Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-opts-2conn [created] c9f62b815


ignite-comm-opts2


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9f62b81
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9f62b81
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9f62b81

Branch: refs/heads/ignite-comm-opts-2conn
Commit: c9f62b815571ab6192736154ef7b90e8e46e95b3
Parents: c81e0d9
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Sep 16 12:33:21 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Sep 16 12:33:21 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   |  40 +--
 .../managers/communication/GridIoMessage.java   |   4 +
 .../ignite/internal/util/nio/GridNioServer.java |   3 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 244 +++++++++++++------
 .../spi/GridTcpSpiForwardingSelfTest.java       |   4 +-
 5 files changed, 193 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 9e547ca..f869c5a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -529,26 +529,26 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Serializa
             }
         }
 
-        Thread t = new Thread(
-            new Runnable() {
-                @Override public void run() {
-                    for (;;) {
-                        try {
-                            Thread.sleep(5000);
-                        }
-                        catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-
-                        dumpStats();
-                    }
-                }
-            }
-        );
-
-        t.setDaemon(true);
-
-        t.start();
+//        Thread t = new Thread(
+//            new Runnable() {
+//                @Override public void run() {
+//                    for (;;) {
+//                        try {
+//                            Thread.sleep(5000);
+//                        }
+//                        catch (InterruptedException e) {
+//                            e.printStackTrace();
+//                        }
+//
+//                        dumpStats();
+//                    }
+//                }
+//            }
+//        );
+//
+//        t.setDaemon(true);
+//
+//        t.start();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index b28ced2..bce4f7f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -161,6 +161,10 @@ public class GridIoMessage implements Message {
         return skipOnTimeout;
     }
 
+    public int connectionIndex() {
+        return super.hashCode();
+    }
+
     /**
      * @return {@code True} if message is ordered, {@code false} otherwise.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index a2449f8..fbfb5cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -1529,7 +1529,8 @@ public class GridNioServer<T> {
 
                                     sb.append("    Connection info [")
                                         
.append("rmtAddr=").append(ses.remoteAddress())
-                                        .append(", 
locAddr=").append(ses.localAddress());
+                                        .append(", 
locAddr=").append(ses.localAddress())
+                                        .append(", 
in=").append(ses.accepted());
 
                                     GridNioRecoveryDescriptor desc = 
ses.recoveryDescriptor();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 3292412..9110add 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -62,6 +62,7 @@ import 
org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.util.GridConcurrentFactory;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
@@ -350,7 +351,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
-                UUID id = ses.meta(NODE_ID_META);
+                ConnectionId id = ses.meta(NODE_ID_META);
 
                 if (id != null) {
                     GridCommunicationClient client = clients.get(id);
@@ -368,7 +369,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         GridNioRecoveryDescriptor recoveryData = 
ses.recoveryDescriptor();
 
                         if (recoveryData != null) {
-                            if 
(recoveryData.nodeAlive(getSpiContext().node(id))) {
+                            if 
(recoveryData.nodeAlive(getSpiContext().node(id.id))) {
                                 if (!recoveryData.messagesFutures().isEmpty()) 
{
                                     reconnect = true;
 
@@ -381,7 +382,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                 recoveryData.onNodeLeft();
                         }
 
-                        DisconnectedSessionInfo disconnectData = new 
DisconnectedSessionInfo(recoveryData,
+                        DisconnectedSessionInfo disconnectData = new 
DisconnectedSessionInfo(id,
+                            recoveryData,
                             reconnect);
 
                         commWorker.addProcessDisconnectRequest(disconnectData);
@@ -390,7 +392,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     CommunicationListener<Message> lsnr0 = lsnr;
 
                     if (lsnr0 != null)
-                        lsnr0.onDisconnected(id);
+                        lsnr0.onDisconnected(id.id);
                 }
             }
 
@@ -399,24 +401,24 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
              * @param msg Message.
              */
             private void onFirstMessage(GridNioSession ses, Message msg) {
-                UUID sndId;
+                ConnectionId sndId;
 
                 if (msg instanceof NodeIdMessage)
-                    sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+                    sndId = new ConnectionId(-1, 
U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0));
                 else {
                     assert msg instanceof HandshakeMessage : msg;
 
-                    sndId = ((HandshakeMessage)msg).nodeId();
+                    sndId = new ConnectionId(((HandshakeMessage)msg).idx, 
((HandshakeMessage)msg).nodeId());
                 }
 
                 if (log.isDebugEnabled())
                     log.debug("Remote node ID received: " + sndId);
 
-                final UUID old = ses.addMeta(NODE_ID_META, sndId);
+                final ConnectionId old = ses.addMeta(NODE_ID_META, sndId);
 
                 assert old == null;
 
-                final ClusterNode rmtNode = getSpiContext().node(sndId);
+                final ClusterNode rmtNode = getSpiContext().node(sndId.id);
 
                 if (rmtNode == null) {
                     if (log.isDebugEnabled())
@@ -462,7 +464,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 HandshakeMessage msg0 = (HandshakeMessage)msg;
 //
-                final GridNioRecoveryDescriptor recoveryDesc = 
recoveryDescriptor(rmtNode);
+                final GridNioRecoveryDescriptor recoveryDesc = 
recoveryDescriptor(rmtNode, sndId);
 //
 //                if (oldFut == null) {
 //                    oldClient = clients.get(sndId);
@@ -520,16 +522,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 //                    else {
                         // The code below causes a race condition between 
shmem and TCP (see IGNITE-1294)
                         boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
-                                new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, !hasShmemClient, fut));
+                                new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, !hasShmemClient, fut, sndId));
 
                         if (reserved)
-                            connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true, false);
+                            connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true, false, sndId);
 //                    }
 //                }
             }
 
             @Override public void onMessage(GridNioSession ses, Message msg) {
-                UUID sndId = ses.meta(NODE_ID_META);
+                ConnectionId sndId = ses.meta(NODE_ID_META);
 
                 if (sndId == null) {
                     assert ses.accepted() : ses;
@@ -601,7 +603,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 //                    else
                         c = NOOP;
 
-                    notifyListener(sndId, msg, c);
+                    notifyListener(sndId.id, msg, c);
                 }
             }
 
@@ -620,7 +622,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 ClusterNode node,
                 long rcvCnt,
                 boolean sndRes,
-                boolean createClient) {
+                boolean createClient,
+                ConnectionId connId) {
+                assert node.id().equals(connId.id);
+
                 recovery.onHandshake(rcvCnt);
 
                 ses.recoveryDescriptor(recovery);
@@ -637,7 +642,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 if (createClient) {
                     client = new GridTcpNioCommunicationClient(ses, log);
 
-                    GridCommunicationClient oldClient = 
clients.putIfAbsent(node.id(), client);
+                    GridCommunicationClient oldClient = 
clients.putIfAbsent(connId, client);
 
                     assert oldClient == null : "Client already created [node=" 
+ node + ", client=" + client +
                         ", oldClient=" + oldClient + ", recoveryDesc=" + 
recovery + ']';
@@ -672,6 +677,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** */
                 private final boolean createClient;
 
+                private final ConnectionId connId;
+
                 /**
                  * @param ses Incoming session.
                  * @param recoveryDesc Recovery descriptor.
@@ -685,13 +692,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     ClusterNode rmtNode,
                     HandshakeMessage msg,
                     boolean createClient,
-                    GridFutureAdapter<GridCommunicationClient> fut) {
+                    GridFutureAdapter<GridCommunicationClient> fut,
+                    ConnectionId connId) {
                     this.ses = ses;
                     this.recoveryDesc = recoveryDesc;
                     this.rmtNode = rmtNode;
                     this.msg = msg;
                     this.createClient = createClient;
                     this.fut = fut;
+                    this.connId = connId;
                 }
 
                 /** {@inheritDoc} */
@@ -703,7 +712,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                     msgFut.get();
 
                                     GridTcpNioCommunicationClient client =
-                                        connected(recoveryDesc, ses, rmtNode, 
msg.received(), false, createClient);
+                                        connected(recoveryDesc, ses, rmtNode, 
msg.received(), false, createClient, connId);
 
                                     fut.onDone(client);
                                 }
@@ -717,7 +726,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                     fut.onDone();
                                 }
                                 finally {
-                                    clientFuts.remove(rmtNode.id(), fut);
+                                    clientFuts.remove(connId, fut);
                                 }
                             }
                         };
@@ -729,7 +738,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             fut.onDone();
                         }
                         finally {
-                            clientFuts.remove(rmtNode.id(), fut);
+                            clientFuts.remove(connId, fut);
                         }
                     }
                 }
@@ -814,7 +823,60 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     private final Collection<ShmemWorker> shmemWorkers = new 
ConcurrentLinkedDeque8<>();
 
     /** Clients. */
-    private final ConcurrentMap<UUID, GridCommunicationClient> clients = 
GridConcurrentFactory.newMap();
+    private final ConcurrentMap<ConnectionId, GridCommunicationClient> clients 
= GridConcurrentFactory.newMap();
+
+    /** */
+    private int connectionsPerNode = 2;
+
+    public int getConnectionsPerNode() {
+        return connectionsPerNode;
+    }
+
+    public void setConnectionsPerNode(int connectionsPerNode) {
+        this.connectionsPerNode = connectionsPerNode;
+    }
+
+    /**
+     *
+     */
+    public static final class ConnectionId {
+        /** */
+        private final int idx;
+
+        /** */
+        private final UUID id;
+
+        /**
+         * @param idx Connection index.
+         * @param id Node ID.
+         */
+        ConnectionId(int idx, UUID id) {
+            this.idx = idx;
+            this.id = id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            ConnectionId clientId = (ConnectionId) o;
+
+            return idx == clientId.idx && id.equals(clientId.id);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = idx;
+            res = 31 * res + id.hashCode();
+            return res;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ConnectionId.class, this);
+        }
+    }
 
     /** SPI listener. */
     private volatile CommunicationListener<Message> lsnr;
@@ -861,7 +923,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     };
 
     /** Client connect futures. */
-    private final ConcurrentMap<UUID, 
GridFutureAdapter<GridCommunicationClient>> clientFuts =
+    private final ConcurrentMap<ConnectionId, 
GridFutureAdapter<GridCommunicationClient>> clientFuts =
         GridConcurrentFactory.newMap();
 
     /** */
@@ -1409,7 +1471,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             sb.append("Communication SPI clients: ").append(U.nl());
 
-            for (Map.Entry<UUID, GridCommunicationClient> entry : 
clients.entrySet()) {
+            for (Map.Entry<ConnectionId, GridCommunicationClient> entry : 
clients.entrySet()) {
                 sb.append("    [node=").append(entry.getKey())
                     .append(", client=").append(entry.getValue())
                     .append(']').append(U.nl());
@@ -1640,9 +1702,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionId rmtNodeId = ses.meta(NODE_ID_META);
 
-                        return rmtNodeId != null ? formatter.reader(rmtNodeId, 
msgFactory) : null;
+                        return rmtNodeId != null ? 
formatter.reader(rmtNodeId.id, msgFactory) : null;
                     }
                 };
 
@@ -1655,9 +1717,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionId rmtNodeId = ses.meta(NODE_ID_META);
 
-                        return rmtNodeId != null ? formatter.writer(rmtNodeId) 
: null;
+                        return rmtNodeId != null ? 
formatter.writer(rmtNodeId.id) : null;
                     }
                 };
 
@@ -1894,16 +1956,18 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
     void onNodeLeft(UUID nodeId) {
         assert nodeId != null;
 
-        GridCommunicationClient client = clients.get(nodeId);
+        for (Map.Entry<ConnectionId, GridCommunicationClient> client : 
clients.entrySet()) {
+            ConnectionId id = client.getKey();
 
-        if (client != null) {
-            if (log.isDebugEnabled())
-                log.debug("Forcing NIO client close since node has left 
[nodeId=" + nodeId +
-                    ", client=" + client + ']');
+            if (id.id.equals(nodeId)) {
+                if (log.isDebugEnabled())
+                    log.debug("Forcing NIO client close since node has left 
[nodeId=" + nodeId +
+                        ", client=" + client + ']');
 
-            client.forceClose();
+                client.getValue().forceClose();
 
-            clients.remove(nodeId, client);
+                clients.remove(id, client.getValue());
+            }
         }
     }
 
@@ -1978,11 +2042,13 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
         else {
             GridCommunicationClient client = null;
 
+            ConnectionId id = new 
ConnectionId(((GridIoMessage)msg).connectionIndex() % connectionsPerNode, 
node.id());
+
             try {
                 boolean retry;
 
                 do {
-                    client = reserveClient(node);
+                    client = reserveClient(node, id);
 
                     UUID nodeId = null;
 
@@ -2013,7 +2079,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 throw new IgniteSpiException("Failed to send message to remote 
node: " + node, e);
             }
             finally {
-                if (client != null && clients.remove(node.id(), client))
+                if (client != null && clients.remove(id, client))
                     client.forceClose();
             }
         }
@@ -2026,13 +2092,13 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
      * @return The existing or just created client.
      * @throws IgniteCheckedException Thrown if any exception occurs.
      */
-    private GridCommunicationClient reserveClient(ClusterNode node) throws 
IgniteCheckedException {
+    private GridCommunicationClient reserveClient(ClusterNode node, 
ConnectionId id) throws IgniteCheckedException {
         assert node != null;
 
         UUID nodeId = node.id();
 
         while (true) {
-            GridCommunicationClient client = clients.get(nodeId);
+            GridCommunicationClient client = clients.get(id);
 
             if (client == null) {
                 if (stopping)
@@ -2041,17 +2107,17 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
                 // Do not allow concurrent connects.
                 GridFutureAdapter<GridCommunicationClient> fut = new 
ConnectFuture();
 
-                GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(nodeId, fut);
+                GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(id, fut);
 
                 if (oldFut == null) {
                     try {
-                        GridCommunicationClient client0 = clients.get(nodeId);
+                        GridCommunicationClient client0 = clients.get(id);
 
                         if (client0 == null) {
-                            client0 = createNioClient(node);
+                            client0 = createNioClient(node, id);
 
                             if (client0 != null) {
-                                GridCommunicationClient old = 
clients.put(nodeId, client0);
+                                GridCommunicationClient old = clients.put(id, 
client0);
 
                                 assert old == null : "Client already created " 
+
                                     "[node=" + node + ", client=" + client0 + 
", oldClient=" + old + ']';
@@ -2059,7 +2125,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                 if (client0 instanceof 
GridTcpNioCommunicationClient) {
                                     GridTcpNioCommunicationClient tcpClient = 
((GridTcpNioCommunicationClient)client0);
 
-                                    if (tcpClient.session().closeTime() > 0 && 
clients.remove(nodeId, client0)) {
+                                    if (tcpClient.session().closeTime() > 0 && 
clients.remove(id, client0)) {
                                         if (log.isDebugEnabled())
                                             log.debug("Session was closed 
after client creation, will retry " +
                                                 "[node=" + node + ", client=" 
+ client0 + ']');
@@ -2081,7 +2147,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             throw (Error)e;
                     }
                     finally {
-                        clientFuts.remove(nodeId, fut);
+                        clientFuts.remove(id, fut);
                     }
                 }
                 else
@@ -2093,7 +2159,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     continue;
 
                 if (getSpiContext().node(nodeId) == null) {
-                    if (clients.remove(nodeId, client))
+                    if (clients.remove(id, client))
                         client.forceClose();
 
                     throw new IgniteSpiException("Destination node is not in 
topology: " + node.id());
@@ -2104,7 +2170,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 return client;
             else
                 // Client has just been closed by idle worker. Help it and try 
again.
-                clients.remove(nodeId, client);
+                clients.remove(id, client);
         }
     }
 
@@ -2113,7 +2179,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable protected GridCommunicationClient createNioClient(ClusterNode 
node) throws IgniteCheckedException {
+    @Nullable protected GridCommunicationClient createNioClient(ClusterNode 
node, ConnectionId id) throws IgniteCheckedException {
         assert node != null;
 
         Integer shmemPort = 
node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
@@ -2154,7 +2220,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         connectGate.enter();
 
         try {
-            GridCommunicationClient client = createTcpClient(node);
+            GridCommunicationClient client = createTcpClient(node, id);
 
             if (log.isDebugEnabled())
                 log.debug("TCP client created: " + client);
@@ -2207,7 +2273,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client, null, node.id(), 
timeoutHelper.nextTimeoutChunk(connTimeout0), null);
+                safeHandshake(client, null, node.id(), 
timeoutHelper.nextTimeoutChunk(connTimeout0), null, null);
             }
             catch (HandshakeTimeoutException | 
IgniteSpiOperationTimeoutException e) {
                 client.forceClose();
@@ -2266,10 +2332,10 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
      */
     private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
         if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
-            UUID id = ses.meta(NODE_ID_META);
+            ConnectionId id = ses.meta(NODE_ID_META);
 
             if (id != null) {
-                ClusterNode node = getSpiContext().node(id);
+                ClusterNode node = getSpiContext().node(id.id);
 
                 if (node != null && node.isClient()) {
                     String msg = "Client node outbound message queue size 
exceeded slowClientQueueLimit, " +
@@ -2283,7 +2349,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         log,
                         msg);
 
-                    getSpiContext().failNode(id, msg);
+                    getSpiContext().failNode(id.id, msg);
                 }
             }
         }
@@ -2296,7 +2362,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    protected GridCommunicationClient createTcpClient(ClusterNode node) throws 
IgniteCheckedException {
+    protected GridCommunicationClient createTcpClient(ClusterNode node, 
ConnectionId id) throws IgniteCheckedException {
         Collection<String> rmtAddrs0 = 
node.attribute(createSpiAttributeName(ATTR_ADDRS));
         Collection<String> rmtHostNames0 = 
node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
         Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
@@ -2364,7 +2430,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             "(node left topology): " + node);
                     }
 
-                    GridNioRecoveryDescriptor recoveryDesc = 
recoveryDescriptor(node);
+                    GridNioRecoveryDescriptor recoveryDesc = 
recoveryDescriptor(node, id);
 
                     if (!recoveryDesc.reserve()) {
                         U.closeQuiet(ch);
@@ -2386,7 +2452,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
 
                         rcvCnt = safeHandshake(ch, recoveryDesc, node.id(),
-                            timeoutHelper.nextTimeoutChunk(connTimeout0), 
sslEngine);
+                            timeoutHelper.nextTimeoutChunk(connTimeout0), 
sslEngine, id);
 
                         if (rcvCnt == -1)
                             return null;
@@ -2399,7 +2465,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     try {
                         Map<Integer, Object> meta = new HashMap<>();
 
-                        meta.put(NODE_ID_META, node.id());
+                        meta.put(NODE_ID_META, id);
 
                         if (isSslEnabled()) {
                             assert sslEngine != null;
@@ -2568,7 +2634,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         @Nullable GridNioRecoveryDescriptor recovery,
         UUID rmtNodeId,
         long timeout,
-        @Nullable SSLEngine ssl
+        @Nullable SSLEngine ssl,
+        ConnectionId id
     ) throws IgniteCheckedException {
         HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, 
U.currentTimeMillis() + timeout);
 
@@ -2650,12 +2717,13 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
                     if (recovery != null) {
                         HandshakeMessage msg = new 
HandshakeMessage(locNode.id(),
                             recovery.incrementConnectCount(),
-                            recovery.received());
+                            recovery.received(),
+                            id.idx);
 
                         if (log.isDebugEnabled())
                             log.debug("Write handshake message [rmtNode=" + 
rmtNodeId + ", msg=" + msg + ']');
 
-                        buf = ByteBuffer.allocate(33);
+                        buf = ByteBuffer.allocate(37);
 
                         buf.order(ByteOrder.nativeOrder());
 
@@ -2810,8 +2878,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @param node Node.
      * @return Recovery receive data for given node.
      */
-    private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) {
-        ClientKey id = new ClientKey(node.id(), node.order());
+    private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node, 
ConnectionId clientId) {
+        ClientKey id = new ClientKey(node.id(), node.order(), clientId.idx);
 
         GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
 
@@ -2874,18 +2942,22 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
      */
     private static class ClientKey {
         /** */
-        private UUID nodeId;
+        private final UUID nodeId;
+
+        /** */
+        private final long order;
 
         /** */
-        private long order;
+        private final int idx;
 
         /**
          * @param nodeId Node ID.
          * @param order Node order.
          */
-        private ClientKey(UUID nodeId, long order) {
+        private ClientKey(UUID nodeId, long order, int idx) {
             this.nodeId = nodeId;
             this.order = order;
+            this.idx = idx;
         }
 
         /** {@inheritDoc} */
@@ -2898,7 +2970,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             ClientKey other = (ClientKey)obj;
 
-            return order == other.order && nodeId.equals(other.nodeId);
+            return idx == other.idx && order == other.order && 
nodeId.equals(other.nodeId);
 
         }
 
@@ -2906,7 +2978,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         @Override public int hashCode() {
             int res = nodeId.hashCode();
 
-            res = 31 * res + (int)(order ^ (order >>> 32));
+            res = 31 * res + (int)(order ^ (order >>> 32)) + idx;
 
             return res;
         }
@@ -3016,9 +3088,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionId rmtNodeId = ses.meta(NODE_ID_META);
 
-                        return rmtNodeId != null ? formatter.writer(rmtNodeId) 
: null;
+                        return rmtNodeId != null ? 
formatter.writer(rmtNodeId.id) : null;
                     }
                 };
 
@@ -3032,9 +3104,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionId rmtNodeId = ses.meta(NODE_ID_META);
 
-                        return rmtNodeId != null ? formatter.reader(rmtNodeId, 
msgFactory) : null;
+                        return rmtNodeId != null ? 
formatter.reader(rmtNodeId.id, msgFactory) : null;
                     }
                 };
 
@@ -3115,8 +3187,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         private void processIdle() {
             cleanupRecovery();
 
-            for (Map.Entry<UUID, GridCommunicationClient> e : 
clients.entrySet()) {
-                UUID nodeId = e.getKey();
+            for (Map.Entry<ConnectionId, GridCommunicationClient> e : 
clients.entrySet()) {
+                UUID nodeId = e.getKey().id;
 
                 GridCommunicationClient client = e.getValue();
 
@@ -3128,7 +3200,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     client.forceClose();
 
-                    clients.remove(nodeId, client);
+                    clients.remove(e.getKey(), client);
 
                     continue;
                 }
@@ -3136,7 +3208,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 GridNioRecoveryDescriptor recovery = null;
 
                 if (client instanceof GridTcpNioCommunicationClient) {
-                    recovery = recoveryDescs.get(new ClientKey(node.id(), 
node.order()));
+                    recovery = recoveryDescs.get(new ClientKey(node.id(), 
node.order(), e.getKey().idx));
 
                     if (recovery != null && recovery.lastAcknowledged() != 
recovery.received()) {
                         RecoveryLastReceivedMessage msg = new 
RecoveryLastReceivedMessage(recovery.received());
@@ -3170,7 +3242,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         log.debug("Closing idle node connection: " + nodeId);
 
                     if (client.close() || client.closed())
-                        clients.remove(nodeId, client);
+                        clients.remove(e.getKey(), client);
                 }
             }
         }
@@ -3223,7 +3295,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (log.isDebugEnabled())
                         log.debug("Recovery reconnect [rmtNode=" + 
recoveryDesc.node().id() + ']');
 
-                    GridCommunicationClient client = reserveClient(node);
+                    GridCommunicationClient client = reserveClient(node, 
sesInfo.connId);
 
                     client.release();
                 }
@@ -3431,6 +3503,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         /** */
         private long connectCnt;
 
+        /** */
+        private int idx;
+
         /**
          * Default constructor required by {@link Message}.
          */
@@ -3443,13 +3518,14 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
          * @param connectCnt Connect count.
          * @param rcvCnt Number of received messages.
          */
-        public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) {
+        public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt, int 
idx) {
             assert nodeId != null;
             assert rcvCnt >= 0 : rcvCnt;
 
             this.nodeId = nodeId;
             this.connectCnt = connectCnt;
             this.rcvCnt = rcvCnt;
+            this.idx = idx;
         }
 
         /**
@@ -3480,7 +3556,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         /** {@inheritDoc} */
         @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) 
{
-            if (buf.remaining() < 33)
+            if (buf.remaining() < 37)
                 return false;
 
             buf.put(HANDSHAKE_MSG_TYPE);
@@ -3495,6 +3571,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             buf.putLong(connectCnt);
 
+            buf.putInt(idx);
+
             return true;
         }
 
@@ -3513,6 +3591,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
             connectCnt = buf.getLong();
 
+            idx = buf.getInt();
+
             return true;
         }
 
@@ -3774,17 +3854,23 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
      */
     private static class DisconnectedSessionInfo {
         /** */
+        private final ConnectionId connId;
+
+        /** */
         private final GridNioRecoveryDescriptor recoveryDesc;
 
         /** */
         private final boolean reconnect;
 
         /**
+         * @param connId Node ID.
          * @param recoveryDesc Recovery descriptor.
          * @param reconnect Reconnect flag.
          */
-        DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor 
recoveryDesc,
+        public DisconnectedSessionInfo(ConnectionId connId,
+            @Nullable GridNioRecoveryDescriptor recoveryDesc,
             boolean reconnect) {
+            this.connId = connId;
             this.recoveryDesc = recoveryDesc;
             this.reconnect = reconnect;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9f62b81/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 652e47f..4c46be9 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -111,14 +111,14 @@ public class GridTcpSpiForwardingSelfTest extends 
GridCommonAbstractTest {
         cfg.setConnectorConfiguration(null);
 
         TcpCommunicationSpi commSpi = new TcpCommunicationSpi() {
-            @Override protected GridCommunicationClient 
createTcpClient(ClusterNode node) throws IgniteCheckedException {
+            @Override protected GridCommunicationClient 
createTcpClient(ClusterNode node, ConnectionId id) throws 
IgniteCheckedException {
                 Map<String, Object> attrs = new HashMap<>(node.attributes());
 
                 attrs.remove(createSpiAttributeName(ATTR_PORT));
 
                 ((TcpDiscoveryNode)node).setAttributes(attrs);
 
-                return super.createTcpClient(node);
+                return super.createTcpClient(node, id);
             }
         };
 

Reply via email to