http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java index b3b5d1a..3ba319b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java @@ -50,8 +50,8 @@ public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest { // Try provoke connection close on socket writeTimeout. commSpi.setSharedMemoryPort(-1); commSpi.setMessageQueueLimit(10); - commSpi.setSocketReceiveBuffer(40); - commSpi.setSocketSendBuffer(40); + commSpi.setSocketReceiveBuffer(64); + commSpi.setSocketSendBuffer(64); commSpi.setSocketWriteTimeout(100); commSpi.setUnacknowledgedMessagesBufferSize(1000); commSpi.setConnectTimeout(10_000);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java index 4dbb7ce..e623467 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioSelfTest.java @@ -678,7 +678,7 @@ public class GridNioSelfTest extends GridCommonAbstractTest { try { SocketChannel ch = SocketChannel.open(new InetSocketAddress(U.getLocalHost(), srvr2.port())); - GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null); + GridNioFuture<GridNioSession> fut = srvr1.createSession(ch, null, false, null); ses = fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java index 1e25003..bee63b3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/GridTcpSpiForwardingSelfTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.AddressResolver; import org.apache.ignite.configuration.BasicAddressResolver; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgniteCallable; @@ -111,7 +112,7 @@ public class GridTcpSpiForwardingSelfTest extends GridCommonAbstractTest { cfg.setConnectorConfiguration(null); TcpCommunicationSpi commSpi = new TcpCommunicationSpi() { - @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) + @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { Map<String, Object> attrs = new HashMap<>(node.attributes()); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java index 88276c2..07edc86 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/GridAbstractCommunicationSelfTest.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.typedef.CO; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; @@ -39,6 +40,7 @@ import org.apache.ignite.spi.IgniteSpiAdapter; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteMock; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; @@ -70,6 +72,9 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS private static final Object mux = new Object(); /** */ + private static GridTimeoutProcessor timeoutProcessor; + + /** */ protected boolean useSsl = false; /** @@ -289,6 +294,12 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(); + for (int i = 0; i < getSpiCount(); i++) { CommunicationSpi<Message> spi = getSpi(i); @@ -298,18 +309,20 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS GridTestNode node = new GridTestNode(rsrcs.getNodeId()); - node.order(i); - GridSpiTestContext ctx = initSpiContext(); ctx.setLocalNode(node); + ctx.timeoutProcessor(timeoutProcessor); + info(">>> Initialized context: nodeId=" + ctx.localNode().id()); spiRsrcs.add(rsrcs); rsrcs.inject(spi); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); + if (useSsl) { IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite"); @@ -324,6 +337,8 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS node.setAttributes(spi.getNodeAttributes()); node.setAttribute(ATTR_MACS, F.concat(U.allLocalMACs(), ", ")); + node.order(i + 1); + nodes.add(node); spi.spiStart(getTestIgniteInstanceName() + (i + 1)); @@ -346,6 +361,14 @@ public abstract class GridAbstractCommunicationSelfTest<T extends CommunicationS /** {@inheritDoc} */ @Override protected void afterTestsStopped() throws Exception { + if (timeoutProcessor != null) { + timeoutProcessor.onKernalStop(true); + + timeoutProcessor.stop(true); + + timeoutProcessor = null; + } + for (CommunicationSpi<Message> spi : spis.values()) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java index 78bf869..39ecd8e 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiConcurrentConnectSelfTest.java @@ -36,7 +36,9 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.nio.GridNioServer; @@ -51,6 +53,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteMock; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; @@ -79,6 +82,9 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic protected static final List<ClusterNode> nodes = new ArrayList<>(); /** */ + private static GridTimeoutProcessor timeoutProcessor; + + /** */ private static int port = 60_000; /** Use ssl. */ @@ -407,27 +413,37 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(); + for (int i = 0; i < SPI_CNT; i++) { CommunicationSpi<Message> spi = createSpi(); - GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); - IgniteTestResources rsrcs = new IgniteTestResources(); GridTestNode node = new GridTestNode(rsrcs.getNodeId()); + node.setAttribute(IgniteNodeAttributes.ATTR_CLIENT_MODE, false); + node.order(i + 1); GridSpiTestContext ctx = initSpiContext(); ctx.setLocalNode(node); + ctx.timeoutProcessor(timeoutProcessor); + info(">>> Initialized context: nodeId=" + ctx.localNode().id()); spiRsrcs.add(rsrcs); rsrcs.inject(spi); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); + if (useSsl) { IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite"); @@ -494,6 +510,14 @@ public class GridTcpCommunicationSpiConcurrentConnectSelfTest<T extends Communic * @throws Exception If failed. */ private void stopSpis() throws Exception { + if (timeoutProcessor != null) { + timeoutProcessor.onKernalStop(true); + + timeoutProcessor.stop(true); + + timeoutProcessor = null; + } + for (CommunicationSpi<Message> spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java index feaae11..f87ff09 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckSelfTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioServer; @@ -44,6 +45,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; import org.apache.ignite.testframework.junits.spi.GridSpiTest; @@ -64,11 +66,11 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS protected static final List<ClusterNode> nodes = new ArrayList<>(); /** */ + private static GridTimeoutProcessor timeoutProcessor; + + /** */ private static final int SPI_CNT = 2; - /** - * - */ static { GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { @Override public Message apply() { @@ -159,6 +161,8 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0)); } + U.sleep(500); + expMsgs += msgPerIter; final long totAcked0 = totAcked; @@ -166,9 +170,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS for (TcpCommunicationSpi spi : spis) { GridNioServer srv = U.field(spi, "nioSrvr"); - Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + final Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return !sessions.isEmpty(); + } + }, 5_000); - assertFalse(sessions.isEmpty()); boolean found = false; @@ -268,21 +277,21 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS ClusterNode node0 = nodes.get(0); ClusterNode node1 = nodes.get(1); - final GridNioServer srv1 = U.field(spi1, "nioSrvr"); - int msgId = 0; // Send message to establish connection. spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); + U.sleep(1000); + // Prevent node1 from send - GridTestUtils.setFieldValue(srv1, "skipWrite", true); + GridTestUtils.setFieldValue(spi1, "skipAck", true); final GridNioSession ses0 = communicationSession(spi0); int sentMsgs = 1; - for (int i = 0; i < 150; i++) { + for (int i = 0; i < 1280; i++) { try { spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); @@ -304,7 +313,7 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS assertTrue("Failed to wait for session close", ses0.closeTime() != 0); - GridTestUtils.setFieldValue(srv1, "skipWrite", false); + GridTestUtils.setFieldValue(spi1, "skipAck", false); for (int i = 0; i < 100; i++) spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0)); @@ -379,11 +388,15 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(); + for (int i = 0; i < SPI_CNT; i++) { TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); - GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); - IgniteTestResources rsrcs = new IgniteTestResources(); GridTestNode node = new GridTestNode(rsrcs.getNodeId()); @@ -392,14 +405,20 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS ctx.setLocalNode(node); + ctx.timeoutProcessor(timeoutProcessor); + spiRsrcs.add(rsrcs); rsrcs.inject(spi); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); + spi.setListener(new TestListener()); node.setAttributes(spi.getNodeAttributes()); + node.order(i); + nodes.add(node); spi.spiStart(getTestIgniteInstanceName() + (i + 1)); @@ -455,6 +474,14 @@ public class GridTcpCommunicationSpiRecoveryAckSelfTest<T extends CommunicationS * @throws Exception If failed. */ private void stopSpis() throws Exception { + if (timeoutProcessor != null) { + timeoutProcessor.onKernalStop(true); + + timeoutProcessor.stop(true); + + timeoutProcessor = null; + } + for (CommunicationSpi<Message> spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java index 2a043ee..46d2d1d 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoverySelfTest.java @@ -32,6 +32,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridNioServer; import org.apache.ignite.internal.util.nio.GridNioSession; @@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteMock; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; @@ -80,6 +82,9 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> /** Use ssl. */ protected boolean useSsl; + /** */ + private static GridTimeoutProcessor timeoutProcessor; + /** * */ @@ -115,7 +120,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> /** {@inheritDoc} */ @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { - // info("Test listener received message: " + msg); + //info("Test listener received message: " + msg); assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage); @@ -186,7 +191,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> * @return Timeout. */ protected long awaitForSocketWriteTimeout() { - return 8000; + return 20000; } /** @@ -298,6 +303,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> // Send message to establish connection. spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0)); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + final AtomicInteger sentCnt = new AtomicInteger(1); int errCnt = 0; @@ -413,6 +424,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> // Send message to establish connection. spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0)); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + expCnt1.incrementAndGet(); int errCnt = 0; @@ -451,7 +468,7 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> ses1.resumeReads().get(); } catch (IgniteCheckedException ignore) { - // Can fail is ses1 was closed. + // Can fail if ses1 was closed. } // Wait when session is closed, then try to open new connection from node1. @@ -534,6 +551,12 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> // Send message to establish connection. spi0.sendMessage(node1, new GridTestMessage(node0.id(), msgId.incrementAndGet(), 0)); + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + final AtomicInteger sentCnt = new AtomicInteger(1); int errCnt = 0; @@ -686,11 +709,15 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(); + for (int i = 0; i < SPI_CNT; i++) { TcpCommunicationSpi spi = getSpi(i); - GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); - IgniteTestResources rsrcs = new IgniteTestResources(); GridTestNode node = new GridTestNode(rsrcs.getNodeId()); @@ -701,10 +728,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> ctx.setLocalNode(node); + ctx.timeoutProcessor(timeoutProcessor); + spiRsrcs.add(rsrcs); rsrcs.inject(spi); + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "igniteInstanceName", "grid-" + i); + if (useSsl) { IgniteMock ignite = GridTestUtils.getFieldValue(spi, IgniteSpiAdapter.class, "ignite"); @@ -770,6 +801,14 @@ public class GridTcpCommunicationSpiRecoverySelfTest<T extends CommunicationSpi> * @throws Exception If failed. */ private void stopSpis() throws Exception { + if (timeoutProcessor != null) { + timeoutProcessor.onKernalStop(true); + + timeoutProcessor.stop(true); + + timeoutProcessor = null; + } + for (CommunicationSpi<Message> spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java index 3f58055..7b59da3 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor; import org.apache.ignite.internal.util.nio.GridNioServer; @@ -47,6 +48,7 @@ import org.apache.ignite.spi.communication.GridTestMessage; import org.apache.ignite.testframework.GridSpiTestContext; import org.apache.ignite.testframework.GridTestNode; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.GridTestKernalContext; import org.apache.ignite.testframework.junits.IgniteTestResources; import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest; import org.apache.ignite.testframework.junits.spi.GridSpiTest; @@ -70,6 +72,9 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic /** */ private static final int SPI_CNT = 2; + /** */ + private static GridTimeoutProcessor timeoutProcessor; + /** * */ @@ -98,8 +103,6 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic /** {@inheritDoc} */ @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { - info("Test listener received message: " + msg); - assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage); GridTestMessage msg0 = (GridTestMessage)msg; @@ -171,6 +174,17 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackC); spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackC); + + if (j == 0) { + final TestListener lsnr0 = (TestListener)spi0.getListener(); + final TestListener lsnr1 = (TestListener)spi1.getListener(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr0.rcvCnt.get() >= 1 && lsnr1.rcvCnt.get() >= 1; + } + }, 1000); + } } expMsgs += msgPerIter; @@ -415,6 +429,12 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); + timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log)); + + timeoutProcessor.start(); + + timeoutProcessor.onKernalStart(); + for (int i = 0; i < SPI_CNT; i++) { TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); @@ -428,6 +448,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic ctx.setLocalNode(node); + ctx.timeoutProcessor(timeoutProcessor); + spiRsrcs.add(rsrcs); rsrcs.inject(spi); @@ -436,6 +458,8 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic node.setAttributes(spi.getNodeAttributes()); + node.order(i); + nodes.add(node); spi.spiStart(getTestIgniteInstanceName() + (i + 1)); @@ -491,6 +515,14 @@ public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends Communic * @throws Exception If failed. */ private void stopSpis() throws Exception { + if (timeoutProcessor != null) { + timeoutProcessor.onKernalStop(true); + + timeoutProcessor.stop(true); + + timeoutProcessor = null; + } + for (CommunicationSpi<Message> spi : spis) { spi.onContextDestroyed(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java index 2b49d53..9c59cb2 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiDropNodesTest.java @@ -188,8 +188,7 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest { final CountDownLatch latch = new CountDownLatch(1); grid(0).events().localListen(new IgnitePredicate<Event>() { - @Override - public boolean apply(Event event) { + @Override public boolean apply(Event evt) { latch.countDown(); return true; @@ -239,14 +238,14 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest { }, 5000); try { - fut1.get(); + fut1.get(1000); } catch (IgniteCheckedException e) { // No-op. } try { - fut2.get(); + fut2.get(1000); } catch (IgniteCheckedException e) { // No-op. @@ -297,8 +296,9 @@ public class TcpCommunicationSpiDropNodesTest extends GridCommonAbstractTest { */ private static class TestCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ - @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) - throws IgniteCheckedException { + @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx) + throws IgniteCheckedException + { if (pred.apply(getLocalNode(), node)) { Map<String, Object> attrs = new HashMap<>(node.attributes()); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java index 4fe67c1..baa1270 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiFaultyClientTest.java @@ -240,8 +240,9 @@ public class TcpCommunicationSpiFaultyClientTest extends GridCommonAbstractTest */ private static class TestCommunicationSpi extends TcpCommunicationSpi { /** {@inheritDoc} */ - @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) - throws IgniteCheckedException { + @Override protected IgniteInternalFuture<GridCommunicationClient> createTcpClient(ClusterNode node, int connIdx) + throws IgniteCheckedException + { if (PRED.apply(node)) { Map<String, Object> attrs = new HashMap<>(node.attributes()); http://git-wip-us.apache.org/repos/asf/ignite/blob/3f6b88a2/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java index 8a20eec..a241a04 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/communication/HadoopExternalCommunication.java @@ -1004,7 +1004,10 @@ public class HadoopExternalCommunication { HandshakeFinish fin = new HandshakeFinish(); - GridNioSession ses = nioSrvr.createSession(ch, F.asMap(HANDSHAKE_FINISH_META, fin)).get(); + GridNioFuture<GridNioSession> sesFut = + nioSrvr.createSession(ch, F.<Integer, Object>asMap(HANDSHAKE_FINISH_META, fin), false, null); + + GridNioSession ses = sesFut.get(); client = new HadoopTcpNioCommunicationClient(ses);
