Repository: ignite
Updated Branches:
  refs/heads/ignite-comm-opts2 c604e8cb2 -> 3b0ffee05


conn


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b0ffee0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b0ffee0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b0ffee0

Branch: refs/heads/ignite-comm-opts2
Commit: 3b0ffee055ed843616282f013daa9d0b982e13bf
Parents: c604e8c
Author: sboikov <sboi...@gridgain.com>
Authored: Wed Sep 21 12:54:53 2016 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Wed Sep 21 12:54:53 2016 +0300

----------------------------------------------------------------------
 .../util/nio/GridSelectorNioSessionImpl.java    |  2 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 47 +++++++++++++++-----
 .../IgniteCacheMessageWriteTimeoutTest.java     | 13 ++++--
 3 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index a680a33..88721ff 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -303,7 +303,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl 
{
         if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
             outRecovery = (GridNioRecoveryDescriptor)val;
 
-            outRecovery.connected();
+            outRecovery.onConnected();
 
             return null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index c9d9bf7..c131cf2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -424,13 +424,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                 if (msg instanceof NodeIdMessage) {
                     sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 
0);
-                    connKey = new ConnectionKey(sndId, 0);
+                    connKey = new ConnectionKey(sndId, 0, -1);
                 }
                 else {
                     assert msg instanceof HandshakeMessage : msg;
 
+                    HandshakeMessage msg0 = (HandshakeMessage)msg;
+
                     sndId = ((HandshakeMessage)msg).nodeId();
-                    connKey = new ConnectionKey(sndId, 
((HandshakeMessage)msg).connectionIndex());
+                    connKey = new ConnectionKey(sndId, msg0.connectionIndex(), 
msg0.connectCount());
                 }
 
                 if (log.isDebugEnabled())
@@ -470,8 +472,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     if (reserve)
                         connectedNew(recoveryDesc, ses, true);
                     else {
-                        if (c.failed)
-                            ses.close();
+                        if (c.failed) {
+                            ses.send(new RecoveryLastReceivedMessage(-1));
+
+                            for (GridNioSession ses0 : nioSrvr.sessions()) {
+                                ConnectionKey key0 = ses0.meta(CONN_IDX_META);
+
+                                if (ses0.accepted() && key0 != null &&
+                                    key0.nodeId().equals(connKey.nodeId()) &&
+                                    key0.connectionIndex() == 
connKey.connectionIndex() &&
+                                    key0.connectCount() < 
connKey.connectCount())
+                                    ses0.close();
+                            }
+                        }
                     }
                 }
                 else {
@@ -2369,7 +2382,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                 // Do not allow concurrent connects.
                 GridFutureAdapter<GridCommunicationClient> fut = new 
ConnectFuture();
 
-                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx);
+                ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1);
 
                 GridFutureAdapter<GridCommunicationClient> oldFut = 
clientFuts.putIfAbsent(connKey, fut);
 
@@ -2705,7 +2718,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             "(node left topology): " + node);
                     }
 
-                    ConnectionKey connKey = new ConnectionKey(node.id(), 
connIdx);
+                    ConnectionKey connKey = new ConnectionKey(node.id(), 
connIdx, -1);
 
                     GridNioRecoveryDescriptor recoveryDesc = 
outRecoveryDescriptor(node, connKey);
 
@@ -3097,8 +3110,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                             rcvCnt = buf.getLong(1);
                         }
 
-                        if (log.isDebugEnabled())
-                            log.debug("Received handshake message [rmtNode=" + 
rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
+                       // if (log.isDebugEnabled())
+                            log.info("Received handshake message [rmtNode=" + 
rmtNodeId + ", rcvCnt=" + rcvCnt + ']');
 
                         if (rcvCnt == -1) {
                             if (log.isDebugEnabled())
@@ -3487,7 +3500,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     GridNioRecoveryDescriptor recovery = null;
 
                     if (!useMultipleConnections(node) && client instanceof 
GridTcpNioCommunicationClient) {
-                        recovery = recoveryDescs.get(new 
ConnectionKey(node.id(), client.connectionIndex()));
+                        recovery = recoveryDescs.get(new 
ConnectionKey(node.id(), client.connectionIndex(), -1));
 
                         if (recovery != null && recovery.lastAcknowledged() != 
recovery.received()) {
                             RecoveryLastReceivedMessage msg = new 
RecoveryLastReceivedMessage(recovery.received());
@@ -3508,7 +3521,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     if (idleTime >= idleConnTimeout) {
                         if (recovery == null && useMultipleConnections(node))
-                            recovery = outRecDescs.get(new 
ConnectionKey(node.id(), client.connectionIndex()));
+                            recovery = outRecDescs.get(new 
ConnectionKey(node.id(), client.connectionIndex(), -1));
 
                         if (recovery != null &&
                             recovery.nodeAlive(getSpiContext().node(nodeId)) &&
@@ -4273,13 +4286,25 @@ public class TcpCommunicationSpi extends 
IgniteSpiAdapter
         /** */
         private final int idx;
 
+        /** */
+        private final long connCnt;
+
         /**
          * @param nodeId Node ID.
          * @param idx Connection index.
+         * @param connCnt Connection counter (set only for incoming 
connections).
          */
-        ConnectionKey(UUID nodeId, int idx) {
+        ConnectionKey(UUID nodeId, int idx, long connCnt) {
             this.nodeId = nodeId;
             this.idx = idx;
+            this.connCnt = connCnt;
+        }
+
+        /**
+         * @return Connection counter.
+         */
+        long connectCount() {
+            return connCnt;
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
index 5b51af8..0dd4079 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -66,15 +66,20 @@ public class IgniteCacheMessageWriteTimeoutTest extends 
GridCommonAbstractTest {
         super.afterTest();
     }
 
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 10 * 60_000;
+    }
+
     /**
      * @throws Exception If failed.
      */
     public void testMessageQueueLimit() throws Exception {
-        startGridsMultiThreaded(3);
-
-        for (int i = 0; i < 15; i++) {
+        for (int i = 0; i < 3; i++) {
             log.info("Iteration: " + i);
 
+            startGridsMultiThreaded(3);
+
             IgniteInternalFuture<?> fut1 = startJobThreads(50);
 
             U.sleep(100);
@@ -83,6 +88,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends 
GridCommonAbstractTest {
 
             fut1.get();
             fut2.get();
+
+            stopAllGrids();
         }
     }
 

Reply via email to