Repository: ignite
Updated Branches:
  refs/heads/ignite-5578 81dbfd527 -> a97266f07


ignite-5578


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a97266f0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a97266f0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a97266f0

Branch: refs/heads/ignite-5578
Commit: a97266f07a9d0f6b09db782e967d0824bf60cb20
Parents: 81dbfd5
Author: sboikov <[email protected]>
Authored: Wed Aug 2 17:08:22 2017 +0300
Committer: sboikov <[email protected]>
Committed: Wed Aug 2 17:08:22 2017 +0300

----------------------------------------------------------------------
 .../cache/CacheAffinitySharedManager.java       |  9 ++-
 .../dht/GridDhtPartitionTopologyImpl.java       | 18 +++---
 .../distributed/CacheExchangeMergeTest.java     | 65 +++++++++++++++++++-
 3 files changed, 77 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a97266f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index ea3196c..b28cd95 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -1141,6 +1141,9 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     ) throws IgniteCheckedException {
         caches.initStartedCaches(descs);
 
+        if (fut.context().mergeExchanges())
+            return;
+
         if (crd) {
             forAllRegisteredCacheGroups(new 
IgniteInClosureX<CacheGroupDescriptor>() {
                 @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
@@ -1369,7 +1372,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
      * @throws IgniteCheckedException If failed.
      */
     public  Map<Integer, CacheGroupAffinityMessage> 
onServerLeftWithExchangeMergeProtocol(
-        GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException
+        final GridDhtPartitionsExchangeFuture fut) throws 
IgniteCheckedException
     {
         final ExchangeDiscoveryEvents evts = fut.context().events();
 
@@ -1385,7 +1388,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
                 List<List<ClusterNode>> assign =
                     cache.affinity().calculate(topVer, evts.lastEvent(), 
evts.discoveryCache());
 
-                if (!cache.rebalanceEnabled)
+                if (!cache.rebalanceEnabled || 
fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
                     cache.affinity().initialize(topVer, assign);
             }
         });
@@ -2061,7 +2064,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
             @Override public void applyx(CacheGroupDescriptor desc) throws 
IgniteCheckedException {
                 CacheGroupHolder grpHolder = groupHolder(topVer, desc);
 
-                if (!grpHolder.rebalanceEnabled)
+                if (!grpHolder.rebalanceEnabled || 
fut.cacheGroupAddedOnExchange(desc.groupId(), desc.receivedFrom()))
                     return;
 
                 AffinityTopologyVersion affTopVer = 
grpHolder.affinity().lastVersion();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a97266f0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 48192c9..3deda12 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -461,13 +461,6 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         lastTopChangeVer = readyTopVer = 
evts.topologyVersion();
                     }
 
-                    if (evts.hasServerLeft()) {
-                        for (DiscoveryEvent evt : evts.events()) {
-                            if (evts.serverLeftEvent(evt))
-                                removeNode(evt.eventNode().id());
-                        }
-                    }
-
                     ClusterNode oldest = 
discoCache.oldestAliveServerNodeWithCache();
 
                     if (log.isDebugEnabled()) {
@@ -516,6 +509,13 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
                         }
                     }
 
+                    if (evts.hasServerLeft()) {
+                        for (DiscoveryEvent evt : evts.events()) {
+                            if (evts.serverLeftEvent(evt))
+                                removeNode(evt.eventNode().id());
+                        }
+                    }
+
                     if (grp.affinityNode()) {
                         if (grpStarted ||
                             exchFut.discoveryEvent().type() == 
EVT_DISCOVERY_CUSTOM_EVT ||
@@ -1047,9 +1047,9 @@ public class GridDhtPartitionTopologyImpl implements 
GridDhtPartitionTopology {
 
         try {
             assert node2part != null && node2part.valid() : "Invalid 
node-to-partitions map [topVer=" + topVer +
+                ", grp=" + grp.cacheOrGroupName() +
                 ", allIds=" + allIds +
-                ", node2part=" + node2part +
-                ", grp=" + grp.cacheOrGroupName() + ']';
+                ", node2part=" + node2part + ']';
 
             // Node IDs can be null if both, primary and backup, nodes 
disappear.
             List<ClusterNode> nodes = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/a97266f0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 8d0cb39..83ed16a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -529,9 +529,68 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
         checkCaches();
     }
 
-    // TODO IGNITE-5578 join with start cache, merge with fail
-    // TODO IGNITE-5578 join with start cache, merge with join, coordinator 
left
-    // TODO IGNITE-5578 join with start cache, merge with join, become 
coordinator
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnJoinAndFailMerge() throws Exception {
+        cfgCache = false;
+
+        final Ignite srv0 = startGrids(2);
+
+        mergeExchangeWaitVersion(srv0, 5);
+
+        cfgCache = true;
+
+        IgniteInternalFuture fut = startGrids(srv0, 2, 2);
+
+        stopGrid(1);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnJoinAndCoordinatorFailed1() throws Exception {
+        cfgCache = false;
+
+        final Ignite srv0 = startGrids(2);
+
+        mergeExchangeWaitVersion(srv0, 5);
+
+        cfgCache = true;
+
+        IgniteInternalFuture fut = startGrids(srv0, 2, 2);
+
+        stopGrid(0);
+
+        fut.get();
+
+        checkCaches();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testStartCacheOnJoinAndCoordinatorFailed2() throws Exception {
+        cfgCache = false;
+
+        final Ignite srv0 = startGrid(0);
+
+        mergeExchangeWaitVersion(srv0, 3);
+
+        cfgCache = true;
+
+        IgniteInternalFuture fut = startGrids(srv0, 1, 2);
+
+        stopGrid(0);
+
+        fut.get();
+
+        checkCaches();
+    }
 
     /**
      * @throws Exception If failed.

Reply via email to