Repository: ignite
Updated Branches:
  refs/heads/master d17ac69c4 -> ebd669e4c


IGNITE-8474 Fixed WalStateNodeLeaveExchangeTask preventing exchange merge - 
Fixes #3990.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ebd669e4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ebd669e4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ebd669e4

Branch: refs/heads/master
Commit: ebd669e4c53cfd66708ff18dd59071e4aace38ae
Parents: d17ac69
Author: Sergey Chugunov <sergey.chugu...@gmail.com>
Authored: Thu May 17 13:10:01 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Thu May 17 14:23:15 2018 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 115 ++++++++++++-------
 .../cache/WalStateNodeLeaveExchangeTask.java    |   2 +-
 .../distributed/CacheExchangeMergeTest.java     |  25 +++-
 .../ignite/testframework/GridTestUtils.java     |  11 +-
 4 files changed, 108 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ebd669e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 28d5d20..c3a0add 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -217,6 +217,9 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
     /** For tests only. */
     private volatile AffinityTopologyVersion exchMergeTestWaitVer;
 
+    /** For tests only. */
+    private volatile List mergedEvtsForTest;
+
     /** Distributed latch manager. */
     private ExchangeLatchManager latchMgr;
 
@@ -1879,9 +1882,14 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
      * For testing only.
      *
      * @param exchMergeTestWaitVer Version to wait for.
+     * @param mergedEvtsForTest List to collect discovery events with merged 
exchanges.
      */
-    public void mergeExchangesTestWaitVersion(AffinityTopologyVersion 
exchMergeTestWaitVer) {
+    public void mergeExchangesTestWaitVersion(
+        AffinityTopologyVersion exchMergeTestWaitVer,
+        @Nullable List mergedEvtsForTest
+    ) {
         this.exchMergeTestWaitVer = exchMergeTestWaitVer;
+        this.mergedEvtsForTest = mergedEvtsForTest;
     }
 
     /**
@@ -1968,46 +1976,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         AffinityTopologyVersion exchMergeTestWaitVer = 
this.exchMergeTestWaitVer;
 
-        if (exchMergeTestWaitVer != null) {
-            if (log.isInfoEnabled()) {
-                log.info("Exchange merge test, waiting for version [exch=" + 
curFut.initialVersion() +
-                    ", waitVer=" + exchMergeTestWaitVer + ']');
-            }
-
-            long end = U.currentTimeMillis() + 10_000;
-
-            while (U.currentTimeMillis() < end) {
-                boolean found = false;
-
-                for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
-                    if (task instanceof GridDhtPartitionsExchangeFuture) {
-                        GridDhtPartitionsExchangeFuture fut = 
(GridDhtPartitionsExchangeFuture)task;
-
-                        if (exchMergeTestWaitVer.equals(fut.initialVersion())) 
{
-                            if (log.isInfoEnabled())
-                                log.info("Exchange merge test, found awaited 
version: " + exchMergeTestWaitVer);
-
-                            found = true;
-
-                            break;
-                        }
-                    }
-                }
-
-                if (found)
-                    break;
-                else {
-                    try {
-                        U.sleep(100);
-                    }
-                    catch (IgniteInterruptedCheckedException e) {
-                        break;
-                    }
-                }
-            }
-
-            this.exchMergeTestWaitVer = null;
-        }
+        if (exchMergeTestWaitVer != null)
+            waitForTestVersion(exchMergeTestWaitVer, curFut);
 
         synchronized (curFut.mutex()) {
             int awaited = 0;
@@ -2048,6 +2018,8 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
                             ", evtNodeClient=" + 
CU.clientNode(fut.firstEvent().eventNode())+ ']');
                     }
 
+                    addDiscoEvtForTest(fut.firstEvent());
+
                     curFut.context().events().addEvent(fut.initialVersion(),
                         fut.firstEvent(),
                         fut.firstEventCache());
@@ -2071,6 +2043,67 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
         }
     }
 
+
+    /**
+     * For testing purposes. Stores discovery events with merged exchanges to 
enable examining them later.
+     *
+     * @param discoEvt Discovery event.
+     */
+    private void addDiscoEvtForTest(DiscoveryEvent discoEvt) {
+        List mergedEvtsForTest = this.mergedEvtsForTest;
+
+        if (mergedEvtsForTest != null)
+            mergedEvtsForTest.add(discoEvt);
+    }
+
+    /**
+     * For testing purposes. Method allows to wait for an exchange future of 
specific version
+     * to appear in exchange worker queue.
+     *
+     * @param exchMergeTestWaitVer Topology Version to wait for.
+     * @param curFut Current Exchange Future.
+     */
+    private void waitForTestVersion(AffinityTopologyVersion 
exchMergeTestWaitVer, GridDhtPartitionsExchangeFuture curFut) {
+        if (log.isInfoEnabled()) {
+            log.info("Exchange merge test, waiting for version [exch=" + 
curFut.initialVersion() +
+                ", waitVer=" + exchMergeTestWaitVer + ']');
+        }
+
+        long end = U.currentTimeMillis() + 10_000;
+
+        while (U.currentTimeMillis() < end) {
+            boolean found = false;
+
+            for (CachePartitionExchangeWorkerTask task : exchWorker.futQ) {
+                if (task instanceof GridDhtPartitionsExchangeFuture) {
+                    GridDhtPartitionsExchangeFuture fut = 
(GridDhtPartitionsExchangeFuture)task;
+
+                    if (exchMergeTestWaitVer.equals(fut.initialVersion())) {
+                        if (log.isInfoEnabled())
+                            log.info("Exchange merge test, found awaited 
version: " + exchMergeTestWaitVer);
+
+                        found = true;
+
+                        break;
+                    }
+                }
+            }
+
+            if (found)
+                break;
+            else {
+                try {
+                    U.sleep(100);
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    break;
+                }
+            }
+        }
+
+        this.exchMergeTestWaitVer = null;
+    }
+
     /**
      * Exchange future thread. All exchanges happen only by one thread and next
      * exchange will not start until previous one completes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebd669e4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java
index 3ac12fc..77dfc34 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateNodeLeaveExchangeTask.java
@@ -47,7 +47,7 @@ public class WalStateNodeLeaveExchangeTask implements 
CachePartitionExchangeWork
 
     /** {@inheritDoc} */
     @Override public boolean skipForExchangeMerge() {
-        return false;
+        return true;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebd669e4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
index 6c714b1..53a75d4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheExchangeMergeTest.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -775,17 +776,37 @@ public class CacheExchangeMergeTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void mergeServersFail1(boolean waitRebalance) throws Exception {
-        final Ignite srv0 = startGrids(4);
+        final Ignite srv0 = startGrids(5);
 
         if (waitRebalance)
             awaitPartitionMapExchange();
 
-        mergeExchangeWaitVersion(srv0, 6);
+        final List<DiscoveryEvent> mergedEvts = new ArrayList<>();
+
+        mergeExchangeWaitVersion(srv0, 8, mergedEvts);
+
+        UUID grid3Id = grid(3).localNode().id();
+        UUID grid2Id = grid(2).localNode().id();
 
+        stopGrid(getTestIgniteInstanceName(4), true, false);
         stopGrid(getTestIgniteInstanceName(3), true, false);
         stopGrid(getTestIgniteInstanceName(2), true, false);
 
         checkCaches();
+
+        awaitPartitionMapExchange();
+
+        assertTrue("Unexpected number of merged disco events: " + 
mergedEvts.size(), mergedEvts.size() == 2);
+
+        for (DiscoveryEvent discoEvt : mergedEvts) {
+            ClusterNode evtNode = discoEvt.eventNode();
+
+            assertTrue("eventNode is null for DiscoEvent " + discoEvt, evtNode 
!= null);
+
+            assertTrue("Unexpected eventNode ID: "
+                    + evtNode.id() + " while expecting " + grid2Id + " or " + 
grid3Id,
+                evtNode.id().equals(grid2Id) || evtNode.id().equals(grid3Id));
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/ebd669e4/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index e6c6657..9390d6b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -1949,6 +1949,15 @@ public final class GridTestUtils {
      */
     public static void mergeExchangeWaitVersion(Ignite node, long topVer) {
         
((IgniteEx)node).context().cache().context().exchange().mergeExchangesTestWaitVersion(
-            new AffinityTopologyVersion(topVer, 0));
+            new AffinityTopologyVersion(topVer, 0), null);
+    }
+
+    /**
+     * @param node Node.
+     * @param topVer Ready exchange version to wait for before trying to merge 
exchanges.
+     */
+    public static void mergeExchangeWaitVersion(Ignite node, long topVer, List 
mergedEvts) {
+        
((IgniteEx)node).context().cache().context().exchange().mergeExchangesTestWaitVersion(
+            new AffinityTopologyVersion(topVer, 0), mergedEvts);
     }
 }

Reply via email to