IGNITE-9398 Reduce time on CustomDiscoveryMessage processing by discovery 
message worker. Fixes #4636.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b13f373c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b13f373c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b13f373c

Branch: refs/heads/ignite-7251
Commit: b13f373c24bf2538603a7f3fee5af70405ad8356
Parents: fb97066
Author: Pavel Kovalenko <[email protected]>
Authored: Fri Sep 14 18:02:36 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Fri Sep 14 18:02:36 2018 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         | 120 +++++++++++++++++--
 .../cluster/GridClusterStateProcessor.java      |   9 +-
 .../org/apache/ignite/spi/IgniteSpiAdapter.java |   3 +-
 .../spi/discovery/DiscoverySpiListener.java     |   5 +-
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  33 +++--
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  64 ++++++----
 ...iteMarshallerCacheClassNameConflictTest.java |  10 +-
 .../IgniteMarshallerCacheFSRestoreTest.java     |   8 +-
 ...gniteAbstractStandByClientReconnectTest.java |  10 +-
 .../discovery/AbstractDiscoverySelfTest.java    |  21 +++-
 .../ignite/testframework/GridTestUtils.java     |   5 +-
 .../zk/internal/ZookeeperDiscoveryImpl.java     | 113 +++++++++++------
 12 files changed, 300 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index d19e08b..48e3318 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -70,6 +70,7 @@ import org.apache.ignite.internal.GridComponent;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
 import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.cluster.NodeOrderComparator;
@@ -103,6 +104,7 @@ import 
org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -221,6 +223,9 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
     /** Discovery event worker. */
     private final DiscoveryWorker discoWrk = new DiscoveryWorker();
 
+    /** Discovery event notyfier worker. */
+    private final DiscoveryMessageNotifyerWorker discoNotifierWrk = new 
DiscoveryMessageNotifyerWorker();
+
     /** Network segment check worker. */
     private SegmentCheckWorker segChkWrk;
 
@@ -583,16 +588,23 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                 }
             }
 
-            @Override public void onDiscovery(
+            @Override public IgniteInternalFuture onDiscovery(
                 final int type,
                 final long topVer,
                 final ClusterNode node,
                 final Collection<ClusterNode> topSnapshot,
                 final Map<Long, Collection<ClusterNode>> snapshots,
-                @Nullable DiscoverySpiCustomMessage spiCustomMsg) {
-                synchronized (discoEvtMux) {
-                    onDiscovery0(type, topVer, node, topSnapshot, snapshots, 
spiCustomMsg);
-                }
+                @Nullable DiscoverySpiCustomMessage spiCustomMsg
+            ) {
+                GridFutureAdapter notificationFut = new GridFutureAdapter();
+
+                discoNotifierWrk.submit(notificationFut, () -> {
+                    synchronized (discoEvtMux) {
+                        onDiscovery0(type, topVer, node, topSnapshot, 
snapshots, spiCustomMsg);
+                    }
+                });
+
+                return notificationFut;
             }
 
             /**
@@ -919,6 +931,8 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
             }
         });
 
+        new IgniteThread(discoNotifierWrk).start();
+
         startSpi();
 
         registeredDiscoSpi = true;
@@ -1698,6 +1712,10 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
         U.join(discoWrk, log);
 
+        U.cancel(discoNotifierWrk);
+
+        U.join(discoNotifierWrk, log);
+
         // Stop SPI itself.
         stopSpi();
 
@@ -2664,6 +2682,86 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
         }
     }
 
+    /**
+     *
+     */
+    private class DiscoveryMessageNotifyerWorker extends GridWorker {
+        /** Queue. */
+        private final BlockingQueue<T2<GridFutureAdapter, Runnable>> queue = 
new LinkedBlockingQueue<>();
+
+        /**
+         * Default constructor.
+         */
+        protected DiscoveryMessageNotifyerWorker() {
+            super(ctx.igniteInstanceName(), "disco-notyfier-worker", 
GridDiscoveryManager.this.log, ctx.workersRegistry());
+        }
+
+        /**
+         *
+         */
+        private void body0() throws InterruptedException {
+            T2<GridFutureAdapter, Runnable> notification = queue.take();
+
+            try {
+                notification.get2().run();
+            }
+            finally {
+                notification.get1().onDone();
+            }
+        }
+
+        /**
+         * @param cmd Command.
+         */
+        public synchronized void submit(GridFutureAdapter notificationFut, 
Runnable cmd) {
+            if (isCancelled()) {
+                notificationFut.onDone();
+
+                return;
+            }
+
+            queue.add(new T2<>(notificationFut, cmd));
+        }
+
+        /**
+         * Cancel thread execution and completes all notification futures.
+         */
+        @Override public synchronized void cancel() {
+            super.cancel();
+
+            while (!queue.isEmpty()) {
+                T2<GridFutureAdapter, Runnable> notification = queue.poll();
+
+                if (notification != null)
+                    notification.get1().onDone();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+            while (!isCancelled()) {
+                try {
+                    body0();
+                }
+                catch (InterruptedException e) {
+                    if (!isCancelled)
+                        ctx.failure().process(new 
FailureContext(SYSTEM_WORKER_TERMINATION, e));
+
+                    throw e;
+                }
+                catch (Throwable t) {
+                    U.error(log, "Exception in discovery notyfier worker 
thread.", t);
+
+                    FailureType type = t instanceof OutOfMemoryError ? 
CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION;
+
+                    ctx.failure().process(new FailureContext(type, t));
+
+                    throw t;
+                }
+            }
+        }
+    }
+
     /** Worker for discovery events. */
     private class DiscoveryWorker extends GridWorker {
         /** */
@@ -2772,15 +2870,13 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
                     throw e;
                 }
                 catch (Throwable t) {
-                    U.error(log, "Exception in discovery worker thread.", t);
+                    U.error(log, "Exception in discovery event worker 
thread.", t);
 
-                    if (t instanceof Error) {
-                        FailureType type = t instanceof OutOfMemoryError ? 
CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION;
+                    FailureType type = t instanceof OutOfMemoryError ? 
CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION;
 
-                        ctx.failure().process(new FailureContext(type, t));
+                    ctx.failure().process(new FailureContext(type, t));
 
-                        throw t;
-                    }
+                    throw t;
                 }
             }
         }
@@ -2795,7 +2891,7 @@ public class GridDiscoveryManager extends 
GridManagerAdapter<DiscoverySpi> {
 
             AffinityTopologyVersion topVer = evt.get2();
 
-            if (type == EVT_NODE_METRICS_UPDATED && 
topVer.compareTo(discoCache.version()) < 0)
+            if (type == EVT_NODE_METRICS_UPDATED && (discoCache == null || 
topVer.compareTo(discoCache.version()) < 0))
                 return;
 
             ClusterNode node = evt.get3();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
index fea751c..727a372 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java
@@ -511,13 +511,16 @@ public class GridClusterStateProcessor extends 
GridProcessorAdapter implements I
 
                 transitionFuts.put(msg.requestId(), new 
GridFutureAdapter<Void>());
 
+                DiscoveryDataClusterState prevState = globalState;
+
                 globalState = DiscoveryDataClusterState.createTransitionState(
-                    globalState,
+                    prevState,
                     msg.activate(),
-                    msg.activate() ? msg.baselineTopology() : 
globalState.baselineTopology(),
+                    msg.activate() ? msg.baselineTopology() : 
prevState.baselineTopology(),
                     msg.requestId(),
                     topVer,
-                    nodeIds);
+                    nodeIds
+                );
 
                 if (msg.forceChangeBaselineTopology())
                     globalState.setTransitionResult(msg.requestId(), 
msg.activate());

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java 
b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index bad0186..a7e6e8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -547,7 +547,8 @@ public abstract class IgniteSpiAdapter implements IgniteSpi 
{
         if (rmtCls == null) {
             if (!optional && starting)
                 throw new IgniteSpiException("Remote SPI with the same name is 
not configured" + tipStr +
-                    " [name=" + name + ", loc=" + locCls + ']');
+                    " [name=" + name + ", loc=" + locCls + ", locNode=" + 
spiCtx.localNode() + ", rmt=" + rmtCls +
+                    ", rmtNode=" + node + ']');
 
             sb.a(format(">>> Remote SPI with the same name is not configured: 
" + name, locCls));
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
index 2b2ac94..519a235 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Map;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -48,8 +49,10 @@ public interface DiscoverySpiListener {
      *      {@code EVT_NODE_JOINED}, then joined node will be in snapshot).
      * @param topHist Topology snapshots history.
      * @param data Data for custom event.
+     *
+     * @return A future that will be completed when notification process has 
finished.
      */
-    public void onDiscovery(
+    public IgniteInternalFuture onDiscovery(
         int type,
         long topVer,
         ClusterNode node,

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index c88ef20..312f737 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -470,7 +470,12 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 Collection<ClusterNode> top = updateTopologyHistory(topVer + 
1, null);
 
-                lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new 
TreeMap<>(topHist), null);
+                try {
+                    lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new 
TreeMap<>(topHist), null).get();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException("Failed to wait for discovery 
listener notification", e);
+                }
             }
         }
 
@@ -2483,21 +2488,31 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @param top Topology snapshot.
          * @param data Optional custom message data.
          */
-        private void notifyDiscovery(int type, long topVer, ClusterNode node, 
Collection<ClusterNode> top,
-            @Nullable DiscoverySpiCustomMessage data) {
+        private void notifyDiscovery(
+            int type,
+            long topVer,
+            ClusterNode node,
+            Collection<ClusterNode> top,
+            @Nullable DiscoverySpiCustomMessage data
+        ) {
             DiscoverySpiListener lsnr = spi.lsnr;
 
-            DebugLogger log = type == EVT_NODE_METRICS_UPDATED ? traceLog : 
debugLog;
+            DebugLogger debugLog = type == EVT_NODE_METRICS_UPDATED ? traceLog 
: ClientImpl.this.debugLog;
 
             if (lsnr != null) {
-                if (log.isDebugEnabled())
-                    log.debug("Discovery notification [node=" + node + ", 
type=" + U.gridEventName(type) +
+                if (debugLog.isDebugEnabled())
+                    debugLog.debug("Discovery notification [node=" + node + ", 
type=" + U.gridEventName(type) +
                         ", topVer=" + topVer + ']');
 
-                lsnr.onDiscovery(type, topVer, node, top, new 
TreeMap<>(topHist), data);
+                try {
+                    lsnr.onDiscovery(type, topVer, node, top, new 
TreeMap<>(topHist), data).get();
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IgniteException("Failed to wait for discovery 
listener notification", e);
+                }
             }
-            else if (log.isDebugEnabled())
-                log.debug("Skipped discovery notification [node=" + node + ", 
type=" + U.gridEventName(type) +
+            else if (debugLog.isDebugEnabled())
+                debugLog.debug("Skipped discovery notification [node=" + node 
+ ", type=" + U.gridEventName(type) +
                     ", topVer=" + topVer + ']');
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 3e7537a..f82af61 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -70,6 +70,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.IgnitionEx;
@@ -2822,7 +2823,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 processDiscardMessage((TcpDiscoveryDiscardMessage)msg);
 
             else if (msg instanceof TcpDiscoveryCustomEventMessage)
-                processCustomMessage((TcpDiscoveryCustomEventMessage)msg);
+                processCustomMessage((TcpDiscoveryCustomEventMessage)msg, 
false);
 
             else if (msg instanceof TcpDiscoveryClientPingRequest)
                 processClientPingRequest((TcpDiscoveryClientPingRequest)msg);
@@ -5406,8 +5407,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         /**
          * @param msg Message.
+         * @param waitForNotification If {@code true} then thread will wait 
when discovery event notification has finished.
          */
-        private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
+        private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, 
boolean waitForNotification) {
             if (isLocalNodeCoordinator()) {
                 boolean delayMsg;
 
@@ -5441,14 +5443,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                     msg.topologyVersion(ring.topologyVersion());
 
                     if (pendingMsgs.procCustomMsgs.add(msg.id())) {
-                        notifyDiscoveryListener(msg);
+                        notifyDiscoveryListener(msg, waitForNotification);
 
                         if (sendMessageToRemotes(msg))
                             sendMessageAcrossRing(msg);
                         else {
                             registerPendingMessage(msg);
 
-                            processCustomMessage(msg);
+                            processCustomMessage(msg, waitForNotification);
                         }
                     }
 
@@ -5478,7 +5480,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                                 ackMsg.topologyVersion(msg.topologyVersion());
 
-                                processCustomMessage(ackMsg);
+                                processCustomMessage(ackMsg, 
waitForNotification);
                             }
                             catch (IgniteCheckedException e) {
                                 U.error(log, "Failed to marshal discovery 
custom message.", e);
@@ -5505,7 +5507,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     assert msg.topologyVersion() == ring.topologyVersion() :
                         "msg: " + msg + ", topVer=" + ring.topologyVersion();
 
-                    notifyDiscoveryListener(msg);
+                    notifyDiscoveryListener(msg, waitForNotification);
                 }
 
                 if (msg.verified())
@@ -5581,7 +5583,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 TcpDiscoveryCustomEventMessage msg;
 
                 while ((msg = pollPendingCustomeMessage()) != null)
-                    processCustomMessage(msg);
+                    processCustomMessage(msg, true);
             }
         }
 
@@ -5596,8 +5598,9 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         /**
          * @param msg Custom message.
+         * @param waitForNotification If {@code true} thread will wait when 
discovery event notification has finished.
          */
-        private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage 
msg) {
+        private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage 
msg, boolean waitForNotification) {
             DiscoverySpiListener lsnr = spi.lsnr;
 
             TcpDiscoverySpiState spiState = spiStateCopy();
@@ -5613,23 +5616,40 @@ class ServerImpl extends TcpDiscoveryImpl {
             if (lsnr != null && (spiState == CONNECTED || spiState == 
DISCONNECTING)) {
                 TcpDiscoveryNode node = ring.node(msg.creatorNodeId());
 
-                if (node != null) {
-                    try {
-                        DiscoverySpiCustomMessage msgObj = 
msg.message(spi.marshaller(),
-                            
U.resolveClassLoader(spi.ignite().configuration()));
+                if (node == null)
+                    return;
 
-                        
lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
-                            msg.topologyVersion(),
-                            node,
-                            snapshot,
-                            hist,
-                            msgObj);
+                DiscoverySpiCustomMessage msgObj;
 
-                        if (msgObj.isMutable())
-                            msg.message(msgObj, U.marshal(spi.marshaller(), 
msgObj));
+                try {
+                    msgObj = msg.message(spi.marshaller(), 
U.resolveClassLoader(spi.ignite().configuration()));
+                }
+                catch (Throwable t) {
+                    throw new IgniteException("Failed to unmarshal discovery 
custom message: " + msg, t);
+                }
+
+                IgniteInternalFuture fut = 
lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
+                    msg.topologyVersion(),
+                    node,
+                    snapshot,
+                    hist,
+                    msgObj);
+
+                if (waitForNotification || msgObj.isMutable()) {
+                    try {
+                        fut.get();
                     }
-                    catch (Throwable e) {
-                        U.error(log, "Failed to unmarshal discovery custom 
message.", e);
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException("Failed to wait for 
discovery listener notification", e);
+                    }
+                }
+
+                if (msgObj.isMutable()) {
+                    try {
+                        msg.message(msgObj, U.marshal(spi.marshaller(), 
msgObj));
+                    }
+                    catch (Throwable t) {
+                        throw new IgniteException("Failed to marshal mutable 
discovery message: " + msgObj, t);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
index 80d0fd1..64c7817 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java
@@ -31,13 +31,13 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.BinaryConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 
@@ -193,7 +193,7 @@ public class IgniteMarshallerCacheClassNameConflictTest 
extends GridCommonAbstra
             }
 
             /** {@inheritDoc} */
-            @Override public void onDiscovery(
+            @Override public IgniteInternalFuture onDiscovery(
                     int type,
                     long topVer,
                     ClusterNode node,
@@ -219,7 +219,9 @@ public class IgniteMarshallerCacheClassNameConflictTest 
extends GridCommonAbstra
                 }
 
                 if (delegate != null)
-                    delegate.onDiscovery(type, topVer, node, topSnapshot, 
topHist, spiCustomMsg);
+                    return delegate.onDiscovery(type, topVer, node, 
topSnapshot, topHist, spiCustomMsg);
+
+                return new GridFinishedFuture();
             }
 
             /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
index 49f5311..7aa61eb 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java
@@ -34,8 +34,10 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.PersistentStoreConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
 import org.apache.ignite.spi.discovery.DiscoverySpiListener;
@@ -243,7 +245,7 @@ public class IgniteMarshallerCacheFSRestoreTest extends 
GridCommonAbstractTest {
             }
 
             /** {@inheritDoc} */
-            @Override public void onDiscovery(
+            @Override public IgniteInternalFuture onDiscovery(
                 int type,
                 long topVer,
                 ClusterNode node,
@@ -267,7 +269,9 @@ public class IgniteMarshallerCacheFSRestoreTest extends 
GridCommonAbstractTest {
                 }
 
                 if (delegate != null)
-                    delegate.onDiscovery(type, topVer, node, topSnapshot, 
topHist, spiCustomMsg);
+                    return delegate.onDiscovery(type, topVer, node, 
topSnapshot, topHist, spiCustomMsg);
+
+                return new GridFinishedFuture();
             }
 
             /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/standbycluster/reconnect/IgniteAbstractStandByClientReconnectTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/standbycluster/reconnect/IgniteAbstractStandByClientReconnectTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/standbycluster/reconnect/IgniteAbstractStandByClientReconnectTest.java
index f7b0ed7..d01e11a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/standbycluster/reconnect/IgniteAbstractStandByClientReconnectTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/standbycluster/reconnect/IgniteAbstractStandByClientReconnectTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -388,7 +389,7 @@ public abstract class 
IgniteAbstractStandByClientReconnectTest extends GridCommo
         }
 
         /** {@inheritDoc} */
-        @Override public void onDiscovery(
+        @Override public IgniteInternalFuture onDiscovery(
             int type,
             long topVer,
             ClusterNode node,
@@ -396,9 +397,9 @@ public abstract class 
IgniteAbstractStandByClientReconnectTest extends GridCommo
             @Nullable Map<Long, Collection<ClusterNode>> topHist,
             @Nullable DiscoverySpiCustomMessage data
         ) {
-            delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, 
data);
+            IgniteInternalFuture fut = delegate.onDiscovery(type, topVer, 
node, topSnapshot, topHist, data);
 
-            if (type == EVT_CLIENT_NODE_DISCONNECTED)
+            if (type == EVT_CLIENT_NODE_DISCONNECTED) {
                 try {
                     System.out.println("Await cluster change state");
 
@@ -407,6 +408,9 @@ public abstract class 
IgniteAbstractStandByClientReconnectTest extends GridCommo
                 catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }
+            }
+
+            return fut;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
index fa1a2ae..e59d24a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java
@@ -34,6 +34,8 @@ import javax.management.ObjectName;
 import mx4j.tools.adaptor.http.HttpAdaptor;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.spi.IgniteSpi;
@@ -160,10 +162,17 @@ public abstract class AbstractDiscoverySelfTest<T extends 
IgniteSpi> extends Gri
         }
 
         /** {@inheritDoc} */
-        @Override public void onDiscovery(int type, long topVer, ClusterNode 
node, Collection<ClusterNode> topSnapshot,
-            Map<Long, Collection<ClusterNode>> topHist, @Nullable 
DiscoverySpiCustomMessage data) {
+        @Override public IgniteInternalFuture onDiscovery(
+            int type,
+            long topVer,
+            ClusterNode node,
+            Collection<ClusterNode> topSnapshot,
+            Map<Long, Collection<ClusterNode>> topHist, @Nullable 
DiscoverySpiCustomMessage data
+        ) {
             if (type == EVT_NODE_METRICS_UPDATED)
                 isMetricsUpdate = true;
+
+            return new GridFinishedFuture();
         }
     }
 
@@ -237,13 +246,15 @@ public abstract class AbstractDiscoverySelfTest<T extends 
IgniteSpi> extends Gri
                     // No-op.
                 }
 
-                @Override public void onDiscovery(int type, long topVer, 
ClusterNode node,
+                @Override public IgniteInternalFuture onDiscovery(int type, 
long topVer, ClusterNode node,
                     Collection<ClusterNode> topSnapshot, Map<Long, 
Collection<ClusterNode>> topHist,
                     @Nullable DiscoverySpiCustomMessage data) {
                     // If METRICS_UPDATED came from local node
                     if (type == EVT_NODE_METRICS_UPDATED
                         && node.id().equals(spi.getLocalNode().id()))
                         spiCnt.addAndGet(1);
+
+                    return new GridFinishedFuture();
                 }
             };
 
@@ -405,7 +416,7 @@ public abstract class AbstractDiscoverySelfTest<T extends 
IgniteSpi> extends Gri
                     }
 
                     @SuppressWarnings({"NakedNotify"})
-                    @Override public void onDiscovery(int type, long topVer, 
ClusterNode node,
+                    @Override public IgniteInternalFuture onDiscovery(int 
type, long topVer, ClusterNode node,
                         Collection<ClusterNode> topSnapshot, Map<Long, 
Collection<ClusterNode>> topHist,
                         @Nullable DiscoverySpiCustomMessage data) {
                         info("Discovery event [type=" + type + ", node=" + 
node + ']');
@@ -413,6 +424,8 @@ public abstract class AbstractDiscoverySelfTest<T extends 
IgniteSpi> extends Gri
                         synchronized (mux) {
                             mux.notifyAll();
                         }
+
+                        return new GridFinishedFuture();
                     }
                 });
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index c6d9527..77f5324 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -159,9 +159,10 @@ public final class GridTestUtils {
         }
 
         /** {@inheritDoc} */
-        @Override public void onDiscovery(int type, long topVer, ClusterNode 
node, Collection<ClusterNode> topSnapshot, @Nullable Map<Long, 
Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage 
spiCustomMsg) {
+        @Override public IgniteInternalFuture onDiscovery(int type, long 
topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, @Nullable 
Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage 
spiCustomMsg) {
             hook.handleDiscoveryMessage(spiCustomMsg);
-            delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, 
spiCustomMsg);
+
+            return delegate.onDiscovery(type, topVer, node, topSnapshot, 
topHist, spiCustomMsg);
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
index 2427c77..b89fbe4 100644
--- 
a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
+++ 
b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java
@@ -467,12 +467,17 @@ public class ZookeeperDiscoveryImpl {
         if (rtState.joined) {
             assert rtState.evtsData != null;
 
-            lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
-                rtState.evtsData.topVer,
-                locNode,
-                rtState.top.topologySnapshot(),
-                Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                null);
+            try {
+                lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED,
+                    rtState.evtsData.topVer,
+                    locNode,
+                    rtState.top.topologySnapshot(),
+                    Collections.emptyMap(),
+                    null).get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException("Failed to wait for discovery 
listener notification", e);
+            }
         }
 
         try {
@@ -533,14 +538,19 @@ public class ZookeeperDiscoveryImpl {
         List<ClusterNode> nodes = rtState.top.topologySnapshot();
 
         if (nodes.isEmpty())
-            nodes = Collections.singletonList((ClusterNode)locNode);
+            nodes = Collections.singletonList(locNode);
 
-        lsnr.onDiscovery(EVT_NODE_SEGMENTED,
-            rtState.evtsData != null ? rtState.evtsData.topVer : 1L,
-            locNode,
-            nodes,
-            Collections.<Long, Collection<ClusterNode>>emptyMap(),
-            null);
+        try {
+            lsnr.onDiscovery(EVT_NODE_SEGMENTED,
+                rtState.evtsData != null ? rtState.evtsData.topVer : 1L,
+                locNode,
+                nodes,
+                Collections.emptyMap(),
+                null).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to wait for discovery listener 
notification", e);
+        }
     }
 
     /**
@@ -2242,12 +2252,19 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = 
Collections.singletonList((ClusterNode)locNode);
 
-        lsnr.onDiscovery(EVT_NODE_JOINED,
-            1L,
-            locNode,
-            topSnapshot,
-            Collections.<Long, Collection<ClusterNode>>emptyMap(),
-            null);
+        try {
+            lsnr.onDiscovery(EVT_NODE_JOINED,
+                1L,
+                locNode,
+                topSnapshot,
+                Collections.emptyMap(),
+                null).get();
+        }
+        catch (IgniteCheckedException e) {
+            joinFut.onDone(e);
+
+            throw new IgniteException("Failed to wait for discovery listener 
notification", e);
+        }
 
         // Reset events (this is also notification for clients left from 
previous cluster).
         rtState.zkClient.setData(zkPaths.evtsPath, 
marshalZip(rtState.evtsData), -1);
@@ -2942,16 +2959,16 @@ public class ZookeeperDiscoveryImpl {
                 joinedEvtData.topVer,
                 locNode,
                 topSnapshot,
-                Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                null);
+                Collections.emptyMap(),
+                null).get();
 
             if (rtState.prevJoined) {
                 lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED,
                     joinedEvtData.topVer,
                     locNode,
                     topSnapshot,
-                    Collections.<Long, Collection<ClusterNode>>emptyMap(),
-                    null);
+                    Collections.emptyMap(),
+                    null).get();
 
                 U.quietAndWarn(log, "Client node was reconnected after it was 
already considered failed [locId=" + locNode.id() + ']');
             }
@@ -3402,13 +3419,23 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
-        lsnr.onDiscovery(
+        IgniteInternalFuture fut = lsnr.onDiscovery(
             DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT,
             evtData.topologyVersion(),
             sndNode,
             topSnapshot,
-            Collections.<Long, Collection<ClusterNode>>emptyMap(),
-            msg);
+            Collections.emptyMap(),
+            msg
+        );
+
+        if (msg != null && msg.isMutable()) {
+            try {
+                fut.get();
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException("Failed to wait for discovery 
listener notification", e);
+            }
+        }
     }
 
     /**
@@ -3426,12 +3453,17 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
-        lsnr.onDiscovery(EVT_NODE_JOINED,
-            joinedEvtData.topVer,
-            joinedNode,
-            topSnapshot,
-            Collections.<Long, Collection<ClusterNode>>emptyMap(),
-            null);
+        try {
+            lsnr.onDiscovery(EVT_NODE_JOINED,
+                joinedEvtData.topVer,
+                joinedNode,
+                topSnapshot,
+                Collections.emptyMap(),
+                null).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to wait for discovery listener 
notification", e);
+        }
     }
 
     /**
@@ -3457,12 +3489,17 @@ public class ZookeeperDiscoveryImpl {
 
         final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot();
 
-        lsnr.onDiscovery(EVT_NODE_FAILED,
-            topVer,
-            failedNode,
-            topSnapshot,
-            Collections.<Long, Collection<ClusterNode>>emptyMap(),
-            null);
+        try {
+            lsnr.onDiscovery(EVT_NODE_FAILED,
+                topVer,
+                failedNode,
+                topSnapshot,
+                Collections.emptyMap(),
+                null).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to wait for discovery listener 
notification", e);
+        }
 
         stats.onNodeFailed();
     }

Reply via email to