Repository: ignite
Updated Branches:
  refs/heads/master 0efce4bc7 -> 1efec196e


IGNITE-9627 Fixed flaky TcpCommunicationSpiSkipMessageSendTest - Fixes #4790.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1efec196
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1efec196
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1efec196

Branch: refs/heads/master
Commit: 1efec196e7f96325c7e237b87556432984775553
Parents: 0efce4b
Author: NSAmelchev <[email protected]>
Authored: Wed Sep 19 12:04:02 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Wed Sep 19 13:11:50 2018 +0300

----------------------------------------------------------------------
 .../TcpCommunicationSpiSkipMessageSendTest.java | 287 +++++++------------
 1 file changed, 99 insertions(+), 188 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1efec196/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
index c4bc8f2..2c17f95 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiSkipMessageSendTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.spi.communication.tcp;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -29,18 +30,17 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteCompute;
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.Ignition;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.events.EventType;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
 import org.apache.ignite.spi.collision.fifoqueue.FifoQueueCollisionSpi;
@@ -57,80 +57,112 @@ public class TcpCommunicationSpiSkipMessageSendTest 
extends GridCommonAbstractTe
     private static final CountDownLatch COMPUTE_JOB_STARTED = new 
CountDownLatch(1);
 
     /** */
-    private static final long FAILURE_DETECTION_TIMEOUT = 10000;
+    private static final long FAILURE_DETECTION_TIMEOUT = 1_000;
 
     /** */
-    private static final long JOIN_TIMEOUT = 10000;
+    private static final long JOIN_TIMEOUT = 5_000;
 
     /** */
-    private static final long START_JOB_TIMEOUT = 10000;
-
-    /** */
-    private static final long DISABLE_NETWORK_DELAY = 2000;
+    private static final long START_JOB_TIMEOUT = 10_000;
 
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
         return 2 * 60 * 1000;
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (igniteInstanceName.contains("client"))
+            cfg.setClientMode(true);
+        else {
+            FifoQueueCollisionSpi collisionSpi = new FifoQueueCollisionSpi();
+
+            collisionSpi.setParallelJobsNumber(1);
+
+            cfg.setCollisionSpi(collisionSpi);
+        }
+
+        cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
+
+        cfg.setSegmentationPolicy(SegmentationPolicy.NOOP);
+
+        TcpCommunicationSpi commSpi = new CustomCommunicationSpi();
+
+        cfg.setCommunicationSpi(commSpi);
+
+        TcpDiscoverySpi discoSpi = new CustomDiscoverySpi();
+
+        discoSpi.setIpFinder(LOCAL_IP_FINDER);
+        discoSpi.setJoinTimeout(JOIN_TIMEOUT);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        return cfg;
+    }
+
     /**
      * @throws Exception If failed.
      */
     public void testClientSegmented() throws Exception {
-        Ignite server = null;
-        Ignite client = null;
+        startGrid("server");
 
-        try {
-            server = Ignition.start(getConfig(false));
+        Ignite client = startGrid("client");
 
-            final CountDownLatch clientDisconnected = new CountDownLatch(1);
-            final CountDownLatch clientSegmented = new CountDownLatch(1);
+        CountDownLatch clientDisconnected = new CountDownLatch(1);
+        CountDownLatch clientSegmented = new CountDownLatch(1);
 
-            client = startClient(clientDisconnected, clientSegmented);
+        IgnitePredicate<Event> locLsnr = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                log.info("Client node received event: " + evt.name());
 
-            final IgniteCompute compute = client.compute();
+                if (evt.type() == EventType.EVT_CLIENT_NODE_DISCONNECTED)
+                    clientDisconnected.countDown();
 
-            runJobAsync(compute);
+                if (evt.type() == EventType.EVT_NODE_SEGMENTED)
+                    clientSegmented.countDown();
 
-            if (!COMPUTE_JOB_STARTED.await(START_JOB_TIMEOUT, 
TimeUnit.MILLISECONDS))
-                fail("Compute job wasn't started.");
+                return true;
+            }
+        };
 
-            disableNetwork(client);
+        client.events().localListen(locLsnr,
+            EventType.EVT_NODE_SEGMENTED,
+            EventType.EVT_CLIENT_NODE_DISCONNECTED);
 
-            if (!clientDisconnected.await(FAILURE_DETECTION_TIMEOUT * 3, 
TimeUnit.MILLISECONDS))
-                fail("Client wasn't disconnected.");
+        IgniteCompute compute = client.compute();
 
-            if (!clientSegmented.await(JOIN_TIMEOUT * 2, 
TimeUnit.MILLISECONDS))
-                fail("Client wasn't segmented.");
-        }
-        finally {
-            if (client != null)
-                client.close();
+        runJobAsync(compute);
 
-            if (server != null)
-                server.close();
-        }
+        if (!COMPUTE_JOB_STARTED.await(START_JOB_TIMEOUT, 
TimeUnit.MILLISECONDS))
+            fail("Compute job wasn't started.");
+
+        disableNetwork(client);
+
+        if (!clientDisconnected.await(JOIN_TIMEOUT * 2, TimeUnit.MILLISECONDS))
+            fail("Client wasn't disconnected.");
+
+        if (!clientSegmented.await(JOIN_TIMEOUT * 2, TimeUnit.MILLISECONDS))
+            fail("Client wasn't segmented.");
     }
 
     /**
      * Simulate network disabling.
      *
      * @param ignite Ignite instance.
-     * @throws IgniteInterruptedCheckedException If thread sleep interrupted.
      * @throws InterruptedException If waiting for network disabled failed 
(interrupted).
      */
-    private void disableNetwork(Ignite ignite) throws 
IgniteInterruptedCheckedException, InterruptedException {
-        U.sleep(DISABLE_NETWORK_DELAY);
-
-        CustomCommunicationSpi communicationSpi = 
(CustomCommunicationSpi)ignite.configuration().getCommunicationSpi();
+    private void disableNetwork(Ignite ignite) throws InterruptedException {
+        CustomCommunicationSpi commSpi = 
(CustomCommunicationSpi)ignite.configuration().getCommunicationSpi();
 
         CustomDiscoverySpi discoverySpi = 
(CustomDiscoverySpi)ignite.configuration().getDiscoverySpi();
 
         discoverySpi.disableNetwork();
 
-        communicationSpi.disableNetwork();
+        commSpi.disableNetwork();
 
-        if (!discoverySpi.awaitNetworkDisabled(FAILURE_DETECTION_TIMEOUT * 2))
+        if (!discoverySpi.awaitNetworkDisabled())
             fail("Network wasn't disabled.");
     }
 
@@ -162,144 +194,40 @@ public class TcpCommunicationSpiSkipMessageSendTest 
extends GridCommonAbstractTe
     }
 
     /**
-     * Create Communication Spi instance.
-     *
-     * @param client Is a client node.
-     * @return Communication Spi.
-     */
-    private TcpCommunicationSpi getCommunicationSpi(boolean client) {
-        TcpCommunicationSpi spi = new CustomCommunicationSpi(client);
-
-        spi.setName("CustomCommunicationSpi");
-
-        return spi;
-    }
-
-    /**
-     * Create Discovery Spi instance.
-     *
-     * @return Discovery Spi.
-     */
-    private TcpDiscoverySpi getDiscoverySpi() {
-        TcpDiscoverySpi spi = new CustomDiscoverySpi();
-
-        spi.setName("CustomDiscoverySpi");
-
-        spi.setIpFinder(LOCAL_IP_FINDER);
-
-        return spi;
-    }
-
-    /**
-     * Create Ignite configuration.
-     *
-     * @param clientMode Client mode.
-     * @return Ignite configuration.
-     */
-    private IgniteConfiguration getConfig(boolean clientMode) {
-        IgniteConfiguration cfg = new IgniteConfiguration();
-
-        cfg.setIgniteInstanceName(clientMode ? "client-node" : "server-node");
-
-        cfg.setClientMode(clientMode);
-
-        cfg.setCommunicationSpi(getCommunicationSpi(clientMode));
-
-        if (!clientMode) {
-            cfg.setDiscoverySpi(getDiscoverySpi());
-
-            FifoQueueCollisionSpi collisionSpi = new FifoQueueCollisionSpi();
-
-            collisionSpi.setParallelJobsNumber(1);
-
-            cfg.setCollisionSpi(collisionSpi);
-        }
-        else {
-            cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
-
-            
cfg.setDiscoverySpi(getDiscoverySpi().setJoinTimeout(JOIN_TIMEOUT));
-        }
-
-        return cfg;
-    }
-
-    /**
-     * Start client node.
-     *
-     * @param clientDisconnected Client is disconnected.
-     * @param clientSegmented Client is segmented.
-     * @return Ignite instance.
-     */
-    private Ignite startClient(final CountDownLatch clientDisconnected, final 
CountDownLatch clientSegmented) {
-        Ignite ignite = Ignition.start(getConfig(true));
-
-        IgnitePredicate<Event> locLsnr = new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event event) {
-                log.info("Client node received event: " + event.name());
-
-                if (event.type() == EventType.EVT_CLIENT_NODE_DISCONNECTED)
-                    clientDisconnected.countDown();
-
-                if (event.type() == EventType.EVT_NODE_SEGMENTED)
-                    clientSegmented.countDown();
-
-                return true;
-            }
-        };
-
-        ignite.events().localListen(locLsnr,
-            EventType.EVT_NODE_SEGMENTED,
-            EventType.EVT_CLIENT_NODE_DISCONNECTED);
-
-        return ignite;
-    }
-
-    /**
      * Communication Spi that emulates connection troubles.
      */
     class CustomCommunicationSpi extends TcpCommunicationSpi {
         /** Network is disabled. */
-        private volatile boolean networkDisabled = false;
-
-        /** Additional logging is enabled. */
-        private final boolean logEnabled;
-
-        /**
-         * @param enableLogs Enable additional logging.
-         */
-        CustomCommunicationSpi(boolean enableLogs) {
-            super();
-            this.logEnabled = enableLogs;
-        }
+        private volatile boolean netDisabled;
 
         /** {@inheritDoc} */
         @Override public void sendMessage(ClusterNode node, Message msg,
             IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
-            String message = msg.toString();
+            String msgStr = msg.toString();
 
-            if (logEnabled)
-                log.info("CustomCommunicationSpi.sendMessage: " + message);
+            log.info("CustomCommunicationSpi.sendMessage: " + msgStr);
 
-            if (message.contains("TOPIC_JOB_CANCEL"))
+            if (msgStr.contains("TOPIC_JOB_CANCEL"))
                 closeTcpConnections();
 
             super.sendMessage(node, msg, ackC);
         }
 
         /** {@inheritDoc} */
-        @Override protected GridCommunicationClient 
createTcpClient(ClusterNode node, int connIdx) throws IgniteCheckedException {
-            if (logEnabled)
-                log.info(String.format("CustomCommunicationSpi.createTcpClient 
[networkDisabled=%s, node=%s]", networkDisabled, node));
+        @Override protected GridCommunicationClient 
createTcpClient(ClusterNode node,
+            int connIdx) throws IgniteCheckedException {
+            log.info(String.format("CustomCommunicationSpi.createTcpClient 
[networkDisabled=%s, node=%s]",
+                netDisabled, node));
 
-            if (networkDisabled) {
-                IgniteSpiOperationTimeoutHelper timeoutHelper = new 
IgniteSpiOperationTimeoutHelper(this, !node.isClient());
+            if (netDisabled) {
+                IgniteSpiOperationTimeoutHelper timeoutHelper = new 
IgniteSpiOperationTimeoutHelper(this,
+                    !node.isClient());
 
                 long timeout = 
timeoutHelper.nextTimeoutChunk(getConnectTimeout());
 
-                if (logEnabled)
-                    log.info("CustomCommunicationSpi.createTcpClient 
[timeoutHelper.nextTimeoutChunk=" + timeout + "]");
+                log.info("CustomCommunicationSpi.createTcpClient 
[timeoutHelper.nextTimeoutChunk=" + timeout + "]");
 
-                sleep(timeout);
+                U.sleep(timeout);
 
                 return null;
             }
@@ -311,7 +239,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends 
GridCommonAbstractTe
          * Simulate network disabling.
          */
         void disableNetwork() {
-            networkDisabled = true;
+            netDisabled = true;
         }
 
         /**
@@ -322,7 +250,7 @@ public class TcpCommunicationSpiSkipMessageSendTest extends 
GridCommonAbstractTe
 
             Set<UUID> ids = clients.keySet();
 
-            if (ids.size() > 0) {
+            if (!ids.isEmpty()) {
                 log.info("Close TCP clients: " + ids);
 
                 for (UUID nodeId : ids) {
@@ -346,25 +274,18 @@ public class TcpCommunicationSpiSkipMessageSendTest 
extends GridCommonAbstractTe
      */
     class CustomDiscoverySpi extends TcpDiscoverySpi {
         /** Network is disabled. */
-        private volatile boolean networkDisabled = false;
+        private volatile boolean netDisabled;
 
         /** */
-        private final CountDownLatch networkDisabledLatch = new 
CountDownLatch(1);
-
-        /** */
-        CustomDiscoverySpi() {
-            super();
-
-            setName("CustomDiscoverySpi");
-        }
+        private final CountDownLatch netDisabledLatch = new CountDownLatch(1);
 
         /** {@inheritDoc} */
         @Override protected <T> T readMessage(Socket sock, @Nullable 
InputStream in,
             long timeout) throws IOException, IgniteCheckedException {
-            if (networkDisabled) {
-                sleep(timeout);
+            if (netDisabled) {
+                U.sleep(timeout);
 
-                return null;
+                throw new SocketTimeoutException("CustomDiscoverySpi: network 
is disabled.");
             }
             else
                 return super.readMessage(sock, in, timeout);
@@ -373,12 +294,10 @@ public class TcpCommunicationSpiSkipMessageSendTest 
extends GridCommonAbstractTe
         /** {@inheritDoc} */
         @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
             long timeout) throws IOException, IgniteCheckedException {
-            if (networkDisabled) {
-                sleep(timeout);
-
-                networkDisabledLatch.countDown();
+            if (netDisabled) {
+                netDisabledLatch.countDown();
 
-                throw new IgniteCheckedException("CustomDiscoverySpi: network 
is disabled.");
+                throw new SocketTimeoutException("CustomDiscoverySpi: network 
is disabled.");
             }
             else
                 super.writeToSocket(sock, msg, timeout);
@@ -388,27 +307,19 @@ public class TcpCommunicationSpiSkipMessageSendTest 
extends GridCommonAbstractTe
          * Simulate network disabling.
          */
         void disableNetwork() {
-            networkDisabled = true;
+            netDisabled = true;
         }
 
         /**
          * Wait until the network is disabled.
          */
-        boolean awaitNetworkDisabled(long timeout) throws InterruptedException 
{
-            return networkDisabledLatch.await(timeout, TimeUnit.MILLISECONDS);
+        boolean awaitNetworkDisabled() throws InterruptedException {
+            return netDisabledLatch.await(FAILURE_DETECTION_TIMEOUT * 2, 
TimeUnit.MILLISECONDS);
         }
     }
 
-    /**
-     * Sleeps for given number of milliseconds.
-     *
-     * @param timeout Time to sleep (2000 ms by default).
-     * @throws IgniteInterruptedCheckedException If current thread interrupted.
-     */
-    static void sleep(long timeout) throws IgniteInterruptedCheckedException {
-        if (timeout > 0)
-            U.sleep(timeout);
-        else
-            U.sleep(2000);
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() {
+        stopAllGrids();
     }
 }

Reply via email to