Repository: ignite Updated Branches: refs/heads/ignite-1758 121d1e0d8 -> 08f9994db
ignite-1758 debug Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/08f9994d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/08f9994d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/08f9994d Branch: refs/heads/ignite-1758 Commit: 08f9994dbc80ef74fd3e9a34fea89f7061f26f7e Parents: 121d1e0 Author: sboikov <[email protected]> Authored: Mon Nov 9 12:11:34 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 9 12:35:22 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 30 ++++++++++++++++++-- .../messages/TcpDiscoveryAbstractMessage.java | 14 +++++++++ .../tcp/TcpDiscoveryMultiThreadedTest.java | 4 +-- 3 files changed, 42 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/08f9994d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 810690f..971e069 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2154,6 +2154,19 @@ class ServerImpl extends TcpDiscoveryImpl { spi.stats.onMessageProcessingStarted(msg); + if (msg.failedNodes() != null) { + for (UUID nodeId : msg.failedNodes()) { + TcpDiscoveryNode failedNode = ring.node(nodeId); + + if (failedNode != null) { + boolean add = failedNodes.add(failedNode); + + if (add) + debugLog(null, "New failed node [node=" + failedNode + ", msg=" + msg + ']'); + } + } + } + if (msg instanceof TcpDiscoveryJoinRequestMessage) processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg); @@ -2334,7 +2347,9 @@ class ServerImpl extends TcpDiscoveryImpl { if (debugMode) debugLog(null, "New next node [newNext=" + newNext + ", formerNext=" + next + - ", ring=" + ring + ", failedNodes=" + failedNodes + ']'); + ", ring=" + ring + + ", msg=" + msg + + ", failedNodes=" + failedNodes + ']'); U.closeQuiet(sock); @@ -2580,6 +2595,15 @@ class ServerImpl extends TcpDiscoveryImpl { if (timeoutHelper == null) timeoutHelper = new IgniteSpiOperationTimeoutHelper(spi); + if (!failedNodes.isEmpty()) { + List<UUID> failedNodeIds = new ArrayList<>(failedNodes.size()); + + for (TcpDiscoveryNode node : failedNodes) + failedNodeIds.add(node.id()); + + msg.failedNodes(failedNodeIds); + } + writeToSocket(sock, msg, timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout())); spi.stats.onMessageSent(msg, U.currentTimeMillis() - tstamp); @@ -4729,10 +4753,10 @@ class ServerImpl extends TcpDiscoveryImpl { synchronized (mux) { readers.add(reader); - - reader.start(); } + reader.start(); + spi.stats.onServerSocketInitialized(U.currentTimeMillis() - tstamp); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/08f9994d/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index 875d18e..6fe9dd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -19,10 +19,13 @@ package org.apache.ignite.spi.discovery.tcp.messages; import java.io.Externalizable; import java.io.Serializable; +import java.util.Collection; +import java.util.List; import java.util.UUID; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; /** * Base class to implement discovery messages. @@ -62,6 +65,9 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { /** Pending message index. */ private short pendingIdx; + /** */ + private Collection<UUID> failedNodes; + /** * Default no-arg constructor for {@link Externalizable} interface. */ @@ -236,6 +242,14 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { return false; } + public void failedNodes(Collection<UUID> failedNodes) { + this.failedNodes = failedNodes; + } + + @Nullable public Collection<UUID> failedNodes() { + return failedNodes; + } + /** {@inheritDoc} */ @Override public final boolean equals(Object obj) { if (this == obj) http://git-wip-us.apache.org/repos/asf/ignite/blob/08f9994d/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java index 70678d7..72549b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java @@ -113,12 +113,10 @@ public class TcpDiscoveryMultiThreadedTest extends GridCommonAbstractTest { if (client()) cfg.setClientMode(true); - cfg.setFailureDetectionTimeout(30_000); - cfg.setDiscoverySpi(new TcpDiscoverySpi(). setIpFinder(ipFinder). setJoinTimeout(60_000). - setNetworkTimeout(30_000)); + setNetworkTimeout(10_000)); int[] evts = {EVT_NODE_FAILED, EVT_NODE_LEFT};
