Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-opts2 f27d7471f -> 301338960


conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30133896
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30133896
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30133896

Branch: refs/heads/ignite-comm-opts2
Commit: 3013389609de337cfebeaea8be5a34cdd93136b9
Parents: f27d747
Author: sboikov <sboi...@gridgain.com>
Authored: Mon Sep 19 20:54:35 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Mon Sep 19 20:54:35 2016 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   5 +
 .../nio/GridAbstractCommunicationClient.java    |  11 +-
 .../util/nio/GridCommunicationClient.java       |   5 +
 .../util/nio/GridShmemCommunicationClient.java  |   6 +-
 .../util/nio/GridTcpNioCommunicationClient.java |   8 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 702 ++++++++++++-------
 6 files changed, 476 insertions(+), 261 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 1eebfd4..908543c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -195,6 +195,11 @@ public class GridIoMessageFactory implements 
MessageFactory {
 
                 break;
 
+            case TcpCommunicationSpi.HANDSHAKE_MSG_TYPE2:
+                msg = new TcpCommunicationSpi.HandshakeMessage2();
+
+                break;
+
             case 0:
                 msg = new GridJobCancelRequest();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 9b014ec..37bc170 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -35,14 +35,23 @@ public abstract class GridAbstractCommunicationClient 
implements GridCommunicati
     /** Metrics listener. */
     protected final GridNioMetricsListener metricsLsnr;
 
+    /** */
+    private final int connIdx;
+
     /**
      * @param metricsLsnr Metrics listener.
      */
-    protected GridAbstractCommunicationClient(@Nullable GridNioMetricsListener 
metricsLsnr) {
+    protected GridAbstractCommunicationClient(int connIdx, @Nullable 
GridNioMetricsListener metricsLsnr) {
+        this.connIdx = connIdx;
         this.metricsLsnr = metricsLsnr;
     }
 
     /** {@inheritDoc} */
+    @Override public int connectionIndex() {
+        return connIdx;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean close() {
         return reserves.compareAndSet(0, -1);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
index 0de54e9..312a20e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridCommunicationClient.java
@@ -107,4 +107,9 @@ public interface GridCommunicationClient {
      * @return {@code True} if send is asynchronous.
      */
     public boolean async();
+
+    /**
+     * @return Connection index.
+     */
+    public int connectionIndex();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index ebe86fb..74d58b2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -55,14 +55,16 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
      * @param formatter Message formatter.
      * @throws IgniteCheckedException If failed.
      */
-    public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr,
+    public GridShmemCommunicationClient(
+        int connIdx,
+        GridNioMetricsListener metricsLsnr,
         int port,
         long connTimeout,
         IgniteLogger log,
         MessageFormatter formatter)
         throws IgniteCheckedException
     {
-        super(metricsLsnr);
+        super(connIdx, metricsLsnr);
 
         assert metricsLsnr != null;
         assert port > 0 && port < 0xffff;

http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 5fe521d..90f17b9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -46,10 +46,14 @@ public class GridTcpNioCommunicationClient extends 
GridAbstractCommunicationClie
 
     /**
      * @param ses Session.
+     * @param connIdx Connection index.
      * @param log Logger.
      */
-    public GridTcpNioCommunicationClient(GridNioSession ses, IgniteLogger log) 
{
-        super(null);
+    public GridTcpNioCommunicationClient(
+        int connIdx,
+        GridNioSession ses,
+        IgniteLogger log) {
+        super(connIdx, null);
 
         assert ses != null;
         assert log != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/30133896/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 9031247..7d91120 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -44,6 +44,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.net.ssl.SSLEngine;
@@ -178,6 +179,7 @@ import static 
org.apache.ignite.events.EventType.EVT_NODE_LEFT;
  * <li>Node local IP address (see {@link #setLocalAddress(String)})</li>
  * <li>Node local port number (see {@link #setLocalPort(int)})</li>
  * <li>Local port range (see {@link #setLocalPortRange(int)}</li>
+ * <li>Connections per node (see {@link #setConnectionsPerNode(int)})</li>
  * <li>Connection buffer flush frequency (see {@link 
#setConnectionBufferFlushFrequency(long)})</li>
  * <li>Connection buffer size (see {@link #setConnectionBufferSize(int)})</li>
  * <li>Idle connection timeout (see {@link 
#setIdleConnectionTimeout(long)})</li>
@@ -238,7 +240,7 @@ import static 
org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 public class TcpCommunicationSpi extends IgniteSpiAdapter
     implements CommunicationSpi<Message>, TcpCommunicationSpiMBean {
     /** */
-    private static final IgniteProductVersion TWO_CONN_SINCE_VER = 
IgniteProductVersion.fromString("1.7.2");
+    private static final IgniteProductVersion MULTIPLE_CONN_SINCE_VER = 
IgniteProductVersion.fromString("1.7.2");
 
     /** IPC error message. */
     public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate 
shared memory segment " +
@@ -289,8 +291,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     public static final int DFLT_SELECTORS_CNT = Math.min(4, 
Runtime.getRuntime().availableProcessors());
 
-    /** Node ID meta for session. */
-    private static final int NODE_ID_META = 
GridNioSessionMetaKey.nextUniqueKey();
+    /** Connection ID meta for session. */
+    private static final int CONN_ID_META = 
GridNioSessionMetaKey.nextUniqueKey();
 
     /** Message tracker meta for session. */
     private static final int TRACKER_META = 
GridNioSessionMetaKey.nextUniqueKey();
@@ -310,6 +312,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Default socket write timeout. */
     public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
 
+    /** */
+    public static final int DFLT_CONN_PER_NODE = 2;
+
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = new IgniteRunnable() {
         @Override public void run() {
@@ -327,6 +332,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final byte HANDSHAKE_MSG_TYPE = -3;
 
     /** */
+    public static final byte HANDSHAKE_MSG_TYPE2 = -4;
+
+    /** */
     private ConnectGateway connectGate;
 
     /** Server listener. */
@@ -354,41 +362,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
-                UUID id = ses.meta(NODE_ID_META);
+                ConnectionKey connId = ses.meta(CONN_ID_META);
 
-                if (id != null) {
-                    GridCommunicationClient client = clients.get(id);
+                if (connId != null) {
+                    UUID id = connId.nodeId();
 
-                    if (client instanceof GridTcpNioCommunicationClient &&
-                        ((GridTcpNioCommunicationClient) client).session() == 
ses) {
-                        client.close();
+                    GridCommunicationClient[] nodeClients = clients.get(id);
 
-                        clients.remove(id, client);
+                    if (nodeClients != null) {
+                        for (GridCommunicationClient client : nodeClients) {
+                            if (client instanceof 
GridTcpNioCommunicationClient &&
+                                ((GridTcpNioCommunicationClient) 
client).session() == ses) {
+                                client.close();
+
+                                removeNodeClient(id, client);
+                            }
+                        }
                     }
 
                     if (!stopping) {
-                        boolean reconnect = false;
-
                         GridNioRecoveryDescriptor outDesc = 
ses.outRecoveryDescriptor();
 
                         if (outDesc != null) {
                             if (outDesc.nodeAlive(getSpiContext().node(id))) {
                                 if (!outDesc.messagesFutures().isEmpty()) {
-                                    reconnect = true;
-
                                     if (log.isDebugEnabled())
                                         log.debug("Session was closed but 
there are unacknowledged messages, " +
                                             "will try to reconnect [rmtNode=" 
+ outDesc.node().id() + ']');
+
+                                    DisconnectedSessionInfo disconnectData =
+                                        new DisconnectedSessionInfo(outDesc, 
connId.connectionIndex());
+
+                                    
commWorker.addProcessDisconnectRequest(disconnectData);
                                 }
                             }
                             else
                                 outDesc.onNodeLeft();
                         }
-
-                        DisconnectedSessionInfo disconnectData = new 
DisconnectedSessionInfo(outDesc,
-                            reconnect);
-
-                        commWorker.addProcessDisconnectRequest(disconnectData);
                     }
 
                     CommunicationListener<Message> lsnr0 = lsnr;
@@ -405,21 +415,22 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             private void onFirstMessage(GridNioSession ses, Message msg) {
                 UUID sndId;
 
-                if (msg instanceof NodeIdMessage)
-                    sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes, 0);
+                ConnectionKey connKey;
+
+                if (msg instanceof NodeIdMessage) {
+                    sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 
0);
+                    connKey = new ConnectionKey(sndId, 0);
+                }
                 else {
                     assert msg instanceof HandshakeMessage : msg;
 
                     sndId = ((HandshakeMessage)msg).nodeId();
+                    connKey = new ConnectionKey(sndId, 
((HandshakeMessage)msg).connectionIndex());
                 }
 
                 if (log.isDebugEnabled())
                     log.debug("Remote node ID received: " + sndId);
 
-                final UUID old = ses.addMeta(NODE_ID_META, sndId);
-
-                assert old == null;
-
                 final ClusterNode rmtNode = getSpiContext().node(sndId);
 
                 if (rmtNode == null) {
@@ -431,6 +442,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     return;
                 }
 
+                final ConnectionKey old = ses.addMeta(CONN_ID_META, connKey);
+
+                assert old == null;
+
                 ClusterNode locNode = getSpiContext().localNode();
 
                 if (ses.remoteAddress() == null)
@@ -440,8 +455,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 HandshakeMessage msg0 = (HandshakeMessage)msg;
 
-                if (useTwoConnections(rmtNode)) {
-                    final GridNioRecoveryDescriptor recoveryDesc = 
inRecoveryDescriptor(rmtNode);
+                if (useMultipleConnections(rmtNode)) {
+                    final GridNioRecoveryDescriptor recoveryDesc = 
inRecoveryDescriptor(rmtNode, connKey);
 
                     ConnectClosureNew c = new ConnectClosureNew(ses, 
recoveryDesc, rmtNode);
 
@@ -455,7 +470,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 }
                 else {
-                    GridCommunicationClient oldClient = clients.get(sndId);
+                    GridCommunicationClient[] curClients = clients.get(sndId);
+
+                    GridCommunicationClient oldClient = curClients != null ? 
curClients[0] : null;
 
                     boolean hasShmemClient = false;
 
@@ -479,12 +496,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     GridFutureAdapter<GridCommunicationClient> fut = new 
GridFutureAdapter<>();
 
-                    GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(sndId, fut);
+                    GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(connKey, fut);
 
-                    final GridNioRecoveryDescriptor recoveryDesc = 
inRecoveryDescriptor(rmtNode);
+                    final GridNioRecoveryDescriptor recoveryDesc = 
inRecoveryDescriptor(rmtNode, connKey);
 
                     if (oldFut == null) {
-                        oldClient = clients.get(sndId);
+                        curClients = clients.get(sndId);
+
+                        oldClient = curClients != null ? curClients[0] : null;
 
                         if (oldClient != null) {
                             if (oldClient instanceof 
GridTcpNioCommunicationClient) {
@@ -521,7 +540,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                 fut.onDone(client);
                             }
                             finally {
-                                clientFuts.remove(rmtNode.id(), fut);
+                                clientFuts.remove(connKey, fut);
                             }
                         }
                     }
@@ -549,9 +568,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             @Override public void onMessage(GridNioSession ses, Message msg) {
-                UUID sndId = ses.meta(NODE_ID_META);
+                ConnectionKey connKey = ses.meta(CONN_ID_META);
 
-                if (sndId == null) {
+                if (connKey == null) {
                     assert ses.accepted() : ses;
 
                     if (!connectGate.tryEnter()) {
@@ -579,9 +598,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         if (recovery != null) {
                             RecoveryLastReceivedMessage msg0 = 
(RecoveryLastReceivedMessage)msg;
 
-                            if (log.isDebugEnabled())
-                                log.debug("Received recovery acknowledgement 
[rmtNode=" + sndId +
+                            if (log.isDebugEnabled()) {
+                                log.debug("Received recovery acknowledgement 
[rmtNode=" + connKey.nodeId() +
+                                    ", connIdx=" + connKey.connectionIndex() +
                                     ", rcvCnt=" + msg0.received() + ']');
+                            }
 
                             recovery.ackReceived(msg0.received());
 
@@ -595,9 +616,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             long rcvCnt = recovery.onReceived();
 
                             if (rcvCnt % ackSndThreshold == 0) {
-                                if (log.isDebugEnabled())
-                                    log.debug("Send recovery acknowledgement 
[rmtNode=" + sndId +
+                                if (log.isDebugEnabled()) {
+                                    log.debug("Send recovery acknowledgement 
[rmtNode=" + connKey.nodeId() +
+                                        ", connIdx=" + 
connKey.connectionIndex() +
                                         ", rcvCnt=" + rcvCnt + ']');
+                                }
 
                                 nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(rcvCnt));
 
@@ -625,7 +648,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     else
                         c = NOOP;
 
-                    notifyListener(sndId, msg, c);
+                    notifyListener(connKey.nodeId(), msg, c);
                 }
             }
 
@@ -659,12 +682,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 GridTcpNioCommunicationClient client = null;
 
                 if (createClient) {
-                    client = new GridTcpNioCommunicationClient(ses, log);
-
-                    GridCommunicationClient oldClient = 
clients.putIfAbsent(node.id(), client);
+                    client = new GridTcpNioCommunicationClient(0, ses, log);
 
-                    assert oldClient == null : "Client already created [node=" 
+ node + ", client=" + client +
-                        ", oldClient=" + oldClient + ", recoveryDesc=" + 
recovery + ']';
+                    addNodeClient(node.id(), 0, client);
                 }
 
                 return client;
@@ -894,6 +914,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Shared memory server. */
     private IpcSharedMemoryServerEndpoint shmemSrv;
 
+    /** */
+    private int connectionsPerNode = DFLT_CONN_PER_NODE;
+
     /** {@code TCP_NODELAY} option value for created sockets. */
     private boolean tcpNoDelay = DFLT_TCP_NODELAY;
 
@@ -916,7 +939,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     private final Collection<ShmemWorker> shmemWorkers = new 
ConcurrentLinkedDeque8<>();
 
     /** Clients. */
-    private final ConcurrentMap<UUID, GridCommunicationClient> clients = 
GridConcurrentFactory.newMap();
+    private final ConcurrentMap<UUID, GridCommunicationClient[]> clients = 
GridConcurrentFactory.newMap();
 
     /** SPI listener. */
     private volatile CommunicationListener<Message> lsnr;
@@ -963,17 +986,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     };
 
     /** Client connect futures. */
-    private final ConcurrentMap<UUID, 
GridFutureAdapter<GridCommunicationClient>> clientFuts =
+    private final ConcurrentMap<ConnectionKey, 
GridFutureAdapter<GridCommunicationClient>> clientFuts =
         GridConcurrentFactory.newMap();
 
     /** */
-    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> 
recoveryDescs = GridConcurrentFactory.newMap();
+    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> 
recoveryDescs = GridConcurrentFactory.newMap();
 
     /** */
-    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> 
outRecDescs = GridConcurrentFactory.newMap();
+    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> 
outRecDescs = GridConcurrentFactory.newMap();
 
     /** */
-    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> 
inRecDescs = GridConcurrentFactory.newMap();
+    private final ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> 
inRecDescs = GridConcurrentFactory.newMap();
 
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new 
GridLocalEventListener() {
@@ -1082,6 +1105,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         return locPortRange;
     }
 
+    public void setConnectionsPerNode(int maxConnectionsPerNode) {
+        this.connectionsPerNode = maxConnectionsPerNode;
+    }
+
+    public int getConnectionsPerNode() {
+        return connectionsPerNode;
+    }
+
     /**
      * Sets local port to accept shared memory connections.
      * <p>
@@ -1502,7 +1533,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         if (log != null) {
             StringBuilder sb = new StringBuilder("Communication SPI recovery 
descriptors: ").append(U.nl());
 
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : 
recoveryDescs.entrySet()) {
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : 
recoveryDescs.entrySet()) {
                 GridNioRecoveryDescriptor desc = entry.getValue();
 
                 sb.append("    [key=").append(entry.getKey())
@@ -1515,7 +1546,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(']').append(U.nl());
             }
 
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : 
outRecDescs.entrySet()) {
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : 
outRecDescs.entrySet()) {
                 GridNioRecoveryDescriptor desc = entry.getValue();
 
                 sb.append("    [key=").append(entry.getKey())
@@ -1526,7 +1557,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(']').append(U.nl());
             }
 
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : 
inRecDescs.entrySet()) {
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> entry : 
inRecDescs.entrySet()) {
                 GridNioRecoveryDescriptor desc = entry.getValue();
 
                 sb.append("    [key=").append(entry.getKey())
@@ -1542,10 +1573,15 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
 
             sb.append("Communication SPI clients: ").append(U.nl());
 
-            for (Map.Entry<UUID, GridCommunicationClient> entry : 
clients.entrySet()) {
-                sb.append("    [node=").append(entry.getKey())
-                    .append(", client=").append(entry.getValue())
-                    .append(']').append(U.nl());
+            for (Map.Entry<UUID, GridCommunicationClient[]> entry : 
clients.entrySet()) {
+                UUID nodeId = entry.getKey();
+                GridCommunicationClient[] clients0 = entry.getValue();
+
+                for (GridCommunicationClient client : clients0) {
+                    sb.append("    [node=").append(nodeId)
+                        .append(", client=").append(client)
+                        .append(']').append(U.nl());
+                }
             }
 
             U.warn(log, sb.toString());
@@ -1773,9 +1809,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey key = ses.meta(CONN_ID_META);
 
-                        return rmtNodeId != null ? formatter.reader(rmtNodeId, 
msgFactory) : null;
+                        return key != null ? formatter.reader(key.nodeId(), 
msgFactory) : null;
                     }
                 };
 
@@ -1788,9 +1824,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey key = ses.meta(CONN_ID_META);
 
-                        return rmtNodeId != null ? formatter.writer(rmtNodeId) 
: null;
+                        return key != null ? formatter.writer(key.nodeId()) : 
null;
                     }
                 };
 
@@ -1966,8 +2002,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         shmemWorkers.clear();
 
         // Force closing on stop (safety).
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0)
+                client.forceClose();
+        }
 
         // Clear resources.
         nioSrvr = null;
@@ -1992,8 +2030,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             connectGate.stopped();
 
         // Force closing.
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0)
+                client.forceClose();
+        }
 
         getSpiContext().deregisterPorts();
 
@@ -2004,8 +2044,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     @Override public void onClientDisconnected(IgniteFuture<?> reconnectFut) {
         connectGate.disconnected(reconnectFut);
 
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0)
+                client.forceClose();
+        }
 
         IgniteClientDisconnectedCheckedException err = new 
IgniteClientDisconnectedCheckedException(reconnectFut,
             "Failed to connect client node disconnected.");
@@ -2029,16 +2071,16 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
     void onNodeLeft(UUID nodeId) {
         assert nodeId != null;
 
-        GridCommunicationClient client = clients.get(nodeId);
-
-        if (client != null) {
-            if (log.isDebugEnabled())
-                log.debug("Forcing NIO client close since node has left 
[nodeId=" + nodeId +
-                    ", client=" + client + ']');
+        GridCommunicationClient[] clients0 = clients.remove(nodeId);
 
-            client.forceClose();
+        if (clients0 != null) {
+            for (GridCommunicationClient client : clients0) {
+                if (log.isDebugEnabled())
+                    log.debug("Forcing NIO client close since node has left 
[nodeId=" + nodeId +
+                        ", client=" + client + ']');
 
-            clients.remove(nodeId, client);
+                client.forceClose();
+            }
         }
     }
 
@@ -2087,6 +2129,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     * TODO
+     * @return
+     */
+    private int connectionIndex() {
+        return ThreadLocalRandom.current().nextInt(connectionsPerNode);
+    }
+
+    /**
      * @param node Destination node.
      * @param msg Message to send.
      * @param ackC Ack closure.
@@ -2113,11 +2163,13 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
         else {
             GridCommunicationClient client = null;
 
+            int connIdx = useMultipleConnections(node) ? connectionIndex() : 0;
+
             try {
                 boolean retry;
 
                 do {
-                    client = reserveClient(node);
+                    client = reserveClient(node, connIdx);
 
                     UUID nodeId = null;
 
@@ -2131,7 +2183,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (!retry)
                         sentMsgsCnt.increment();
                     else {
-                        clients.remove(node.id(), client);
+                        removeNodeClient(node.id(), client);
 
                         ClusterNode node0 = getSpiContext().node(node.id());
 
@@ -2148,26 +2200,85 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
                 throw new IgniteSpiException("Failed to send message to remote 
node: " + node, e);
             }
             finally {
-                if (client != null && clients.remove(node.id(), client))
+                if (client != null && removeNodeClient(node.id(), client))
                     client.forceClose();
             }
         }
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param rmvClient Client to remove.
+     * @return {@code True} if client was removed.
+     */
+    private boolean removeNodeClient(UUID nodeId, GridCommunicationClient 
rmvClient) {
+        for (;;) {
+            GridCommunicationClient[] curClients = clients.get(nodeId);
+
+            if (curClients == null)
+                return false;
+
+            if (curClients[rmvClient.connectionIndex()] == rmvClient) {
+                GridCommunicationClient[] newClients = 
Arrays.copyOf(curClients, curClients.length);
+
+                newClients[rmvClient.connectionIndex()] = null;
+
+                if (clients.replace(nodeId, curClients, newClients))
+                    return true;
+            }
+            else
+                return false;
+        }
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param connIdx Connection index.
+     * @param addClient Client to add.
+     */
+    private void addNodeClient(UUID nodeId, int connIdx, 
GridCommunicationClient addClient) {
+        for (;;) {
+            GridCommunicationClient[] curClients = clients.get(nodeId);
+
+            assert curClients == null || curClients[connIdx] == null : "Client 
already created " +
+                "[node=" + nodeId + ", client=" + addClient + ", oldClient=" + 
curClients[connIdx] + ']';
+
+            GridCommunicationClient[] newClients;
+
+            if (curClients == null) {
+                newClients = new GridCommunicationClient[connectionsPerNode];
+                newClients[connIdx] = addClient;
+
+                if (clients.putIfAbsent(nodeId, newClients) == null)
+                    break;
+            }
+            else {
+                newClients = Arrays.copyOf(curClients, curClients.length);
+                newClients[connIdx] = addClient;
+
+                if (clients.replace(nodeId, curClients, newClients))
+                    break;
+            }
+        }
+    }
+
+    /**
      * Returns existing or just created client to node.
      *
      * @param node Node to which client should be open.
+     * @param connIdx Connection index.
      * @return The existing or just created client.
      * @throws IgniteCheckedException Thrown if any exception occurs.
      */
-    private GridCommunicationClient reserveClient(ClusterNode node) throws 
IgniteCheckedException {
+    private GridCommunicationClient reserveClient(ClusterNode node, int 
connIdx) throws IgniteCheckedException {
         assert node != null;
 
         UUID nodeId = node.id();
 
         while (true) {
-            GridCommunicationClient client = clients.get(nodeId);
+            GridCommunicationClient[] curClients = clients.get(nodeId);
+
+            GridCommunicationClient client = curClients != null ? 
curClients[connIdx] : null;
 
             if (client == null) {
                 if (stopping)
@@ -2176,25 +2287,26 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
                 // Do not allow concurrent connects.
                 GridFutureAdapter<GridCommunicationClient> fut = new 
ConnectFuture();
 
-                GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(nodeId, fut);
+                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx);
+
+                GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(connKey, fut);
 
                 if (oldFut == null) {
                     try {
-                        GridCommunicationClient client0 = clients.get(nodeId);
+                        GridCommunicationClient[] curClients0 = 
clients.get(nodeId);
+
+                        GridCommunicationClient client0 = curClients0 != null 
? curClients0[connIdx] : null;
 
                         if (client0 == null) {
-                            client0 = createNioClient(node);
+                            client0 = createNioClient(node, connIdx);
 
                             if (client0 != null) {
-                                GridCommunicationClient old = 
clients.put(nodeId, client0);
-
-                                assert old == null : "Client already created " 
+
-                                    "[node=" + node + ", client=" + client0 + 
", oldClient=" + old + ']';
+                                addNodeClient(nodeId, connIdx, client0);
 
                                 if (client0 instanceof 
GridTcpNioCommunicationClient) {
                                     GridTcpNioCommunicationClient tcpClient = 
((GridTcpNioCommunicationClient)client0);
 
-                                    if (tcpClient.session().closeTime() > 0 && 
clients.remove(nodeId, client0)) {
+                                    if (tcpClient.session().closeTime() > 0 && 
removeNodeClient(nodeId, client0)) {
                                         if (log.isDebugEnabled())
                                             log.debug("Session was closed 
after client creation, will retry " +
                                                 "[node=" + node + ", client=" 
+ client0 + ']');
@@ -2228,7 +2340,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     continue;
 
                 if (getSpiContext().node(nodeId) == null) {
-                    if (clients.remove(nodeId, client))
+                    if (removeNodeClient(nodeId, client))
                         client.forceClose();
 
                     throw new IgniteSpiException("Destination node is not in 
topology: " + node.id());
@@ -2239,16 +2351,18 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
                 return client;
             else
                 // Client has just been closed by idle worker. Help it and try 
again.
-                clients.remove(nodeId, client);
+                removeNodeClient(nodeId, client);
         }
     }
 
     /**
      * @param node Node to create client for.
+     * @param connIdx Connection index.
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    @Nullable protected GridCommunicationClient createNioClient(ClusterNode 
node) throws IgniteCheckedException {
+    @Nullable private GridCommunicationClient createNioClient(ClusterNode 
node, int connIdx)
+        throws IgniteCheckedException {
         assert node != null;
 
         Integer shmemPort = 
node.attribute(createSpiAttributeName(ATTR_SHMEM_PORT));
@@ -2267,6 +2381,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             try {
                 GridCommunicationClient client = createShmemClient(
                     node,
+                    connIdx,
                     shmemPort);
 
                 if (log.isDebugEnabled())
@@ -2289,7 +2404,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         connectGate.enter();
 
         try {
-            GridCommunicationClient client = createTcpClient(node);
+            GridCommunicationClient client = createTcpClient(node, connIdx);
 
             if (log.isDebugEnabled())
                 log.debug("TCP client created: " + client);
@@ -2304,10 +2419,12 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
     /**
      * @param node Node.
      * @param port Port.
+     * @param connIdx Connection index.
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
     @Nullable protected GridCommunicationClient createShmemClient(ClusterNode 
node,
+        int connIdx,
         Integer port) throws IgniteCheckedException {
         int attempt = 1;
 
@@ -2321,7 +2438,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             GridCommunicationClient client;
 
             try {
-                client = new GridShmemCommunicationClient(metricsLsnr,
+                client = new GridShmemCommunicationClient(
+                    connIdx,
+                    metricsLsnr,
                     port,
                     timeoutHelper.nextTimeoutChunk(connTimeout),
                     log,
@@ -2342,7 +2461,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             try {
-                safeHandshake(client, null, node.id(), 
timeoutHelper.nextTimeoutChunk(connTimeout0), null);
+                safeHandshake(client,
+                    null,
+                    node.id(),
+                    timeoutHelper.nextTimeoutChunk(connTimeout0),
+                    null,
+                    null);
             }
             catch (HandshakeTimeoutException | 
IgniteSpiOperationTimeoutException e) {
                 client.forceClose();
@@ -2401,10 +2525,10 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
      */
     private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
         if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
-            UUID id = ses.meta(NODE_ID_META);
+            ConnectionKey id = ses.meta(CONN_ID_META);
 
             if (id != null) {
-                ClusterNode node = getSpiContext().node(id);
+                ClusterNode node = getSpiContext().node(id.nodeId);
 
                 if (node != null && node.isClient()) {
                     String msg = "Client node outbound message queue size 
exceeded slowClientQueueLimit, " +
@@ -2418,7 +2542,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         log,
                         msg);
 
-                    getSpiContext().failNode(id, msg);
+                    getSpiContext().failNode(id.nodeId(), msg);
                 }
             }
         }
@@ -2431,7 +2555,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      * @return Client.
      * @throws IgniteCheckedException If failed.
      */
-    protected GridCommunicationClient createTcpClient(ClusterNode node) throws 
IgniteCheckedException {
+    protected GridCommunicationClient createTcpClient(ClusterNode node, int 
connIdx) throws IgniteCheckedException {
         Collection<String> rmtAddrs0 = 
node.attribute(createSpiAttributeName(ATTR_ADDRS));
         Collection<String> rmtHostNames0 = 
node.attribute(createSpiAttributeName(ATTR_HOST_NAMES));
         Integer boundPort = node.attribute(createSpiAttributeName(ATTR_PORT));
@@ -2499,7 +2623,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             "(node left topology): " + node);
                     }
 
-                    GridNioRecoveryDescriptor recoveryDesc = 
outRecoveryDescriptor(node);
+                    ConnectionKey connKey = new ConnectionKey(node.id(), 
connIdx);
+
+                    GridNioRecoveryDescriptor recoveryDesc = 
outRecoveryDescriptor(node, connKey);
 
                     if (!recoveryDesc.reserve()) {
                         U.closeQuiet(ch);
@@ -2520,8 +2646,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             sslEngine.setUseClientMode(true);
                         }
 
-                        rcvCnt = safeHandshake(ch, recoveryDesc, node.id(),
-                            timeoutHelper.nextTimeoutChunk(connTimeout0), 
sslEngine);
+                        Integer handshakeConnIdx = 
useMultipleConnections(node) ? connIdx : null;
+
+                        rcvCnt = safeHandshake(ch,
+                            recoveryDesc,
+                            node.id(),
+                            timeoutHelper.nextTimeoutChunk(connTimeout0),
+                            sslEngine,
+                            handshakeConnIdx);
 
                         if (rcvCnt == -1)
                             return null;
@@ -2534,7 +2666,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     try {
                         Map<Integer, Object> meta = new HashMap<>();
 
-                        meta.put(NODE_ID_META, node.id());
+                        meta.put(CONN_ID_META, connKey);
 
                         if (isSslEnabled()) {
                             assert sslEngine != null;
@@ -2550,7 +2682,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         GridNioSession ses = nioSrvr.createSession(ch, 
meta).get();
 
-                        client = new GridTcpNioCommunicationClient(ses, log);
+                        client = new GridTcpNioCommunicationClient(connIdx, 
ses, log);
 
                         conn = true;
                     }
@@ -2703,7 +2835,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         @Nullable GridNioRecoveryDescriptor recovery,
         UUID rmtNodeId,
         long timeout,
-        @Nullable SSLEngine ssl
+        @Nullable SSLEngine ssl,
+        @Nullable Integer handshakeConnIdx
     ) throws IgniteCheckedException {
         HandshakeTimeoutObject<T> obj = new HandshakeTimeoutObject<>(client, 
U.currentTimeMillis() + timeout);
 
@@ -2783,14 +2916,28 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
                             "fully initialized [isStopping=" + 
getSpiContext().isStopping() + ']');
 
                     if (recovery != null) {
-                        HandshakeMessage msg = new 
HandshakeMessage(locNode.id(),
-                            recovery.incrementConnectCount(),
-                            recovery.received());
+                        HandshakeMessage msg;
+
+                        int msgSize = 33;
+
+                        if (handshakeConnIdx != null) {
+                            msg = new HandshakeMessage2(locNode.id(),
+                                recovery.incrementConnectCount(),
+                                recovery.received(),
+                                handshakeConnIdx);
+
+                            msgSize += 4;
+                        }
+                        else {
+                            msg = new HandshakeMessage(locNode.id(),
+                                recovery.incrementConnectCount(),
+                                recovery.received());
+                        }
 
                         if (log.isDebugEnabled())
                             log.debug("Write handshake message [rmtNode=" + 
rmtNodeId + ", msg=" + msg + ']');
 
-                        buf = ByteBuffer.allocate(33);
+                        buf = ByteBuffer.allocate(msgSize);
 
                         buf.order(ByteOrder.nativeOrder());
 
@@ -2937,51 +3084,55 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
 
         U.join(commWorker, log);
 
-        for (GridCommunicationClient client : clients.values())
-            client.forceClose();
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0)
+                client.forceClose();
+        }
     }
 
     /**
      * @param node Node.
+     * @param key Connection key.
      * @return Recovery descriptor for outgoing connection.
      */
-    private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node) {
-        if (useTwoConnections(node))
-            return recoveryDescriptor(outRecDescs, node);
+    private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node, 
ConnectionKey key) {
+        if (useMultipleConnections(node))
+            return recoveryDescriptor(outRecDescs, node, key);
         else
-            return recoveryDescriptor(recoveryDescs, node);
+            return recoveryDescriptor(recoveryDescs, node, key);
     }
 
     /**
      * @param node Node.
+     * @param key Connection key.
      * @return Recovery descriptor for incoming connection.
      */
-    private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node) {
-        if (useTwoConnections(node))
-            return recoveryDescriptor(inRecDescs, node);
+    private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node, 
ConnectionKey key) {
+        if (useMultipleConnections(node))
+            return recoveryDescriptor(inRecDescs, node, key);
         else
-            return recoveryDescriptor(recoveryDescs, node);
+            return recoveryDescriptor(recoveryDescs, node, key);
     }
 
     /**
      * @param node Node.
-     * @return {@code True} if given node supports two connections per-node 
for communication.
+     * @return {@code True} if given node supports multiple connections 
per-node for communication.
      */
-    private boolean useTwoConnections(ClusterNode node) {
-        return node.version().compareToIgnoreTimestamp(TWO_CONN_SINCE_VER) >= 
0;
+    private boolean useMultipleConnections(ClusterNode node) {
+        return 
node.version().compareToIgnoreTimestamp(MULTIPLE_CONN_SINCE_VER) >= 0;
     }
 
     /**
      * @param recoveryDescs Descriptors map.
      * @param node Node.
+     * @param key Connection key.
      * @return Recovery receive data for given node.
      */
     private GridNioRecoveryDescriptor recoveryDescriptor(
-        ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs,
-        ClusterNode node) {
-        ClientKey id = new ClientKey(node.id(), node.order());
-
-        GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
+        ConcurrentMap<ConnectionKey, GridNioRecoveryDescriptor> recoveryDescs,
+        ClusterNode node,
+        ConnectionKey key) {
+        GridNioRecoveryDescriptor recovery = recoveryDescs.get(key);
 
         if (recovery == null) {
             int maxSize = Math.max(msgQueueLimit, ackSndThreshold);
@@ -2989,7 +3140,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             int queueLimit = unackedMsgsBufSize != 0 ? unackedMsgsBufSize : 
(maxSize * 5);
 
             GridNioRecoveryDescriptor old =
-                recoveryDescs.putIfAbsent(id, recovery = new 
GridNioRecoveryDescriptor(queueLimit, node, log));
+                recoveryDescs.putIfAbsent(key, recovery = new 
GridNioRecoveryDescriptor(queueLimit, node, log));
 
             if (old != null)
                 recovery = old;
@@ -3031,54 +3182,6 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         return S.toString(TcpCommunicationSpi.class, this);
     }
 
-    /**
-     *
-     */
-    private static class ClientKey {
-        /** */
-        private UUID nodeId;
-
-        /** */
-        private long order;
-
-        /**
-         * @param nodeId Node ID.
-         * @param order Node order.
-         */
-        private ClientKey(UUID nodeId, long order) {
-            this.nodeId = nodeId;
-            this.order = order;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-
-            if (obj == null || getClass() != obj.getClass())
-                return false;
-
-            ClientKey other = (ClientKey)obj;
-
-            return order == other.order && nodeId.equals(other.nodeId);
-
-        }
-
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            int res = nodeId.hashCode();
-
-            res = 31 * res + (int)(order ^ (order >>> 32));
-
-            return res;
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(ClientKey.class, this);
-        }
-    }
-
     /** Internal exception class for proper timeout handling. */
     private static class HandshakeTimeoutException extends 
IgniteCheckedException {
         /** */
@@ -3178,9 +3281,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey connKey = ses.meta(CONN_ID_META);
 
-                        return rmtNodeId != null ? formatter.writer(rmtNodeId) 
: null;
+                        return connKey != null ? 
formatter.writer(connKey.nodeId()) : null;
                     }
                 };
 
@@ -3194,9 +3297,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+                        ConnectionKey connKey = ses.meta(CONN_ID_META);
 
-                        return rmtNodeId != null ? formatter.reader(rmtNodeId, 
msgFactory) : null;
+                        return connKey != null ? 
formatter.reader(connKey.nodeId(), msgFactory) : null;
                     }
                 };
 
@@ -3277,72 +3380,72 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
         private void processIdle() {
             cleanupRecovery();
 
-            for (Map.Entry<UUID, GridCommunicationClient> e : 
clients.entrySet()) {
+            for (Map.Entry<UUID, GridCommunicationClient[]> e : 
clients.entrySet()) {
                 UUID nodeId = e.getKey();
 
-                GridCommunicationClient client = e.getValue();
-
-                ClusterNode node = getSpiContext().node(nodeId);
+                for (GridCommunicationClient client : e.getValue()) {
+                    ClusterNode node = getSpiContext().node(nodeId);
 
-                if (node == null) {
-                    if (log.isDebugEnabled())
-                        log.debug("Forcing close of non-existent node 
connection: " + nodeId);
+                    if (node == null) {
+                        if (log.isDebugEnabled())
+                            log.debug("Forcing close of non-existent node 
connection: " + nodeId);
 
-                    client.forceClose();
+                        client.forceClose();
 
-                    clients.remove(nodeId, client);
+                        removeNodeClient(nodeId, client);
 
-                    continue;
-                }
+                        continue;
+                    }
 
-                GridNioRecoveryDescriptor recovery = null;
+                    GridNioRecoveryDescriptor recovery = null;
 
-                if (!useTwoConnections(node) && client instanceof 
GridTcpNioCommunicationClient) {
-                    recovery = recoveryDescs.get(new ClientKey(node.id(), 
node.order()));
+                    if (!useMultipleConnections(node) && client instanceof 
GridTcpNioCommunicationClient) {
+                        recovery = recoveryDescs.get(new 
ConnectionKey(node.id(), client.connectionIndex()));
 
-                    if (recovery != null && recovery.lastAcknowledged() != 
recovery.received()) {
-                        RecoveryLastReceivedMessage msg = new 
RecoveryLastReceivedMessage(recovery.received());
+                        if (recovery != null && recovery.lastAcknowledged() != 
recovery.received()) {
+                            RecoveryLastReceivedMessage msg = new 
RecoveryLastReceivedMessage(recovery.received());
 
-                        if (log.isDebugEnabled())
-                            log.debug("Send recovery acknowledgement on 
timeout [rmtNode=" + nodeId +
-                                ", rcvCnt=" + msg.received() + ']');
+                            if (log.isDebugEnabled())
+                                log.debug("Send recovery acknowledgement on 
timeout [rmtNode=" + nodeId +
+                                    ", rcvCnt=" + msg.received() + ']');
 
-                        
nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
+                            
nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg);
 
-                        recovery.lastAcknowledged(msg.received());
+                            recovery.lastAcknowledged(msg.received());
 
-                        continue;
+                            continue;
+                        }
                     }
-                }
 
-                long idleTime = client.getIdleTime();
+                    long idleTime = client.getIdleTime();
 
-                if (idleTime >= idleConnTimeout) {
-                    if (recovery == null && useTwoConnections(node))
-                        recovery = outRecDescs.get(new ClientKey(node.id(), 
node.order()));
+                    if (idleTime >= idleConnTimeout) {
+                        if (recovery == null && useMultipleConnections(node))
+                            recovery = outRecDescs.get(new 
ConnectionKey(node.id(), client.connectionIndex()));
 
-                    if (recovery != null &&
-                        recovery.nodeAlive(getSpiContext().node(nodeId)) &&
-                        !recovery.messagesFutures().isEmpty()) {
-                        if (log.isDebugEnabled())
-                            log.debug("Node connection is idle, but there are 
unacknowledged messages, " +
-                                "will wait: " + nodeId);
+                        if (recovery != null &&
+                            recovery.nodeAlive(getSpiContext().node(nodeId)) &&
+                            !recovery.messagesFutures().isEmpty()) {
+                            if (log.isDebugEnabled())
+                                log.debug("Node connection is idle, but there 
are unacknowledged messages, " +
+                                    "will wait: " + nodeId);
 
-                        continue;
-                    }
+                            continue;
+                        }
 
-                    if (log.isDebugEnabled())
-                        log.debug("Closing idle node connection: " + nodeId);
+                        if (log.isDebugEnabled())
+                            log.debug("Closing idle node connection: " + 
nodeId);
 
-                    if (client.close() || client.closed())
-                        clients.remove(nodeId, client);
+                        if (client.close() || client.closed())
+                            removeNodeClient(nodeId, client);
+                    }
                 }
             }
 
             for (GridNioSession ses : nioSrvr.sessions()) {
                 GridNioRecoveryDescriptor recovery = 
ses.inRecoveryDescriptor();
 
-                if (recovery != null && useTwoConnections(recovery.node())) {
+                if (recovery != null && 
useMultipleConnections(recovery.node())) {
                     assert ses.accepted() : ses;
 
                     sendAckOnTimeout(recovery, ses);
@@ -3382,10 +3485,10 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
         /**
          *
          */
-        private void cleanupRecovery(ConcurrentMap<ClientKey, 
GridNioRecoveryDescriptor> recoveryDescs) {
-            Set<ClientKey> left = null;
+        private void cleanupRecovery(ConcurrentMap<ConnectionKey, 
GridNioRecoveryDescriptor> recoveryDescs) {
+            Set<ConnectionKey> left = null;
 
-            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : 
recoveryDescs.entrySet()) {
+            for (Map.Entry<ConnectionKey, GridNioRecoveryDescriptor> e : 
recoveryDescs.entrySet()) {
                 if (left != null && left.contains(e.getKey()))
                     continue;
 
@@ -3402,7 +3505,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             if (left != null) {
                 assert !left.isEmpty();
 
-                for (ClientKey id : left) {
+                for (ConnectionKey id : left) {
                     GridNioRecoveryDescriptor recoverySnd = 
recoveryDescs.get(id);
 
                     if (recoverySnd != null && recoverySnd.onNodeLeft())
@@ -3415,45 +3518,43 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
          * @param sesInfo Disconnected session information.
          */
         private void processDisconnect(DisconnectedSessionInfo sesInfo) {
-            if (sesInfo.reconnect) {
-                GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
-
-                ClusterNode node = recoveryDesc.node();
+            GridNioRecoveryDescriptor recoveryDesc = sesInfo.recoveryDesc;
 
-                if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
-                    return;
+            ClusterNode node = recoveryDesc.node();
 
-                try {
-                    if (log.isDebugEnabled())
-                        log.debug("Recovery reconnect [rmtNode=" + 
recoveryDesc.node().id() + ']');
+            if (!recoveryDesc.nodeAlive(getSpiContext().node(node.id())))
+                return;
 
-                    GridCommunicationClient client = reserveClient(node);
+            try {
+                if (log.isDebugEnabled())
+                    log.debug("Recovery reconnect [rmtNode=" + 
recoveryDesc.node().id() + ']');
 
-                    client.release();
-                }
-                catch (IgniteCheckedException | IgniteException e) {
-                    try {
-                        if 
(recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && 
getSpiContext().pingNode(node.id())) {
-                            if (log.isDebugEnabled())
-                                log.debug("Recovery reconnect failed, will 
retry " +
-                                    "[rmtNode=" + recoveryDesc.node().id() + 
", err=" + e + ']');
+                GridCommunicationClient client = reserveClient(node, 
sesInfo.connIdx);
 
-                            addProcessDisconnectRequest(sesInfo);
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Recovery reconnect failed, " +
-                                    "node left [rmtNode=" + 
recoveryDesc.node().id() + ", err=" + e + ']');
+                client.release();
+            }
+            catch (IgniteCheckedException | IgniteException e) {
+                try {
+                    if 
(recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && 
getSpiContext().pingNode(node.id())) {
+                        if (log.isDebugEnabled())
+                            log.debug("Recovery reconnect failed, will retry " 
+
+                                "[rmtNode=" + recoveryDesc.node().id() + ", 
err=" + e + ']');
 
-                            onException("Recovery reconnect failed, node left 
[rmtNode=" + recoveryDesc.node().id() + "]",
-                                e);
-                        }
+                        addProcessDisconnectRequest(sesInfo);
                     }
-                    catch (IgniteClientDisconnectedException e0) {
+                    else {
                         if (log.isDebugEnabled())
-                            log.debug("Failed to ping node, client 
disconnected.");
+                            log.debug("Recovery reconnect failed, " +
+                                "node left [rmtNode=" + 
recoveryDesc.node().id() + ", err=" + e + ']');
+
+                        onException("Recovery reconnect failed, node left 
[rmtNode=" + recoveryDesc.node().id() + "]",
+                            e);
                     }
                 }
+                catch (IgniteClientDisconnectedException e0) {
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to ping node, client disconnected.");
+                }
             }
         }
 
@@ -3657,6 +3758,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
 
         /**
+         * @return Connection index.
+         */
+        public int connectionIndex() {
+            return 0;
+        }
+
+        /**
          * @return Connect count.
          */
         public long connectCount() {
@@ -3737,6 +3845,50 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     }
 
     /**
+     *
+     */
+    public static class HandshakeMessage2 extends HandshakeMessage {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private int connIdx;
+
+        /**
+         *
+         */
+        public HandshakeMessage2() {
+            // No-op.
+        }
+
+        /**
+         * @param nodeId Node ID.
+         * @param connectCnt Connect count.
+         * @param rcvCnt Number of received messages.
+         * @param connIdx Connection index.
+         */
+        public HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, 
int connIdx) {
+            super(nodeId, connectCnt, rcvCnt);
+            this.connIdx = connIdx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public byte directType() {
+            return HANDSHAKE_MSG_TYPE2;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int connectionIndex() {
+            return connIdx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(HandshakeMessage2.class, this);
+        }
+    }
+
+    /**
      * Recovery acknowledgment message.
      */
     @SuppressWarnings("PublicInnerClass")
@@ -3981,16 +4133,15 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
         private final GridNioRecoveryDescriptor recoveryDesc;
 
         /** */
-        private final boolean reconnect;
+        private int connIdx;
 
         /**
          * @param recoveryDesc Recovery descriptor.
-         * @param reconnect Reconnect flag.
+         * @param connIdx Connection index.
          */
-        DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor 
recoveryDesc,
-            boolean reconnect) {
+        DisconnectedSessionInfo(@Nullable GridNioRecoveryDescriptor 
recoveryDesc, int connIdx) {
             this.recoveryDesc = recoveryDesc;
-            this.reconnect = reconnect;
+            this.connIdx = connIdx;
         }
 
         /** {@inheritDoc} */
@@ -3998,4 +4149,43 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             return S.toString(DisconnectedSessionInfo.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private static class ConnectionKey {
+        /** */
+        private final UUID nodeId;
+
+        /** */
+        private final int idx;
+
+        /**
+         * @param nodeId Node ID.
+         * @param idx Connection index.
+         */
+        ConnectionKey(UUID nodeId, int idx) {
+            this.nodeId = nodeId;
+            this.idx = idx;
+        }
+
+        /**
+         * @return Node ID.
+         */
+        UUID nodeId() {
+            return nodeId;
+        }
+
+        /**
+         * @return Connection index.
+         */
+        int connectionIndex() {
+            return idx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ConnectionKey.class, this);
+        }
+    }
 }

Reply via email to