IGNITE-9398 Reduce time on CustomDiscoveryMessage processing by discovery message worker. Fixes #4636.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b13f373c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b13f373c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b13f373c Branch: refs/heads/ignite-7251 Commit: b13f373c24bf2538603a7f3fee5af70405ad8356 Parents: fb97066 Author: Pavel Kovalenko <[email protected]> Authored: Fri Sep 14 18:02:36 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Sep 14 18:02:36 2018 +0300 ---------------------------------------------------------------------- .../discovery/GridDiscoveryManager.java | 120 +++++++++++++++++-- .../cluster/GridClusterStateProcessor.java | 9 +- .../org/apache/ignite/spi/IgniteSpiAdapter.java | 3 +- .../spi/discovery/DiscoverySpiListener.java | 5 +- .../ignite/spi/discovery/tcp/ClientImpl.java | 33 +++-- .../ignite/spi/discovery/tcp/ServerImpl.java | 64 ++++++---- ...iteMarshallerCacheClassNameConflictTest.java | 10 +- .../IgniteMarshallerCacheFSRestoreTest.java | 8 +- ...gniteAbstractStandByClientReconnectTest.java | 10 +- .../discovery/AbstractDiscoverySelfTest.java | 21 +++- .../ignite/testframework/GridTestUtils.java | 5 +- .../zk/internal/ZookeeperDiscoveryImpl.java | 113 +++++++++++------ 12 files changed, 300 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index d19e08b..48e3318 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -70,6 +70,7 @@ import org.apache.ignite.internal.GridComponent; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.NodeOrderComparator; @@ -103,6 +104,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.S; @@ -221,6 +223,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { /** Discovery event worker. */ private final DiscoveryWorker discoWrk = new DiscoveryWorker(); + /** Discovery event notyfier worker. */ + private final DiscoveryMessageNotifyerWorker discoNotifierWrk = new DiscoveryMessageNotifyerWorker(); + /** Network segment check worker. */ private SegmentCheckWorker segChkWrk; @@ -583,16 +588,23 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } - @Override public void onDiscovery( + @Override public IgniteInternalFuture onDiscovery( final int type, final long topVer, final ClusterNode node, final Collection<ClusterNode> topSnapshot, final Map<Long, Collection<ClusterNode>> snapshots, - @Nullable DiscoverySpiCustomMessage spiCustomMsg) { - synchronized (discoEvtMux) { - onDiscovery0(type, topVer, node, topSnapshot, snapshots, spiCustomMsg); - } + @Nullable DiscoverySpiCustomMessage spiCustomMsg + ) { + GridFutureAdapter notificationFut = new GridFutureAdapter(); + + discoNotifierWrk.submit(notificationFut, () -> { + synchronized (discoEvtMux) { + onDiscovery0(type, topVer, node, topSnapshot, snapshots, spiCustomMsg); + } + }); + + return notificationFut; } /** @@ -919,6 +931,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } }); + new IgniteThread(discoNotifierWrk).start(); + startSpi(); registeredDiscoSpi = true; @@ -1698,6 +1712,10 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { U.join(discoWrk, log); + U.cancel(discoNotifierWrk); + + U.join(discoNotifierWrk, log); + // Stop SPI itself. stopSpi(); @@ -2664,6 +2682,86 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } } + /** + * + */ + private class DiscoveryMessageNotifyerWorker extends GridWorker { + /** Queue. */ + private final BlockingQueue<T2<GridFutureAdapter, Runnable>> queue = new LinkedBlockingQueue<>(); + + /** + * Default constructor. + */ + protected DiscoveryMessageNotifyerWorker() { + super(ctx.igniteInstanceName(), "disco-notyfier-worker", GridDiscoveryManager.this.log, ctx.workersRegistry()); + } + + /** + * + */ + private void body0() throws InterruptedException { + T2<GridFutureAdapter, Runnable> notification = queue.take(); + + try { + notification.get2().run(); + } + finally { + notification.get1().onDone(); + } + } + + /** + * @param cmd Command. + */ + public synchronized void submit(GridFutureAdapter notificationFut, Runnable cmd) { + if (isCancelled()) { + notificationFut.onDone(); + + return; + } + + queue.add(new T2<>(notificationFut, cmd)); + } + + /** + * Cancel thread execution and completes all notification futures. + */ + @Override public synchronized void cancel() { + super.cancel(); + + while (!queue.isEmpty()) { + T2<GridFutureAdapter, Runnable> notification = queue.poll(); + + if (notification != null) + notification.get1().onDone(); + } + } + + /** {@inheritDoc} */ + @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { + while (!isCancelled()) { + try { + body0(); + } + catch (InterruptedException e) { + if (!isCancelled) + ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, e)); + + throw e; + } + catch (Throwable t) { + U.error(log, "Exception in discovery notyfier worker thread.", t); + + FailureType type = t instanceof OutOfMemoryError ? CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION; + + ctx.failure().process(new FailureContext(type, t)); + + throw t; + } + } + } + } + /** Worker for discovery events. */ private class DiscoveryWorker extends GridWorker { /** */ @@ -2772,15 +2870,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { throw e; } catch (Throwable t) { - U.error(log, "Exception in discovery worker thread.", t); + U.error(log, "Exception in discovery event worker thread.", t); - if (t instanceof Error) { - FailureType type = t instanceof OutOfMemoryError ? CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION; + FailureType type = t instanceof OutOfMemoryError ? CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION; - ctx.failure().process(new FailureContext(type, t)); + ctx.failure().process(new FailureContext(type, t)); - throw t; - } + throw t; } } } @@ -2795,7 +2891,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { AffinityTopologyVersion topVer = evt.get2(); - if (type == EVT_NODE_METRICS_UPDATED && topVer.compareTo(discoCache.version()) < 0) + if (type == EVT_NODE_METRICS_UPDATED && (discoCache == null || topVer.compareTo(discoCache.version()) < 0)) return; ClusterNode node = evt.get3(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index fea751c..727a372 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -511,13 +511,16 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I transitionFuts.put(msg.requestId(), new GridFutureAdapter<Void>()); + DiscoveryDataClusterState prevState = globalState; + globalState = DiscoveryDataClusterState.createTransitionState( - globalState, + prevState, msg.activate(), - msg.activate() ? msg.baselineTopology() : globalState.baselineTopology(), + msg.activate() ? msg.baselineTopology() : prevState.baselineTopology(), msg.requestId(), topVer, - nodeIds); + nodeIds + ); if (msg.forceChangeBaselineTopology()) globalState.setTransitionResult(msg.requestId(), msg.activate()); http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index bad0186..a7e6e8c 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -547,7 +547,8 @@ public abstract class IgniteSpiAdapter implements IgniteSpi { if (rmtCls == null) { if (!optional && starting) throw new IgniteSpiException("Remote SPI with the same name is not configured" + tipStr + - " [name=" + name + ", loc=" + locCls + ']'); + " [name=" + name + ", loc=" + locCls + ", locNode=" + spiCtx.localNode() + ", rmt=" + rmtCls + + ", rmtNode=" + node + ']'); sb.a(format(">>> Remote SPI with the same name is not configured: " + name, locCls)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java index 2b2ac94..519a235 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiListener.java @@ -21,6 +21,7 @@ import java.util.Collection; import java.util.Map; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.IgniteInternalFuture; import org.jetbrains.annotations.Nullable; /** @@ -48,8 +49,10 @@ public interface DiscoverySpiListener { * {@code EVT_NODE_JOINED}, then joined node will be in snapshot). * @param topHist Topology snapshots history. * @param data Data for custom event. + * + * @return A future that will be completed when notification process has finished. */ - public void onDiscovery( + public IgniteInternalFuture onDiscovery( int type, long topVer, ClusterNode node, http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index c88ef20..312f737 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -470,7 +470,12 @@ class ClientImpl extends TcpDiscoveryImpl { Collection<ClusterNode> top = updateTopologyHistory(topVer + 1, null); - lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null); + try { + lsnr.onDiscovery(EVT_NODE_FAILED, topVer, n, top, new TreeMap<>(topHist), null).get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for discovery listener notification", e); + } } } @@ -2483,21 +2488,31 @@ class ClientImpl extends TcpDiscoveryImpl { * @param top Topology snapshot. * @param data Optional custom message data. */ - private void notifyDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> top, - @Nullable DiscoverySpiCustomMessage data) { + private void notifyDiscovery( + int type, + long topVer, + ClusterNode node, + Collection<ClusterNode> top, + @Nullable DiscoverySpiCustomMessage data + ) { DiscoverySpiListener lsnr = spi.lsnr; - DebugLogger log = type == EVT_NODE_METRICS_UPDATED ? traceLog : debugLog; + DebugLogger debugLog = type == EVT_NODE_METRICS_UPDATED ? traceLog : ClientImpl.this.debugLog; if (lsnr != null) { - if (log.isDebugEnabled()) - log.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) + + if (debugLog.isDebugEnabled()) + debugLog.debug("Discovery notification [node=" + node + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); - lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), data); + try { + lsnr.onDiscovery(type, topVer, node, top, new TreeMap<>(topHist), data).get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for discovery listener notification", e); + } } - else if (log.isDebugEnabled()) - log.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) + + else if (debugLog.isDebugEnabled()) + debugLog.debug("Skipped discovery notification [node=" + node + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 3e7537a..f82af61 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -70,6 +70,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.IgnitionEx; @@ -2822,7 +2823,7 @@ class ServerImpl extends TcpDiscoveryImpl { processDiscardMessage((TcpDiscoveryDiscardMessage)msg); else if (msg instanceof TcpDiscoveryCustomEventMessage) - processCustomMessage((TcpDiscoveryCustomEventMessage)msg); + processCustomMessage((TcpDiscoveryCustomEventMessage)msg, false); else if (msg instanceof TcpDiscoveryClientPingRequest) processClientPingRequest((TcpDiscoveryClientPingRequest)msg); @@ -5406,8 +5407,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** * @param msg Message. + * @param waitForNotification If {@code true} then thread will wait when discovery event notification has finished. */ - private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { + private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean waitForNotification) { if (isLocalNodeCoordinator()) { boolean delayMsg; @@ -5441,14 +5443,14 @@ class ServerImpl extends TcpDiscoveryImpl { msg.topologyVersion(ring.topologyVersion()); if (pendingMsgs.procCustomMsgs.add(msg.id())) { - notifyDiscoveryListener(msg); + notifyDiscoveryListener(msg, waitForNotification); if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); else { registerPendingMessage(msg); - processCustomMessage(msg); + processCustomMessage(msg, waitForNotification); } } @@ -5478,7 +5480,7 @@ class ServerImpl extends TcpDiscoveryImpl { ackMsg.topologyVersion(msg.topologyVersion()); - processCustomMessage(ackMsg); + processCustomMessage(ackMsg, waitForNotification); } catch (IgniteCheckedException e) { U.error(log, "Failed to marshal discovery custom message.", e); @@ -5505,7 +5507,7 @@ class ServerImpl extends TcpDiscoveryImpl { assert msg.topologyVersion() == ring.topologyVersion() : "msg: " + msg + ", topVer=" + ring.topologyVersion(); - notifyDiscoveryListener(msg); + notifyDiscoveryListener(msg, waitForNotification); } if (msg.verified()) @@ -5581,7 +5583,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryCustomEventMessage msg; while ((msg = pollPendingCustomeMessage()) != null) - processCustomMessage(msg); + processCustomMessage(msg, true); } } @@ -5596,8 +5598,9 @@ class ServerImpl extends TcpDiscoveryImpl { /** * @param msg Custom message. + * @param waitForNotification If {@code true} thread will wait when discovery event notification has finished. */ - private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg) { + private void notifyDiscoveryListener(TcpDiscoveryCustomEventMessage msg, boolean waitForNotification) { DiscoverySpiListener lsnr = spi.lsnr; TcpDiscoverySpiState spiState = spiStateCopy(); @@ -5613,23 +5616,40 @@ class ServerImpl extends TcpDiscoveryImpl { if (lsnr != null && (spiState == CONNECTED || spiState == DISCONNECTING)) { TcpDiscoveryNode node = ring.node(msg.creatorNodeId()); - if (node != null) { - try { - DiscoverySpiCustomMessage msgObj = msg.message(spi.marshaller(), - U.resolveClassLoader(spi.ignite().configuration())); + if (node == null) + return; - lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, - msg.topologyVersion(), - node, - snapshot, - hist, - msgObj); + DiscoverySpiCustomMessage msgObj; - if (msgObj.isMutable()) - msg.message(msgObj, U.marshal(spi.marshaller(), msgObj)); + try { + msgObj = msg.message(spi.marshaller(), U.resolveClassLoader(spi.ignite().configuration())); + } + catch (Throwable t) { + throw new IgniteException("Failed to unmarshal discovery custom message: " + msg, t); + } + + IgniteInternalFuture fut = lsnr.onDiscovery(DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, + msg.topologyVersion(), + node, + snapshot, + hist, + msgObj); + + if (waitForNotification || msgObj.isMutable()) { + try { + fut.get(); } - catch (Throwable e) { - U.error(log, "Failed to unmarshal discovery custom message.", e); + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for discovery listener notification", e); + } + } + + if (msgObj.isMutable()) { + try { + msg.message(msgObj, U.marshal(spi.marshaller(), msgObj)); + } + catch (Throwable t) { + throw new IgniteException("Failed to marshal mutable discovery message: " + msgObj, t); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java index 80d0fd1..64c7817 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheClassNameConflictTest.java @@ -31,13 +31,13 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.BinaryConfiguration; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -193,7 +193,7 @@ public class IgniteMarshallerCacheClassNameConflictTest extends GridCommonAbstra } /** {@inheritDoc} */ - @Override public void onDiscovery( + @Override public IgniteInternalFuture onDiscovery( int type, long topVer, ClusterNode node, @@ -219,7 +219,9 @@ public class IgniteMarshallerCacheClassNameConflictTest extends GridCommonAbstra } if (delegate != null) - delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg); + return delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg); + + return new GridFinishedFuture(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java index 49f5311..7aa61eb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheFSRestoreTest.java @@ -34,8 +34,10 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage; import org.apache.ignite.spi.discovery.DiscoverySpiListener; @@ -243,7 +245,7 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public void onDiscovery( + @Override public IgniteInternalFuture onDiscovery( int type, long topVer, ClusterNode node, @@ -267,7 +269,9 @@ public class IgniteMarshallerCacheFSRestoreTest extends GridCommonAbstractTest { } if (delegate != null) - delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg); + return delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg); + + return new GridFinishedFuture(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/standbycluster/reconnect/IgniteAbstractStandByClientReconnectTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/standbycluster/reconnect/IgniteAbstractStandByClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/standbycluster/reconnect/IgniteAbstractStandByClientReconnectTest.java index f7b0ed7..d01e11a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/standbycluster/reconnect/IgniteAbstractStandByClientReconnectTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/standbycluster/reconnect/IgniteAbstractStandByClientReconnectTest.java @@ -31,6 +31,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgnitePredicate; @@ -388,7 +389,7 @@ public abstract class IgniteAbstractStandByClientReconnectTest extends GridCommo } /** {@inheritDoc} */ - @Override public void onDiscovery( + @Override public IgniteInternalFuture onDiscovery( int type, long topVer, ClusterNode node, @@ -396,9 +397,9 @@ public abstract class IgniteAbstractStandByClientReconnectTest extends GridCommo @Nullable Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage data ) { - delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, data); + IgniteInternalFuture fut = delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, data); - if (type == EVT_CLIENT_NODE_DISCONNECTED) + if (type == EVT_CLIENT_NODE_DISCONNECTED) { try { System.out.println("Await cluster change state"); @@ -407,6 +408,9 @@ public abstract class IgniteAbstractStandByClientReconnectTest extends GridCommo catch (InterruptedException e) { throw new RuntimeException(e); } + } + + return fut; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java index fa1a2ae..e59d24a 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/AbstractDiscoverySelfTest.java @@ -34,6 +34,8 @@ import javax.management.ObjectName; import mx4j.tools.adaptor.http.HttpAdaptor; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.marshaller.Marshaller; import org.apache.ignite.spi.IgniteSpi; @@ -160,10 +162,17 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri } /** {@inheritDoc} */ - @Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, - Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage data) { + @Override public IgniteInternalFuture onDiscovery( + int type, + long topVer, + ClusterNode node, + Collection<ClusterNode> topSnapshot, + Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage data + ) { if (type == EVT_NODE_METRICS_UPDATED) isMetricsUpdate = true; + + return new GridFinishedFuture(); } } @@ -237,13 +246,15 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri // No-op. } - @Override public void onDiscovery(int type, long topVer, ClusterNode node, + @Override public IgniteInternalFuture onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage data) { // If METRICS_UPDATED came from local node if (type == EVT_NODE_METRICS_UPDATED && node.id().equals(spi.getLocalNode().id())) spiCnt.addAndGet(1); + + return new GridFinishedFuture(); } }; @@ -405,7 +416,7 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri } @SuppressWarnings({"NakedNotify"}) - @Override public void onDiscovery(int type, long topVer, ClusterNode node, + @Override public IgniteInternalFuture onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage data) { info("Discovery event [type=" + type + ", node=" + node + ']'); @@ -413,6 +424,8 @@ public abstract class AbstractDiscoverySelfTest<T extends IgniteSpi> extends Gri synchronized (mux) { mux.notifyAll(); } + + return new GridFinishedFuture(); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java index c6d9527..77f5324 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java @@ -159,9 +159,10 @@ public final class GridTestUtils { } /** {@inheritDoc} */ - @Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, @Nullable Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage spiCustomMsg) { + @Override public IgniteInternalFuture onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, @Nullable Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage spiCustomMsg) { hook.handleDiscoveryMessage(spiCustomMsg); - delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg); + + return delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/b13f373c/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java ---------------------------------------------------------------------- diff --git a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java index 2427c77..b89fbe4 100644 --- a/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java +++ b/modules/zookeeper/src/main/java/org/apache/ignite/spi/discovery/zk/internal/ZookeeperDiscoveryImpl.java @@ -467,12 +467,17 @@ public class ZookeeperDiscoveryImpl { if (rtState.joined) { assert rtState.evtsData != null; - lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED, - rtState.evtsData.topVer, - locNode, - rtState.top.topologySnapshot(), - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + try { + lsnr.onDiscovery(EVT_CLIENT_NODE_DISCONNECTED, + rtState.evtsData.topVer, + locNode, + rtState.top.topologySnapshot(), + Collections.emptyMap(), + null).get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for discovery listener notification", e); + } } try { @@ -533,14 +538,19 @@ public class ZookeeperDiscoveryImpl { List<ClusterNode> nodes = rtState.top.topologySnapshot(); if (nodes.isEmpty()) - nodes = Collections.singletonList((ClusterNode)locNode); + nodes = Collections.singletonList(locNode); - lsnr.onDiscovery(EVT_NODE_SEGMENTED, - rtState.evtsData != null ? rtState.evtsData.topVer : 1L, - locNode, - nodes, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + try { + lsnr.onDiscovery(EVT_NODE_SEGMENTED, + rtState.evtsData != null ? rtState.evtsData.topVer : 1L, + locNode, + nodes, + Collections.emptyMap(), + null).get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for discovery listener notification", e); + } } /** @@ -2242,12 +2252,19 @@ public class ZookeeperDiscoveryImpl { final List<ClusterNode> topSnapshot = Collections.singletonList((ClusterNode)locNode); - lsnr.onDiscovery(EVT_NODE_JOINED, - 1L, - locNode, - topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + try { + lsnr.onDiscovery(EVT_NODE_JOINED, + 1L, + locNode, + topSnapshot, + Collections.emptyMap(), + null).get(); + } + catch (IgniteCheckedException e) { + joinFut.onDone(e); + + throw new IgniteException("Failed to wait for discovery listener notification", e); + } // Reset events (this is also notification for clients left from previous cluster). rtState.zkClient.setData(zkPaths.evtsPath, marshalZip(rtState.evtsData), -1); @@ -2942,16 +2959,16 @@ public class ZookeeperDiscoveryImpl { joinedEvtData.topVer, locNode, topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + Collections.emptyMap(), + null).get(); if (rtState.prevJoined) { lsnr.onDiscovery(EVT_CLIENT_NODE_RECONNECTED, joinedEvtData.topVer, locNode, topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + Collections.emptyMap(), + null).get(); U.quietAndWarn(log, "Client node was reconnected after it was already considered failed [locId=" + locNode.id() + ']'); } @@ -3402,13 +3419,23 @@ public class ZookeeperDiscoveryImpl { final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); - lsnr.onDiscovery( + IgniteInternalFuture fut = lsnr.onDiscovery( DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT, evtData.topologyVersion(), sndNode, topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - msg); + Collections.emptyMap(), + msg + ); + + if (msg != null && msg.isMutable()) { + try { + fut.get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for discovery listener notification", e); + } + } } /** @@ -3426,12 +3453,17 @@ public class ZookeeperDiscoveryImpl { final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); - lsnr.onDiscovery(EVT_NODE_JOINED, - joinedEvtData.topVer, - joinedNode, - topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + try { + lsnr.onDiscovery(EVT_NODE_JOINED, + joinedEvtData.topVer, + joinedNode, + topSnapshot, + Collections.emptyMap(), + null).get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for discovery listener notification", e); + } } /** @@ -3457,12 +3489,17 @@ public class ZookeeperDiscoveryImpl { final List<ClusterNode> topSnapshot = rtState.top.topologySnapshot(); - lsnr.onDiscovery(EVT_NODE_FAILED, - topVer, - failedNode, - topSnapshot, - Collections.<Long, Collection<ClusterNode>>emptyMap(), - null); + try { + lsnr.onDiscovery(EVT_NODE_FAILED, + topVer, + failedNode, + topSnapshot, + Collections.emptyMap(), + null).get(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to wait for discovery listener notification", e); + } stats.onNodeFailed(); }
