Repository: ignite Updated Branches: refs/heads/master 0efce4bc7 -> 1efec196e
IGNITE-9627 Fixed flaky TcpCommunicationSpiSkipMessageSendTest - Fixes #4790. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1efec196 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1efec196 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1efec196 Branch: refs/heads/master Commit: 1efec196e7f96325c7e237b87556432984775553 Parents: 0efce4b Author: NSAmelchev <[email protected]> Authored: Wed Sep 19 12:04:02 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Sep 19 13:11:50 2018 +0300 ---------------------------------------------------------------------- .../TcpCommunicationSpiSkipMessageSendTest.java | 287 +++++++------------ 1 file changed, 99 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1efec196/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java index c4bc8f2..2c17f95 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.spi.communication.tcp; import java.io.IOException; import java.io.InputStream; import java.net.Socket; +import java.net.SocketTimeoutException; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; @@ -29,18 +30,17 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; -import org.apache.ignite.Ignition; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.util.nio.GridCommunicationClient; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteCallable; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.segmentation.SegmentationPolicy; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper; import org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi; @@ -57,80 +57,112 @@ public class TcpCommunicationSpiSkipMessageSendTest extends GridCommonAbstractTe private static final CountDownLatch COMPUTE_JOB_STARTED = new CountDownLatch(1); /** */ - private static final long FAILURE_DETECTION_TIMEOUT = 10000; + private static final long FAILURE_DETECTION_TIMEOUT = 1_000; /** */ - private static final long JOIN_TIMEOUT = 10000; + private static final long JOIN_TIMEOUT = 5_000; /** */ - private static final long START_JOB_TIMEOUT = 10000; - - /** */ - private static final long DISABLE_NETWORK_DELAY = 2000; + private static final long START_JOB_TIMEOUT = 10_000; /** {@inheritDoc} */ @Override protected long getTestTimeout() { return 2 * 60 * 1000; } + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (igniteInstanceName.contains("client")) + cfg.setClientMode(true); + else { + FifoQueueCollisionSpi collisionSpi = new FifoQueueCollisionSpi(); + + collisionSpi.setParallelJobsNumber(1); + + cfg.setCollisionSpi(collisionSpi); + } + + cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT); + + cfg.setSegmentationPolicy(SegmentationPolicy.NOOP); + + TcpCommunicationSpi commSpi = new CustomCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + TcpDiscoverySpi discoSpi = new CustomDiscoverySpi(); + + discoSpi.setIpFinder(LOCAL_IP_FINDER); + discoSpi.setJoinTimeout(JOIN_TIMEOUT); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + /** * @throws Exception If failed. */ public void testClientSegmented() throws Exception { - Ignite server = null; - Ignite client = null; + startGrid("server"); - try { - server = Ignition.start(getConfig(false)); + Ignite client = startGrid("client"); - final CountDownLatch clientDisconnected = new CountDownLatch(1); - final CountDownLatch clientSegmented = new CountDownLatch(1); + CountDownLatch clientDisconnected = new CountDownLatch(1); + CountDownLatch clientSegmented = new CountDownLatch(1); - client = startClient(clientDisconnected, clientSegmented); + IgnitePredicate<Event> locLsnr = new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + log.info("Client node received event: " + evt.name()); - final IgniteCompute compute = client.compute(); + if (evt.type() == EventType.EVT_CLIENT_NODE_DISCONNECTED) + clientDisconnected.countDown(); - runJobAsync(compute); + if (evt.type() == EventType.EVT_NODE_SEGMENTED) + clientSegmented.countDown(); - if (!COMPUTE_JOB_STARTED.await(START_JOB_TIMEOUT, TimeUnit.MILLISECONDS)) - fail("Compute job wasn't started."); + return true; + } + }; - disableNetwork(client); + client.events().localListen(locLsnr, + EventType.EVT_NODE_SEGMENTED, + EventType.EVT_CLIENT_NODE_DISCONNECTED); - if (!clientDisconnected.await(FAILURE_DETECTION_TIMEOUT * 3, TimeUnit.MILLISECONDS)) - fail("Client wasn't disconnected."); + IgniteCompute compute = client.compute(); - if (!clientSegmented.await(JOIN_TIMEOUT * 2, TimeUnit.MILLISECONDS)) - fail("Client wasn't segmented."); - } - finally { - if (client != null) - client.close(); + runJobAsync(compute); - if (server != null) - server.close(); - } + if (!COMPUTE_JOB_STARTED.await(START_JOB_TIMEOUT, TimeUnit.MILLISECONDS)) + fail("Compute job wasn't started."); + + disableNetwork(client); + + if (!clientDisconnected.await(JOIN_TIMEOUT * 2, TimeUnit.MILLISECONDS)) + fail("Client wasn't disconnected."); + + if (!clientSegmented.await(JOIN_TIMEOUT * 2, TimeUnit.MILLISECONDS)) + fail("Client wasn't segmented."); } /** * Simulate network disabling. * * @param ignite Ignite instance. - * @throws IgniteInterruptedCheckedException If thread sleep interrupted. * @throws InterruptedException If waiting for network disabled failed (interrupted). */ - private void disableNetwork(Ignite ignite) throws IgniteInterruptedCheckedException, InterruptedException { - U.sleep(DISABLE_NETWORK_DELAY); - - CustomCommunicationSpi communicationSpi = (CustomCommunicationSpi)ignite.configuration().getCommunicationSpi(); + private void disableNetwork(Ignite ignite) throws InterruptedException { + CustomCommunicationSpi commSpi = (CustomCommunicationSpi)ignite.configuration().getCommunicationSpi(); CustomDiscoverySpi discoverySpi = (CustomDiscoverySpi)ignite.configuration().getDiscoverySpi(); discoverySpi.disableNetwork(); - communicationSpi.disableNetwork(); + commSpi.disableNetwork(); - if (!discoverySpi.awaitNetworkDisabled(FAILURE_DETECTION_TIMEOUT * 2)) + if (!discoverySpi.awaitNetworkDisabled()) fail("Network wasn't disabled."); } @@ -162,144 +194,40 @@ public class TcpCommunicationSpiSkipMessageSendTest extends GridCommonAbstractTe } /** - * Create Communication Spi instance. - * - * @param client Is a client node. - * @return Communication Spi. - */ - private TcpCommunicationSpi getCommunicationSpi(boolean client) { - TcpCommunicationSpi spi = new CustomCommunicationSpi(client); - - spi.setName("CustomCommunicationSpi"); - - return spi; - } - - /** - * Create Discovery Spi instance. - * - * @return Discovery Spi. - */ - private TcpDiscoverySpi getDiscoverySpi() { - TcpDiscoverySpi spi = new CustomDiscoverySpi(); - - spi.setName("CustomDiscoverySpi"); - - spi.setIpFinder(LOCAL_IP_FINDER); - - return spi; - } - - /** - * Create Ignite configuration. - * - * @param clientMode Client mode. - * @return Ignite configuration. - */ - private IgniteConfiguration getConfig(boolean clientMode) { - IgniteConfiguration cfg = new IgniteConfiguration(); - - cfg.setIgniteInstanceName(clientMode ? "client-node" : "server-node"); - - cfg.setClientMode(clientMode); - - cfg.setCommunicationSpi(getCommunicationSpi(clientMode)); - - if (!clientMode) { - cfg.setDiscoverySpi(getDiscoverySpi()); - - FifoQueueCollisionSpi collisionSpi = new FifoQueueCollisionSpi(); - - collisionSpi.setParallelJobsNumber(1); - - cfg.setCollisionSpi(collisionSpi); - } - else { - cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT); - - cfg.setDiscoverySpi(getDiscoverySpi().setJoinTimeout(JOIN_TIMEOUT)); - } - - return cfg; - } - - /** - * Start client node. - * - * @param clientDisconnected Client is disconnected. - * @param clientSegmented Client is segmented. - * @return Ignite instance. - */ - private Ignite startClient(final CountDownLatch clientDisconnected, final CountDownLatch clientSegmented) { - Ignite ignite = Ignition.start(getConfig(true)); - - IgnitePredicate<Event> locLsnr = new IgnitePredicate<Event>() { - @Override public boolean apply(Event event) { - log.info("Client node received event: " + event.name()); - - if (event.type() == EventType.EVT_CLIENT_NODE_DISCONNECTED) - clientDisconnected.countDown(); - - if (event.type() == EventType.EVT_NODE_SEGMENTED) - clientSegmented.countDown(); - - return true; - } - }; - - ignite.events().localListen(locLsnr, - EventType.EVT_NODE_SEGMENTED, - EventType.EVT_CLIENT_NODE_DISCONNECTED); - - return ignite; - } - - /** * Communication Spi that emulates connection troubles. */ class CustomCommunicationSpi extends TcpCommunicationSpi { /** Network is disabled. */ - private volatile boolean networkDisabled = false; - - /** Additional logging is enabled. */ - private final boolean logEnabled; - - /** - * @param enableLogs Enable additional logging. - */ - CustomCommunicationSpi(boolean enableLogs) { - super(); - this.logEnabled = enableLogs; - } + private volatile boolean netDisabled; /** {@inheritDoc} */ @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) throws IgniteSpiException { - String message = msg.toString(); + String msgStr = msg.toString(); - if (logEnabled) - log.info("CustomCommunicationSpi.sendMessage: " + message); + log.info("CustomCommunicationSpi.sendMessage: " + msgStr); - if (message.contains("TOPIC_JOB_CANCEL")) + if (msgStr.contains("TOPIC_JOB_CANCEL")) closeTcpConnections(); super.sendMessage(node, msg, ackC); } /** {@inheritDoc} */ - @Override protected GridCommunicationClient createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException { - if (logEnabled) - log.info(String.format("CustomCommunicationSpi.createTcpClient [networkDisabled=%s, node=%s]", networkDisabled, node)); + @Override protected GridCommunicationClient createTcpClient(ClusterNode node, + int connIdx) throws IgniteCheckedException { + log.info(String.format("CustomCommunicationSpi.createTcpClient [networkDisabled=%s, node=%s]", + netDisabled, node)); - if (networkDisabled) { - IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this, !node.isClient()); + if (netDisabled) { + IgniteSpiOperationTimeoutHelper timeoutHelper = new IgniteSpiOperationTimeoutHelper(this, + !node.isClient()); long timeout = timeoutHelper.nextTimeoutChunk(getConnectTimeout()); - if (logEnabled) - log.info("CustomCommunicationSpi.createTcpClient [timeoutHelper.nextTimeoutChunk=" + timeout + "]"); + log.info("CustomCommunicationSpi.createTcpClient [timeoutHelper.nextTimeoutChunk=" + timeout + "]"); - sleep(timeout); + U.sleep(timeout); return null; } @@ -311,7 +239,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends GridCommonAbstractTe * Simulate network disabling. */ void disableNetwork() { - networkDisabled = true; + netDisabled = true; } /** @@ -322,7 +250,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends GridCommonAbstractTe Set<UUID> ids = clients.keySet(); - if (ids.size() > 0) { + if (!ids.isEmpty()) { log.info("Close TCP clients: " + ids); for (UUID nodeId : ids) { @@ -346,25 +274,18 @@ public class TcpCommunicationSpiSkipMessageSendTest extends GridCommonAbstractTe */ class CustomDiscoverySpi extends TcpDiscoverySpi { /** Network is disabled. */ - private volatile boolean networkDisabled = false; + private volatile boolean netDisabled; /** */ - private final CountDownLatch networkDisabledLatch = new CountDownLatch(1); - - /** */ - CustomDiscoverySpi() { - super(); - - setName("CustomDiscoverySpi"); - } + private final CountDownLatch netDisabledLatch = new CountDownLatch(1); /** {@inheritDoc} */ @Override protected <T> T readMessage(Socket sock, @Nullable InputStream in, long timeout) throws IOException, IgniteCheckedException { - if (networkDisabled) { - sleep(timeout); + if (netDisabled) { + U.sleep(timeout); - return null; + throw new SocketTimeoutException("CustomDiscoverySpi: network is disabled."); } else return super.readMessage(sock, in, timeout); @@ -373,12 +294,10 @@ public class TcpCommunicationSpiSkipMessageSendTest extends GridCommonAbstractTe /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { - if (networkDisabled) { - sleep(timeout); - - networkDisabledLatch.countDown(); + if (netDisabled) { + netDisabledLatch.countDown(); - throw new IgniteCheckedException("CustomDiscoverySpi: network is disabled."); + throw new SocketTimeoutException("CustomDiscoverySpi: network is disabled."); } else super.writeToSocket(sock, msg, timeout); @@ -388,27 +307,19 @@ public class TcpCommunicationSpiSkipMessageSendTest extends GridCommonAbstractTe * Simulate network disabling. */ void disableNetwork() { - networkDisabled = true; + netDisabled = true; } /** * Wait until the network is disabled. */ - boolean awaitNetworkDisabled(long timeout) throws InterruptedException { - return networkDisabledLatch.await(timeout, TimeUnit.MILLISECONDS); + boolean awaitNetworkDisabled() throws InterruptedException { + return netDisabledLatch.await(FAILURE_DETECTION_TIMEOUT * 2, TimeUnit.MILLISECONDS); } } - /** - * Sleeps for given number of milliseconds. - * - * @param timeout Time to sleep (2000 ms by default). - * @throws IgniteInterruptedCheckedException If current thread interrupted. - */ - static void sleep(long timeout) throws IgniteInterruptedCheckedException { - if (timeout > 0) - U.sleep(timeout); - else - U.sleep(2000); + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() { + stopAllGrids(); } }
