ignite-3418 Avoid unnecessary discovery messages (cherry picked from #b0a5128)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2e65beb4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2e65beb4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2e65beb4 Branch: refs/heads/ignite-1232 Commit: 2e65beb4f628e0ba138abc2dfab707bc9a5e965e Parents: 955a5f8 Author: sboikov <[email protected]> Authored: Wed Jul 6 10:43:38 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Jul 6 10:43:38 2016 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ClientImpl.java | 2 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 109 ++++++--- .../tcp/internal/TcpDiscoveryStatistics.java | 45 +++- .../tcp/TcpDiscoveryMultiThreadedTest.java | 222 +++++++++++++++++++ .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 63 +++++- 5 files changed, 395 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2e65beb4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java index 31d614f..9821134 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java @@ -616,7 +616,7 @@ class ClientImpl extends TcpDiscoveryImpl { spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp, 0); if (log.isDebugEnabled()) log.debug("Message has been sent to address [msg=" + msg + ", addr=" + addr + http://git-wip-us.apache.org/repos/asf/ignite/blob/2e65beb4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index b1c56c0..6db1e87 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -206,7 +206,10 @@ class ServerImpl extends TcpDiscoveryImpl { private StatisticsPrinter statsPrinter; /** Failed nodes (but still in topology). */ - private final Collection<TcpDiscoveryNode> failedNodes = new HashSet<>(); + private final Map<TcpDiscoveryNode, UUID> failedNodes = new HashMap<>(); + + /** */ + private final Collection<UUID> failedNodesMsgSent = new HashSet<>(); /** Leaving nodes (but still in topology). */ private final Collection<TcpDiscoveryNode> leavingNodes = new HashSet<>(); @@ -782,7 +785,7 @@ class ServerImpl extends TcpDiscoveryImpl { if (nodeAlive) { synchronized (mux) { - nodeAlive = !F.transform(failedNodes, F.node2id()).contains(nodeId) && + nodeAlive = !F.transform(failedNodes.keySet(), F.node2id()).contains(nodeId) && !F.transform(leavingNodes, F.node2id()).contains(nodeId); } } @@ -1107,7 +1110,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean ignore = false; synchronized (failedNodes) { - for (TcpDiscoveryNode failedNode : failedNodes) { + for (TcpDiscoveryNode failedNode : failedNodes.keySet()) { if (failedNode.id().equals(res.creatorNodeId())) { if (log.isDebugEnabled()) log.debug("Ignore response from node from failed list: " + res); @@ -1137,7 +1140,7 @@ class ServerImpl extends TcpDiscoveryImpl { spi.writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + long tstamp0 = U.currentTimeMillis(); if (debugMode) debugLog(msg, "Message has been sent directly to address [msg=" + msg + ", addr=" + addr + @@ -1152,7 +1155,11 @@ class ServerImpl extends TcpDiscoveryImpl { // E.g. due to class not found issue. joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage; - return spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + int receipt = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + + spi.stats.onMessageSent(msg, tstamp0 - tstamp, U.currentTimeMillis() - tstamp0); + + return receipt; } catch (ClassCastException e) { // This issue is rarely reproducible on AmazonEC2, but never @@ -1386,7 +1393,7 @@ class ServerImpl extends TcpDiscoveryImpl { @Nullable private TcpDiscoveryNode resolveCoordinator( @Nullable Collection<TcpDiscoveryNode> filter) { synchronized (mux) { - Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes, leavingNodes); + Collection<TcpDiscoveryNode> excluded = F.concat(false, failedNodes.keySet(), leavingNodes); if (!F.isEmpty(filter)) excluded = F.concat(false, excluded, filter); @@ -1545,7 +1552,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryNode next; synchronized (mux) { - next = ring.nextNode(failedNodes); + next = ring.nextNode(failedNodes.keySet()); } if (next != null) @@ -1618,7 +1625,7 @@ class ServerImpl extends TcpDiscoveryImpl { b.append("Failed nodes: ").append(U.nl()); - for (TcpDiscoveryNode node : failedNodes) + for (TcpDiscoveryNode node : failedNodes.keySet()) b.append(" ").append(node.id()).append(U.nl()); b.append(U.nl()); @@ -1814,10 +1821,14 @@ class ServerImpl extends TcpDiscoveryImpl { if (failedNode != null) { if (!failedNode.isLocal()) { - boolean added; + boolean added = false; synchronized (mux) { - added = failedNodes.add(failedNode); + if (!failedNodes.containsKey(failedNode)) { + failedNodes.put(failedNode, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); + + added = true; + } } if (added && log.isDebugEnabled()) @@ -2426,7 +2437,7 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoverySpiState state; synchronized (mux) { - failedNodes = U.arrayList(ServerImpl.this.failedNodes); + failedNodes = U.arrayList(ServerImpl.this.failedNodes.keySet()); state = spiState; } @@ -2655,12 +2666,7 @@ class ServerImpl extends TcpDiscoveryImpl { assert !forceSndPending || msg instanceof TcpDiscoveryNodeLeftMessage; - boolean sndPending= - (newNextNode && ring.minimumNodeVersion().compareTo(CUSTOM_MSG_ALLOW_JOINING_FOR_VERIFIED_SINCE) >= 0) || - failure || - forceSndPending; - - if (sndPending) { + if (failure || forceSndPending) { if (log.isDebugEnabled()) log.debug("Pending messages will be sent [failure=" + failure + ", newNextNode=" + newNextNode + @@ -2688,10 +2694,12 @@ class ServerImpl extends TcpDiscoveryImpl { clearNodeAddedMessage(pendingMsg); } - spi.stats.onMessageSent(pendingMsg, U.currentTimeMillis() - tstamp); + long tstamp0 = U.currentTimeMillis(); int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + spi.stats.onMessageSent(pendingMsg, tstamp0 - tstamp, U.currentTimeMillis() - tstamp0); + if (log.isDebugEnabled()) log.debug("Pending message has been sent to next node [msgId=" + msg.id() + ", pendingMsgId=" + pendingMsg.id() + ", next=" + next.id() + @@ -2735,10 +2743,12 @@ class ServerImpl extends TcpDiscoveryImpl { spi.writeToSocket(sock, out, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); - spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); + long tstamp0 = U.currentTimeMillis(); int res = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0)); + spi.stats.onMessageSent(msg, tstamp0 - tstamp, U.currentTimeMillis() - tstamp0); + onMessageExchanged(); if (log.isDebugEnabled()) { @@ -2840,7 +2850,7 @@ class ServerImpl extends TcpDiscoveryImpl { } synchronized (mux) { - failedNodes.removeAll(ServerImpl.this.failedNodes); + failedNodes.removeAll(ServerImpl.this.failedNodes.keySet()); } if (!failedNodes.isEmpty()) { @@ -2854,7 +2864,13 @@ class ServerImpl extends TcpDiscoveryImpl { } synchronized (mux) { - ServerImpl.this.failedNodes.addAll(failedNodes); + for (TcpDiscoveryNode failedNode : failedNodes) { + if (!ServerImpl.this.failedNodes.containsKey(failedNode)) + ServerImpl.this.failedNodes.put(failedNode, locNodeId); + } + + for (TcpDiscoveryNode failedNode : failedNodes) + failedNodesMsgSent.add(failedNode.id()); } for (TcpDiscoveryNode n : failedNodes) @@ -4233,6 +4249,8 @@ class ServerImpl extends TcpDiscoveryImpl { failedNodes.remove(leftNode); leavingNodes.remove(leftNode); + + failedNodesMsgSent.remove(leftNode.id()); } } @@ -4292,7 +4310,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean contains; synchronized (mux) { - contains = failedNodes.contains(sndNode); + contains = failedNodes.containsKey(sndNode); } if (contains) { @@ -4321,7 +4339,8 @@ class ServerImpl extends TcpDiscoveryImpl { assert !node.isLocal() || !msg.verified() : msg; synchronized (mux) { - failedNodes.add(node); + if (!failedNodes.containsKey(node)) + failedNodes.put(node, msg.senderNodeId() != null ? msg.senderNodeId() : getLocalNodeId()); } } else { @@ -4382,6 +4401,8 @@ class ServerImpl extends TcpDiscoveryImpl { leavingNodes.remove(node); + failedNodesMsgSent.remove(node.id()); + ClientMessageWorker worker = clientMsgWorkers.remove(node.id()); if (worker != null) @@ -4657,7 +4678,7 @@ class ServerImpl extends TcpDiscoveryImpl { boolean failedNode; synchronized (mux) { - failedNode = failedNodes.contains(clientNode); + failedNode = failedNodes.containsKey(clientNode); } if (!failedNode) { @@ -4896,24 +4917,44 @@ class ServerImpl extends TcpDiscoveryImpl { } /** - * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node - * is still in the ring. + * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node is still in the + * ring and node detected failure left ring. */ private void checkFailedNodesList() { List<TcpDiscoveryNodeFailedMessage> msgs = null; synchronized (mux) { - for (Iterator<TcpDiscoveryNode> it = failedNodes.iterator(); it.hasNext();) { - TcpDiscoveryNode node = it.next(); + if (!failedNodes.isEmpty()) { + for (Iterator<Map.Entry<TcpDiscoveryNode, UUID>> it = failedNodes.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<TcpDiscoveryNode, UUID> e = it.next(); - if (ring.node(node.id()) != null) { - if (msgs == null) - msgs = new ArrayList<>(failedNodes.size()); + TcpDiscoveryNode node = e.getKey(); + UUID failSndNode = e.getValue(); - msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(), node.internalOrder())); + if (ring.node(node.id()) == null) { + it.remove(); + + continue; + } + + if (!nodeAlive(failSndNode) && !failedNodesMsgSent.contains(node.id())) { + if (msgs == null) + msgs = new ArrayList<>(); + + msgs.add(new TcpDiscoveryNodeFailedMessage(getLocalNodeId(), node.id(), node.internalOrder())); + + failedNodesMsgSent.add(node.id()); + } + } + } + + if (!failedNodesMsgSent.isEmpty()) { + for (Iterator<UUID> it = failedNodesMsgSent.iterator(); it.hasNext(); ) { + UUID nodeId = it.next(); + + if (ring.node(nodeId) == null) + it.remove(); } - else - it.remove(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2e65beb4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java index f6232ba..9e73632 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java @@ -89,6 +89,14 @@ public class TcpDiscoveryStatistics { /** Ring messages sent timestamps. */ private final Map<IgniteUuid, Long> ringMsgsSndTs = new GridBoundedLinkedHashMap<>(1024); + /** */ + @GridToStringInclude + private final Map<String, Long> avgMsgsAckTimes = new HashMap<>(); + + /** */ + @GridToStringInclude + private final Map<String, Long> maxMsgsAckTimes = new HashMap<>(); + /** Average time messages is in queue. */ private long avgMsgQueueTime; @@ -302,8 +310,9 @@ public class TcpDiscoveryStatistics { * * @param msg Sent message. * @param time Time taken to serialize message. + * @param ackTime Time taken to receive message acknowledge. */ - public synchronized void onMessageSent(TcpDiscoveryAbstractMessage msg, long time) { + public synchronized void onMessageSent(TcpDiscoveryAbstractMessage msg, long time, long ackTime) { assert msg != null; assert time >= 0 : time; @@ -326,7 +335,24 @@ public class TcpDiscoveryStatistics { sentMsgs.put(msg.getClass().getSimpleName(), ++cnt); - Long avgTime = F.addIfAbsent(avgMsgsSndTimes, msg.getClass().getSimpleName(), new Callable<Long>() { + addTimeInfo(avgMsgsSndTimes, maxMsgsSndTimes, msg, cnt, time); + + addTimeInfo(avgMsgsAckTimes, maxMsgsAckTimes, msg, cnt, time); + } + + /** + * @param avgTimes Average times. + * @param maxTimes Max times. + * @param msg Message. + * @param cnt Total message count. + * @param time Time. + */ + private void addTimeInfo(Map<String, Long> avgTimes, + Map<String, Long> maxTimes, + TcpDiscoveryAbstractMessage msg, + int cnt, + long time) { + Long avgTime = F.addIfAbsent(avgTimes, msg.getClass().getSimpleName(), new Callable<Long>() { @Override public Long call() { return 0L; } @@ -336,9 +362,9 @@ public class TcpDiscoveryStatistics { avgTime = (avgTime * (cnt - 1) + time) / cnt; - avgMsgsSndTimes.put(msg.getClass().getSimpleName(), avgTime); + avgTimes.put(msg.getClass().getSimpleName(), avgTime); - Long maxTime = F.addIfAbsent(maxMsgsSndTimes, msg.getClass().getSimpleName(), new Callable<Long>() { + Long maxTime = F.addIfAbsent(maxTimes, msg.getClass().getSimpleName(), new Callable<Long>() { @Override public Long call() { return 0L; } @@ -347,7 +373,7 @@ public class TcpDiscoveryStatistics { assert maxTime != null; if (time > maxTime) - maxMsgsSndTimes.put(msg.getClass().getSimpleName(), time); + maxTimes.put(msg.getClass().getSimpleName(), time); } /** @@ -474,6 +500,13 @@ public class TcpDiscoveryStatistics { } /** + * @return Sent messages counts (grouped by type). + */ + public synchronized Map<String, Integer> sentMessages() { + return new HashMap<>(sentMsgs); + } + + /** * Gets max messages send time (grouped by type). * * @return Map containing messages types and max send times. @@ -621,6 +654,7 @@ public class TcpDiscoveryStatistics { avgClientSockInitTime = 0; avgMsgProcTime = 0; avgMsgQueueTime = 0; + avgMsgsAckTimes.clear(); avgMsgsSndTimes.clear(); avgRingMsgTime = 0; avgSrvSockInitTime = 0; @@ -634,6 +668,7 @@ public class TcpDiscoveryStatistics { maxClientSockInitTime = 0; maxMsgProcTime = 0; maxMsgQueueTime = 0; + maxMsgsAckTimes.clear(); maxMsgsSndTimes.clear(); maxProcTimeMsgCls = null; maxRingMsgTime = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/2e65beb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 5053c2d..28b527e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -28,9 +28,17 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import javax.cache.Cache; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; @@ -42,6 +50,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; @@ -483,4 +492,217 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { stopAllGrids(); } } + + /** + * @throws Exception If failed. + */ + public void _testCustomEventOnJoinCoordinatorStop() throws Exception { + for (int k = 0; k < 10; k++) { + log.info("Iteration: " + k); + + clientFlagGlobal = false; + + final int START_NODES = 5; + final int JOIN_NODES = 5; + + startGrids(START_NODES); + + final AtomicInteger startIdx = new AtomicInteger(START_NODES); + + final AtomicBoolean stop = new AtomicBoolean(); + + IgniteInternalFuture<?> fut1 = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + CacheConfiguration ccfg = new CacheConfiguration(); + + Ignite ignite = ignite(START_NODES - 1); + + while (!stop.get()) { + ignite.createCache(ccfg); + + ignite.destroyCache(ccfg.getName()); + } + + return null; + } + }); + + try { + final CyclicBarrier barrier = new CyclicBarrier(JOIN_NODES + 1); + + IgniteInternalFuture<?> fut2 = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int idx = startIdx.getAndIncrement(); + + Thread.currentThread().setName("start-thread-" + idx); + + barrier.await(); + + Ignite ignite = startGrid(idx); + + assertFalse(ignite.configuration().isClientMode()); + + log.info("Started node: " + ignite.name()); + + IgniteCache<Object, Object> cache = ignite.getOrCreateCache((String)null); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + // No-op. + } + }); + + QueryCursor<Cache.Entry<Object, Object>> cur = cache.query(qry); + + cur.close(); + + return null; + } + }, JOIN_NODES, "start-thread"); + + barrier.await(); + + U.sleep(ThreadLocalRandom.current().nextInt(10, 100)); + + for (int i = 0; i < START_NODES - 1; i++) { + GridTestUtils.invoke(ignite(i).configuration().getDiscoverySpi(), "simulateNodeFailure"); + + stopGrid(i); + } + + stop.set(true); + + fut1.get(); + fut2.get(); + } + finally { + stop.set(true); + + fut1.get(); + } + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void _testClientContinuousQueryCoordinatorStop() throws Exception { + for (int k = 0; k < 10; k++) { + log.info("Iteration: " + k); + + clientFlagGlobal = false; + + final int START_NODES = 5; + final int JOIN_NODES = 5; + + startGrids(START_NODES); + + ignite(0).createCache(new CacheConfiguration<>()); + + final AtomicInteger startIdx = new AtomicInteger(START_NODES); + + final CyclicBarrier barrier = new CyclicBarrier(JOIN_NODES + 1); + + clientFlagGlobal = true; + + IgniteInternalFuture<?> fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + int idx = startIdx.getAndIncrement(); + + Thread.currentThread().setName("start-thread-" + idx); + + barrier.await(); + + Ignite ignite = startGrid(idx); + assertTrue(ignite.configuration().isClientMode()); + + log.info("Started node: " + ignite.name()); + + IgniteCache<Object, Object> cache = ignite.getOrCreateCache((String)null); + + for (int i = 0; i < 10; i++) { + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + // No-op. + } + }); + + cache.query(qry); + } + + return null; + } + }, JOIN_NODES, "start-thread"); + + barrier.await(); + + U.sleep(ThreadLocalRandom.current().nextInt(100, 500)); + + for (int i = 0; i < START_NODES - 1; i++) { + GridTestUtils.invoke(ignite(i).configuration().getDiscoverySpi(), "simulateNodeFailure"); + + stopGrid(i); + } + + fut.get(); + + stopAllGrids(); + } + } + + /** + * @throws Exception If failed. + */ + public void _testCustomEventNodeRestart() throws Exception { + clientFlagGlobal = false; + + Ignite ignite = startGrid(0); + + ignite.getOrCreateCache(new CacheConfiguration<>()); + + final long stopTime = System.currentTimeMillis() + 60_000; + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + try { + while (System.currentTimeMillis() < stopTime) { + Ignite ignite = startGrid(idx + 1); + + IgniteCache<Object, Object> cache = ignite.cache(null); + + int qryCnt = ThreadLocalRandom.current().nextInt(10) + 1; + + for (int i = 0; i < qryCnt; i++) { + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + // No-op. + } + }); + + QueryCursor<Cache.Entry<Object, Object>> cur = cache.query(qry); + + cur.close(); + } + + GridTestUtils.invoke(ignite.configuration().getDiscoverySpi(), "simulateNodeFailure"); + + ignite.close(); + } + } + catch (Exception e) { + log.error("Unexpected error: " + e, e); + + throw new IgniteException(e); + } + } + }, 5, "node-restart"); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2e65beb4/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 96c0e66..d5ea46c 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -58,6 +58,7 @@ import org.apache.ignite.internal.processors.port.GridPortRecord; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; @@ -68,6 +69,7 @@ import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; +import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics; import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryAbstractMessage; @@ -109,7 +111,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** */ private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>(); - private GridStringLogger strLogger; + /** */ + private GridStringLogger strLog; /** * @throws Exception If fails. @@ -197,10 +200,10 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureSegmentedNode")) { cfg.setFailureDetectionTimeout(6_000); - cfg.setGridLogger(strLogger = new GridStringLogger()); + cfg.setGridLogger(strLog = new GridStringLogger()); } else if (gridName.contains("testNodeShutdownOnRingMessageWorkerFailureFailedNode")) - cfg.setGridLogger(strLogger = new GridStringLogger()); + cfg.setGridLogger(strLog = new GridStringLogger()); return cfg; } @@ -1438,7 +1441,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { assertTrue(disconnected.get()); - String result = strLogger.toString(); + String result = strLog.toString(); assert result.contains("TcpDiscoverSpi's message worker thread failed abnormally") : result; } @@ -1520,7 +1523,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { Thread.sleep(10_000); - String result = strLogger.toString(); + String result = strLog.toString(); assert result.contains("Local node SEGMENTED") && !result.contains("TcpDiscoverSpi's message worker thread failed abnormally") : result; @@ -1910,6 +1913,54 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } /** + * @throws Exception If failed. + */ + public void testNoExtraNodeFailedMessage() throws Exception { + try { + final int NODES = 10; + + startGridsMultiThreaded(NODES); + + int stopIdx = 5; + + Ignite failIgnite = ignite(stopIdx); + + ((TcpDiscoverySpi)failIgnite.configuration().getDiscoverySpi()).simulateNodeFailure(); + + for (int i = 0; i < NODES; i++) { + if (i != stopIdx) { + final Ignite ignite = ignite(i); + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return ignite.cluster().topologyVersion() >= NODES + 1; + } + }, 10_000); + + TcpDiscoverySpi spi = (TcpDiscoverySpi)ignite.configuration().getDiscoverySpi(); + + TcpDiscoveryStatistics stats = GridTestUtils.getFieldValue(spi, "stats"); + + Integer cnt = stats.sentMessages().get(TcpDiscoveryNodeFailedMessage.class.getSimpleName()); + + log.info("Count1: " + cnt); + + assertTrue("Invalid message count: " + cnt, cnt == null || cnt <= 2); + + cnt = stats.receivedMessages().get(TcpDiscoveryNodeFailedMessage.class.getSimpleName()); + + log.info("Count2: " + cnt); + + assertTrue("Invalid message count: " + cnt, cnt == null || cnt <= 2); + } + } + } + finally { + stopAllGrids(); + } + } + + /** * @param nodeName Node name. * @throws Exception If failed. */ @@ -1925,7 +1976,7 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { return true; } } - }, 10_000); + }, 30_000); if (!wait) U.dumpThreads(log);
