Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-opts2 [created] c70fe0793


ignite-3220 Implemented separate in/out connections in communication.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c70fe079
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c70fe079
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c70fe079

Branch: refs/heads/ignite-comm-opts2
Commit: c70fe0793b2b5a12ceef8fcaed11e2ba2ce76fa3
Parents: f30b79c
Author: sboikov <sboi...@gridgain.com>
Authored: Thu Sep 15 10:17:36 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Thu Sep 15 10:17:36 2016 +0300

----------------------------------------------------------------------
 .../rest/protocols/tcp/MockNioSession.java      |  14 +-
 .../util/nio/GridNioRecoveryDescriptor.java     |   1 +
 .../ignite/internal/util/nio/GridNioServer.java |  44 ++-
 .../internal/util/nio/GridNioSession.java       |  14 +-
 .../internal/util/nio/GridNioSessionImpl.java   |  16 +-
 .../util/nio/GridSelectorNioSessionImpl.java    |  46 ++-
 .../communication/tcp/TcpCommunicationSpi.java  | 325 +++++++++++++++----
 .../nio/impl/GridNioFilterChainSelfTest.java    |  14 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |   4 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |   2 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |   2 +-
 ...CommunicationRecoveryAckClosureSelfTest.java |   2 +-
 12 files changed, 379 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
 
b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
index c82c73e..e848653 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
@@ -131,12 +131,22 @@ public class MockNioSession extends 
GridMetadataAwareAdapter implements GridNioS
     }
 
     /** {@inheritDoc} */
-    @Override public void recoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
+    @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
         // No-op.
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+    @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor 
outRecoveryDescriptor() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor 
inRecoveryDescriptor() {
         return null;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 35480ac..29903d4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -31,6 +31,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Recovery information for single node.
  */
+@Deprecated // To be splitted into separate classes for in/out data when do 
not need maintain backward compatibility.
 public class GridNioRecoveryDescriptor {
     /** Number of acknowledged messages. */
     private long acked;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 24b8fad..ccd0ae4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -37,6 +37,7 @@ import java.nio.channels.WritableByteChannel;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
 import java.util.List;
@@ -504,7 +505,7 @@ public class GridNioServer<T> {
     public void resend(GridNioSession ses) {
         assert ses instanceof GridSelectorNioSessionImpl;
 
-        GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor();
+        GridNioRecoveryDescriptor recoveryDesc = ses.outRecoveryDescriptor();
 
         if (recoveryDesc != null && !recoveryDesc.messagesFutures().isEmpty()) 
{
             Deque<GridNioFuture<?>> futs = recoveryDesc.messagesFutures();
@@ -530,6 +531,13 @@ public class GridNioServer<T> {
     }
 
     /**
+     * @return Sessions.
+     */
+    public Collection<? extends GridNioSession> sessions() {
+        return sessions;
+    }
+
+    /**
      * @param ses Session.
      * @param op Operation.
      * @return Future for operation.
@@ -1463,16 +1471,25 @@ public class GridNioServer<T> {
                                         
.append("rmtAddr=").append(ses.remoteAddress())
                                         .append(", 
locAddr=").append(ses.localAddress());
 
-                                    GridNioRecoveryDescriptor desc = 
ses.recoveryDescriptor();
+                                    GridNioRecoveryDescriptor outDesc = 
ses.outRecoveryDescriptor();
 
-                                    if (desc != null) {
-                                        sb.append(", 
msgsSent=").append(desc.sent())
-                                            .append(", 
msgsAckedByRmt=").append(desc.acked())
-                                            .append(", 
msgsRcvd=").append(desc.received())
-                                            .append(", 
descIdHash=").append(System.identityHashCode(desc));
+                                    if (outDesc != null) {
+                                        sb.append(", 
msgsSent=").append(outDesc.sent())
+                                            .append(", 
msgsAckedByRmt=").append(outDesc.acked())
+                                            .append(", 
descIdHash=").append(System.identityHashCode(outDesc));
                                     }
                                     else
-                                        sb.append(", recoveryDesc=null");
+                                        sb.append(", outRecoveryDesc=null");
+
+                                    GridNioRecoveryDescriptor inDesc = 
ses.inRecoveryDescriptor();
+
+                                    if (inDesc != null) {
+                                        sb.append(", 
msgsRcvd=").append(inDesc.received())
+                                            .append(", 
lastAcked=").append(inDesc.lastAcknowledged())
+                                            .append(", 
descIdHash=").append(System.identityHashCode(inDesc));
+                                    }
+                                    else
+                                        sb.append(", inRecoveryDesc=null");
 
                                     sb.append(", 
bytesRcvd=").append(ses.bytesReceived())
                                         .append(", 
bytesSent=").append(ses.bytesSent())
@@ -1826,9 +1843,10 @@ public class GridNioServer<T> {
                 // Since ses is in closed state, no write requests will be 
added.
                 NioOperationFuture<?> fut = 
ses.removeMeta(NIO_OPERATION.ordinal());
 
-                GridNioRecoveryDescriptor recovery = ses.recoveryDescriptor();
+                GridNioRecoveryDescriptor outRecovery = 
ses.outRecoveryDescriptor();
+                GridNioRecoveryDescriptor inRecovery = 
ses.inRecoveryDescriptor();
 
-                if (recovery != null) {
+                if (outRecovery != null || inRecovery != null) {
                     try {
                         // Poll will update recovery data.
                         while ((fut = (NioOperationFuture<?>)ses.pollFuture()) 
!= null) {
@@ -1837,7 +1855,11 @@ public class GridNioServer<T> {
                         }
                     }
                     finally {
-                        recovery.release();
+                        if (outRecovery != null)
+                            outRecovery.release();
+
+                        if (inRecovery != null && inRecovery != outRecovery)
+                            inRecovery.release();
                     }
                 }
                 else {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
index e4a7225..1e427d8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSession.java
@@ -158,10 +158,20 @@ public interface GridNioSession {
     /**
      * @param recoveryDesc Recovery descriptor.
      */
-    public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+    public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+
+    /**
+     * @param recoveryDesc Recovery descriptor.
+     */
+    public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc);
+
+    /**
+     * @return Recovery descriptor if recovery is supported, {@code null 
otherwise.}
+     */
+    @Nullable public GridNioRecoveryDescriptor outRecoveryDescriptor();
 
     /**
      * @return Recovery descriptor if recovery is supported, {@code null 
otherwise.}
      */
-    @Nullable public GridNioRecoveryDescriptor recoveryDescriptor();
+    @Nullable public GridNioRecoveryDescriptor inRecoveryDescriptor();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
index 0bcfe64..53a624d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioSessionImpl.java
@@ -296,12 +296,22 @@ public class GridNioSessionImpl implements GridNioSession 
{
     }
 
     /** {@inheritDoc} */
-    @Override public void recoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
+    @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
         throw new UnsupportedOperationException();
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+    @Nullable @Override public GridNioRecoveryDescriptor 
outRecoveryDescriptor() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor 
inRecoveryDescriptor() {
         return null;
     }
 
@@ -309,4 +319,4 @@ public class GridNioSessionImpl implements GridNioSession {
     @Override public String toString() {
         return S.toString(GridNioSessionImpl.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 0ba6af2..a680a33 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -59,8 +59,11 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Read buffer. */
     private ByteBuffer readBuf;
 
-    /** Recovery data. */
-    private GridNioRecoveryDescriptor recovery;
+    /** Incoming recovery data. */
+    private GridNioRecoveryDescriptor inRecovery;
+
+    /** Outgoing recovery data. */
+    private GridNioRecoveryDescriptor outRecovery;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -124,7 +127,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl 
{
      * @param key Selection key.
      */
     void key(SelectionKey key) {
-        assert this.key == null;
+        assert key != null;
 
         this.key = key;
     }
@@ -225,17 +228,17 @@ class GridSelectorNioSessionImpl extends 
GridNioSessionImpl {
             if (sem != null && !last.messageThread())
                 sem.release();
 
-            if (recovery != null) {
-                if (!recovery.add(last)) {
+            if (outRecovery != null) {
+                if (!outRecovery.add(last)) {
                     LT.warn(log, null, "Unacknowledged messages queue size 
overflow, will attempt to reconnect " +
                         "[remoteAddr=" + remoteAddress() +
-                        ", queueLimit=" + recovery.queueLimit() + ']');
+                        ", queueLimit=" + outRecovery.queueLimit() + ']');
 
                     if (log.isDebugEnabled())
                         log.debug("Unacknowledged messages queue size 
overflow, will attempt to reconnect " +
                             "[remoteAddr=" + remoteAddress() +
-                            ", queueSize=" + recovery.messagesFutures().size() 
+
-                            ", queueLimit=" + recovery.queueLimit() + ']');
+                            ", queueSize=" + 
outRecovery.messagesFutures().size() +
+                            ", queueLimit=" + outRecovery.queueLimit() + ']');
 
                     close();
                 }
@@ -272,24 +275,35 @@ class GridSelectorNioSessionImpl extends 
GridNioSessionImpl {
     }
 
     /** {@inheritDoc} */
-    @Override public void recoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
+    @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
+        assert recoveryDesc != null;
+
+        outRecovery = recoveryDesc;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor 
outRecoveryDescriptor() {
+        return outRecovery;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
         assert recoveryDesc != null;
 
-        recovery = recoveryDesc;
+        inRecovery = recoveryDesc;
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
-        return recovery;
+    @Nullable @Override public GridNioRecoveryDescriptor 
inRecoveryDescriptor() {
+        return inRecovery;
     }
 
     /** {@inheritDoc} */
     @Override public <T> T addMeta(int key, @Nullable T val) {
-        if (val instanceof GridNioRecoveryDescriptor) {
-            recovery = (GridNioRecoveryDescriptor)val;
+        if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
+            outRecovery = (GridNioRecoveryDescriptor)val;
 
-            if (!accepted())
-                recovery.connected();
+            outRecovery.connected();
 
             return null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 74ecc45..63afb61 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -102,6 +102,7 @@ import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteRunnable;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -236,6 +237,9 @@ import static 
org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 @IgniteSpiConsistencyChecked(optional = false)
 public class TcpCommunicationSpi extends IgniteSpiAdapter
     implements CommunicationSpi<Message>, TcpCommunicationSpiMBean {
+    /** */
+    private static final IgniteProductVersion TWO_CONN_SINCE_VER = 
IgniteProductVersion.fromString("1.7.2");
+
     /** IPC error message. */
     public static final String OUT_OF_RESOURCES_TCP_MSG = "Failed to allocate 
shared memory segment " +
         "(switching to TCP, may be slower).";
@@ -365,7 +369,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (!stopping) {
                         boolean reconnect = false;
 
-                        GridNioRecoveryDescriptor recoveryData = 
ses.recoveryDescriptor();
+                        GridNioRecoveryDescriptor recoveryData = 
ses.outRecoveryDescriptor();
 
                         if (recoveryData != null) {
                             if 
(recoveryData.nodeAlive(getSpiContext().node(id))) {
@@ -432,52 +436,33 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 if (ses.remoteAddress() == null)
                     return;
 
-                GridCommunicationClient oldClient = clients.get(sndId);
-
-                boolean hasShmemClient = false;
+                assert msg instanceof HandshakeMessage : msg;
 
-                if (oldClient != null) {
-                    if (oldClient instanceof GridTcpNioCommunicationClient) {
-                        if (log.isDebugEnabled())
-                            log.debug("Received incoming connection when 
already connected " +
-                                    "to this node, rejecting [locNode=" + 
locNode.id() +
-                                    ", rmtNode=" + sndId + ']');
+                HandshakeMessage msg0 = (HandshakeMessage)msg;
 
-                        ses.send(new RecoveryLastReceivedMessage(-1));
+                if (useTwoConnections(rmtNode)) {
+                    final GridNioRecoveryDescriptor recoveryDesc = 
inRecoveryDescriptor(rmtNode);
 
-                        return;
-                    }
-                    else {
-                        assert oldClient instanceof 
GridShmemCommunicationClient;
+                    boolean reserve = 
recoveryDesc.tryReserve(msg0.connectCount(),
+                        new ConnectClosureNew(ses, recoveryDesc, rmtNode));
 
-                        hasShmemClient = true;
-                    }
+                    if (reserve)
+                        connectedNew(recoveryDesc, ses, true);
                 }
+                else {
+                    GridCommunicationClient oldClient = clients.get(sndId);
 
-                GridFutureAdapter<GridCommunicationClient> fut = new 
GridFutureAdapter<>();
-
-                GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(sndId, fut);
-
-                assert msg instanceof HandshakeMessage : msg;
-
-                HandshakeMessage msg0 = (HandshakeMessage)msg;
-
-                final GridNioRecoveryDescriptor recoveryDesc = 
recoveryDescriptor(rmtNode);
-
-                if (oldFut == null) {
-                    oldClient = clients.get(sndId);
+                    boolean hasShmemClient = false;
 
                     if (oldClient != null) {
                         if (oldClient instanceof 
GridTcpNioCommunicationClient) {
                             if (log.isDebugEnabled())
                                 log.debug("Received incoming connection when 
already connected " +
-                                        "to this node, rejecting [locNode=" + 
locNode.id() +
-                                        ", rmtNode=" + sndId + ']');
+                                    "to this node, rejecting [locNode=" + 
locNode.id() +
+                                    ", rmtNode=" + sndId + ']');
 
                             ses.send(new RecoveryLastReceivedMessage(-1));
 
-                            fut.onDone(oldClient);
-
                             return;
                         }
                         else {
@@ -487,43 +472,73 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         }
                     }
 
-                    boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
+                    GridFutureAdapter<GridCommunicationClient> fut = new 
GridFutureAdapter<>();
+
+                    GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(sndId, fut);
+
+                    final GridNioRecoveryDescriptor recoveryDesc = 
inRecoveryDescriptor(rmtNode);
+
+                    if (oldFut == null) {
+                        oldClient = clients.get(sndId);
+
+                        if (oldClient != null) {
+                            if (oldClient instanceof 
GridTcpNioCommunicationClient) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Received incoming connection 
when already connected " +
+                                        "to this node, rejecting [locNode=" + 
locNode.id() +
+                                        ", rmtNode=" + sndId + ']');
+
+                                ses.send(new RecoveryLastReceivedMessage(-1));
+
+                                fut.onDone(oldClient);
+
+                                return;
+                            }
+                            else {
+                                assert oldClient instanceof 
GridShmemCommunicationClient;
+
+                                hasShmemClient = true;
+                            }
+                        }
+
+                        boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
                             new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, !hasShmemClient, fut));
 
-                    if (log.isDebugEnabled())
-                        log.debug("Received incoming connection from remote 
node " +
+                        if (log.isDebugEnabled())
+                            log.debug("Received incoming connection from 
remote node " +
                                 "[rmtNode=" + rmtNode.id() + ", reserved=" + 
reserved + ']');
 
-                    if (reserved) {
-                        try {
-                            GridTcpNioCommunicationClient client =
+                        if (reserved) {
+                            try {
+                                GridTcpNioCommunicationClient client =
                                     connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true, !hasShmemClient);
 
-                            fut.onDone(client);
-                        }
-                        finally {
-                            clientFuts.remove(rmtNode.id(), fut);
+                                fut.onDone(client);
+                            }
+                            finally {
+                                clientFuts.remove(rmtNode.id(), fut);
+                            }
                         }
                     }
-                }
-                else {
-                    if (oldFut instanceof ConnectFuture && locNode.order() < 
rmtNode.order()) {
-                        if (log.isDebugEnabled()) {
-                            log.debug("Received incoming connection from 
remote node while " +
+                    else {
+                        if (oldFut instanceof ConnectFuture && locNode.order() 
< rmtNode.order()) {
+                            if (log.isDebugEnabled()) {
+                                log.debug("Received incoming connection from 
remote node while " +
                                     "connecting to this node, rejecting 
[locNode=" + locNode.id() +
                                     ", locNodeOrder=" + locNode.order() + ", 
rmtNode=" + rmtNode.id() +
                                     ", rmtNodeOrder=" + rmtNode.order() + ']');
-                        }
+                            }
 
-                        ses.send(new RecoveryLastReceivedMessage(-1));
-                    }
-                    else {
-                        // The code below causes a race condition between 
shmem and TCP (see IGNITE-1294)
-                        boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+                        }
+                        else {
+                            // The code below causes a race condition between 
shmem and TCP (see IGNITE-1294)
+                            boolean reserved = 
recoveryDesc.tryReserve(msg0.connectCount(),
                                 new ConnectClosure(ses, recoveryDesc, rmtNode, 
msg0, !hasShmemClient, fut));
 
-                        if (reserved)
-                            connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true, !hasShmemClient);
+                            if (reserved)
+                                connected(recoveryDesc, ses, rmtNode, 
msg0.received(), true, !hasShmemClient);
+                        }
                     }
                 }
             }
@@ -553,10 +568,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 else {
                     rcvdMsgsCnt.increment();
 
-                    GridNioRecoveryDescriptor recovery = 
ses.recoveryDescriptor();
+                    if (msg instanceof RecoveryLastReceivedMessage) {
+                        GridNioRecoveryDescriptor recovery = 
ses.outRecoveryDescriptor();
 
-                    if (recovery != null) {
-                        if (msg instanceof RecoveryLastReceivedMessage) {
+                        if (recovery != null) {
                             RecoveryLastReceivedMessage msg0 = 
(RecoveryLastReceivedMessage)msg;
 
                             if (log.isDebugEnabled())
@@ -567,7 +582,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                             return;
                         }
-                        else {
+                    }
+                    else {
+                        GridNioRecoveryDescriptor recovery = 
ses.inRecoveryDescriptor();
+
+                        if (recovery != null) {
                             long rcvCnt = recovery.onReceived();
 
                             if (rcvCnt % ackSndThreshold == 0) {
@@ -623,7 +642,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 boolean createClient) {
                 recovery.onHandshake(rcvCnt);
 
-                ses.recoveryDescriptor(recovery);
+                ses.inRecoveryDescriptor(recovery);
 
                 nioSrvr.resend(ses);
 
@@ -647,6 +666,79 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             }
 
             /**
+             * @param recovery Recovery descriptor.
+             * @param ses Session.
+             * @param sndRes If {@code true} sends response for recovery 
handshake.
+             */
+            private void connectedNew(
+                GridNioRecoveryDescriptor recovery,
+                GridNioSession ses,
+                boolean sndRes) {
+                ses.inRecoveryDescriptor(recovery);
+
+                if (sndRes)
+                    nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(recovery.received()));
+
+                recovery.connected();
+            }
+
+            /**
+             *
+             */
+            class ConnectClosureNew implements IgniteInClosure<Boolean> {
+                /** */
+                private static final long serialVersionUID = 0L;
+
+                /** */
+                private final GridNioSession ses;
+
+                /** */
+                private final GridNioRecoveryDescriptor recoveryDesc;
+
+                /** */
+                private final ClusterNode rmtNode;
+
+                /**
+                 * @param ses Incoming session.
+                 * @param recoveryDesc Recovery descriptor.
+                 * @param rmtNode Remote node.
+                 */
+                ConnectClosureNew(GridNioSession ses,
+                    GridNioRecoveryDescriptor recoveryDesc,
+                    ClusterNode rmtNode) {
+                    this.ses = ses;
+                    this.recoveryDesc = recoveryDesc;
+                    this.rmtNode = rmtNode;
+                }
+
+                /** {@inheritDoc} */
+                @Override public void apply(Boolean success) {
+                    if (success) {
+                        IgniteInClosure<IgniteInternalFuture<?>> lsnr = new 
IgniteInClosure<IgniteInternalFuture<?>>() {
+                            @Override public void 
apply(IgniteInternalFuture<?> msgFut) {
+                                try {
+                                    msgFut.get();
+
+                                    connectedNew(recoveryDesc, ses, false);
+                                }
+                                catch (IgniteCheckedException e) {
+                                    if (log.isDebugEnabled())
+                                        log.debug("Failed to send recovery 
handshake " +
+                                            "[rmtNode=" + rmtNode.id() + ", 
err=" + e + ']');
+
+                                    recoveryDesc.release();
+                                }
+                            }
+                        };
+
+                        nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr);
+                    }
+                    else
+                        nioSrvr.sendSystem(ses, new 
RecoveryLastReceivedMessage(-1));
+                }
+            }
+
+            /**
              *
              */
             @SuppressWarnings("PackageVisibleInnerClass")
@@ -867,6 +959,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     /** */
     private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> 
recoveryDescs = GridConcurrentFactory.newMap();
 
+    /** */
+    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> 
outRecDescs = GridConcurrentFactory.newMap();
+
+    /** */
+    private final ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> 
inRecDescs = GridConcurrentFactory.newMap();
+
     /** Discovery listener. */
     private final GridLocalEventListener discoLsnr = new 
GridLocalEventListener() {
         @Override public void onEvent(Event evt) {
@@ -1407,6 +1505,28 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     .append(']').append(U.nl());
             }
 
+            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : 
outRecDescs.entrySet()) {
+                GridNioRecoveryDescriptor desc = entry.getValue();
+
+                sb.append("    [key=").append(entry.getKey())
+                    .append(", msgsSent=").append(desc.sent())
+                    .append(", msgsAckedByRmt=").append(desc.acked())
+                    .append(", reserveCnt=").append(desc.reserveCount())
+                    .append(", 
descIdHash=").append(System.identityHashCode(desc))
+                    .append(']').append(U.nl());
+            }
+
+            for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> entry : 
inRecDescs.entrySet()) {
+                GridNioRecoveryDescriptor desc = entry.getValue();
+
+                sb.append("    [key=").append(entry.getKey())
+                    .append(", msgsRcvd=").append(desc.received())
+                    .append(", lastAcked=").append(desc.lastAcknowledged())
+                    .append(", reserveCnt=").append(desc.reserveCount())
+                    .append(", 
descIdHash=").append(System.identityHashCode(desc))
+                    .append(']').append(U.nl());
+            }
+
             sb.append("Communication SPI clients: ").append(U.nl());
 
             for (Map.Entry<UUID, GridCommunicationClient> entry : 
clients.entrySet()) {
@@ -1881,6 +2001,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
             clientFut.onDone(err);
 
         recoveryDescs.clear();
+        inRecDescs.clear();
+        outRecDescs.clear();
     }
 
     /** {@inheritDoc} */
@@ -2364,7 +2486,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             "(node left topology): " + node);
                     }
 
-                    GridNioRecoveryDescriptor recoveryDesc = 
recoveryDescriptor(node);
+                    GridNioRecoveryDescriptor recoveryDesc = 
outRecoveryDescriptor(node);
 
                     if (!recoveryDesc.reserve()) {
                         U.closeQuiet(ch);
@@ -2682,6 +2804,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         else
                             
ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType));
                     }
+
                     if (recovery != null) {
                         if (log.isDebugEnabled())
                             log.debug("Waiting for handshake [rmtNode=" + 
rmtNodeId + ']');
@@ -2807,9 +2930,41 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
     /**
      * @param node Node.
+     * @return Recovery descriptor for outgoing connection.
+     */
+    private GridNioRecoveryDescriptor outRecoveryDescriptor(ClusterNode node) {
+        if (useTwoConnections(node))
+            return recoveryDescriptor(outRecDescs, node);
+        else
+            return recoveryDescriptor(recoveryDescs, node);
+    }
+
+    /**
+     * @param node Node.
+     * @return Recovery descriptor for incoming connection.
+     */
+    private GridNioRecoveryDescriptor inRecoveryDescriptor(ClusterNode node) {
+        if (useTwoConnections(node))
+            return recoveryDescriptor(inRecDescs, node);
+        else
+            return recoveryDescriptor(recoveryDescs, node);
+    }
+
+    /**
+     * @param node Node.
+     * @return {@code True} if given node supports two connections per-node 
for communication.
+     */
+    private boolean useTwoConnections(ClusterNode node) {
+        return node.version().compareToIgnoreTimestamp(TWO_CONN_SINCE_VER) >= 
0;
+    }
+
+    /**
+     * @param node Node.
      * @return Recovery receive data for given node.
      */
-    private GridNioRecoveryDescriptor recoveryDescriptor(ClusterNode node) {
+    private GridNioRecoveryDescriptor recoveryDescriptor(
+        ConcurrentMap<ClientKey, GridNioRecoveryDescriptor> recoveryDescs,
+        ClusterNode node) {
         ClientKey id = new ClientKey(node.id(), node.order());
 
         GridNioRecoveryDescriptor recovery = recoveryDescs.get(id);
@@ -3128,7 +3283,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 GridNioRecoveryDescriptor recovery = null;
 
-                if (client instanceof GridTcpNioCommunicationClient) {
+                if (!useTwoConnections(node) && client instanceof 
GridTcpNioCommunicationClient) {
                     recovery = recoveryDescs.get(new ClientKey(node.id(), 
node.order()));
 
                     if (recovery != null && recovery.lastAcknowledged() != 
recovery.received()) {
@@ -3149,6 +3304,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 long idleTime = client.getIdleTime();
 
                 if (idleTime >= idleConnTimeout) {
+                    if (recovery == null && useTwoConnections(node))
+                        recovery = outRecDescs.get(new ClientKey(node.id(), 
node.order()));
+
                     if (recovery != null &&
                         recovery.nodeAlive(getSpiContext().node(nodeId)) &&
                         !recovery.messagesFutures().isEmpty()) {
@@ -3166,12 +3324,51 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
                         clients.remove(nodeId, client);
                 }
             }
+
+            for (GridNioSession ses : nioSrvr.sessions()) {
+                GridNioRecoveryDescriptor recovery = 
ses.inRecoveryDescriptor();
+
+                if (recovery != null && useTwoConnections(recovery.node())) {
+                    assert ses.accepted() : ses;
+
+                    sendAckOnTimeout(recovery, ses);
+                }
+            }
+        }
+
+        /**
+         * @param recovery Recovery descriptor.
+         * @param ses Session.
+         */
+        private void sendAckOnTimeout(GridNioRecoveryDescriptor recovery, 
GridNioSession ses) {
+            if (recovery != null && recovery.lastAcknowledged() != 
recovery.received()) {
+                RecoveryLastReceivedMessage msg = new 
RecoveryLastReceivedMessage(recovery.received());
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Send recovery acknowledgement on timeout 
[rmtNode=" + recovery.node().id() +
+                        ", rcvCnt=" + msg.received() +
+                        ", lastAcked=" + recovery.lastAcknowledged() + ']');
+                }
+
+                nioSrvr.sendSystem(ses, msg);
+
+                recovery.lastAcknowledged(msg.received());
+            }
         }
 
         /**
          *
          */
         private void cleanupRecovery() {
+            cleanupRecovery(recoveryDescs);
+            cleanupRecovery(inRecDescs);
+            cleanupRecovery(outRecDescs);
+        }
+
+        /**
+         *
+         */
+        private void cleanupRecovery(ConcurrentMap<ClientKey, 
GridNioRecoveryDescriptor> recoveryDescs) {
             Set<ClientKey> left = null;
 
             for (Map.Entry<ClientKey, GridNioRecoveryDescriptor> e : 
recoveryDescs.entrySet()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
index 201fd27..58b91e4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java
@@ -369,12 +369,22 @@ public class GridNioFilterChainSelfTest extends 
GridCommonAbstractTest {
         }
 
         /** {@inheritDoc} */
-        @Override public void recoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
+        @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
             // No-op.
         }
 
         /** {@inheritDoc} */
-        @Nullable @Override public GridNioRecoveryDescriptor 
recoveryDescriptor() {
+        @Nullable @Override public GridNioRecoveryDescriptor 
outRecoveryDescriptor() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor 
recoveryDesc) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public GridNioRecoveryDescriptor 
inRecoveryDescriptor() {
             return null;
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
index 97eb34c..c7f7ad4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java
@@ -280,13 +280,13 @@ public class 
GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic
                             @Override public boolean apply() {
                                 Collection sessions = U.field(srv, "sessions");
 
-                                return sessions.size() == 1;
+                                return sessions.size() == 2;
                             }
                         }, 5000);
 
                         Collection sessions = U.field(srv, "sessions");
 
-                        assertEquals(1, sessions.size());
+                        assertEquals(2, sessions.size());
                     }
 
                     assertEquals(expMsgs, lsnr.cntr.get());

http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 7bbf531..f210bec 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -370,7 +370,7 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest 
extends GridSpiAbstrac
             Collection<? extends GridNioSession> sessions = 
GridTestUtils.getFieldValue(srv, "sessions");
 
             for (GridNioSession ses : sessions) {
-                final GridNioRecoveryDescriptor snd = ses.recoveryDescriptor();
+                final GridNioRecoveryDescriptor snd = 
ses.outRecoveryDescriptor();
 
                 if (snd != null) {
                     GridTestUtils.waitForCondition(new GridAbsPredicate() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
index 34872c6..fb2dfd7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java
@@ -173,7 +173,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T 
extends CommunicationS
                     boolean found = false;
 
                     for (GridNioSession ses : sessions) {
-                        final GridNioRecoveryDescriptor recoveryDesc = 
ses.recoveryDescriptor();
+                        final GridNioRecoveryDescriptor recoveryDesc = 
ses.outRecoveryDescriptor();
 
                         if (recoveryDesc != null) {
                             found = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c70fe079/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
index 25e3611..e153fe2 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java
@@ -187,7 +187,7 @@ public class 
IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic
                     boolean found = false;
 
                     for (GridNioSession ses : sessions) {
-                        final GridNioRecoveryDescriptor recoveryDesc = 
ses.recoveryDescriptor();
+                        final GridNioRecoveryDescriptor recoveryDesc = 
ses.outRecoveryDescriptor();
 
                         if (recoveryDesc != null) {
                             found = true;

Reply via email to