Repository: ignite Updated Branches: refs/heads/ignite-5578 81dbfd527 -> a97266f07
ignite-5578 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a97266f0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a97266f0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a97266f0 Branch: refs/heads/ignite-5578 Commit: a97266f07a9d0f6b09db782e967d0824bf60cb20 Parents: 81dbfd5 Author: sboikov <[email protected]> Authored: Wed Aug 2 17:08:22 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Aug 2 17:08:22 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 9 ++- .../dht/GridDhtPartitionTopologyImpl.java | 18 +++--- .../distributed/CacheExchangeMergeTest.java | 65 +++++++++++++++++++- 3 files changed, 77 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a97266f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index ea3196c..b28cd95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1141,6 +1141,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap ) throws IgniteCheckedException { caches.initStartedCaches(descs); + if (fut.context().mergeExchanges()) + return; + if (crd) { forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { @@ -1369,7 +1372,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @throws IgniteCheckedException If failed. */ public Map<Integer, CacheGroupAffinityMessage> onServerLeftWithExchangeMergeProtocol( - GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException + final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { final ExchangeDiscoveryEvents evts = fut.context().events(); @@ -1385,7 +1388,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<List<ClusterNode>> assign = cache.affinity().calculate(topVer, evts.lastEvent(), evts.discoveryCache()); - if (!cache.rebalanceEnabled) + if (!cache.rebalanceEnabled || fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom())) cache.affinity().initialize(topVer, assign); } }); @@ -2061,7 +2064,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap @Override public void applyx(CacheGroupDescriptor desc) throws IgniteCheckedException { CacheGroupHolder grpHolder = groupHolder(topVer, desc); - if (!grpHolder.rebalanceEnabled) + if (!grpHolder.rebalanceEnabled || fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom())) return; AffinityTopologyVersion affTopVer = grpHolder.affinity().lastVersion(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a97266f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 48192c9..3deda12 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -461,13 +461,6 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { lastTopChangeVer = readyTopVer = evts.topologyVersion(); } - if (evts.hasServerLeft()) { - for (DiscoveryEvent evt : evts.events()) { - if (evts.serverLeftEvent(evt)) - removeNode(evt.eventNode().id()); - } - } - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); if (log.isDebugEnabled()) { @@ -516,6 +509,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } + if (evts.hasServerLeft()) { + for (DiscoveryEvent evt : evts.events()) { + if (evts.serverLeftEvent(evt)) + removeNode(evt.eventNode().id()); + } + } + if (grp.affinityNode()) { if (grpStarted || exchFut.discoveryEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || @@ -1047,9 +1047,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + + ", grp=" + grp.cacheOrGroupName() + ", allIds=" + allIds + - ", node2part=" + node2part + - ", grp=" + grp.cacheOrGroupName() + ']'; + ", node2part=" + node2part + ']'; // Node IDs can be null if both, primary and backup, nodes disappear. List<ClusterNode> nodes = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a97266f0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java index 8d0cb39..83ed16a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java @@ -529,9 +529,68 @@ public class CacheExchangeMergeTest extends GridCommonAbstractTest { checkCaches(); } - // TODO IGNITE-5578 join with start cache, merge with fail - // TODO IGNITE-5578 join with start cache, merge with join, coordinator left - // TODO IGNITE-5578 join with start cache, merge with join, become coordinator + /** + * @throws Exception If failed. + */ + public void testStartCacheOnJoinAndFailMerge() throws Exception { + cfgCache = false; + + final Ignite srv0 = startGrids(2); + + mergeExchangeWaitVersion(srv0, 5); + + cfgCache = true; + + IgniteInternalFuture fut = startGrids(srv0, 2, 2); + + stopGrid(1); + + fut.get(); + + checkCaches(); + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheOnJoinAndCoordinatorFailed1() throws Exception { + cfgCache = false; + + final Ignite srv0 = startGrids(2); + + mergeExchangeWaitVersion(srv0, 5); + + cfgCache = true; + + IgniteInternalFuture fut = startGrids(srv0, 2, 2); + + stopGrid(0); + + fut.get(); + + checkCaches(); + } + + /** + * @throws Exception If failed. + */ + public void testStartCacheOnJoinAndCoordinatorFailed2() throws Exception { + cfgCache = false; + + final Ignite srv0 = startGrid(0); + + mergeExchangeWaitVersion(srv0, 3); + + cfgCache = true; + + IgniteInternalFuture fut = startGrids(srv0, 1, 2); + + stopGrid(0); + + fut.get(); + + checkCaches(); + } /** * @throws Exception If failed.
