IGNITE-10369 Avoid race on handling discovery messages if last custom message is not processed completely - Fixes #5462.
Signed-off-by: Pavel Kovalenko <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0cd303f1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0cd303f1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0cd303f1 Branch: refs/heads/ignite-10044 Commit: 0cd303f1f1b4be23be8f3be6301d4defd4be6ea1 Parents: 504393f Author: ibessonov <[email protected]> Authored: Tue Dec 4 12:55:45 2018 +0300 Committer: Pavel Kovalenko <[email protected]> Committed: Tue Dec 4 12:55:45 2018 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 4 +- .../cluster/GridClusterStateProcessor.java | 2 +- .../ignite/spi/discovery/DiscoveryDataBag.java | 6 +++ .../ignite/spi/discovery/tcp/ServerImpl.java | 41 +++++++++++++++++++- .../IgniteClusterActivateDeactivateTest.java | 4 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 21 ++++++++++ 6 files changed, 70 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index beb8d21..5abe63c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -765,8 +765,6 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { // Current version. discoCache = discoCache(); - final DiscoCache discoCache0 = discoCache; - // If this is a local join event, just save it and do not notify listeners. if (locJoinEvt) { if (gridStartTime == 0) @@ -854,7 +852,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { try { fut.get(); - discoWrk.addEvent(type, nextTopVer, node, discoCache0, topSnapshot, null); + discoWrk.addEvent(EVT_CLIENT_NODE_RECONNECTED, nextTopVer, node, discoCache, topSnapshot, null); } catch (IgniteException ignore) { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index c48128e..9d2adae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -414,7 +414,7 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I if (msg.requestId().equals(state.transitionRequestId())) { log.info("Received state change finish message: " + msg.clusterActive()); - globalState = globalState.finish(msg.success()); + globalState = state.finish(msg.success()); afterStateChangeFinished(msg.id(), msg.success()); http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java index b8d8908..2b6bbca 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.GridComponent; +import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; /** @@ -312,4 +313,9 @@ public class DiscoveryDataBag { @Nullable public Map<Integer, Serializable> localNodeSpecificData() { return nodeSpecificData.get(DEFAULT_KEY); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DiscoveryDataBag.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index ce69e78..bab9ec0 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -261,6 +261,9 @@ class ServerImpl extends TcpDiscoveryImpl { private final ConcurrentMap<InetSocketAddress, GridPingFutureAdapter<IgniteBiTuple<UUID, Boolean>>> pingMap = new ConcurrentHashMap<>(); + /** Last listener future. */ + private IgniteFuture<?> lastCustomEvtLsnrFut; + /** * @param adapter Adapter. */ @@ -2157,6 +2160,20 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * Wait for all the listeners from previous discovery message to be completed. + */ + private void waitForLastCustomEventListenerFuture() { + if (lastCustomEvtLsnrFut != null) { + try { + lastCustomEvtLsnrFut.get(); + } + finally { + lastCustomEvtLsnrFut = null; + } + } + } + + /** * Discovery messages history used for client reconnect. */ private class EnsuredMessageHistory { @@ -4160,6 +4177,15 @@ class ServerImpl extends TcpDiscoveryImpl { private void processNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { assert msg != null; + blockingSectionBegin(); + + try { + waitForLastCustomEventListenerFuture(); + } + finally { + blockingSectionEnd(); + } + TcpDiscoveryNode node = msg.node(); assert node != null; @@ -5606,8 +5632,19 @@ class ServerImpl extends TcpDiscoveryImpl { hist, msgObj); - if (waitForNotification || msgObj.isMutable()) - fut.get(); + if (waitForNotification || msgObj.isMutable()) { + blockingSectionBegin(); + + try { + fut.get(); + } + finally { + blockingSectionEnd(); + } + } + else { + lastCustomEvtLsnrFut = fut; + } if (msgObj.isMutable()) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java index ed6eb86..cc65ada 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteClusterActivateDeactivateTest.java @@ -1201,13 +1201,13 @@ public class IgniteClusterActivateDeactivateTest extends GridCommonAbstractTest client = false; - IgniteInternalFuture startFut1 = GridTestUtils.runAsync((Callable)() -> { + IgniteInternalFuture<?> startFut1 = GridTestUtils.runAsync(() -> { startGrid(4); return null; }, "start-node1"); - IgniteInternalFuture startFut2 = GridTestUtils.runAsync((Callable)() -> { + IgniteInternalFuture<?> startFut2 = GridTestUtils.runAsync(() -> { startGrid(5); return null; http://git-wip-us.apache.org/repos/asf/ignite/blob/0cd303f1/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index abe1877..284f642 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -192,6 +192,9 @@ public class ZookeeperDiscoveryImpl { /** */ private final ZookeeperDiscoveryStatistics stats; + /** Last listener future. */ + private IgniteFuture<?> lastCustomEvtLsnrFut; + /** * @param spi Discovery SPI. * @param igniteInstanceName Instance name. @@ -2752,6 +2755,8 @@ public class ZookeeperDiscoveryImpl { private boolean processBulkJoin(ZkDiscoveryEventsData evtsData, ZkDiscoveryNodeJoinEventData evtData) throws Exception { + waitForLastListenerFuture(); + boolean evtProcessed = false; for (int i = 0; i < evtData.joinedNodes.size(); i++) { @@ -3430,6 +3435,8 @@ public class ZookeeperDiscoveryImpl { if (msg != null && msg.isMutable()) fut.get(); + else + lastCustomEvtLsnrFut = fut; } /** @@ -3987,6 +3994,20 @@ public class ZookeeperDiscoveryImpl { } /** + * Wait for all the listeners from previous discovery message to be completed. + */ + private void waitForLastListenerFuture() { + if (lastCustomEvtLsnrFut != null) { + try { + lastCustomEvtLsnrFut.get(); + } + finally { + lastCustomEvtLsnrFut = null; + } + } + } + + /** * */ private class ReconnectClosure implements Runnable {
