Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-opts2 6e72f5198 -> 4030ef886


conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4030ef88
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4030ef88
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4030ef88

Branch: refs/heads/ignite-comm-opts2
Commit: 4030ef886043360e846926f2a72d5a8a81393e21
Parents: 6e72f51
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Sep 20 11:49:10 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Sep 20 14:37:06 2016 +0300

----------------------------------------------------------------------
 .../nio/GridAbstractCommunicationClient.java    |   1 +
 .../util/nio/GridShmemCommunicationClient.java  |   1 +
 .../util/nio/GridTcpNioCommunicationClient.java |   2 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 244 ++++++++++++++-----
 ...eAtomicMessageRecovery10ConnectionsTest.java |  28 +++
 .../IgniteCacheMessageRecoveryAbstractTest.java |  24 +-
 .../spi/GridTcpSpiForwardingSelfTest.java       |   5 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |  17 +-
 .../GridTcpCommunicationSpiConfigSelfTest.java  |   3 +
 ...cpCommunicationSpiMultithreadedSelfTest.java |  12 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   1 +
 ...GridTcpCommunicationSpiRecoverySelfTest.java |   1 +
 ...CommunicationRecoveryAckClosureSelfTest.java |   1 +
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 14 files changed, 268 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
index 37bc170..f2ab932 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridAbstractCommunicationClient.java
@@ -39,6 +39,7 @@ public abstract class GridAbstractCommunicationClient 
implements GridCommunicati
     private final int connIdx;
 
     /**
+     * @param connIdx Connection index.
      * @param metricsLsnr Metrics listener.
      */
     protected GridAbstractCommunicationClient(int connIdx, @Nullable 
GridNioMetricsListener metricsLsnr) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index 74d58b2..d941bae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -48,6 +48,7 @@ public class GridShmemCommunicationClient extends 
GridAbstractCommunicationClien
     private final MessageFormatter formatter;
 
     /**
+     * @param connIdx Connection index.
      * @param metricsLsnr Metrics listener.
      * @param port Shared memory IPC server port.
      * @param connTimeout Connection timeout.

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 90f17b9..fcb40c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -45,8 +45,8 @@ public class GridTcpNioCommunicationClient extends 
GridAbstractCommunicationClie
     private final IgniteLogger log;
 
     /**
-     * @param ses Session.
      * @param connIdx Connection index.
+     * @param ses Session.
      * @param log Logger.
      */
     public GridTcpNioCommunicationClient(

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7d91120..2d1a2b2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -47,6 +47,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLException;
 import org.apache.ignite.Ignite;
@@ -54,6 +55,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.AddressResolver;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -291,8 +293,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     public static final int DFLT_SELECTORS_CNT = Math.min(4, 
Runtime.getRuntime().availableProcessors());
 
-    /** Connection ID meta for session. */
-    private static final int CONN_ID_META = 
GridNioSessionMetaKey.nextUniqueKey();
+    /** Connection index meta for session. */
+    private static final int CONN_IDX_META = 
GridNioSessionMetaKey.nextUniqueKey();
 
     /** Message tracker meta for session. */
     private static final int TRACKER_META = 
GridNioSessionMetaKey.nextUniqueKey();
@@ -312,8 +314,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** Default socket write timeout. */
     public static final long DFLT_SOCK_WRITE_TIMEOUT = 2000;
 
-    /** */
-    public static final int DFLT_CONN_PER_NODE = 2;
+    /** Default connections per node. */
+    public static final int DFLT_CONN_PER_NODE = 1;
 
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = new IgniteRunnable() {
@@ -362,7 +364,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             @Override public void onDisconnected(GridNioSession ses, @Nullable 
Exception e) {
-                ConnectionKey connId = ses.meta(CONN_ID_META);
+                ConnectionKey connId = ses.meta(CONN_IDX_META);
 
                 if (connId != null) {
                     UUID id = connId.nodeId();
@@ -442,7 +444,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     return;
                 }
 
-                final ConnectionKey old = ses.addMeta(CONN_ID_META, connKey);
+                final ConnectionKey old = ses.addMeta(CONN_IDX_META, connKey);
 
                 assert old == null;
 
@@ -526,7 +528,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
 
                         boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
-                            new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, !hasShmemClient, fut));
+                            new ConnectClosure(ses, recoveryDesc, rmtNode, 
connKey, msg0, !hasShmemClient, fut));
 
                         if (log.isDebugEnabled())
                             log.debug("Received incoming connection from 
remote node " +
@@ -558,7 +560,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         else {
                             // The code below causes a race condition between 
shmem and TCP (see IGNITE-1294)
                             boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
-                                new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, !hasShmemClient, fut));
+                                new ConnectClosure(ses, recoveryDesc, rmtNode, 
connKey, msg0, !hasShmemClient, fut));
 
                             if (reserved)
                                 connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true, !hasShmemClient);
@@ -568,7 +570,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             @Override public void onMessage(GridNioSession ses, Message msg) {
-                ConnectionKey connKey = ses.meta(CONN_ID_META);
+                ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
                 if (connKey == null) {
                     assert ses.accepted() : ses;
@@ -684,7 +686,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 if (createClient) {
                     client = new GridTcpNioCommunicationClient(0, ses, log);
 
-                    addNodeClient(node.id(), 0, client);
+                    addNodeClient(node, 0, client);
                 }
 
                 return client;
@@ -794,10 +796,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 /** */
                 private final boolean createClient;
 
+                /** */
+                private final ConnectionKey connKey;
+
                 /**
                  * @param ses Incoming session.
                  * @param recoveryDesc Recovery descriptor.
                  * @param rmtNode Remote node.
+                 * @param connKey Connection key.
                  * @param msg Handshake message.
                  * @param createClient If {@code true} creates NIO 
communication client..
                  * @param fut Connect future.
@@ -805,12 +811,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 ConnectClosure(GridNioSession ses,
                     GridNioRecoveryDescriptor recoveryDesc,
                     ClusterNode rmtNode,
+                    ConnectionKey connKey,
                     HandshakeMessage msg,
                     boolean createClient,
                     GridFutureAdapter<GridCommunicationClient> fut) {
                     this.ses = ses;
                     this.recoveryDesc = recoveryDesc;
                     this.rmtNode = rmtNode;
+                    this.connKey = connKey;
                     this.msg = msg;
                     this.createClient = createClient;
                     this.fut = fut;
@@ -839,7 +847,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                                     fut.onDone();
                                 }
                                 finally {
-                                    clientFuts.remove(rmtNode.id(), fut);
+                                    clientFuts.remove(connKey, fut);
                                 }
                             }
                         };
@@ -851,7 +859,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             fut.onDone();
                         }
                         finally {
-                            clientFuts.remove(rmtNode.id(), fut);
+                            clientFuts.remove(connKey, fut);
                         }
                     }
                 }
@@ -1105,10 +1113,20 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
         return locPortRange;
     }
 
+    /**
+     * TODO
+     *
+     * @param maxConnectionsPerNode
+     */
     public void setConnectionsPerNode(int maxConnectionsPerNode) {
         this.connectionsPerNode = maxConnectionsPerNode;
     }
 
+    /**
+     * TODO
+     *
+     * @return
+     */
     public int getConnectionsPerNode() {
         return connectionsPerNode;
     }
@@ -1553,6 +1571,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(", msgsSent=").append(desc.sent())
                     .append(", msgsAckedByRmt=").append(desc.acked())
                     .append(", reserveCnt=").append(desc.reserveCount())
+                    .append(", connected=").append(desc.connected())
+                    .append(", reserved=").append(desc.reserved())
                     .append(", 
descIdHash=").append(System.identityHashCode(desc))
                     .append(']').append(U.nl());
             }
@@ -1578,9 +1598,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 GridCommunicationClient[] clients0 = entry.getValue();
 
                 for (GridCommunicationClient client : clients0) {
-                    sb.append("    [node=").append(nodeId)
-                        .append(", client=").append(client)
-                        .append(']').append(U.nl());
+                    if (client != null) {
+                        sb.append("    [node=").append(nodeId)
+                            .append(", client=").append(client)
+                            .append(']').append(U.nl());
+                    }
                 }
             }
 
@@ -1606,6 +1628,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         assertParameter(msgQueueLimit >= 0, "msgQueueLimit >= 0");
         assertParameter(shmemPort > 0 || shmemPort == -1, "shmemPort > 0 || 
shmemPort == -1");
         assertParameter(selectorsCnt > 0, "selectorsCnt > 0");
+        assertParameter(connectionsPerNode > 0, "connectionsPerNode > 0");
+        assertParameter(connectionsPerNode <= 1024, "connectionsPerNode <= 
1024");
 
         if (!failureDetectionTimeoutEnabled()) {
             assertParameter(reconCnt > 0, "reconnectCnt > 0");
@@ -1691,6 +1715,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             log.debug(configInfo("sockRcvBuf", sockRcvBuf));
             log.debug(configInfo("shmemPort", shmemPort));
             log.debug(configInfo("msgQueueLimit", msgQueueLimit));
+            log.debug(configInfo("connectionsPerNode", connectionsPerNode));
 
             if (failureDetectionTimeoutEnabled()) {
                 log.debug(configInfo("connTimeout", connTimeout));
@@ -1809,7 +1834,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        ConnectionKey key = ses.meta(CONN_ID_META);
+                        ConnectionKey key = ses.meta(CONN_IDX_META);
 
                         return key != null ? formatter.reader(key.nodeId(), 
msgFactory) : null;
                     }
@@ -1824,7 +1849,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        ConnectionKey key = ses.meta(CONN_ID_META);
+                        ConnectionKey key = ses.meta(CONN_IDX_META);
 
                         return key != null ? formatter.writer(key.nodeId()) : 
null;
                     }
@@ -2003,8 +2028,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         // Force closing on stop (safety).
         for (GridCommunicationClient[] clients0 : clients.values()) {
-            for (GridCommunicationClient client : clients0)
-                client.forceClose();
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
         }
 
         // Clear resources.
@@ -2031,8 +2058,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
         // Force closing.
         for (GridCommunicationClient[] clients0 : clients.values()) {
-            for (GridCommunicationClient client : clients0)
-                client.forceClose();
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
         }
 
         getSpiContext().deregisterPorts();
@@ -2045,8 +2074,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         connectGate.disconnected(reconnectFut);
 
         for (GridCommunicationClient[] clients0 : clients.values()) {
-            for (GridCommunicationClient client : clients0)
-                client.forceClose();
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
         }
 
         IgniteClientDisconnectedCheckedException err = new 
IgniteClientDisconnectedCheckedException(reconnectFut,
@@ -2075,11 +2106,13 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
 
         if (clients0 != null) {
             for (GridCommunicationClient client : clients0) {
-                if (log.isDebugEnabled())
-                    log.debug("Forcing NIO client close since node has left 
[nodeId=" + nodeId +
-                        ", client=" + client + ']');
+                if (client != null) {
+                    if (log.isDebugEnabled())
+                        log.debug("Forcing NIO client close since node has 
left [nodeId=" + nodeId +
+                            ", client=" + client + ']');
 
-                client.forceClose();
+                    client.forceClose();
+                }
             }
         }
     }
@@ -2128,12 +2161,49 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
         sendMessage0(node, msg, ackC);
     }
 
+    /** */
+    private final int idxMode = 
IgniteSystemProperties.getInteger("CONN_IDX_MODE", 0);
+
+    /** */
+    private final ThreadLocal<Integer> threadConnIdx = new ThreadLocal<>();
+
+    /** */
+    private final AtomicInteger connIdx = new AtomicInteger();
+
     /**
      * TODO
      * @return
      */
     private int connectionIndex() {
-        return ThreadLocalRandom.current().nextInt(connectionsPerNode);
+        switch (idxMode) {
+            case 0: {
+                return (int)(Thread.currentThread().getId() % 
connectionsPerNode);
+            }
+
+            case 1: {
+                Integer threadIdx = threadConnIdx.get();
+
+                if (threadIdx != null)
+                    return threadIdx;
+
+                for (;;) {
+                    int idx = connIdx.get();
+                    int nextIdx = idx == connectionsPerNode - 1 ? 0 : idx + 1;
+
+                    if (connIdx.compareAndSet(idx, nextIdx)) {
+                        threadConnIdx.set(idx);
+
+                        return idx;
+                    }
+                }
+            }
+
+            case 2:
+                return ThreadLocalRandom.current().nextInt(connectionsPerNode);
+
+            default:
+                throw new IgniteException("Invalid connection index mode: " + 
idxMode);
+        }
     }
 
     /**
@@ -2215,48 +2285,48 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
         for (;;) {
             GridCommunicationClient[] curClients = clients.get(nodeId);
 
-            if (curClients == null)
+            if (curClients == null || curClients[rmvClient.connectionIndex()] 
!= rmvClient)
                 return false;
 
-            if (curClients[rmvClient.connectionIndex()] == rmvClient) {
-                GridCommunicationClient[] newClients = 
Arrays.copyOf(curClients, curClients.length);
+            GridCommunicationClient[] newClients = Arrays.copyOf(curClients, 
curClients.length);
 
-                newClients[rmvClient.connectionIndex()] = null;
+            newClients[rmvClient.connectionIndex()] = null;
 
-                if (clients.replace(nodeId, curClients, newClients))
-                    return true;
-            }
-            else
-                return false;
+            if (clients.replace(nodeId, curClients, newClients))
+                return true;
         }
     }
 
     /**
-     * @param nodeId Node ID.
+     * @param node Node.
      * @param connIdx Connection index.
      * @param addClient Client to add.
      */
-    private void addNodeClient(UUID nodeId, int connIdx, 
GridCommunicationClient addClient) {
+    private void addNodeClient(ClusterNode node, int connIdx, 
GridCommunicationClient addClient) {
+        assert connectionsPerNode > 0 : connectionsPerNode;
+
         for (;;) {
-            GridCommunicationClient[] curClients = clients.get(nodeId);
+            GridCommunicationClient[] curClients = clients.get(node.id());
 
-            assert curClients == null || curClients[connIdx] == null : "Client 
already created " +
-                "[node=" + nodeId + ", client=" + addClient + ", oldClient=" + 
curClients[connIdx] + ']';
+            assert curClients == null || curClients[connIdx] == null : "Client 
already created [node=" + node.id() +
+                ", connIdx=" + connIdx +
+                ", client=" + addClient +
+                ", oldClient=" + curClients[connIdx] + ']';
 
             GridCommunicationClient[] newClients;
 
             if (curClients == null) {
-                newClients = new GridCommunicationClient[connectionsPerNode];
+                newClients = new 
GridCommunicationClient[useMultipleConnections(node) ? connectionsPerNode : 1];
                 newClients[connIdx] = addClient;
 
-                if (clients.putIfAbsent(nodeId, newClients) == null)
+                if (clients.putIfAbsent(node.id(), newClients) == null)
                     break;
             }
             else {
                 newClients = Arrays.copyOf(curClients, curClients.length);
                 newClients[connIdx] = addClient;
 
-                if (clients.replace(nodeId, curClients, newClients))
+                if (clients.replace(node.id(), curClients, newClients))
                     break;
             }
         }
@@ -2272,6 +2342,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     private GridCommunicationClient reserveClient(ClusterNode node, int 
connIdx) throws IgniteCheckedException {
         assert node != null;
+        assert connIdx >= 0 && connIdx < connectionsPerNode : connIdx;
 
         UUID nodeId = node.id();
 
@@ -2301,7 +2372,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             client0 = createNioClient(node, connIdx);
 
                             if (client0 != null) {
-                                addNodeClient(nodeId, connIdx, client0);
+                                addNodeClient(node, connIdx, client0);
 
                                 if (client0 instanceof 
GridTcpNioCommunicationClient) {
                                     GridTcpNioCommunicationClient tcpClient = 
((GridTcpNioCommunicationClient)client0);
@@ -2328,7 +2399,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             throw (Error)e;
                     }
                     finally {
-                        clientFuts.remove(nodeId, fut);
+                        clientFuts.remove(connKey, fut);
                     }
                 }
                 else
@@ -2525,7 +2596,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
      */
     private void checkClientQueueSize(GridNioSession ses, int msgQueueSize) {
         if (slowClientQueueLimit > 0 && msgQueueSize > slowClientQueueLimit) {
-            ConnectionKey id = ses.meta(CONN_ID_META);
+            ConnectionKey id = ses.meta(CONN_IDX_META);
 
             if (id != null) {
                 ClusterNode node = getSpiContext().node(id.nodeId);
@@ -2666,7 +2737,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     try {
                         Map<Integer, Object> meta = new HashMap<>();
 
-                        meta.put(CONN_ID_META, connKey);
+                        meta.put(CONN_IDX_META, connKey);
 
                         if (isSslEnabled()) {
                             assert sslEngine != null;
@@ -3085,8 +3156,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         U.join(commWorker, log);
 
         for (GridCommunicationClient[] clients0 : clients.values()) {
-            for (GridCommunicationClient client : clients0)
-                client.forceClose();
+            for (GridCommunicationClient client : clients0) {
+                if (client != null)
+                    client.forceClose();
+            }
         }
     }
 
@@ -3281,7 +3354,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        ConnectionKey connKey = ses.meta(CONN_ID_META);
+                        ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
                         return connKey != null ? 
formatter.writer(connKey.nodeId()) : null;
                     }
@@ -3297,7 +3370,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         assert formatter != null;
 
-                        ConnectionKey connKey = ses.meta(CONN_ID_META);
+                        ConnectionKey connKey = ses.meta(CONN_IDX_META);
 
                         return connKey != null ? 
formatter.reader(connKey.nodeId(), msgFactory) : null;
                     }
@@ -3384,6 +3457,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 UUID nodeId = e.getKey();
 
                 for (GridCommunicationClient client : e.getValue()) {
+                    if (client == null)
+                        continue;
+
                     ClusterNode node = getSpiContext().node(nodeId);
 
                     if (node == null) {
@@ -3483,7 +3559,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
 
         /**
-         *
+         * @param recoveryDescs Recovery descriptors to cleanup.
          */
         private void cleanupRecovery(ConcurrentMap<ConnectionKey, 
GridNioRecoveryDescriptor> recoveryDescs) {
             Set<ConnectionKey> left = null;
@@ -3492,9 +3568,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 if (left != null && left.contains(e.getKey()))
                     continue;
 
-                GridNioRecoveryDescriptor recoverySnd = e.getValue();
+                GridNioRecoveryDescriptor recoveryDesc = e.getValue();
 
-                if 
(!recoverySnd.nodeAlive(getSpiContext().node(recoverySnd.node().id()))) {
+                if 
(!recoveryDesc.nodeAlive(getSpiContext().node(e.getKey().nodeId()))) {
                     if (left == null)
                         left = new HashSet<>();
 
@@ -3506,10 +3582,10 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
                 assert !left.isEmpty();
 
                 for (ConnectionKey id : left) {
-                    GridNioRecoveryDescriptor recoverySnd = 
recoveryDescs.get(id);
+                    GridNioRecoveryDescriptor recoveryDesc = 
recoveryDescs.get(id);
 
-                    if (recoverySnd != null && recoverySnd.onNodeLeft())
-                        recoveryDescs.remove(id);
+                    if (recoveryDesc != null && recoveryDesc.onNodeLeft())
+                        recoveryDescs.remove(id, recoveryDesc);
                 }
             }
         }
@@ -3795,7 +3871,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             if (buf.remaining() < 33)
                 return false;
 
-            buf.put(HANDSHAKE_MSG_TYPE);
+            buf.put(directType());
 
             byte[] bytes = U.uuidToBytes(nodeId);
 
@@ -3867,7 +3943,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
          * @param rcvCnt Number of received messages.
          * @param connIdx Connection index.
          */
-        public HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, 
int connIdx) {
+        HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int 
connIdx) {
             super(nodeId, connectCnt, rcvCnt);
             this.connIdx = connIdx;
         }
@@ -3883,6 +3959,32 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
 
         /** {@inheritDoc} */
+        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) 
{
+            if (!super.writeTo(buf, writer))
+                return false;
+
+            if (buf.remaining() < 4)
+                return false;
+
+            buf.putInt(connIdx);
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean readFrom(ByteBuffer buf, MessageReader 
reader) {
+            if (!super.readFrom(buf, reader))
+                return false;
+
+            if (buf.remaining() < 4)
+                return false;
+
+            connIdx = buf.getInt();
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(HandshakeMessage2.class, this);
         }
@@ -4184,6 +4286,26 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
         }
 
         /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            ConnectionKey key = (ConnectionKey) o;
+
+            return idx == key.idx && nodeId.equals(key.nodeId);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = nodeId.hashCode();
+            res = 31 * res + idx;
+            return res;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(ConnectionKey.class, this);
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
new file mode 100644
index 0000000..30fc9ef
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheAtomicMessageRecovery10ConnectionsTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+/**
+ *
+ */
+public class IgniteCacheAtomicMessageRecovery10ConnectionsTest extends 
IgniteCacheAtomicMessageRecoveryTest {
+    /** {@inheritDoc} */
+    @Override protected int connectionsPerNode() {
+        return 10;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
index 0460a8f..1bfd727 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageRecoveryAbstractTest.java
@@ -58,6 +58,7 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest 
extends GridCommonA
 
         commSpi.setSocketWriteTimeout(1000);
         commSpi.setSharedMemoryPort(-1);
+        commSpi.setConnectionsPerNode(connectionsPerNode());
 
         cfg.setCommunicationSpi(commSpi);
 
@@ -76,6 +77,13 @@ public abstract class IgniteCacheMessageRecoveryAbstractTest 
extends GridCommonA
     }
 
     /**
+     * @return Value for {@link 
TcpCommunicationSpi#setConnectionsPerNode(int)}.
+     */
+    protected int connectionsPerNode() {
+        return TcpCommunicationSpi.DFLT_CONN_PER_NODE;
+    }
+
+    /**
      * @return Cache atomicity mode.
      */
     protected abstract CacheAtomicityMode atomicityMode();
@@ -174,18 +182,22 @@ public abstract class 
IgniteCacheMessageRecoveryAbstractTest extends GridCommonA
     static boolean closeSessions(Ignite ignite) throws Exception {
         TcpCommunicationSpi commSpi = 
(TcpCommunicationSpi)ignite.configuration().getCommunicationSpi();
 
-        Map<UUID, GridCommunicationClient> clients = U.field(commSpi, 
"clients");
+        Map<UUID, GridCommunicationClient[]> clients = U.field(commSpi, 
"clients");
 
         boolean closed = false;
 
-        for (GridCommunicationClient client : clients.values()) {
-            GridTcpNioCommunicationClient client0 = 
(GridTcpNioCommunicationClient)client;
+        for (GridCommunicationClient[] clients0 : clients.values()) {
+            for (GridCommunicationClient client : clients0) {
+                if (client != null) {
+                    GridTcpNioCommunicationClient client0 = 
(GridTcpNioCommunicationClient)client;
 
-            GridNioSession ses = client0.session();
+                    GridNioSession ses = client0.session();
 
-            ses.close();
+                    ses.close();
 
-            closed = true;
+                    closed = true;
+                }
+            }
         }
 
         return closed;

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
index 652e47f..deda313 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java
@@ -111,14 +111,15 @@ public class GridTcpSpiForwardingSelfTest extends 
GridCommonAbstractTest {
         cfg.setConnectorConfiguration(null);
 
         TcpCommunicationSpi commSpi = new TcpCommunicationSpi() {
-            @Override protected GridCommunicationClient 
createTcpClient(ClusterNode node) throws IgniteCheckedException {
+            @Override protected GridCommunicationClient 
createTcpClient(ClusterNode node, int connIdx)
+                throws IgniteCheckedException {
                 Map<String, Object> attrs = new HashMap<>(node.attributes());
 
                 attrs.remove(createSpiAttributeName(ATTR_PORT));
 
                 ((TcpDiscoveryNode)node).setAttributes(attrs);
 
-                return super.createTcpClient(node);
+                return super.createTcpClient(node, connIdx);
             }
         };
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index c7f7ad4..bd66319 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -78,6 +78,9 @@ public class 
GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
     /** */
     private static int port = 60_000;
 
+    /** */
+    private int connectionsPerNode = 1;
+
     /**
      *
      */
@@ -158,6 +161,17 @@ public class 
GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
     /**
      * @throws Exception If failed.
      */
+    public void testMultithreaded_10Connections() throws Exception {
+        connectionsPerNode = 10;
+
+        int threads = Runtime.getRuntime().availableProcessors() * 5;
+
+        concurrentConnect(threads, 10, 10, false, false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
     public void testWithLoad() throws Exception {
         int threads = Runtime.getRuntime().availableProcessors() * 5;
 
@@ -286,7 +300,7 @@ public class 
GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
 
                         Collection sessions = U.field(srv, "sessions");
 
-                        assertEquals(2, sessions.size());
+                        assertEquals(2 * connectionsPerNode, sessions.size());
                     }
 
                     assertEquals(expMsgs, lsnr.cntr.get());
@@ -315,6 +329,7 @@ public class 
GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
         spi.setIdleConnectionTimeout(60_000);
         spi.setConnectTimeout(10_000);
         spi.setSharedMemoryPort(-1);
+        spi.setConnectionsPerNode(connectionsPerNode);
 
         return spi;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
index b0353a6..c84ee32 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConfigSelfTest.java
@@ -47,6 +47,9 @@ public class GridTcpCommunicationSpiConfigSelfTest extends 
GridSpiAbstractConfig
         checkNegativeSpiProperty(new TcpCommunicationSpi(), 
"ackSendThreshold", 0);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), 
"ackSendThreshold", -1);
         checkNegativeSpiProperty(new TcpCommunicationSpi(), 
"unacknowledgedMessagesBufferSize", -1);
+        checkNegativeSpiProperty(new TcpCommunicationSpi(), 
"connectionsPerNode", 0);
+        checkNegativeSpiProperty(new TcpCommunicationSpi(), 
"connectionsPerNode", -1);
+        checkNegativeSpiProperty(new TcpCommunicationSpi(), 
"connectionsPerNode", Integer.MAX_VALUE);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index f210bec..74be68d 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -109,7 +109,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest 
extends GridSpiAbstrac
     /**
      * @param useShmem Use shared mem.
      */
-    protected GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
+    GridTcpCommunicationSpiMultithreadedSelfTest(boolean useShmem) {
         super(false);
 
         this.useShmem = useShmem;
@@ -549,11 +549,17 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest 
extends GridSpiAbstrac
         }
 
         for (CommunicationSpi spi : spis.values()) {
-            final ConcurrentMap<UUID, GridCommunicationClient> clients = 
U.field(spi, "clients");
+            final ConcurrentMap<UUID, GridCommunicationClient[]> clients = 
U.field(spi, "clients");
 
             assert GridTestUtils.waitForCondition(new PA() {
                 @Override public boolean apply() {
-                    return clients.isEmpty();
+                    for (GridCommunicationClient[] clients0 : 
clients.values()) {
+                        for (GridCommunicationClient client : clients0) {
+                            if (client != null)
+                                return false;
+                        }
+                    }
+                    return true;
                 }
             }, getTestTimeout()) : "Clients: " + clients;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index fb2dfd7..e0478da 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -361,6 +361,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T 
extends CommunicationS
         spi.setAckSendThreshold(ackCnt);
         spi.setMessageQueueLimit(queueLimit);
         spi.setSharedMemoryPort(-1);
+        spi.setConnectionsPerNode(1);
 
         return spi;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
index bec6e00..dfa8a54 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java
@@ -644,6 +644,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T 
extends CommunicationSpi>
         spi.setSocketWriteTimeout(1000);
         spi.setSocketSendBuffer(512);
         spi.setSocketReceiveBuffer(512);
+        spi.setConnectionsPerNode(1);
 
         return spi;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index e153fe2..9928d93 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -397,6 +397,7 @@ public class 
IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
         spi.setAckSendThreshold(ackCnt);
         spi.setMessageQueueLimit(queueLimit);
         spi.setSharedMemoryPort(-1);
+        spi.setConnectionsPerNode(1);
 
         return spi;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/4030ef88/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index dff9fc7..2fd84ab 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -124,6 +124,7 @@ import 
org.apache.ignite.internal.processors.cache.context.IgniteCacheTxExecutio
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUpdateTopologyChangeTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
+import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecovery10ConnectionsTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheConnectionRecoveryTest;
 import 
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageRecoveryIdleConnectionTest;
@@ -282,6 +283,7 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheEntrySetIterationPreloadingSelfTest.class);
         suite.addTestSuite(GridCacheMixedPartitionExchangeSelfTest.class);
         suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
+        
suite.addTestSuite(IgniteCacheAtomicMessageRecovery10ConnectionsTest.class);
         suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
         suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class);
         suite.addTestSuite(IgniteCacheMessageRecoveryIdleConnectionTest.class);

Reply via email to