IGNITE-10369 Avoid race on handling discovery messages if last custom message 
is not processed completely - Fixes #5462.

Signed-off-by: Pavel Kovalenko <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0cd303f1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0cd303f1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0cd303f1

Branch: refs/heads/ignite-10044
Commit: 0cd303f1f1b4be23be8f3be6301d4defd4be6ea1
Parents: 504393f
Author: ibessonov <[email protected]>
Authored: Tue Dec 4 12:55:45 2018 +0300
Committer: Pavel Kovalenko <[email protected]>
Committed: Tue Dec 4 12:55:45 2018 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |  4 +-
 .../cluster/GridClusterStateProcessor.java      |  2 +-
 .../ignite/spi/discovery/DiscoveryDataBag.java  |  6 +++
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 41 +++++++++++++++++++-
 .../IgniteClusterActivateDeactivateTest.java    |  4 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 21 ++++++++++
 6 files changed, 70 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index beb8d21..5abe63c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -765,8 +765,6 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     // Current version.
                     discoCache = discoCache();
 
-                final DiscoCache discoCache0 = discoCache;
-
                 // If this is a local join event, just save it and do not 
notify listeners.
                 if (locJoinEvt) {
                     if (gridStartTime == 0)
@@ -854,7 +852,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                             try {
                                 fut.get();
 
-                                discoWrk.addEvent(type, nextTopVer, node, 
discoCache0, topSnapshot, null);
+                                discoWrk.addEvent(EVT_CLIENT_NODE_RECONNECTED, 
nextTopVer, node, discoCache, topSnapshot, null);
                             }
                             catch (IgniteException ignore) {
                                 // No-op.

http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index c48128e..9d2adae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -414,7 +414,7 @@ public class GridClusterStateProcessor extends 
GridProcessorAdapter implements I
         if (msg.requestId().equals(state.transitionRequestId())) {
             log.info("Received state change finish message: " + 
msg.clusterActive());
 
-            globalState = globalState.finish(msg.success());
+            globalState = state.finish(msg.success());
 
             afterStateChangeFinished(msg.id(), msg.success());
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
index b8d8908..2b6bbca 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -312,4 +313,9 @@ public class DiscoveryDataBag {
     @Nullable public Map<Integer, Serializable> localNodeSpecificData() {
         return nodeSpecificData.get(DEFAULT_KEY);
     }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(DiscoveryDataBag.class, this);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index ce69e78..bab9ec0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -261,6 +261,9 @@ class ServerImpl extends TcpDiscoveryImpl {
     private final ConcurrentMap<InetSocketAddress, 
GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>>> pingMap =
         new ConcurrentHashMap<>();
 
+    /** Last listener future. */
+    private IgniteFuture<?> lastCustomEvtLsnrFut;
+
     /**
      * @param adapter Adapter.
      */
@@ -2157,6 +2160,20 @@ class ServerImpl extends TcpDiscoveryImpl {
     }
 
     /**
+     * Wait for all the listeners from previous discovery message to be 
completed.
+     */
+    private void waitForLastCustomEventListenerFuture() {
+        if (lastCustomEvtLsnrFut != null) {
+            try {
+                lastCustomEvtLsnrFut.get();
+            }
+            finally {
+                lastCustomEvtLsnrFut = null;
+            }
+        }
+    }
+
+    /**
      * Discovery messages history used for client reconnect.
      */
     private class EnsuredMessageHistory {
@@ -4160,6 +4177,15 @@ class ServerImpl extends TcpDiscoveryImpl {
         private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) 
{
             assert msg != null;
 
+            blockingSectionBegin();
+
+            try {
+                waitForLastCustomEventListenerFuture();
+            }
+            finally {
+                blockingSectionEnd();
+            }
+
             TcpDiscoveryNode node = msg.node();
 
             assert node != null;
@@ -5606,8 +5632,19 @@ class ServerImpl extends TcpDiscoveryImpl {
                     hist,
                     msgObj);
 
-                if (waitForNotification || msgObj.isMutable())
-                    fut.get();
+                if (waitForNotification || msgObj.isMutable()) {
+                    blockingSectionBegin();
+
+                    try {
+                        fut.get();
+                    }
+                    finally {
+                        blockingSectionEnd();
+                    }
+                }
+                else {
+                    lastCustomEvtLsnrFut = fut;
+                }
 
                 if (msgObj.isMutable()) {
                     try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
index ed6eb86..cc65ada 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java
@@ -1201,13 +1201,13 @@ public class IgniteClusterActivateDeactivateTest 
extends GridCommonAbstractTest
 
         client = false;
 
-        IgniteInternalFuture startFut1 = GridTestUtils.runAsync((Callable)() 
-> {
+        IgniteInternalFuture<?> startFut1 = GridTestUtils.runAsync(() -> {
             startGrid(4);
 
             return null;
         }, "start-node1");
 
-        IgniteInternalFuture startFut2 = GridTestUtils.runAsync((Callable)() 
-> {
+        IgniteInternalFuture<?> startFut2 = GridTestUtils.runAsync(() -> {
             startGrid(5);
 
             return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index abe1877..284f642 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -192,6 +192,9 @@ public class ZookeeperDiscoveryImpl {
     /** */
     private final ZookeeperDiscoveryStatistics stats;
 
+    /** Last listener future. */
+    private IgniteFuture<?> lastCustomEvtLsnrFut;
+
     /**
      * @param spi Discovery SPI.
      * @param igniteInstanceName Instance name.
@@ -2752,6 +2755,8 @@ public class ZookeeperDiscoveryImpl {
     private boolean processBulkJoin(ZkDiscoveryEventsData evtsData, 
ZkDiscoveryNodeJoinEventData evtData)
         throws Exception
     {
+        waitForLastListenerFuture();
+
         boolean evtProcessed = false;
 
         for (int i = 0; i < evtData.joinedNodes.size(); i++) {
@@ -3430,6 +3435,8 @@ public class ZookeeperDiscoveryImpl {
 
         if (msg != null && msg.isMutable())
             fut.get();
+        else
+            lastCustomEvtLsnrFut = fut;
     }
 
     /**
@@ -3987,6 +3994,20 @@ public class ZookeeperDiscoveryImpl {
     }
 
     /**
+     * Wait for all the listeners from previous discovery message to be 
completed.
+     */
+    private void waitForLastListenerFuture() {
+        if (lastCustomEvtLsnrFut != null) {
+            try {
+                lastCustomEvtLsnrFut.get();
+            }
+            finally {
+                lastCustomEvtLsnrFut = null;
+            }
+        }
+    }
+
+    /**
      *
      */
     private class ReconnectClosure implements Runnable {

Reply via email to