Repository: incubator-ignite Updated Branches: refs/heads/ignite-1169 381773897 -> 383b97fbc
IGNITE-1169 Fixed review notes. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/383b97fb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/383b97fb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/383b97fb Branch: refs/heads/ignite-1169 Commit: 383b97fbcbf95bfba25446a560045e4b39f4f87b Parents: 3817738 Author: nikolay_tikhonov <ntikho...@gridgain.com> Authored: Fri Jul 31 10:03:53 2015 +0300 Committer: nikolay_tikhonov <ntikho...@gridgain.com> Committed: Fri Jul 31 10:03:53 2015 +0300 ---------------------------------------------------------------------- .../util/nio/GridNioRecoveryDescriptor.java | 1 - .../ignite/internal/util/nio/GridNioServer.java | 1 - ...mmunicationSpiRecoveryAckFutureSelfTest.java | 464 ------------------- ...CommunicationRecoveryAckClosureSelfTest.java | 464 +++++++++++++++++++ .../IgniteSpiCommunicationSelfTestSuite.java | 2 +- 5 files changed, 465 insertions(+), 467 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/383b97fb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java index a21600b..a7ed02a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.internal.*; import org.apache.ignite.lang.*; import org.jetbrains.annotations.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/383b97fb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java index 5c4916e..0ce6c30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java @@ -21,7 +21,6 @@ import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.nio.ssl.*; import org.apache.ignite.internal.util.tostring.*; import org.apache.ignite.internal.util.typedef.*; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/383b97fb/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java deleted file mode 100644 index c082b4f..0000000 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiRecoveryAckFutureSelfTest.java +++ /dev/null @@ -1,464 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.communication.tcp; - -import org.apache.ignite.*; -import org.apache.ignite.cluster.*; -import org.apache.ignite.internal.managers.communication.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.nio.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.plugin.extensions.communication.*; -import org.apache.ignite.spi.*; -import org.apache.ignite.spi.communication.*; -import org.apache.ignite.testframework.*; -import org.apache.ignite.testframework.junits.*; -import org.apache.ignite.testframework.junits.spi.*; - -import org.eclipse.jetty.util.*; - -import java.net.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * - */ -@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") -public class GridTcpCommunicationSpiRecoveryAckFutureSelfTest<T extends CommunicationSpi> extends GridSpiAbstractTest<T> { - /** */ - private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>(); - - /** */ - protected static final List<TcpCommunicationSpi> spis = new ArrayList<>(); - - /** */ - protected static final List<ClusterNode> nodes = new ArrayList<>(); - - /** */ - private static final int SPI_CNT = 2; - - /** - * - */ - static { - GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { - @Override public Message apply() { - return new GridTestMessage(); - } - }); - } - - /** - * Disable SPI auto-start. - */ - public GridTcpCommunicationSpiRecoveryAckFutureSelfTest() { - super(false); - } - - /** */ - @SuppressWarnings({"deprecation"}) - private class TestListener implements CommunicationListener<Message> { - /** */ - private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>(); - - /** */ - private AtomicInteger rcvCnt = new AtomicInteger(); - - /** {@inheritDoc} */ - @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { - info("Test listener received message: " + msg); - - assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage); - - GridTestMessage msg0 = (GridTestMessage)msg; - - assertTrue("Duplicated message received: " + msg0, msgIds.add(msg0.getMsgId())); - - rcvCnt.incrementAndGet(); - - msgC.run(); - } - - /** {@inheritDoc} */ - @Override public void onDisconnected(UUID nodeId) { - // No-op. - } - } - - /** - * @throws Exception If failed. - */ - public void testAckOnIdle() throws Exception { - checkAck(10, 2000, 9); - } - - /** - * @throws Exception If failed. - */ - public void testAckOnCount() throws Exception { - checkAck(10, 60_000, 10); - } - - /** - * @param ackCnt Recovery acknowledgement count. - * @param idleTimeout Idle connection timeout. - * @param msgPerIter Messages per iteration. - * @throws Exception If failed. - */ - private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception { - createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT); - - try { - TcpCommunicationSpi spi0 = spis.get(0); - TcpCommunicationSpi spi1 = spis.get(1); - - ClusterNode node0 = nodes.get(0); - ClusterNode node1 = nodes.get(1); - - int msgId = 0; - - int expMsgs = 0; - - for (int i = 0; i < 5; i++) { - info("Iteration: " + i); - - final AtomicInteger ackMsgs = new AtomicInteger(0); - - IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { - @Override public void apply(IgniteException o) { - assert o == null; - - ackMsgs.incrementAndGet(); - } - }; - - for (int j = 0; j < msgPerIter; j++) { - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); - - spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure); - } - - expMsgs += msgPerIter; - - for (TcpCommunicationSpi spi : spis) { - GridNioServer srv = U.field(spi, "nioSrvr"); - - Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); - - assertFalse(sessions.isEmpty()); - - boolean found = false; - - for (GridNioSession ses : sessions) { - final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor(); - - if (recoveryDesc != null) { - found = true; - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return recoveryDesc.messagesFutures().isEmpty(); - } - }, 10_000); - - assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0, - recoveryDesc.messagesFutures().size()); - - break; - } - } - - assertTrue(found); - } - - final int expMsgs0 = expMsgs; - - for (TcpCommunicationSpi spi : spis) { - final TestListener lsnr = (TestListener)spi.getListener(); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { - return lsnr.rcvCnt.get() >= expMsgs0; - } - }, 5000); - - assertEquals(expMsgs, lsnr.rcvCnt.get()); - } - - assertEquals(msgPerIter * 2, ackMsgs.get()); - } - } - finally { - stopSpis(); - } - } - - /** - * @throws Exception If failed. - */ - public void testQueueOverflow() throws Exception { - for (int i = 0; i < 3; i++) { - try { - startSpis(5, 60_000, 10); - - checkOverflow(); - - break; - } - catch (IgniteCheckedException e) { - if (e.hasCause(BindException.class)) { - if (i < 2) { - info("Got exception caused by BindException, will retry after delay: " + e); - - stopSpis(); - - U.sleep(10_000); - } - else - throw e; - } - else - throw e; - } - finally { - stopSpis(); - } - } - } - - /** - * @throws Exception If failed. - */ - private void checkOverflow() throws Exception { - TcpCommunicationSpi spi0 = spis.get(0); - TcpCommunicationSpi spi1 = spis.get(1); - - ClusterNode node0 = nodes.get(0); - ClusterNode node1 = nodes.get(1); - - final GridNioServer srv1 = U.field(spi1, "nioSrvr"); - - final AtomicInteger ackMsgs = new AtomicInteger(0); - - IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { - @Override public void apply(IgniteException o) { - assert o == null; - - ackMsgs.incrementAndGet(); - } - }; - - int msgId = 0; - - // Send message to establish connection. - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); - - // Prevent node1 from send - GridTestUtils.setFieldValue(srv1, "skipWrite", true); - - final GridNioSession ses0 = communicationSession(spi0); - - for (int i = 0; i < 150; i++) - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); - - // Wait when session is closed because of queue overflow. - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return ses0.closeTime() != 0; - } - }, 5000); - - assertTrue("Failed to wait for session close", ses0.closeTime() != 0); - - GridTestUtils.setFieldValue(srv1, "skipWrite", false); - - for (int i = 0; i < 100; i++) - spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); - - final int expMsgs = 251; - - final TestListener lsnr = (TestListener)spi1.getListener(); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return lsnr.rcvCnt.get() >= expMsgs; - } - }, 5000); - - assertEquals(expMsgs, lsnr.rcvCnt.get()); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return expMsgs == ackMsgs.get(); - } - }, 5000); - } - - /** - * @param spi SPI. - * @return Session. - * @throws Exception If failed. - */ - @SuppressWarnings("unchecked") - private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception { - final GridNioServer srv = U.field(spi, "nioSrvr"); - - GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override - public boolean apply() { - Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); - - return !sessions.isEmpty(); - } - }, 5000); - - Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); - - assertEquals(1, sessions.size()); - - return sessions.iterator().next(); - } - - /** - * @param ackCnt Recovery acknowledgement count. - * @param idleTimeout Idle connection timeout. - * @param queueLimit Message queue limit. - * @return SPI instance. - */ - protected TcpCommunicationSpi getSpi(int ackCnt, int idleTimeout, int queueLimit) { - TcpCommunicationSpi spi = new TcpCommunicationSpi(); - - spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); - spi.setIdleConnectionTimeout(idleTimeout); - spi.setTcpNoDelay(true); - spi.setAckSendThreshold(ackCnt); - spi.setMessageQueueLimit(queueLimit); - spi.setSharedMemoryPort(-1); - - return spi; - } - - /** - * @param ackCnt Recovery acknowledgement count. - * @param idleTimeout Idle connection timeout. - * @param queueLimit Message queue limit. - * @throws Exception If failed. - */ - private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { - spis.clear(); - nodes.clear(); - spiRsrcs.clear(); - - Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); - - for (int i = 0; i < SPI_CNT; i++) { - TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); - - GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i); - - IgniteTestResources rsrcs = new IgniteTestResources(); - - GridTestNode node = new GridTestNode(rsrcs.getNodeId()); - - GridSpiTestContext ctx = initSpiContext(); - - ctx.setLocalNode(node); - - spiRsrcs.add(rsrcs); - - rsrcs.inject(spi); - - spi.setListener(new TestListener()); - - node.setAttributes(spi.getNodeAttributes()); - - nodes.add(node); - - spi.spiStart(getTestGridName() + (i + 1)); - - spis.add(spi); - - spi.onContextInitialized(ctx); - - ctxs.put(node, ctx); - } - - // For each context set remote nodes. - for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) { - for (ClusterNode n : nodes) { - if (!n.equals(e.getKey())) - e.getValue().remoteNodes().add(n); - } - } - } - - /** - * @param ackCnt Recovery acknowledgement count. - * @param idleTimeout Idle connection timeout. - * @param queueLimit Message queue limit. - * @throws Exception If failed. - */ - private void createSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { - for (int i = 0; i < 3; i++) { - try { - startSpis(ackCnt, idleTimeout, queueLimit); - - break; - } - catch (IgniteCheckedException e) { - if (e.hasCause(BindException.class)) { - if (i < 2) { - info("Failed to start SPIs because of BindException, will retry after delay."); - - stopSpis(); - - U.sleep(10_000); - } - else - throw e; - } - else - throw e; - } - } - } - - /** - * @throws Exception If failed. - */ - private void stopSpis() throws Exception { - for (CommunicationSpi<Message> spi : spis) { - spi.onContextDestroyed(); - - spi.setListener(null); - - spi.spiStop(); - } - - for (IgniteTestResources rsrcs : spiRsrcs) - rsrcs.stopThreads(); - - spis.clear(); - nodes.clear(); - spiRsrcs.clear(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/383b97fb/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java new file mode 100644 index 0000000..e353f2d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/IgniteTcpCommunicationRecoveryAckClosureSelfTest.java @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.cluster.*; +import org.apache.ignite.internal.managers.communication.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.plugin.extensions.communication.*; +import org.apache.ignite.spi.*; +import org.apache.ignite.spi.communication.*; +import org.apache.ignite.testframework.*; +import org.apache.ignite.testframework.junits.*; +import org.apache.ignite.testframework.junits.spi.*; + +import org.eclipse.jetty.util.*; + +import java.net.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * + */ +public class IgniteTcpCommunicationRecoveryAckClosureSelfTest<T extends CommunicationSpi> + extends GridSpiAbstractTest<T> { + /** */ + private static final Collection<IgniteTestResources> spiRsrcs = new ArrayList<>(); + + /** */ + protected static final List<TcpCommunicationSpi> spis = new ArrayList<>(); + + /** */ + protected static final List<ClusterNode> nodes = new ArrayList<>(); + + /** */ + private static final int SPI_CNT = 2; + + /** + * + */ + static { + GridIoMessageFactory.registerCustom(GridTestMessage.DIRECT_TYPE, new CO<Message>() { + @Override public Message apply() { + return new GridTestMessage(); + } + }); + } + + /** + * Disable SPI auto-start. + */ + public IgniteTcpCommunicationRecoveryAckClosureSelfTest() { + super(false); + } + + /** */ + @SuppressWarnings({"deprecation"}) + private class TestListener implements CommunicationListener<Message> { + /** */ + private ConcurrentHashSet<Long> msgIds = new ConcurrentHashSet<>(); + + /** */ + private AtomicInteger rcvCnt = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Message msg, IgniteRunnable msgC) { + info("Test listener received message: " + msg); + + assertTrue("Unexpected message: " + msg, msg instanceof GridTestMessage); + + GridTestMessage msg0 = (GridTestMessage)msg; + + assertTrue("Duplicated message received: " + msg0, msgIds.add(msg0.getMsgId())); + + rcvCnt.incrementAndGet(); + + msgC.run(); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(UUID nodeId) { + // No-op. + } + } + + /** + * @throws Exception If failed. + */ + public void testAckOnIdle() throws Exception { + checkAck(10, 2000, 9); + } + + /** + * @throws Exception If failed. + */ + public void testAckOnCount() throws Exception { + checkAck(10, 60_000, 10); + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param msgPerIter Messages per iteration. + * @throws Exception If failed. + */ + private void checkAck(int ackCnt, int idleTimeout, int msgPerIter) throws Exception { + createSpis(ackCnt, idleTimeout, TcpCommunicationSpi.DFLT_MSG_QUEUE_LIMIT); + + try { + TcpCommunicationSpi spi0 = spis.get(0); + TcpCommunicationSpi spi1 = spis.get(1); + + ClusterNode node0 = nodes.get(0); + ClusterNode node1 = nodes.get(1); + + int msgId = 0; + + int expMsgs = 0; + + for (int i = 0; i < 5; i++) { + info("Iteration: " + i); + + final AtomicInteger ackMsgs = new AtomicInteger(0); + + IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { + @Override public void apply(IgniteException o) { + assert o == null; + + ackMsgs.incrementAndGet(); + } + }; + + for (int j = 0; j < msgPerIter; j++) { + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + spi1.sendMessage(node0, new GridTestMessage(node1.id(), ++msgId, 0), ackClosure); + } + + expMsgs += msgPerIter; + + for (TcpCommunicationSpi spi : spis) { + GridNioServer srv = U.field(spi, "nioSrvr"); + + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + assertFalse(sessions.isEmpty()); + + boolean found = false; + + for (GridNioSession ses : sessions) { + final GridNioRecoveryDescriptor recoveryDesc = ses.recoveryDescriptor(); + + if (recoveryDesc != null) { + found = true; + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return recoveryDesc.messagesFutures().isEmpty(); + } + }, 10_000); + + assertEquals("Unexpected messages: " + recoveryDesc.messagesFutures(), 0, + recoveryDesc.messagesFutures().size()); + + break; + } + } + + assertTrue(found); + } + + final int expMsgs0 = expMsgs; + + for (TcpCommunicationSpi spi : spis) { + final TestListener lsnr = (TestListener)spi.getListener(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + return lsnr.rcvCnt.get() >= expMsgs0; + } + }, 5000); + + assertEquals(expMsgs, lsnr.rcvCnt.get()); + } + + assertEquals(msgPerIter * 2, ackMsgs.get()); + } + } + finally { + stopSpis(); + } + } + + /** + * @throws Exception If failed. + */ + public void testQueueOverflow() throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(5, 60_000, 10); + + checkOverflow(); + + break; + } + catch (IgniteCheckedException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Got exception caused by BindException, will retry after delay: " + e); + + stopSpis(); + + U.sleep(10_000); + } + else + throw e; + } + else + throw e; + } + finally { + stopSpis(); + } + } + } + + /** + * @throws Exception If failed. + */ + private void checkOverflow() throws Exception { + TcpCommunicationSpi spi0 = spis.get(0); + TcpCommunicationSpi spi1 = spis.get(1); + + ClusterNode node0 = nodes.get(0); + ClusterNode node1 = nodes.get(1); + + final GridNioServer srv1 = U.field(spi1, "nioSrvr"); + + final AtomicInteger ackMsgs = new AtomicInteger(0); + + IgniteInClosure<IgniteException> ackClosure = new CI1<IgniteException>() { + @Override public void apply(IgniteException o) { + assert o == null; + + ackMsgs.incrementAndGet(); + } + }; + + int msgId = 0; + + // Send message to establish connection. + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + // Prevent node1 from send + GridTestUtils.setFieldValue(srv1, "skipWrite", true); + + final GridNioSession ses0 = communicationSession(spi0); + + for (int i = 0; i < 150; i++) + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + // Wait when session is closed because of queue overflow. + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return ses0.closeTime() != 0; + } + }, 5000); + + assertTrue("Failed to wait for session close", ses0.closeTime() != 0); + + GridTestUtils.setFieldValue(srv1, "skipWrite", false); + + for (int i = 0; i < 100; i++) + spi0.sendMessage(node1, new GridTestMessage(node0.id(), ++msgId, 0), ackClosure); + + final int expMsgs = 251; + + final TestListener lsnr = (TestListener)spi1.getListener(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return lsnr.rcvCnt.get() >= expMsgs; + } + }, 5000); + + assertEquals(expMsgs, lsnr.rcvCnt.get()); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return expMsgs == ackMsgs.get(); + } + }, 5000); + } + + /** + * @param spi SPI. + * @return Session. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private GridNioSession communicationSession(TcpCommunicationSpi spi) throws Exception { + final GridNioServer srv = U.field(spi, "nioSrvr"); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override + public boolean apply() { + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + return !sessions.isEmpty(); + } + }, 5000); + + Collection<? extends GridNioSession> sessions = GridTestUtils.getFieldValue(srv, "sessions"); + + assertEquals(1, sessions.size()); + + return sessions.iterator().next(); + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @return SPI instance. + */ + protected TcpCommunicationSpi getSpi(int ackCnt, int idleTimeout, int queueLimit) { + TcpCommunicationSpi spi = new TcpCommunicationSpi(); + + spi.setLocalPort(GridTestUtils.getNextCommPort(getClass())); + spi.setIdleConnectionTimeout(idleTimeout); + spi.setTcpNoDelay(true); + spi.setAckSendThreshold(ackCnt); + spi.setMessageQueueLimit(queueLimit); + spi.setSharedMemoryPort(-1); + + return spi; + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @throws Exception If failed. + */ + private void startSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { + spis.clear(); + nodes.clear(); + spiRsrcs.clear(); + + Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>(); + + for (int i = 0; i < SPI_CNT; i++) { + TcpCommunicationSpi spi = getSpi(ackCnt, idleTimeout, queueLimit); + + GridTestUtils.setFieldValue(spi, IgniteSpiAdapter.class, "gridName", "grid-" + i); + + IgniteTestResources rsrcs = new IgniteTestResources(); + + GridTestNode node = new GridTestNode(rsrcs.getNodeId()); + + GridSpiTestContext ctx = initSpiContext(); + + ctx.setLocalNode(node); + + spiRsrcs.add(rsrcs); + + rsrcs.inject(spi); + + spi.setListener(new TestListener()); + + node.setAttributes(spi.getNodeAttributes()); + + nodes.add(node); + + spi.spiStart(getTestGridName() + (i + 1)); + + spis.add(spi); + + spi.onContextInitialized(ctx); + + ctxs.put(node, ctx); + } + + // For each context set remote nodes. + for (Map.Entry<ClusterNode, GridSpiTestContext> e : ctxs.entrySet()) { + for (ClusterNode n : nodes) { + if (!n.equals(e.getKey())) + e.getValue().remoteNodes().add(n); + } + } + } + + /** + * @param ackCnt Recovery acknowledgement count. + * @param idleTimeout Idle connection timeout. + * @param queueLimit Message queue limit. + * @throws Exception If failed. + */ + private void createSpis(int ackCnt, int idleTimeout, int queueLimit) throws Exception { + for (int i = 0; i < 3; i++) { + try { + startSpis(ackCnt, idleTimeout, queueLimit); + + break; + } + catch (IgniteCheckedException e) { + if (e.hasCause(BindException.class)) { + if (i < 2) { + info("Failed to start SPIs because of BindException, will retry after delay."); + + stopSpis(); + + U.sleep(10_000); + } + else + throw e; + } + else + throw e; + } + } + } + + /** + * @throws Exception If failed. + */ + private void stopSpis() throws Exception { + for (CommunicationSpi<Message> spi : spis) { + spi.onContextDestroyed(); + + spi.setListener(null); + + spi.spiStop(); + } + + for (IgniteTestResources rsrcs : spiRsrcs) + rsrcs.stopThreads(); + + spis.clear(); + nodes.clear(); + spiRsrcs.clear(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/383b97fb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java index dcb8058..e9aaf0c 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java @@ -32,7 +32,7 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite { TestSuite suite = new TestSuite("Communication SPI Test Suite"); suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckSelfTest.class)); - suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoveryAckFutureSelfTest.class)); + suite.addTest(new TestSuite(IgniteTcpCommunicationRecoveryAckClosureSelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiRecoverySelfTest.class)); suite.addTest(new TestSuite(GridTcpCommunicationSpiConcurrentConnectSelfTest.class));