Repository: ignite Updated Branches: refs/heads/ignite-1758 52940badd -> b0303d746
ignite-1758 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0303d74 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0303d74 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0303d74 Branch: refs/heads/ignite-1758 Commit: b0303d746894afbc9d3cf7fe87e9dc55f1928376 Parents: 52940ba Author: sboikov <sboi...@gridgain.com> Authored: Thu Oct 29 16:51:07 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Oct 29 17:00:53 2015 +0300 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 104 ++++++++++++++----- .../messages/TcpDiscoveryAbstractMessage.java | 11 ++ .../messages/TcpDiscoveryNodeAddedMessage.java | 43 +++++++- .../distributed/IgniteCacheManyClientsTest.java | 14 +-- 4 files changed, 135 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b0303d74/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index a4fb6fd..b843e10 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -1396,6 +1396,21 @@ class ServerImpl extends TcpDiscoveryImpl { TcpDiscoveryNode node = nodeAddedMsg.node(); if (node.id().equals(destNodeId)) { + Collection<TcpDiscoveryNode> allNodes = ring.allNodes(); + Collection<TcpDiscoveryNode> topToSnd = new ArrayList<>(allNodes.size()); + + for (TcpDiscoveryNode n0 : allNodes) { + assert n0.internalOrder() != 0 : n0; + + // Skip next node and nodes added after next + // in case this message is resent due to failures/leaves. + // There will be separate messages for nodes with greater + // internal order. + if (n0.internalOrder() < nodeAddedMsg.node().internalOrder()) + topToSnd.add(n0); + } + + nodeAddedMsg.topology(topToSnd); nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId); Map<Long, Collection<ClusterNode>> hist; @@ -1417,6 +1432,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Nullify topology before registration. TcpDiscoveryNodeAddedMessage nodeAddedMsg = (TcpDiscoveryNodeAddedMessage)msg; + nodeAddedMsg.topology(null); nodeAddedMsg.topologyHistory(null); nodeAddedMsg.messages(null, null, null); } @@ -1751,6 +1767,27 @@ class ServerImpl extends TcpDiscoveryImpl { void add(TcpDiscoveryAbstractMessage msg) { assert spi.ensured(msg) && msg.verified() : msg; + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg; + + TcpDiscoveryNode node = addedMsg.node(); + + if (node.isClient() && !msgs.contains(msg)) { + Collection<TcpDiscoveryNode> allNodes = ring.allNodes(); + + Collection<TcpDiscoveryNode> top = new ArrayList<>(allNodes.size()); + + for (TcpDiscoveryNode n0 : allNodes) { + assert n0.internalOrder() > 0 : n0; + + if (n0.internalOrder() < node.internalOrder()) + top.add(n0); + } + + addedMsg.clientTopology(top); + } + } + msgs.add(msg); } @@ -1774,7 +1811,7 @@ class ServerImpl extends TcpDiscoveryImpl { for (TcpDiscoveryAbstractMessage msg : msgs) { if (msg instanceof TcpDiscoveryNodeAddedMessage) { - if (node.id().equals(((TcpDiscoveryNodeAddedMessage) msg).node().id())) + if (node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id())) res = new ArrayList<>(msgs.size()); } @@ -1827,8 +1864,21 @@ class ServerImpl extends TcpDiscoveryImpl { * @return Prepared message. */ private TcpDiscoveryAbstractMessage prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) { - if (msg instanceof TcpDiscoveryNodeAddedMessage) - prepareNodeAddedMessage(msg, destNodeId, null, null, null); + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg; + + if (addedMsg.node().id().equals(destNodeId)) { + assert addedMsg.clientTopology() != null : addedMsg; + + TcpDiscoveryNodeAddedMessage msg0 = new TcpDiscoveryNodeAddedMessage(addedMsg); + + prepareNodeAddedMessage(msg0, destNodeId, null, null, null); + + msg0.topology(addedMsg.clientTopology()); + + return msg0; + } + } return msg; } @@ -2175,6 +2225,8 @@ class ServerImpl extends TcpDiscoveryImpl { msgClone = msg; } + prepareNodeAddedMessage(msgClone, clientMsgWorker.clientNodeId, null, null, null); + clientMsgWorker.addMessage(msgClone); } } @@ -3255,19 +3307,6 @@ class ServerImpl extends TcpDiscoveryImpl { } msg.verify(locNodeId); - - Collection<TcpDiscoveryNode> allNodes = ring.allNodes(); - - Collection<TcpDiscoveryNode> top = new ArrayList<>(allNodes.size()); - - for (TcpDiscoveryNode n0 : allNodes) { - assert n0.internalOrder() > 0 : n0; - - if (n0.internalOrder() < node.internalOrder()) - top.add(n0); - } - - msg.topology(top); } else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != null) { // Local node already has node from message in local topology. @@ -3431,6 +3470,7 @@ class ServerImpl extends TcpDiscoveryImpl { // Clear data to minimize message size. msg.messages(null, null, null); + msg.topology(null); msg.topologyHistory(null); msg.clearDiscoveryData(); } @@ -5338,19 +5378,14 @@ class ServerImpl extends TcpDiscoveryImpl { } } else { - try { - if (log.isDebugEnabled()) - log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" - + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); + if (log.isDebugEnabled()) + log.debug("Redirecting message to client [sock=" + sock + ", locNodeId=" + + getLocalNodeId() + ", rmtNodeId=" + clientNodeId + ", msg=" + msg + ']'); - prepareNodeAddedMessage(msg, clientNodeId, null, null, null); + assert topologyInitialized(msg) : msg; - writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? - spi.failureDetectionTimeout() : spi.getSocketTimeout()); - } - finally { - clearNodeAddedMessage(msg); - } + writeToSocket(sock, msg, spi.failureDetectionTimeoutEnabled() ? + spi.failureDetectionTimeout() : spi.getSocketTimeout()); } } catch (IgniteCheckedException | IOException e) { @@ -5370,6 +5405,21 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * @param msg Message. + * @return {@code True} if topology initialized. + */ + private boolean topologyInitialized(TcpDiscoveryAbstractMessage msg) { + if (msg instanceof TcpDiscoveryNodeAddedMessage) { + TcpDiscoveryNodeAddedMessage addedMsg = (TcpDiscoveryNodeAddedMessage)msg; + + if (clientNodeId.equals(addedMsg.node().id())) + return addedMsg.topology() != null; + } + + return true; + } + + /** * @param res Ping result. */ public void pingResult(boolean res) { http://git-wip-us.apache.org/repos/asf/ignite/blob/b0303d74/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java index c50f791..875d18e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java @@ -79,6 +79,17 @@ public abstract class TcpDiscoveryAbstractMessage implements Serializable { } /** + * @param msg Message. + */ + protected TcpDiscoveryAbstractMessage(TcpDiscoveryAbstractMessage msg) { + this.id = msg.id; + this.verifierNodeId = msg.verifierNodeId; + this.topVer = msg.topVer; + this.flags = msg.flags; + this.pendingIdx = msg.pendingIdx; + } + + /** * Gets creator node. * * @return Creator node ID. http://git-wip-us.apache.org/repos/asf/ignite/blob/b0303d74/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java index 6ce362c..6f8e14e 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java @@ -55,6 +55,10 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { @GridToStringInclude private Collection<TcpDiscoveryNode> top; + /** */ + @GridToStringInclude + private transient Collection<TcpDiscoveryNode> clientTop; + /** Topology snapshots history. */ private Map<Long, Collection<ClusterNode>> topHist; @@ -93,6 +97,24 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { } /** + * @param msg Message. + */ + public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) { + super(msg); + + this.node = msg.node; + this.msgs = msg.msgs; + this.discardMsgId = msg.discardMsgId; + this.discardCustomMsgId = msg.discardCustomMsgId; + this.top = msg.top; + this.clientTop = msg.clientTop; + this.topHist = msg.topHist; + this.newNodeDiscoData = msg.newNodeDiscoData; + this.oldNodesDiscoData = msg.oldNodesDiscoData; + this.gridStartTime = msg.gridStartTime; + } + + /** * Gets newly added node. * * @return New node. @@ -133,6 +155,7 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { * * @param msgs Pending messages to send to new node. * @param discardMsgId Discarded message ID. + * @param discardCustomMsgId Discarded custom message ID. */ public void messages( @Nullable Collection<TcpDiscoveryAbstractMessage> msgs, @@ -158,13 +181,27 @@ public class TcpDiscoveryNodeAddedMessage extends TcpDiscoveryAbstractMessage { * * @param top Current topology. */ - public void topology(Collection<TcpDiscoveryNode> top) { - assert top != null; - + public void topology(@Nullable Collection<TcpDiscoveryNode> top) { this.top = top; } /** + * @param top Topology at the moment when client joined. + */ + public void clientTopology(Collection<TcpDiscoveryNode> top) { + assert top != null && !top.isEmpty() : top; + + this.clientTop = top; + } + + /** + * @return Topology at the moment when client joined. + */ + public Collection<TcpDiscoveryNode> clientTopology() { + return clientTop; + } + + /** * Gets topology snapshots history. * * @return Map with topology snapshots history. http://git-wip-us.apache.org/repos/asf/ignite/blob/b0303d74/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java index 78fc590..242b12d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java @@ -113,13 +113,6 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void testManyClients() throws Throwable { - manyClientsPutGet(); - } - - /** - * @throws Exception If failed. - */ public void testManyClientsClientDiscovery() throws Throwable { clientDiscovery = true; @@ -138,6 +131,13 @@ public class IgniteCacheManyClientsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testManyClientsForceServerMode() throws Throwable { + manyClientsPutGet(); + } + + /** + * @throws Exception If failed. + */ private void manyClientsSequentially() throws Exception { client = true;