Repository: ignite Updated Branches: refs/heads/ignite-comm-opts2 c604e8cb2 -> 3b0ffee05
conn Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b0ffee0 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b0ffee0 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b0ffee0 Branch: refs/heads/ignite-comm-opts2 Commit: 3b0ffee055ed843616282f013daa9d0b982e13bf Parents: c604e8c Author: sboikov <sboi...@gridgain.com> Authored: Wed Sep 21 12:54:53 2016 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Sep 21 12:54:53 2016 +0300 ---------------------------------------------------------------------- .../util/nio/GridSelectorNioSessionImpl.java | 2 +- .../communication/tcp/TcpCommunicationSpi.java | 47 +++++++++++++++----- .../IgniteCacheMessageWriteTimeoutTest.java | 13 ++++-- 3 files changed, 47 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java index a680a33..88721ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java @@ -303,7 +303,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl { if (!accepted() && val instanceof GridNioRecoveryDescriptor) { outRecovery = (GridNioRecoveryDescriptor)val; - outRecovery.connected(); + outRecovery.onConnected(); return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index c9d9bf7..c131cf2 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -424,13 +424,15 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (msg instanceof NodeIdMessage) { sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); - connKey = new ConnectionKey(sndId, 0); + connKey = new ConnectionKey(sndId, 0, -1); } else { assert msg instanceof HandshakeMessage : msg; + HandshakeMessage msg0 = (HandshakeMessage)msg; + sndId = ((HandshakeMessage)msg).nodeId(); - connKey = new ConnectionKey(sndId, ((HandshakeMessage)msg).connectionIndex()); + connKey = new ConnectionKey(sndId, msg0.connectionIndex(), msg0.connectCount()); } if (log.isDebugEnabled()) @@ -470,8 +472,19 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (reserve) connectedNew(recoveryDesc, ses, true); else { - if (c.failed) - ses.close(); + if (c.failed) { + ses.send(new RecoveryLastReceivedMessage(-1)); + + for (GridNioSession ses0 : nioSrvr.sessions()) { + ConnectionKey key0 = ses0.meta(CONN_IDX_META); + + if (ses0.accepted() && key0 != null && + key0.nodeId().equals(connKey.nodeId()) && + key0.connectionIndex() == connKey.connectionIndex() && + key0.connectCount() < connKey.connectCount()) + ses0.close(); + } + } } } else { @@ -2369,7 +2382,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter // Do not allow concurrent connects. GridFutureAdapter<GridCommunicationClient> fut = new ConnectFuture(); - ConnectionKey connKey = new ConnectionKey(nodeId, connIdx); + ConnectionKey connKey = new ConnectionKey(nodeId, connIdx, -1); GridFutureAdapter<GridCommunicationClient> oldFut = clientFuts.putIfAbsent(connKey, fut); @@ -2705,7 +2718,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter "(node left topology): " + node); } - ConnectionKey connKey = new ConnectionKey(node.id(), connIdx); + ConnectionKey connKey = new ConnectionKey(node.id(), connIdx, -1); GridNioRecoveryDescriptor recoveryDesc = outRecoveryDescriptor(node, connKey); @@ -3097,8 +3110,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter rcvCnt = buf.getLong(1); } - if (log.isDebugEnabled()) - log.debug("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']'); + // if (log.isDebugEnabled()) + log.info("Received handshake message [rmtNode=" + rmtNodeId + ", rcvCnt=" + rcvCnt + ']'); if (rcvCnt == -1) { if (log.isDebugEnabled()) @@ -3487,7 +3500,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioRecoveryDescriptor recovery = null; if (!useMultipleConnections(node) && client instanceof GridTcpNioCommunicationClient) { - recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex())); + recovery = recoveryDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1)); if (recovery != null && recovery.lastAcknowledged() != recovery.received()) { RecoveryLastReceivedMessage msg = new RecoveryLastReceivedMessage(recovery.received()); @@ -3508,7 +3521,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (idleTime >= idleConnTimeout) { if (recovery == null && useMultipleConnections(node)) - recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex())); + recovery = outRecDescs.get(new ConnectionKey(node.id(), client.connectionIndex(), -1)); if (recovery != null && recovery.nodeAlive(getSpiContext().node(nodeId)) && @@ -4273,13 +4286,25 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** */ private final int idx; + /** */ + private final long connCnt; + /** * @param nodeId Node ID. * @param idx Connection index. + * @param connCnt Connection counter (set only for incoming connections). */ - ConnectionKey(UUID nodeId, int idx) { + ConnectionKey(UUID nodeId, int idx, long connCnt) { this.nodeId = nodeId; this.idx = idx; + this.connCnt = connCnt; + } + + /** + * @return Connection counter. + */ + long connectCount() { + return connCnt; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/3b0ffee0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java index 5b51af8..0dd4079 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java @@ -66,15 +66,20 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest { super.afterTest(); } + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 10 * 60_000; + } + /** * @throws Exception If failed. */ public void testMessageQueueLimit() throws Exception { - startGridsMultiThreaded(3); - - for (int i = 0; i < 15; i++) { + for (int i = 0; i < 3; i++) { log.info("Iteration: " + i); + startGridsMultiThreaded(3); + IgniteInternalFuture<?> fut1 = startJobThreads(50); U.sleep(100); @@ -83,6 +88,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest { fut1.get(); fut2.get(); + + stopAllGrids(); } }