ignite-1758 Fixed issues with client reconnect handling
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6ea3b562 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6ea3b562 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6ea3b562 Branch: refs/heads/ignite-1819 Commit: 6ea3b56205de19ceac89762d9c20c3fe62ab13b9 Parents: 04964b9 Author: sboikov <[email protected]> Authored: Fri Oct 30 16:33:40 2015 +0300 Committer: sboikov <[email protected]> Committed: Fri Oct 30 16:33:40 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 3 + .../apache/ignite/internal/IgniteKernal.java | 14 +- .../processors/cache/GridCacheProcessor.java | 77 +++-- .../dht/preloader/GridDhtPreloader.java | 4 +- .../CacheObjectPortableProcessorImpl.java | 9 + .../util/nio/GridNioRecoveryDescriptor.java | 11 +- .../communication/tcp/TcpCommunicationSpi.java | 40 ++- .../ignite/spi/discovery/tcp/ClientImpl.java | 205 ++++++++----- .../ignite/spi/discovery/tcp/ServerImpl.java | 213 +++++++++---- .../messages/TcpDiscoveryAbstractMessage.java | 11 + .../messages/TcpDiscoveryNodeAddedMessage.java | 39 +++ .../IgniteClientReconnectCacheTest.java | 33 ++ .../cache/GridCacheAbstractFullApiSelfTest.java | 3 + .../CacheGetFutureHangsSelfTest.java | 8 + .../IgniteCacheClientReconnectTest.java | 2 + .../distributed/IgniteCacheManyClientsTest.java | 14 +- ...gniteClientReconnectMassiveShutdownTest.java | 303 +++++++++++++++++++ .../tcp/TcpDiscoveryMultiThreadedTest.java | 285 +++++++++++++---- 18 files changed, 1021 insertions(+), 253 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 1e7d002..de7c10b 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -355,6 +355,9 @@ public final class IgniteSystemProperties { /** Maximum size for affinity assignment history. */ public static final String IGNITE_AFFINITY_HISTORY_SIZE = "IGNITE_AFFINITY_HISTORY_SIZE"; + /** Maximum size for discovery messages history. */ + public static final String IGNITE_DISCOVERY_HISTORY_SIZE = "IGNITE_DISCOVERY_HISTORY_SIZE"; + /** Number of cache operation retries in case of topology exceptions. */ public static final String IGNITE_CACHE_RETRIES_COUNT = "IGNITE_CACHE_RETRIES_COUNT"; http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 4820a93..5a0fe16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -165,6 +165,7 @@ import org.apache.ignite.plugin.PluginNotFoundException; import org.apache.ignite.plugin.PluginProvider; import org.apache.ignite.spi.IgniteSpi; import org.apache.ignite.spi.IgniteSpiVersionCheckException; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL; @@ -3158,10 +3159,17 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { /** {@inheritDoc} */ public void dumpDebugInfo() { - U.warn(log, "Dumping debug info for node [id=" + ctx.localNodeId() + + boolean client = ctx.clientNode(); + + ClusterNode locNode = ctx.discovery().localNode(); + + UUID routerId = locNode instanceof TcpDiscoveryNode ? ((TcpDiscoveryNode)locNode).clientRouterNodeId() : null; + + U.warn(log, "Dumping debug info for node [id=" + locNode.id() + ", name=" + ctx.gridName() + - ", order=" + ctx.discovery().localNode().order() + - ", client=" + ctx.clientNode() + ']'); + ", order=" + locNode.order() + + ", client=" + client + + (client && routerId != null ? ", routerId=" + routerId : "") + ']'); ctx.cache().context().exchange().dumpDebugInfo(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 5bf4ac7..301e7d3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1803,61 +1803,80 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** {@inheritDoc} */ @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) { + boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null; + // Collect dynamically started caches to a single object. - Collection<DynamicCacheChangeRequest> reqs = - new ArrayList<>(registeredCaches.size() + registeredTemplates.size()); + Collection<DynamicCacheChangeRequest> reqs; - boolean reconnect = ctx.localNodeId().equals(nodeId) && cachesOnDisconnect != null; + Map<String, Map<UUID, Boolean>> clientNodesMap; - Map<String, DynamicCacheDescriptor> descs = reconnect ? cachesOnDisconnect : registeredCaches; + if (reconnect) { + reqs = new ArrayList<>(caches.size()); - for (DynamicCacheDescriptor desc : descs.values()) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); + clientNodesMap = U.newHashMap(caches.size()); - req.startCacheConfiguration(desc.cacheConfiguration()); + for (GridCacheAdapter<?, ?> cache : caches.values()) { + DynamicCacheDescriptor desc = cachesOnDisconnect.get(maskNull(cache.name())); - req.cacheType(desc.cacheType()); + if (desc == null) + continue; - req.deploymentId(desc.deploymentId()); + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(cache.name(), null); - reqs.add(req); - } + req.startCacheConfiguration(desc.cacheConfiguration()); - for (DynamicCacheDescriptor desc : registeredTemplates.values()) { - DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); + req.cacheType(desc.cacheType()); - req.startCacheConfiguration(desc.cacheConfiguration()); + req.deploymentId(desc.deploymentId()); - req.template(true); + reqs.add(req); - req.deploymentId(desc.deploymentId()); + Boolean nearEnabled = cache.isNear(); + + Map<UUID, Boolean> map = U.newHashMap(1); + + map.put(nodeId, nearEnabled); - reqs.add(req); + clientNodesMap.put(cache.name(), map); + } } + else { + reqs = new ArrayList<>(registeredCaches.size() + registeredTemplates.size()); - DynamicCacheChangeBatch req = new DynamicCacheChangeBatch(reqs); + for (DynamicCacheDescriptor desc : registeredCaches.values()) { + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); - Map<String, Map<UUID, Boolean>> clientNodesMap = ctx.discovery().clientNodesMap(); + req.startCacheConfiguration(desc.cacheConfiguration()); - if (reconnect) { - clientNodesMap = U.newHashMap(caches.size()); + req.cacheType(desc.cacheType()); - for (GridCacheAdapter<?, ?> cache : caches.values()) { - Boolean nearEnabled = cache.isNear(); + req.deploymentId(desc.deploymentId()); - Map<UUID, Boolean> map = U.newHashMap(1); + reqs.add(req); + } - map.put(nodeId, nearEnabled); + for (DynamicCacheDescriptor desc : registeredTemplates.values()) { + DynamicCacheChangeRequest req = new DynamicCacheChangeRequest(desc.cacheConfiguration().getName(), null); - clientNodesMap.put(cache.name(), map); + req.startCacheConfiguration(desc.cacheConfiguration()); + + req.template(true); + + req.deploymentId(desc.deploymentId()); + + reqs.add(req); } + + clientNodesMap = ctx.discovery().clientNodesMap(); } - req.clientNodes(clientNodesMap); + DynamicCacheChangeBatch batch = new DynamicCacheChangeBatch(reqs); + + batch.clientNodes(clientNodesMap); - req.clientReconnect(reconnect); + batch.clientReconnect(reconnect); - return req; + return batch; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 83867f4..356a85b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -192,9 +192,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { ClusterNode loc = cctx.localNode(); - long startTime = loc.metrics().getStartTime(); - - assert startTime > 0; + assert loc.metrics().getStartTime() > 0; final long startTopVer = loc.order(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java index 2de9d84..f0319aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java @@ -39,6 +39,8 @@ import javax.cache.processor.EntryProcessor; import javax.cache.processor.MutableEntry; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.portable.api.IgnitePortables; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cluster.ClusterNode; @@ -75,6 +77,7 @@ import org.apache.ignite.internal.util.lang.GridMapEntry; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -371,6 +374,12 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor else throw e; } + catch (CacheException e) { + if (X.hasCause(e, ClusterTopologyCheckedException.class, ClusterTopologyException.class)) + continue; + else + throw e; + } break; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index 88837de..5647239 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -193,14 +193,19 @@ public class GridNioRecoveryDescriptor { /** * Node left callback. + * + * @return {@code False} if descriptor is reserved. */ - public void onNodeLeft() { + public boolean onNodeLeft() { GridNioFuture<?>[] futs = null; synchronized (this) { nodeLeft = true; - if (!reserved && !msgFuts.isEmpty()) { + if (reserved) + return false; + + if (!msgFuts.isEmpty()) { futs = msgFuts.toArray(new GridNioFuture<?>[msgFuts.size()]); msgFuts.clear(); @@ -209,6 +214,8 @@ public class GridNioRecoveryDescriptor { if (futs != null) completeOnNodeLeft(futs); + + return true; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 5ea2c02..e8bd8a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -61,6 +61,7 @@ import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.util.GridConcurrentFactory; import org.apache.ignite.internal.util.GridSpinReadWriteLock; @@ -1358,7 +1359,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public int getOutboundMessagesQueueSize() { - return nioSrvr.outboundMessagesQueueSize(); + GridNioServer<Message> srv = nioSrvr; + + return srv != null ? srv.outboundMessagesQueueSize() : 0; } /** {@inheritDoc} */ @@ -1870,25 +1873,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter * * @param node Destination node. * @param msg Message to send. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message. * Note that this is not guaranteed that failed communication will result * in thrown exception as this is dependant on SPI implementation. */ - public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { - sendMessage0(node, msg, ackClosure); + sendMessage0(node, msg, ackC); } /** * @param node Destination node. * @param msg Message to send. - * @param ackClosure Ack closure. + * @param ackC Ack closure. * @throws org.apache.ignite.spi.IgniteSpiException Thrown in case of any error during sending the message. * Note that this is not guaranteed that failed communication will result * in thrown exception as this is dependant on SPI implementation. */ - private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackClosure) + private void sendMessage0(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { assert node != null; assert msg != null; @@ -1896,13 +1899,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isTraceEnabled()) log.trace("Sending message with ack to node [node=" + node + ", msg=" + msg + ']'); - ClusterNode localNode = getLocalNode(); + ClusterNode locNode = getLocalNode(); - if (localNode == null) + if (locNode == null) throw new IgniteSpiException("Local node has not been started or fully initialized " + "[isStopping=" + getSpiContext().isStopping() + ']'); - if (node.id().equals(localNode.id())) + if (node.id().equals(locNode.id())) notifyListener(node.id(), msg, NOOP); else { GridCommunicationClient client = null; @@ -1915,10 +1918,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter UUID nodeId = null; - if (!client.async() && !localNode.version().equals(node.version())) + if (!client.async() && !locNode.version().equals(node.version())) nodeId = node.id(); - retry = client.sendMessage(nodeId, msg, ackClosure); + retry = client.sendMessage(nodeId, msg, ackC); client.release(); @@ -2292,6 +2295,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter return null; } + if (getSpiContext().node(node.id()) == null) { + recoveryDesc.release(); + + U.closeQuiet(ch); + + throw new ClusterTopologyCheckedException("Failed to send message, " + + "node left cluster: " + node); + } + long rcvCnt = -1; SSLEngine sslEngine = null; @@ -3100,10 +3112,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter assert !left.isEmpty(); for (ClientKey id : left) { - GridNioRecoveryDescriptor recoverySnd = recoveryDescs.remove(id); + GridNioRecoveryDescriptor recoverySnd = recoveryDescs.get(id); - if (recoverySnd != null) - recoverySnd.onNodeLeft(); + if (recoverySnd != null && recoverySnd.onNodeLeft()) + recoveryDescs.remove(id); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index e4c29db..a4619c6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -270,8 +270,6 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void spiStop() throws IgniteSpiException { - timer.cancel(); - if (msgWorker != null && msgWorker.isAlive()) { // Should always be alive msgWorker.addMessage(SPI_STOP); @@ -297,6 +295,8 @@ class ClientImpl extends TcpDiscoveryImpl { U.join(sockWriter, log); U.join(sockReader, log); + timer.cancel(); + spi.printStopInfo(); } @@ -461,7 +461,8 @@ class ClientImpl extends TcpDiscoveryImpl { * @see TcpDiscoverySpi#joinTimeout */ @SuppressWarnings("BusyWait") - @Nullable private T2<Socket, Boolean> joinTopology(boolean recon, long timeout) throws IgniteSpiException, InterruptedException { + @Nullable private T2<SocketStream, Boolean> joinTopology(boolean recon, long timeout) + throws IgniteSpiException, InterruptedException { Collection<InetSocketAddress> addrs = null; long startTime = U.currentTimeMillis(); @@ -501,7 +502,7 @@ class ClientImpl extends TcpDiscoveryImpl { InetSocketAddress addr = it.next(); - T3<Socket, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr); + T3<SocketStream, Integer, Boolean> sockAndRes = sendJoinRequest(recon, addr); if (sockAndRes == null) { it.remove(); @@ -511,11 +512,11 @@ class ClientImpl extends TcpDiscoveryImpl { assert sockAndRes.get1() != null && sockAndRes.get2() != null : sockAndRes; - Socket sock = sockAndRes.get1(); + Socket sock = sockAndRes.get1().socket(); switch (sockAndRes.get2()) { case RES_OK: - return new T2<>(sock, sockAndRes.get3()); + return new T2<>(sockAndRes.get1(), sockAndRes.get3()); case RES_CONTINUE_JOIN: case RES_WAIT: @@ -548,7 +549,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param addr Address. * @return Socket, connect response and client acknowledge support flag. */ - @Nullable private T3<Socket, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) { + @Nullable private T3<SocketStream, Integer, Boolean> sendJoinRequest(boolean recon, InetSocketAddress addr) { assert addr != null; if (log.isDebugEnabled()) @@ -621,7 +622,8 @@ class ClientImpl extends TcpDiscoveryImpl { log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + ", rmtNodeId=" + rmtNodeId + ']'); - return new T3<>(sock, spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), + return new T3<>(new SocketStream(sock), + spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)), res.clientAck()); } catch (IOException | IgniteCheckedException e) { @@ -708,7 +710,7 @@ class ClientImpl extends TcpDiscoveryImpl { Collection<ClusterNode> top = topHist.get(topVer); - assert top != null : msg; + assert top != null : "Failed to find topology history [msg=" + msg + ", hist=" + topHist + ']'; return top; } @@ -765,7 +767,10 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override public void brakeConnection() { - U.closeQuiet(msgWorker.currSock); + SocketStream sockStream = msgWorker.currSock; + + if (sockStream != null) + U.closeQuiet(sockStream.socket()); } /** {@inheritDoc} */ @@ -826,7 +831,7 @@ class ClientImpl extends TcpDiscoveryImpl { private final Object mux = new Object(); /** */ - private Socket sock; + private SocketStream sockStream; /** */ private UUID rmtNodeId; @@ -838,12 +843,12 @@ class ClientImpl extends TcpDiscoveryImpl { } /** - * @param sock Socket. + * @param sockStream Socket. * @param rmtNodeId Rmt node id. */ - public void setSocket(Socket sock, UUID rmtNodeId) { + public void setSocket(SocketStream sockStream, UUID rmtNodeId) { synchronized (mux) { - this.sock = sock; + this.sockStream = sockStream; this.rmtNodeId = rmtNodeId; @@ -854,22 +859,24 @@ class ClientImpl extends TcpDiscoveryImpl { /** {@inheritDoc} */ @Override protected void body() throws InterruptedException { while (!isInterrupted()) { - Socket sock; + SocketStream sockStream; UUID rmtNodeId; synchronized (mux) { - if (this.sock == null) { + if (this.sockStream == null) { mux.wait(); continue; } - sock = this.sock; + sockStream = this.sockStream; rmtNodeId = this.rmtNodeId; } + Socket sock = sockStream.socket(); + try { - InputStream in = new BufferedInputStream(sock.getInputStream()); + InputStream in = sockStream.stream(); sock.setKeepAlive(true); sock.setTcpNoDelay(true); @@ -912,18 +919,14 @@ class ClientImpl extends TcpDiscoveryImpl { boolean ack = msg instanceof TcpDiscoveryClientAckResponse; - if (!ack) { - if (spi.ensured(msg) && joinLatch.getCount() == 0L) - lastMsgId = msg.id(); - + if (!ack) msgWorker.addMessage(msg); - } else sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg); } } catch (IOException e) { - msgWorker.addMessage(new SocketClosedMessage(sock)); + msgWorker.addMessage(new SocketClosedMessage(sockStream)); if (log.isDebugEnabled()) U.error(log, "Connection failed [sock=" + sock + ", locNodeId=" + getLocalNodeId() + ']', e); @@ -932,8 +935,8 @@ class ClientImpl extends TcpDiscoveryImpl { U.closeQuiet(sock); synchronized (mux) { - if (this.sock == sock) { - this.sock = null; + if (this.sockStream == sockStream) { + this.sockStream = null; this.rmtNodeId = null; } } @@ -1125,7 +1128,7 @@ class ClientImpl extends TcpDiscoveryImpl { */ private class Reconnector extends IgniteSpiThread { /** */ - private volatile Socket sock; + private volatile SocketStream sockStream; /** */ private boolean clientAck; @@ -1148,7 +1151,10 @@ class ClientImpl extends TcpDiscoveryImpl { public void cancel() { interrupt(); - U.closeQuiet(sock); + SocketStream sockStream = this.sockStream; + + if (sockStream != null) + U.closeQuiet(sockStream.socket()); } /** {@inheritDoc} */ @@ -1166,24 +1172,26 @@ class ClientImpl extends TcpDiscoveryImpl { try { while (true) { - T2<Socket, Boolean> joinRes = joinTopology(true, timeout); + T2<SocketStream, Boolean> joinRes = joinTopology(true, timeout); if (joinRes == null) { if (join) { joinError(new IgniteSpiException("Join process timed out, connection failed and " + "failed to reconnect (consider increasing 'joinTimeout' configuration property) " + - "[joinTimeout=" + spi.joinTimeout + ", sock=" + sock + ']')); + "[joinTimeout=" + spi.joinTimeout + ']')); } else U.error(log, "Failed to reconnect to cluster (consider increasing 'networkTimeout'" + - " configuration property) [networkTimeout=" + spi.netTimeout + ", sock=" + sock + ']'); + " configuration property) [networkTimeout=" + spi.netTimeout + ']'); return; } - sock = joinRes.get1(); + sockStream = joinRes.get1(); clientAck = joinRes.get2(); + Socket sock = sockStream.socket(); + if (isInterrupted()) throw new InterruptedException(); @@ -1194,7 +1202,7 @@ class ClientImpl extends TcpDiscoveryImpl { sock.setSoTimeout((int)spi.netTimeout); - InputStream in = new BufferedInputStream(sock.getInputStream()); + InputStream in = sockStream.stream(); sock.setKeepAlive(true); sock.setTcpNoDelay(true); @@ -1264,11 +1272,16 @@ class ClientImpl extends TcpDiscoveryImpl { catch (IOException | IgniteCheckedException e) { err = e; + success = false; + U.error(log, "Failed to reconnect", e); } finally { if (!success) { - U.closeQuiet(sock); + SocketStream sockStream = this.sockStream; + + if (sockStream != null) + U.closeQuiet(sockStream.socket()); if (join) joinError(new IgniteSpiException("Failed to connect to cluster, connection failed and failed " + @@ -1288,10 +1301,7 @@ class ClientImpl extends TcpDiscoveryImpl { private final BlockingDeque<Object> queue = new LinkedBlockingDeque<>(); /** */ - private Socket currSock; - - /** Indicates that pending messages are currently processed. */ - private boolean pending; + private SocketStream currSock; /** */ private Reconnector reconnector; @@ -1338,11 +1348,13 @@ class ClientImpl extends TcpDiscoveryImpl { } } else if (msg == SPI_STOP) { + boolean connected = state == CONNECTED; + state = STOPPED; assert spi.getSpiContext().isStopping(); - if (currSock != null) { + if (connected && currSock != null) { TcpDiscoveryAbstractMessage leftMsg = new TcpDiscoveryNodeLeftMessage(getLocalNodeId()); leftMsg.client(true); @@ -1467,7 +1479,10 @@ class ClientImpl extends TcpDiscoveryImpl { } } finally { - U.closeQuiet(currSock); + SocketStream currSock = this.currSock; + + if (currSock != null) + U.closeQuiet(currSock.socket()); if (joinLatch.getCount() > 0) joinError(new IgniteSpiException("Some error in join process.")); // This should not occur. @@ -1490,7 +1505,7 @@ class ClientImpl extends TcpDiscoveryImpl { joinCnt++; - T2<Socket, Boolean> joinRes = joinTopology(false, spi.joinTimeout); + T2<SocketStream, Boolean> joinRes = joinTopology(false, spi.joinTimeout); if (joinRes == null) { if (join) @@ -1506,7 +1521,7 @@ class ClientImpl extends TcpDiscoveryImpl { currSock = joinRes.get1(); - sockWriter.setSocket(joinRes.get1(), joinRes.get2()); + sockWriter.setSocket(joinRes.get1().socket(), joinRes.get2()); if (spi.joinTimeout > 0) { final int joinCnt0 = joinCnt; @@ -1551,6 +1566,9 @@ class ClientImpl extends TcpDiscoveryImpl { processPingRequest(); spi.stats.onMessageProcessingFinished(msg); + + if (spi.ensured(msg) && state == CONNECTED) + lastMsgId = msg.id(); } /** @@ -1604,8 +1622,10 @@ class ClientImpl extends TcpDiscoveryImpl { if (msg.topologyHistory() != null) topHist.putAll(msg.topologyHistory()); } - else if (log.isDebugEnabled()) - log.debug("Discarding node added message with empty topology: " + msg); + else { + if (log.isDebugEnabled()) + log.debug("Discarding node added message with empty topology: " + msg); + } } else if (log.isDebugEnabled()) log.debug("Discarding node added message (this message has already been processed) " + @@ -1625,8 +1645,10 @@ class ClientImpl extends TcpDiscoveryImpl { spi.onExchange(newNodeId, newNodeId, data, null); } } - else if (log.isDebugEnabled()) - log.debug("Ignore topology message, local node not added to topology: " + msg); + else { + if (log.isDebugEnabled()) + log.debug("Ignore topology message, local node not added to topology: " + msg); + } } } @@ -1653,6 +1675,11 @@ class ClientImpl extends TcpDiscoveryImpl { locNode.order(topVer); + for (Iterator<Long> it = topHist.keySet().iterator(); it.hasNext();) { + if (it.next() >= topVer) + it.remove(); + } + Collection<ClusterNode> nodes = updateTopologyHistory(topVer, msg); notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes); @@ -1712,7 +1739,7 @@ class ClientImpl extends TcpDiscoveryImpl { assert top != null && top.contains(node) : "Topology does not contain node [msg=" + msg + ", node=" + node + ", top=" + top + ']'; - if (!pending && joinLatch.getCount() > 0) { + if (state != CONNECTED) { if (log.isDebugEnabled()) log.debug("Discarding node add finished message (join process is not finished): " + msg); @@ -1725,8 +1752,10 @@ class ClientImpl extends TcpDiscoveryImpl { spi.stats.onNodeJoined(); } } - else if (log.isDebugEnabled()) - log.debug("Ignore topology message, local node not added to topology: " + msg); + else { + if (log.isDebugEnabled()) + log.debug("Ignore topology message, local node not added to topology: " + msg); + } } } @@ -1756,7 +1785,7 @@ class ClientImpl extends TcpDiscoveryImpl { Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); - if (!pending && joinLatch.getCount() > 0) { + if (state != CONNECTED) { if (log.isDebugEnabled()) log.debug("Discarding node left message (join process is not finished): " + msg); @@ -1767,8 +1796,10 @@ class ClientImpl extends TcpDiscoveryImpl { spi.stats.onNodeLeft(); } - else if (log.isDebugEnabled()) - log.debug("Ignore topology message, local node not added to topology: " + msg); + else { + if (log.isDebugEnabled()) + log.debug("Ignore topology message, local node not added to topology: " + msg); + } } } @@ -1809,7 +1840,7 @@ class ClientImpl extends TcpDiscoveryImpl { Collection<ClusterNode> top = updateTopologyHistory(msg.topologyVersion(), msg); - if (!pending && joinLatch.getCount() > 0) { + if (state != CONNECTED) { if (log.isDebugEnabled()) log.debug("Discarding node failed message (join process is not finished): " + msg); @@ -1875,25 +1906,18 @@ class ClientImpl extends TcpDiscoveryImpl { if (reconnector != null) { assert msg.success() : msg; - currSock = reconnector.sock; + currSock = reconnector.sockStream; - sockWriter.setSocket(currSock, reconnector.clientAck); + sockWriter.setSocket(currSock.socket(), reconnector.clientAck); sockReader.setSocket(currSock, locNode.clientRouterNodeId()); reconnector = null; - pending = true; - - try { - for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { - if (log.isDebugEnabled()) - log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']'); + for (TcpDiscoveryAbstractMessage pendingMsg : msg.pendingMessages()) { + if (log.isDebugEnabled()) + log.debug("Process pending message on reconnect [msg=" + pendingMsg + ']'); - processDiscoveryMessage(pendingMsg); - } - } - finally { - pending = false; + processDiscoveryMessage(pendingMsg); } } else { @@ -1921,7 +1945,7 @@ class ClientImpl extends TcpDiscoveryImpl { * @param msg Message. */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) { - if (msg.verified() && state == CONNECTED) { + if (state == CONNECTED) { DiscoverySpiListener lsnr = spi.lsnr; if (lsnr != null) { @@ -2048,13 +2072,56 @@ class ClientImpl extends TcpDiscoveryImpl { */ private static class SocketClosedMessage { /** */ + private final SocketStream sock; + + /** + * @param sock Socket. + */ + private SocketClosedMessage(SocketStream sock) { + this.sock = sock; + } + } + + /** + * + */ + private static class SocketStream { + /** */ private final Socket sock; + /** */ + private final InputStream in; + /** * @param sock Socket. + * @throws IOException If failed to create stream. */ - private SocketClosedMessage(Socket sock) { + public SocketStream(Socket sock) throws IOException { + assert sock != null; + this.sock = sock; + + this.in = new BufferedInputStream(sock.getInputStream()); + } + + /** + * @return Socket. + */ + Socket socket() { + return sock; + + } + + /** + * @return Socket input stream. + */ + InputStream stream() { + return in; + } + + /** {@inheritDoc} */ + public String toString() { + return sock.toString(); } } @@ -2077,4 +2144,4 @@ class ClientImpl extends TcpDiscoveryImpl { /** */ STOPPED } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index b8df846..ee9f818 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -126,7 +126,9 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessa import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_HISTORY_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID; +import static org.apache.ignite.IgniteSystemProperties.getInteger; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; @@ -154,6 +156,9 @@ import static org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusChe @SuppressWarnings("All") class ServerImpl extends TcpDiscoveryImpl { /** */ + private static final int ENSURED_MSG_HIST_SIZE = getInteger(IGNITE_DISCOVERY_HISTORY_SIZE, 1024 * 10); + + /** */ private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); @@ -1250,9 +1255,11 @@ class ServerImpl extends TcpDiscoveryImpl { lsnr.onDiscovery(type, topVer, node, top, hist, null); } - else if (log.isDebugEnabled()) - log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState + - ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); + else { + if (log.isDebugEnabled()) + log.debug("Skipped discovery notification [node=" + node + ", spiState=" + spiState + + ", type=" + U.gridEventName(type) + ", topVer=" + topVer + ']'); + } } /** @@ -1447,6 +1454,12 @@ class ServerImpl extends TcpDiscoveryImpl { tmp = U.arrayList(readers); } + for (ClientMessageWorker msgWorker : clientMsgWorkers.values()) { + U.interrupt(msgWorker); + + U.join(msgWorker, log); + } + U.interrupt(tmp); U.joinThreads(tmp, log); @@ -1744,22 +1757,36 @@ class ServerImpl extends TcpDiscoveryImpl { * Discovery messages history used for client reconnect. */ private class EnsuredMessageHistory { - /** */ - private static final int MAX = 1024; - /** Pending messages. */ - private final ArrayDeque<TcpDiscoveryAbstractMessage> msgs = new ArrayDeque<>(MAX * 2); + private final GridBoundedLinkedHashSet<TcpDiscoveryAbstractMessage> + msgs = new GridBoundedLinkedHashSet<>(ENSURED_MSG_HIST_SIZE); /** * @param msg Adds message. */ void add(TcpDiscoveryAbstractMessage msg) { - assert spi.ensured(msg) : msg; + assert spi.ensured(msg) && msg.verified() : msg; + + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg; - msgs.addLast(msg); + TcpDiscoveryNode node = addedMsg.node(); - while (msgs.size() > MAX) - msgs.pollFirst(); + if (node.isClient() && !msgs.contains(msg)) { + Collection<TcpDiscoveryNode> allNodes = ring.allNodes(); + + Collection<TcpDiscoveryNode> top = new ArrayList<>(allNodes.size()); + + for (TcpDiscoveryNode n0 : allNodes) { + if (n0.internalOrder() != 0 && n0.internalOrder() < node.internalOrder()) + top.add(n0); + } + + addedMsg.clientTopology(top); + } + } + + msgs.add(msg); } /** @@ -1782,11 +1809,11 @@ class ServerImpl extends TcpDiscoveryImpl { for (TcpDiscoveryAbstractMessage msg : msgs) { if (msg instanceof TcpDiscoveryNodeAddedMessage) { - if (node.id().equals(((TcpDiscoveryNodeAddedMessage) msg).node().id())) + if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id())) res = new ArrayList<>(msgs.size()); } - if (res != null && msg.verified()) + if (res != null) res.add(prepare(msg, node.id())); } @@ -1812,7 +1839,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (msg.id().equals(lastMsgId)) skip = false; } - else if (msg.verified()) + else cp.add(prepare(msg, node.id())); } @@ -1820,7 +1847,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (log.isDebugEnabled()) { if (cp == null) - log.debug("Failed to find messages history [node=" + node + ", lastMsgId" + lastMsgId + ']'); + log.debug("Failed to find messages history [node=" + node + ", lastMsgId=" + lastMsgId + ']'); else log.debug("Found messages history [node=" + node + ", hist=" + cp + ']'); } @@ -1835,8 +1862,21 @@ class ServerImpl extends TcpDiscoveryImpl { * @return Prepared message. */ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) { - if (msg instanceof TcpDiscoveryNodeAddedMessage) - prepareNodeAddedMessage(msg, destNodeId, null, null, null); + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg; + + if (addedMsg.node().id().equals(destNodeId)) { + assert addedMsg.clientTopology() != null : addedMsg; + + TcpDiscoveryNodeAddedMessage msg0 = new TcpDiscoveryNodeAddedMessage(addedMsg); + + prepareNodeAddedMessage(msg0, destNodeId, null, null, null); + + msg0.topology(addedMsg.clientTopology()); + + return msg0; + } + } return msg; } @@ -2132,7 +2172,7 @@ class ServerImpl extends TcpDiscoveryImpl { else assert false : "Unknown message type: " + msg.getClass().getSimpleName(); - if (spi.ensured(msg)) + if (spi.ensured(msg) && redirectToClients(msg)) msgHist.add(msg); if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) { @@ -2161,19 +2201,9 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Sends message across the ring. - * - * @param msg Message to send + * @param msg Message. */ - @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"}) - private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { - assert msg != null; - - assert ring.hasRemoteNodes(); - - for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs) - msgLsnr.apply(msg); - + private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) { if (redirectToClients(msg)) { byte[] marshalledMsg = null; @@ -2193,9 +2223,28 @@ class ServerImpl extends TcpDiscoveryImpl { msgClone = msg; } + prepareNodeAddedMessage(msgClone, clientMsgWorker.clientNodeId, null, null, null); + clientMsgWorker.addMessage(msgClone); } } + } + + /** + * Sends message across the ring. + * + * @param msg Message to send + */ + @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", "ContinueStatementWithLabel"}) + private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) { + assert msg != null; + + assert ring.hasRemoteNodes(); + + for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : spi.sendMsgLsnrs) + msgLsnr.apply(msg); + + sendMessageToClients(msg); Collection<TcpDiscoveryNode> failedNodes; @@ -2810,7 +2859,7 @@ class ServerImpl extends TcpDiscoveryImpl { "[clientNode=" + existingNode + ", msg=" + reconMsg + ']'); } else { - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(reconMsg)) sendMessageAcrossRing(reconMsg); } } @@ -3052,8 +3101,11 @@ class ServerImpl extends TcpDiscoveryImpl { nodeAddedMsg.client(msg.client()); processNodeAddedMessage(nodeAddedMsg); + + if (nodeAddedMsg.verified()) + msgHist.add(nodeAddedMsg); } - else if (ring.hasRemoteNodes()) + else if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } @@ -3155,8 +3207,13 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Failing reconnecting client node because failed to restore pending " + "messages [locNodeId=" + locNodeId + ", clientNodeId=" + nodeId + ']'); - processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, - node.id(), node.internalOrder())); + TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage(locNodeId, + node.id(), node.internalOrder()); + + processNodeFailedMessage(nodeFailedMsg); + + if (nodeFailedMsg.verified()) + msgHist.add(nodeFailedMsg); } } else if (log.isDebugEnabled()) @@ -3172,12 +3229,12 @@ class ServerImpl extends TcpDiscoveryImpl { locNodeId + ", clientNodeId=" + nodeId + ']'); } else { - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } } else { - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } } @@ -3239,6 +3296,9 @@ class ServerImpl extends TcpDiscoveryImpl { processNodeAddFinishedMessage(addFinishMsg); + if (addFinishMsg.verified()) + msgHist.add(addFinishMsg); + addMessage(new TcpDiscoveryDiscardMessage(locNodeId, msg.id(), false)); return; @@ -3249,7 +3309,7 @@ class ServerImpl extends TcpDiscoveryImpl { else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) { // Local node already has node from message in local topology. // Just pass it to coordinator via the ring. - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); if (log.isDebugEnabled()) @@ -3437,7 +3497,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } @@ -3572,7 +3632,7 @@ class ServerImpl extends TcpDiscoveryImpl { notifyDiscovery(EVT_NODE_JOINED, topVer, locNode); } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); checkPendingCustomMessages(); @@ -3740,7 +3800,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (ring.hasRemoteNodes()) { + if (sendMessageToRemotes(msg)) { try { sendMessageAcrossRing(msg); } @@ -3761,6 +3821,19 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * @param msg Message to send. + * @return {@code True} if message should be send across the ring. + */ + private boolean sendMessageToRemotes(TcpDiscoveryAbstractMessage msg) { + if (ring.hasRemoteNodes()) + return true; + + sendMessageToClients(msg); + + return false; + } + + /** * Processes node failed message. * * @param msg Node failed message. @@ -3892,7 +3965,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onNodeFailed(); } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); else { if (log.isDebugEnabled()) @@ -4032,7 +4105,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } @@ -4098,7 +4171,7 @@ class ServerImpl extends TcpDiscoveryImpl { } } - if (ring.hasRemoteNodes()) { + if (sendMessageToRemotes(msg)) { if ((locNodeId.equals(msg.creatorNodeId()) && msg.senderNodeId() == null || !hasMetrics(msg, locNodeId)) && spiStateCopy() == CONNECTED) { // Message is on its first ring or just created on coordinator. @@ -4135,16 +4208,22 @@ class ServerImpl extends TcpDiscoveryImpl { failedNode = failedNodes.contains(clientNode); } - if (!failedNode) - processNodeFailedMessage(new TcpDiscoveryNodeFailedMessage(locNodeId, - clientNode.id(), clientNode.internalOrder())); + if (!failedNode) { + TcpDiscoveryNodeFailedMessage nodeFailedMsg = new TcpDiscoveryNodeFailedMessage( + locNodeId, clientNode.id(), clientNode.internalOrder()); + + processNodeFailedMessage(nodeFailedMsg); + + if (nodeFailedMsg.verified()) + msgHist.add(nodeFailedMsg); + } } } } } } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } else { @@ -4351,7 +4430,7 @@ class ServerImpl extends TcpDiscoveryImpl { notifyDiscoveryListener(msg); } - if (ring.hasRemoteNodes()) + if (sendMessageToRemotes(msg)) sendMessageAcrossRing(msg); } } @@ -4363,8 +4442,12 @@ class ServerImpl extends TcpDiscoveryImpl { if (joiningNodes.isEmpty() && isLocalNodeCoordinator()) { TcpDiscoveryCustomEventMessage msg; - while ((msg = pendingCustomMsgs.poll()) != null) + while ((msg = pendingCustomMsgs.poll()) != null) { processCustomMessage(msg); + + if (msg.verified()) + msgHist.add(msg); + } } } @@ -5293,19 +5376,14 @@ class ServerImpl extends TcpDiscoveryImpl { } } else { - try { - if (log.isDebugEnabled()) - log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" - + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); + if (log.isDebugEnabled()) + log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" + + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - prepareNodeAddedMessage(msg, clientNodeId, null, null, null); + assert topologyInitialized(msg) : msg; - writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? - spi.failureDetectionTimeout() : spi.getSocketTimeout()); - } - finally { - clearNodeAddedMessage(msg); - } + writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } catch (IgniteCheckedException | IOException e) { @@ -5325,6 +5403,21 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * @param msg Message. + * @return {@code True} if topology initialized. + */ + private boolean topologyInitialized(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg; + + if (clientNodeId.equals(addedMsg.node().id())) + return addedMsg.topology() != null; + } + + return true; + } + + /** * @param res Ping result. */ public void pingResult(boolean res) { http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index c50f791..875d18e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -79,6 +79,17 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { } /** + * @param msg Message. + */ + protected TcpDiscoveryAbstractMessage(TcpDiscoveryAbstractMessage msg) { + this.id = msg.id; + this.verifierNodeId = msg.verifierNodeId; + this.topVer = msg.topVer; + this.flags = msg.flags; + this.pendingIdx = msg.pendingIdx; + } + + /** * Gets creator node. * * @return Creator node ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 5a7146d..6f8e14e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -55,6 +55,10 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { @GridToStringInclude private Collection<TcpDiscoveryNode> top; + /** */ + @GridToStringInclude + private transient Collection<TcpDiscoveryNode> clientTop; + /** Topology snapshots history. */ private Map<Long, Collection<ClusterNode>> topHist; @@ -93,6 +97,24 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { } /** + * @param msg Message. + */ + public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { + super(msg); + + this.node = msg.node; + this.msgs = msg.msgs; + this.discardMsgId = msg.discardMsgId; + this.discardCustomMsgId = msg.discardCustomMsgId; + this.top = msg.top; + this.clientTop = msg.clientTop; + this.topHist = msg.topHist; + this.newNodeDiscoData = msg.newNodeDiscoData; + this.oldNodesDiscoData = msg.oldNodesDiscoData; + this.gridStartTime = msg.gridStartTime; + } + + /** * Gets newly added node. * * @return New node. @@ -133,6 +155,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { * * @param msgs Pending messages to send to new node. * @param discardMsgId Discarded message ID. + * @param discardCustomMsgId Discarded custom message ID. */ public void messages( @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @@ -163,6 +186,22 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { } /** + * @param top Topology at the moment when client joined. + */ + public void clientTopology(Collection<TcpDiscoveryNode> top) { + assert top != null && !top.isEmpty() : top; + + this.clientTop = top; + } + + /** + * @return Topology at the moment when client joined. + */ + public Collection<TcpDiscoveryNode> clientTopology() { + return clientTop; + } + + /** * Gets topology snapshots history. * * @return Map with topology snapshots history. http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index edd95e9..6131f54 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -1128,6 +1128,39 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac } /** + * @throws Exception If failed. + */ + public void testReconnectDestroyCache() throws Exception { + clientMode = true; + + Ignite client = startGrid(SRV_CNT); + + CacheConfiguration<Integer, Integer> ccfg1 = new CacheConfiguration<>(); + ccfg1.setName("cache1"); + + CacheConfiguration<Integer, Integer> ccfg2 = new CacheConfiguration<>(); + ccfg2.setName("cache2"); + + final Ignite srv = grid(0); + + srv.createCache(ccfg1); + srv.createCache(ccfg2).put(1, 1); + + IgniteCache<Integer, Integer> cache = client.cache("cache2"); + + reconnectClientNode(client, srv, new Runnable() { + @Override public void run() { + srv.destroyCache("cache1"); + } + }); + + cache.put(2, 2); + + assertEquals(1, (Object)cache.get(1)); + assertEquals(2, (Object)cache.get(2)); + } + + /** * @param client Client. * @param disconnectLatch Disconnect event latch. * @param reconnectLatch Reconnect event latch. http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index a6b5535..530ff61 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -78,6 +78,7 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.resources.LoggerResource; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi; import org.apache.ignite.testframework.GridTestUtils; @@ -183,6 +184,8 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); if (memoryMode() == OFFHEAP_TIERED || memoryMode() == OFFHEAP_VALUES) http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java index 51e76f6..659520b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java @@ -32,6 +32,9 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; @@ -41,6 +44,9 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC * Test for reproducing problems during simultaneously Ignite instances stopping and cache requests executing. */ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + /** Grid count. */ private static final int GRID_CNT = 8; @@ -55,6 +61,8 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); OptimizedMarshaller marsh = new OptimizedMarshaller(); http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java index 2aa4280..37c5a6b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientReconnectTest.java @@ -94,6 +94,8 @@ public class IgniteCacheClientReconnectTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { super.afterTestsStopped(); + + stopAllGrids(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java index 78fc590..242b12d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java @@ -113,13 +113,6 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testManyClients() throws Throwable { - manyClientsPutGet(); - } - - /** - * @throws Exception If failed. - */ public void testManyClientsClientDiscovery() throws Throwable { clientDiscovery = true; @@ -138,6 +131,13 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testManyClientsForceServerMode() throws Throwable { + manyClientsPutGet(); + } + + /** + * @throws Exception If failed. + */ private void manyClientsSequentially() throws Exception { client = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/6ea3b562/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java new file mode 100644 index 0000000..6f0e887 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/IgniteClientReconnectMassiveShutdownTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.discovery.tcp; + +import java.util.HashMap; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * Client reconnect test in multi threaded mode while cache operations are in progress. + */ +public class IgniteClientReconnectMassiveShutdownTest extends GridCommonAbstractTest { + /** */ + private static final int GRID_CNT = 14; + + /** */ + private static final int CLIENT_GRID_CNT = 14; + + /** */ + private static volatile boolean clientMode; + + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClientMode(clientMode); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder)); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + + Thread.sleep(5000); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60 * 1000; + } + + /** + * @throws Exception If any error occurs. + */ + public void _testMassiveServersShutdown1() throws Exception { + massiveServersShutdown(StopType.FAIL_EVENT); + } + + /** + * @throws Exception If any error occurs. + */ + public void testMassiveServersShutdown2() throws Exception { + massiveServersShutdown(StopType.SIMULATE_FAIL); + } + + /** + * @throws Exception If any error occurs. + */ + public void _testMassiveServersShutdown3() throws Exception { + massiveServersShutdown(StopType.CLOSE); + } + + /** + * @param stopType How tp stop node. + * @throws Exception If any error occurs. + */ + private void massiveServersShutdown(final StopType stopType) throws Exception { + clientMode = false; + + startGridsMultiThreaded(GRID_CNT); + + clientMode = true; + + startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT); + + final AtomicBoolean done = new AtomicBoolean(); + + // Starting a cache dynamically. + Ignite client = grid(GRID_CNT); + + assertTrue(client.configuration().isClientMode()); + + CacheConfiguration<String, Integer> cfg = new CacheConfiguration<>(); + + cfg.setCacheMode(PARTITIONED); + cfg.setAtomicityMode(TRANSACTIONAL); + cfg.setBackups(2); + cfg.setOffHeapMaxMemory(0); + cfg.setMemoryMode(OFFHEAP_TIERED); + + IgniteCache<String, Integer> cache = client.getOrCreateCache(cfg); + + HashMap<String, Integer> put = new HashMap<>(); + + // Load some data. + for (int i = 0; i < 10_000; i++) + put.put(String.valueOf(i), i); + + cache.putAll(put); + + // Preparing client nodes and starting cache operations from them. + final BlockingQueue<Integer> clientIdx = new LinkedBlockingQueue<>(); + + for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++) + clientIdx.add(i); + + IgniteInternalFuture<?> clientsFut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + int idx = clientIdx.take(); + + Ignite ignite = grid(idx); + + Thread.currentThread().setName("client-thread-" + ignite.name()); + + assertTrue(ignite.configuration().isClientMode()); + + IgniteCache<String, Integer> cache = ignite.cache(null); + + IgniteTransactions txs = ignite.transactions(); + + Random rand = new Random(); + + while (!done.get()) { + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(String.valueOf(rand.nextInt(10_000)), rand.nextInt(50_000)); + + tx.commit(); + } + catch (ClusterTopologyException ex) { + ex.retryReadyFuture().get(); + } + catch (IgniteException | CacheException e) { + if (X.hasCause(e, IgniteClientDisconnectedException.class)) { + IgniteClientDisconnectedException cause = X.cause(e, + IgniteClientDisconnectedException.class); + + assert cause != null; + + cause.reconnectFuture().get(); + } + else if (X.hasCause(e, ClusterTopologyException.class)) { + ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class); + + assert cause != null; + + cause.retryReadyFuture().get(); + } + else + throw e; + } + } + + return null; + } + }, + CLIENT_GRID_CNT, "client-thread"); + + try { + // Killing a half of server nodes. + final int srvsToKill = GRID_CNT / 2; + + final BlockingQueue<Integer> victims = new LinkedBlockingQueue<>(); + + for (int i = 0; i < srvsToKill; i++) + victims.add(i); + + final BlockingQueue<Integer> assassins = new LinkedBlockingQueue<>(); + + for (int i = srvsToKill; i < GRID_CNT; i++) + assassins.add(i); + + IgniteInternalFuture<?> srvsShutdownFut = multithreadedAsync( + new Callable<Object>() { + @Override public Object call() throws Exception { + Thread.sleep(5_000); + + Ignite assassin = grid(assassins.take()); + + assertFalse(assassin.configuration().isClientMode()); + + Ignite victim = grid(victims.take()); + + assertFalse(victim.configuration().isClientMode()); + + log.info("Kill node [node=" + victim.name() + ", from=" + assassin.name() + ']'); + + switch (stopType) { + case CLOSE: + victim.close(); + + break; + + case FAIL_EVENT: + UUID nodeId = victim.cluster().localNode().id(); + + assassin.configuration().getDiscoverySpi().failNode(nodeId, null); + + break; + + case SIMULATE_FAIL: + ((TcpDiscoverySpi)victim.configuration().getDiscoverySpi()).simulateNodeFailure(); + + break; + + default: + fail(); + } + + return null; + } + }, + assassins.size(), "kill-thread"); + + srvsShutdownFut.get(); + + Thread.sleep(15_000); + + done.set(true); + + clientsFut.get(); + + awaitPartitionMapExchange(); + + for (int k = 0; k < 10_000; k++) { + String key = String.valueOf(k); + + Object val = cache.get(key); + + for (int i = srvsToKill; i < GRID_CNT; i++) + assertEquals(val, ignite(i).cache(null).get(key)); + } + } + finally { + done.set(true); + } + } + + /** + * + */ + enum StopType { + /** */ + CLOSE, + + /** */ + SIMULATE_FAIL, + + /** */ + FAIL_EVENT + } +}
