Repository: ignite
Updated Branches:
  refs/heads/ignite-1758 52940badd -> b0303d746


ignite-1758


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b0303d74
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b0303d74
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b0303d74

Branch: refs/heads/ignite-1758
Commit: b0303d746894afbc9d3cf7fe87e9dc55f1928376
Parents: 52940ba
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Oct 29 16:51:07 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Oct 29 17:00:53 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 104 ++++++++++++++-----
 .../messages/TcpDiscoveryAbstractMessage.java   |  11 ++
 .../messages/TcpDiscoveryNodeAddedMessage.java  |  43 +++++++-
 .../distributed/IgniteCacheManyClientsTest.java |  14 +--
 4 files changed, 135 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b0303d74/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index a4fb6fd..b843e10 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1396,6 +1396,21 @@ class ServerImpl extends TcpDiscoveryImpl {
             TcpDiscoveryNode node = nodeAddedMsg.node();
 
             if (node.id().equals(destNodeId)) {
+                Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
+                Collection<TcpDiscoveryNode> topToSnd = new 
ArrayList<>(allNodes.size());
+
+                for (TcpDiscoveryNode n0 : allNodes) {
+                    assert n0.internalOrder() != 0 : n0;
+
+                    // Skip next node and nodes added after next
+                    // in case this message is resent due to failures/leaves.
+                    // There will be separate messages for nodes with greater
+                    // internal order.
+                    if (n0.internalOrder() < 
nodeAddedMsg.node().internalOrder())
+                        topToSnd.add(n0);
+                }
+
+                nodeAddedMsg.topology(topToSnd);
                 nodeAddedMsg.messages(msgs, discardMsgId, discardCustomMsgId);
 
                 Map<Long, Collection<ClusterNode>> hist;
@@ -1417,6 +1432,7 @@ class ServerImpl extends TcpDiscoveryImpl {
             // Nullify topology before registration.
             TcpDiscoveryNodeAddedMessage nodeAddedMsg = 
(TcpDiscoveryNodeAddedMessage)msg;
 
+            nodeAddedMsg.topology(null);
             nodeAddedMsg.topologyHistory(null);
             nodeAddedMsg.messages(null, null, null);
         }
@@ -1751,6 +1767,27 @@ class ServerImpl extends TcpDiscoveryImpl {
         void add(TcpDiscoveryAbstractMessage msg) {
             assert spi.ensured(msg) && msg.verified() : msg;
 
+            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                TcpDiscoveryNodeAddedMessage addedMsg = 
(TcpDiscoveryNodeAddedMessage)msg;
+
+                TcpDiscoveryNode node = addedMsg.node();
+
+                if (node.isClient() && !msgs.contains(msg)) {
+                    Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
+
+                    Collection<TcpDiscoveryNode> top = new 
ArrayList<>(allNodes.size());
+
+                    for (TcpDiscoveryNode n0 : allNodes) {
+                        assert n0.internalOrder() > 0 : n0;
+
+                        if (n0.internalOrder() < node.internalOrder())
+                            top.add(n0);
+                    }
+
+                    addedMsg.clientTopology(top);
+                }
+            }
+
             msgs.add(msg);
         }
 
@@ -1774,7 +1811,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 for (TcpDiscoveryAbstractMessage msg : msgs) {
                     if (msg instanceof TcpDiscoveryNodeAddedMessage) {
-                        if (node.id().equals(((TcpDiscoveryNodeAddedMessage) 
msg).node().id()))
+                        if 
(node.id().equals(((TcpDiscoveryNodeAddedMessage)msg).node().id()))
                             res = new ArrayList<>(msgs.size());
                     }
 
@@ -1827,8 +1864,21 @@ class ServerImpl extends TcpDiscoveryImpl {
          * @return Prepared message.
          */
         private TcpDiscoveryAbstractMessage 
prepare(TcpDiscoveryAbstractMessage msg, UUID destNodeId) {
-            if (msg instanceof TcpDiscoveryNodeAddedMessage)
-                prepareNodeAddedMessage(msg, destNodeId, null, null, null);
+            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                TcpDiscoveryNodeAddedMessage addedMsg = 
(TcpDiscoveryNodeAddedMessage)msg;
+
+                if (addedMsg.node().id().equals(destNodeId)) {
+                    assert addedMsg.clientTopology() != null : addedMsg;
+
+                    TcpDiscoveryNodeAddedMessage msg0 = new 
TcpDiscoveryNodeAddedMessage(addedMsg);
+
+                    prepareNodeAddedMessage(msg0, destNodeId, null, null, 
null);
+
+                    msg0.topology(addedMsg.clientTopology());
+
+                    return msg0;
+                }
+            }
 
             return msg;
         }
@@ -2175,6 +2225,8 @@ class ServerImpl extends TcpDiscoveryImpl {
                         msgClone = msg;
                     }
 
+                    prepareNodeAddedMessage(msgClone, 
clientMsgWorker.clientNodeId, null, null, null);
+
                     clientMsgWorker.addMessage(msgClone);
                 }
             }
@@ -3255,19 +3307,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
 
                 msg.verify(locNodeId);
-
-                Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
-
-                Collection<TcpDiscoveryNode> top = new 
ArrayList<>(allNodes.size());
-
-                for (TcpDiscoveryNode n0 : allNodes) {
-                    assert n0.internalOrder() > 0 : n0;
-
-                    if (n0.internalOrder() < node.internalOrder())
-                        top.add(n0);
-                }
-
-                msg.topology(top);
             }
             else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != 
null) {
                 // Local node already has node from message in local topology.
@@ -3431,6 +3470,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                             // Clear data to minimize message size.
                             msg.messages(null, null, null);
+                            msg.topology(null);
                             msg.topologyHistory(null);
                             msg.clearDiscoveryData();
                         }
@@ -5338,19 +5378,14 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
                 else {
-                    try {
-                        if (log.isDebugEnabled())
-                            log.debug("Redirecting message to client [sock=" + 
sock + ", locNodeId="
-                                + getLocalNodeId() + ", rmtNodeId=" + 
clientNodeId + ", msg=" + msg + ']');
+                    if (log.isDebugEnabled())
+                        log.debug("Redirecting message to client [sock=" + 
sock + ", locNodeId="
+                            + getLocalNodeId() + ", rmtNodeId=" + clientNodeId 
+ ", msg=" + msg + ']');
 
-                        prepareNodeAddedMessage(msg, clientNodeId, null, null, 
null);
+                    assert topologyInitialized(msg) : msg;
 
-                        writeToSocket(sock, msg, 
spi.failureDetectionTimeoutEnabled() ?
-                            spi.failureDetectionTimeout() : 
spi.getSocketTimeout());
-                    }
-                    finally {
-                        clearNodeAddedMessage(msg);
-                    }
+                    writeToSocket(sock, msg, 
spi.failureDetectionTimeoutEnabled() ?
+                        spi.failureDetectionTimeout() : 
spi.getSocketTimeout());
                 }
             }
             catch (IgniteCheckedException | IOException e) {
@@ -5370,6 +5405,21 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @param msg Message.
+         * @return {@code True} if topology initialized.
+         */
+        private boolean topologyInitialized(TcpDiscoveryAbstractMessage msg) {
+            if (msg instanceof TcpDiscoveryNodeAddedMessage) {
+                TcpDiscoveryNodeAddedMessage addedMsg = 
(TcpDiscoveryNodeAddedMessage)msg;
+
+                if (clientNodeId.equals(addedMsg.node().id()))
+                    return addedMsg.topology() != null;
+            }
+
+            return true;
+        }
+
+        /**
          * @param res Ping result.
          */
         public void pingResult(boolean res) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0303d74/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
index c50f791..875d18e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryAbstractMessage.java
@@ -79,6 +79,17 @@ public abstract class TcpDiscoveryAbstractMessage implements 
Serializable {
     }
 
     /**
+     * @param msg Message.
+     */
+    protected TcpDiscoveryAbstractMessage(TcpDiscoveryAbstractMessage msg) {
+        this.id = msg.id;
+        this.verifierNodeId = msg.verifierNodeId;
+        this.topVer = msg.topVer;
+        this.flags = msg.flags;
+        this.pendingIdx = msg.pendingIdx;
+    }
+
+    /**
      * Gets creator node.
      *
      * @return Creator node ID.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0303d74/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
index 6ce362c..6f8e14e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddedMessage.java
@@ -55,6 +55,10 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractMessage {
     @GridToStringInclude
     private Collection<TcpDiscoveryNode> top;
 
+    /** */
+    @GridToStringInclude
+    private transient Collection<TcpDiscoveryNode> clientTop;
+
     /** Topology snapshots history. */
     private Map<Long, Collection<ClusterNode>> topHist;
 
@@ -93,6 +97,24 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractMessage {
     }
 
     /**
+     * @param msg Message.
+     */
+    public TcpDiscoveryNodeAddedMessage(TcpDiscoveryNodeAddedMessage msg) {
+        super(msg);
+
+        this.node = msg.node;
+        this.msgs = msg.msgs;
+        this.discardMsgId = msg.discardMsgId;
+        this.discardCustomMsgId = msg.discardCustomMsgId;
+        this.top = msg.top;
+        this.clientTop = msg.clientTop;
+        this.topHist = msg.topHist;
+        this.newNodeDiscoData = msg.newNodeDiscoData;
+        this.oldNodesDiscoData = msg.oldNodesDiscoData;
+        this.gridStartTime = msg.gridStartTime;
+    }
+
+    /**
      * Gets newly added node.
      *
      * @return New node.
@@ -133,6 +155,7 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractMessage {
      *
      * @param msgs Pending messages to send to new node.
      * @param discardMsgId Discarded message ID.
+     * @param discardCustomMsgId Discarded custom message ID.
      */
     public void messages(
         @Nullable Collection<TcpDiscoveryAbstractMessage> msgs,
@@ -158,13 +181,27 @@ public class TcpDiscoveryNodeAddedMessage extends 
TcpDiscoveryAbstractMessage {
      *
      * @param top Current topology.
      */
-    public void topology(Collection<TcpDiscoveryNode> top) {
-        assert top != null;
-
+    public void topology(@Nullable Collection<TcpDiscoveryNode> top) {
         this.top = top;
     }
 
     /**
+     * @param top Topology at the moment when client joined.
+     */
+    public void clientTopology(Collection<TcpDiscoveryNode> top) {
+        assert top != null && !top.isEmpty() : top;
+
+        this.clientTop = top;
+    }
+
+    /**
+     * @return Topology at the moment when client joined.
+     */
+    public Collection<TcpDiscoveryNode> clientTopology() {
+        return clientTop;
+    }
+
+    /**
      * Gets topology snapshots history.
      *
      * @return Map with topology snapshots history.

http://git-wip-us.apache.org/repos/asf/ignite/blob/b0303d74/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
index 78fc590..242b12d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheManyClientsTest.java
@@ -113,13 +113,6 @@ public class IgniteCacheManyClientsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    public void testManyClients() throws Throwable {
-        manyClientsPutGet();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
     public void testManyClientsClientDiscovery() throws Throwable {
         clientDiscovery = true;
 
@@ -138,6 +131,13 @@ public class IgniteCacheManyClientsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    public void testManyClientsForceServerMode() throws Throwable {
+        manyClientsPutGet();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     private void manyClientsSequentially() throws Exception {
         client = true;
 

Reply via email to