IGNITE-6818 Handle half open connection in communication.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/191295d4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/191295d4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/191295d4 Branch: refs/heads/ignite-zk Commit: 191295d45f53225d9e1e214c6fdd85b59e80d0ec Parents: 132ec3f Author: dkarachentsev <[email protected]> Authored: Mon Nov 13 10:35:21 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 13 10:35:21 2017 +0300 ---------------------------------------------------------------------- .../communication/tcp/TcpCommunicationSpi.java | 37 +++-- ...ommunicationSpiHalfOpenedConnectionTest.java | 142 +++++++++++++++++++ .../IgniteSpiCommunicationSelfTestSuite.java | 2 + 3 files changed, 168 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/191295d4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 1bff8ee..49425ce 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -539,15 +539,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (c.failed) { ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED)); - for (GridNioSession ses0 : nioSrvr.sessions()) { - ConnectionKey key0 = ses0.meta(CONN_IDX_META); - - if (ses0.accepted() && key0 != null && - key0.nodeId().equals(connKey.nodeId()) && - key0.connectionIndex() == connKey.connectionIndex() && - key0.connectCount() < connKey.connectCount()) - ses0.close(); - } + closeStaleConnections(connKey); } } } @@ -567,11 +559,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (oldClient instanceof GridTcpNioCommunicationClient) { if (log.isInfoEnabled()) log.info("Received incoming connection when already connected " + - "to this node, rejecting [locNode=" + locNode.id() + - ", rmtNode=" + sndId + ']'); + "to this node, rejecting [locNode=" + locNode.id() + + ", rmtNode=" + sndId + ']'); ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED)); + closeStaleConnections(connKey); + return; } else { @@ -599,11 +593,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (log.isInfoEnabled()) log.info("Received incoming connection when already connected " + - "to this node, rejecting [locNode=" + locNode.id() + - ", rmtNode=" + sndId + ']'); + "to this node, rejecting [locNode=" + locNode.id() + + ", rmtNode=" + sndId + ']'); ses.send(new RecoveryLastReceivedMessage(ALREADY_CONNECTED)); + closeStaleConnections(connKey); + fut.onDone(oldClient); return; @@ -658,6 +654,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati } } + /** + * @param connKey Connection key. + */ + private void closeStaleConnections(ConnectionKey connKey) { + for (GridNioSession ses0 : nioSrvr.sessions()) { + ConnectionKey key0 = ses0.meta(CONN_IDX_META); + + if (ses0.accepted() && key0 != null && + key0.nodeId().equals(connKey.nodeId()) && + key0.connectionIndex() == connKey.connectionIndex() && + key0.connectCount() < connKey.connectCount()) + ses0.close(); + } + } + @Override public void onMessage(final GridNioSession ses, Message msg) { ConnectionKey connKey = ses.meta(CONN_IDX_META); http://git-wip-us.apache.org/repos/asf/ignite/blob/191295d4/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java new file mode 100644 index 0000000..3e10f94 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiHalfOpenedConnectionTest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import java.io.IOException; +import java.util.Iterator; +import java.util.UUID; +import java.util.concurrent.ConcurrentMap; +import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterGroup; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.nio.GridCommunicationClient; +import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; +import org.apache.ignite.internal.util.nio.GridNioServerListener; +import org.apache.ignite.internal.util.nio.GridTcpNioCommunicationClient; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Tests case when connection is closed only for one side, when other is not notified. + */ +public class TcpCommunicationSpiHalfOpenedConnectionTest extends GridCommonAbstractTest { + /** Client spi. */ + private TcpCommunicationSpi clientSpi; + + /** Paired connections. */ + private boolean pairedConnections; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (igniteInstanceName.contains("client")) { + cfg.setClientMode(true); + + clientSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi(); + } + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setUsePairedConnections(pairedConnections); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(true); + } + + /** + * @throws Exception If failed. + */ + public void testReconnect() throws Exception { + pairedConnections = false; + + checkReconnect(); + } + + /** + * @throws Exception If failed. + */ + public void testReconnectPaired() throws Exception { + pairedConnections = true; + + checkReconnect(); + } + + /** + * @throws Exception If failed. + */ + private void checkReconnect() throws Exception { + Ignite srv = startGrid("server"); + Ignite client = startGrid("client"); + + UUID nodeId = srv.cluster().localNode().id(); + + System.out.println(">> Server ID: " + nodeId); + + ClusterGroup srvGrp = client.cluster().forNodeId(nodeId); + + System.out.println(">> Send job"); + + // Establish connection + client.compute(srvGrp).run(F.noop()); + + ConcurrentMap<UUID, GridCommunicationClient[]> clients = U.field(clientSpi, "clients"); + ConcurrentMap<?, GridNioRecoveryDescriptor> recoveryDescs = U.field(clientSpi, "recoveryDescs"); + ConcurrentMap<?, GridNioRecoveryDescriptor> outRecDescs = U.field(clientSpi, "outRecDescs"); + ConcurrentMap<?, GridNioRecoveryDescriptor> inRecDescs = U.field(clientSpi, "inRecDescs"); + GridNioServerListener<Message> lsnr = U.field(clientSpi, "srvLsnr"); + + Iterator<GridNioRecoveryDescriptor> it = F.concat( + recoveryDescs.values().iterator(), + outRecDescs.values().iterator(), + inRecDescs.values().iterator() + ); + + while (it.hasNext()) { + GridNioRecoveryDescriptor desc = it.next(); + + // Need to simulate connection close in GridNioServer as it + // releases descriptors on disconnect. + desc.release(); + } + + // Remove client to avoid calling close(), in that case server + // will close connection too, but we want to keep the server + // uninformed and force ping old connection. + GridCommunicationClient[] clients0 = clients.remove(nodeId); + + for (GridCommunicationClient commClient : clients0) + lsnr.onDisconnected(((GridTcpNioCommunicationClient)commClient).session(), new IOException("Test exception")); + + info(">> Removed client"); + + // Reestablish connection + client.compute(srvGrp).run(F.noop()); + + info(">> Sent second job"); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 30_000; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/191295d4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java index 77de3fc..8e96a3f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java @@ -38,6 +38,7 @@ import org.apache.ignite.spi.communication.tcp.GridTcpCommunicationSpiTcpSelfTes import org.apache.ignite.spi.communication.tcp.IgniteTcpCommunicationRecoveryAckClosureSelfTest; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiDropNodesTest; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFaultyClientTest; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest; /** * Test suite for all communication SPIs. @@ -78,6 +79,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(TcpCommunicationSpiFaultyClientTest.class)); suite.addTest(new TestSuite(TcpCommunicationSpiDropNodesTest.class)); + suite.addTest(new TestSuite(TcpCommunicationSpiHalfOpenedConnectionTest.class)); return suite; }
