Repository: ignite Updated Branches: refs/heads/ignite-5578 ef038c4cd -> c7a606905
ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c7a60690 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c7a60690 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c7a60690 Branch: refs/heads/ignite-5578 Commit: c7a606905c7fd1ab40240f0cdc87947432759fdd Parents: ef038c4 Author: sboikov <[email protected]> Authored: Tue Jul 18 17:58:31 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Jul 18 18:56:05 2017 +0300 ---------------------------------------------------------------------- .../internal/managers/discovery/DiscoCache.java | 13 ++++ .../discovery/GridDiscoveryManager.java | 21 +++++-- .../cache/CacheAffinitySharedManager.java | 63 +++++--------------- .../dht/GridDhtPartitionTopologyImpl.java | 3 + .../preloader/CacheGroupAffinityMessage.java | 9 ++- .../distributed/CacheExchangeMergeTest.java | 16 +++-- 6 files changed, 67 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index f63c5f6..ac2d0b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -85,6 +86,9 @@ public class DiscoCache { /** */ private final IgniteProductVersion minNodeVer; + /** */ + private final AffinityTopologyVersion topVer; + /** * @param state Current cluster state. * @param loc Local node. @@ -101,6 +105,7 @@ public class DiscoCache { * @param alives Alive nodes. */ DiscoCache( + AffinityTopologyVersion topVer, DiscoveryDataClusterState state, ClusterNode loc, List<ClusterNode> rmtNodes, @@ -114,6 +119,7 @@ public class DiscoCache { Map<Integer, List<ClusterNode>> cacheGrpAffNodes, Map<UUID, ClusterNode> nodeMap, Set<UUID> alives) { + this.topVer = topVer; this.state = state; this.loc = loc; this.rmtNodes = rmtNodes; @@ -143,6 +149,13 @@ public class DiscoCache { } /** + * @return + */ + public AffinityTopologyVersion version() { + return topVer; + } + + /** * @return Minimum node version. */ public IgniteProductVersion minimumNodeVersion() { http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index 9f5bd3f..adf4431 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -597,7 +597,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ChangeGlobalStateFinishMessage stateFinishMsg = null; if (locJoinEvt) { - discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot); + discoCache = createDiscoCache(new AffinityTopologyVersion(topVer, minorTopVer), + ctx.state().clusterState(), + locNode, + topSnapshot); transitionWaitFut = ctx.state().onLocalJoin(discoCache); } @@ -620,7 +623,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { else if (customMsg instanceof ChangeGlobalStateFinishMessage) { ctx.state().onStateFinishMessage((ChangeGlobalStateFinishMessage)customMsg); - discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot); + discoCache = createDiscoCache(topSnap.get().topVer, + ctx.state().clusterState(), + locNode, + topSnapshot); topSnap.set(new Snapshot(topSnap.get().topVer, discoCache)); @@ -668,7 +674,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // event notifications, since SPI notifies manager about all events from this listener. if (verChanged) { if (discoCache == null) - discoCache = createDiscoCache(ctx.state().clusterState(), locNode, topSnapshot); + discoCache = createDiscoCache(nextTopVer, ctx.state().clusterState(), locNode, topSnapshot); discoCacheHist.put(nextTopVer, discoCache); @@ -739,7 +745,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { topHist.clear(); topSnap.set(new Snapshot(AffinityTopologyVersion.ZERO, - createDiscoCache(ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet()))); + createDiscoCache(AffinityTopologyVersion.ZERO, ctx.state().clusterState(), locNode, Collections.<ClusterNode>emptySet()))); } else if (type == EVT_CLIENT_NODE_RECONNECTED) { assert locNode.isClient() : locNode; @@ -2149,7 +2155,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { * @param topSnapshot Topology snapshot. * @return Newly created discovery cache. */ - @NotNull private DiscoCache createDiscoCache(DiscoveryDataClusterState state, + @NotNull private DiscoCache createDiscoCache( + AffinityTopologyVersion topVer, + DiscoveryDataClusterState state, ClusterNode loc, Collection<ClusterNode> topSnapshot) { HashSet<UUID> alives = U.newHashSet(topSnapshot.size()); @@ -2226,6 +2234,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } return new DiscoCache( + topVer, state, loc, Collections.unmodifiableList(rmtNodes), @@ -2368,7 +2377,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { discoWrk.addEvent(EVT_NODE_SEGMENTED, AffinityTopologyVersion.NONE, node, - createDiscoCache(null, node, empty), + createDiscoCache(AffinityTopologyVersion.NONE, null, node, empty), empty, null); http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index df7bf03..ba6a22b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -89,14 +89,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT, 10_000); /** */ - static final IgniteClosure<ClusterNode, UUID> NODE_TO_ID = new IgniteClosure<ClusterNode, UUID>() { + private static final IgniteClosure<ClusterNode, UUID> NODE_TO_ID = new IgniteClosure<ClusterNode, UUID>() { @Override public UUID apply(ClusterNode node) { return node.id(); } }; /** */ - static final IgniteClosure<ClusterNode, Long> NODE_TO_ORDER = new IgniteClosure<ClusterNode, Long>() { + private static final IgniteClosure<ClusterNode, Long> NODE_TO_ORDER = new IgniteClosure<ClusterNode, Long>() { @Override public Long apply(ClusterNode node) { return node.order(); } @@ -105,9 +105,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** Affinity information for all started caches (initialized on coordinator). */ private ConcurrentMap<Integer, CacheGroupHolder> grpHolders = new ConcurrentHashMap<>(); - /** Last topology version when affinity was calculated (updated from exchange thread). */ - private AffinityTopologyVersion affCalcVer; - /** Topology version which requires affinity re-calculation (set from discovery thread). */ private AffinityTopologyVersion lastAffVer; @@ -172,8 +169,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap // Clean-up in case of client reconnect. caches.clear(); - affCalcVer = null; - lastAffVer = null; caches.init(cctx.cache().cacheGroupDescriptors(), cctx.cache().cacheDescriptors()); @@ -272,10 +267,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (waitInfo == null) return; - assert affCalcVer != null; - assert affCalcVer.equals(waitInfo.topVer) : "Invalid affinity version [calcVer=" + affCalcVer + - ", waitVer=" + waitInfo.topVer + ']'; - Map<Integer, UUID> partWait = waitInfo.waitGrps.get(checkGrpId); boolean rebalanced = true; @@ -841,7 +832,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } if (stoppedGrps != null) { - boolean notify = false; + AffinityTopologyVersion notifyTopVer = null; synchronized (mux) { if (waitInfo != null) { @@ -849,7 +840,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean rmv = waitInfo.waitGrps.remove(grpId) != null; if (rmv) { - notify = true; + notifyTopVer = waitInfo.topVer; waitInfo.assignments.remove(grpId); } @@ -857,8 +848,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } } - if (notify) { - final AffinityTopologyVersion topVer = affCalcVer; + if (notifyTopVer != null) { + final AffinityTopologyVersion topVer = notifyTopVer; cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { @@ -948,16 +939,12 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean crd, final CacheAffinityChangeMessage msg) throws IgniteCheckedException { - assert affCalcVer != null || cctx.kernalContext().clientNode(); assert msg.topologyVersion() != null && msg.exchangeId() == null : msg; - assert affCalcVer == null || affCalcVer.equals(msg.topologyVersion()) : - "Invalid version [affCalcVer=" + affCalcVer + ", msg=" + msg + ']'; final AffinityTopologyVersion topVer = exchFut.topologyVersion(); if (log.isDebugEnabled()) { log.debug("Process affinity change message [exchVer=" + exchFut.topologyVersion() + - ", affCalcVer=" + affCalcVer + ", msgVer=" + msg.topologyVersion() + ']'); } @@ -1019,11 +1006,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap aff.clientEventTopologyChange(exchFut.discoveryEvent(), topVer); } }); - - synchronized (mux) { - if (affCalcVer == null) - affCalcVer = msg.topologyVersion(); - } } /** @@ -1267,6 +1249,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); + log.info("mergeExchangesOnServerLeft [topVer=" + fut.context().events().discoveryCache().version() + ']'); + forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { ExchangeDiscoveryEvents evts = fut.context().events(); @@ -1301,10 +1285,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap aff.initialize(evts.topologyVersion(), cachedAssignment(aff, newAssignment, affCache)); } }); - - synchronized (mux) { - affCalcVer = fut.context().events().waitRebalanceEventVersion(); - } } public void onJoin(final GridDhtPartitionsExchangeFuture fut, GridDhtPartitionsFullMessage msg) @@ -1345,18 +1325,14 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } } }); - - if (!cctx.kernalContext().clientNode()) { - synchronized (mux) { - affCalcVer = fut.context().events().topologyVersion(); - } - } } public void mergeExchangesOnServerJoin(GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException { final ExchangeDiscoveryEvents evts = fut.context().events(); + log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.discoveryCache().version() + ']'); + assert evts.serverJoin() && !evts.serverLeft(); WaitRebalanceInfo waitRebalanceInfo = initAffinityOnNodeJoin(evts, crd); @@ -1371,6 +1347,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert evts.serverLeft(); + log.info("mergeExchangesInitAffinityOnServerLeft [topVer=" + evts.discoveryCache().version() + ']'); + forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { AffinityTopologyVersion topVer = evts.topologyVersion(); @@ -1381,10 +1359,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } }); - synchronized (mux) { - affCalcVer = evts.waitRebalanceEventVersion(); - } - Map<Integer, Map<Integer, List<Long>>> diff = initAffinityOnNodeLeft0(evts.topologyVersion(), fut, NODE_TO_ORDER, @@ -1433,8 +1407,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } private void setWaitRebalanceInfo(WaitRebalanceInfo waitRebalanceInfo, AffinityTopologyVersion topVer, boolean crd) { - affCalcVer = topVer; - this.waitInfo = waitRebalanceInfo != null && !waitRebalanceInfo.empty() ? waitRebalanceInfo : null; WaitRebalanceInfo info = this.waitInfo; @@ -1606,8 +1578,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } synchronized (mux) { - affCalcVer = fut.topologyVersion(); - this.waitInfo = null; } @@ -1992,7 +1962,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap throws IgniteCheckedException { final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(fut.context().events().waitRebalanceEventVersion()); - final Collection<ClusterNode> aliveNodes = cctx.discovery().nodes(topVer); + final Collection<ClusterNode> aliveNodes = fut.context().events().discoveryCache().serverNodes(); final Map<Integer, Map<Integer, List<T>>> assignment = new HashMap<>(); @@ -2013,7 +1983,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert newAssignment != null; - List<List<ClusterNode>> newAssignment0 = initAff ? new ArrayList<>(newAssignment) : null; + List<List<ClusterNode>> newAssignment0 = initAff ? new ArrayList<>(newAssignment) : null; GridDhtPartitionTopology top = grpHolder.topology(fut); @@ -2052,7 +2022,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap for (int i = 1; i < curNodes.size(); i++) { ClusterNode curNode = curNodes.get(i); - if (top.partitionState(curNode.id(), p) == GridDhtPartitionState.OWNING) { + if (top.partitionState(curNode.id(), p) == GridDhtPartitionState.OWNING && + aliveNodes.contains(curNode)) { newNodes0 = latePrimaryAssignment(grpHolder.affinity(), p, curNode, @@ -2107,8 +2078,6 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap }); synchronized (mux) { - assert affCalcVer.equals(topVer); - this.waitInfo = !waitRebalanceInfo.empty() ? waitRebalanceInfo : null; WaitRebalanceInfo info = this.waitInfo; http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 74156d9..52df4a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1118,6 +1118,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (stopping) return false; + if (exchangeVer == null && !topReadyFut.isDone()) + return false; + if (exchangeVer != null) { assert exchangeVer.compareTo(topVer) >= 0 : exchangeVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java index 69a507c..fcfec1d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java @@ -135,6 +135,12 @@ public class CacheGroupAffinityMessage implements Message { return cachesAff; } + /** + * @param assign Nodes orders. + * @param nodesByOrder Nodes by order cache. + * @param discoCache Discovery data cache. + * @return Nodes list. + */ public static List<ClusterNode> toNodes(GridLongList assign, Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) { List<ClusterNode> assign0 = new ArrayList<>(assign.size()); @@ -146,7 +152,8 @@ public class CacheGroupAffinityMessage implements Message { if (affNode == null) { affNode = discoCache.serverNodeByOrder(order); - assert affNode != null : order; + assert affNode != null : "Failed to find node by order [order=" + order + + ", topVer=" + discoCache.version() + ']'; nodesByOrder.put(order, affNode); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c7a60690/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index 5e0d7e2..376224a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -67,8 +67,14 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { } cfg.setCacheConfiguration( - cacheConfiguration("c1", ATOMIC), - cacheConfiguration("c2", TRANSACTIONAL)); + cacheConfiguration("c1", ATOMIC, 0), + cacheConfiguration("c2", ATOMIC, 1), + cacheConfiguration("c3", ATOMIC, 2), + cacheConfiguration("c4", ATOMIC, 10), + cacheConfiguration("c5", TRANSACTIONAL, 0), + cacheConfiguration("c6", TRANSACTIONAL, 1), + cacheConfiguration("c7", TRANSACTIONAL, 2), + cacheConfiguration("c8", TRANSACTIONAL, 10)); return cfg; } @@ -83,13 +89,15 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { /** * @param name Cache name. * @param atomicityMode Cache atomicity mode. + * @param backups Number of backups. * @return Cache configuration. */ - private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode) { + private CacheConfiguration cacheConfiguration(String name, CacheAtomicityMode atomicityMode, int backups) { CacheConfiguration ccfg = new CacheConfiguration(name); ccfg.setAtomicityMode(atomicityMode); ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setBackups(backups); return ccfg; } @@ -246,7 +254,7 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { assertTrue(nodes.size() > 0); - String[] cacheNames = {"c1", "c2"}; + String[] cacheNames = {"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c8"}; ThreadLocalRandom rnd = ThreadLocalRandom.current();
