ignite-1758

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0f23a53b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0f23a53b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0f23a53b

Branch: refs/heads/ignite-1758
Commit: 0f23a53b531a187285ed2cc01a2a99e1c0c69abc
Parents: 4917cff
Author: sboikov <[email protected]>
Authored: Tue Oct 27 09:14:50 2015 +0300
Committer: sboikov <[email protected]>
Committed: Wed Oct 28 16:09:25 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ClientImpl.java    |  80 +++---
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 105 +++++---
 .../tcp/TcpDiscoveryMultiThreadedTest.java      | 241 +++++++++++--------
 3 files changed, 260 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0f23a53b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 03ae201..9cadca1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -920,12 +920,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                         boolean ack = msg instanceof 
TcpDiscoveryClientAckResponse;
 
-                        if (!ack) {
-                            if (spi.ensured(msg) && joinLatch.getCount() == 0L)
-                                lastMsgId = msg.id();
-
+                        if (!ack)
                             msgWorker.addMessage(msg);
-                        }
                         else
                             
sockWriter.ackReceived((TcpDiscoveryClientAckResponse)msg);
                     }
@@ -1237,13 +1233,8 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                                         return;
                                     }
-                                    else {
-                                        U.warn(log, "Client failed to 
reconnect because failed to restore discovery " +
-                                            "messages history, consider 
increasing '" +
-                                            IGNITE_DISCOVERY_HISTORY_SIZE + "' 
system property.");
-
+                                    else
                                         return;
-                                    }
                                 }
                             }
                             else if (spi.ensured(msg)) {
@@ -1313,9 +1304,6 @@ class ClientImpl extends TcpDiscoveryImpl {
         /** */
         private SocketStream currSock;
 
-        /** Indicates that pending messages are currently processed. */
-        private boolean pending;
-
         /** */
         private Reconnector reconnector;
 
@@ -1361,11 +1349,13 @@ class ClientImpl extends TcpDiscoveryImpl {
                         }
                     }
                     else if (msg == SPI_STOP) {
+                        boolean connected = state == CONNECTED;
+
                         state = STOPPED;
 
                         assert spi.getSpiContext().isStopping();
 
-                        if (currSock != null) {
+                        if (connected && currSock != null) {
                             TcpDiscoveryAbstractMessage leftMsg = new 
TcpDiscoveryNodeLeftMessage(getLocalNodeId());
 
                             leftMsg.client(true);
@@ -1577,6 +1567,9 @@ class ClientImpl extends TcpDiscoveryImpl {
                 processPingRequest();
 
             spi.stats.onMessageProcessingFinished(msg);
+
+            if (spi.ensured(msg) && state == CONNECTED)
+                lastMsgId = msg.id();
         }
 
         /**
@@ -1630,8 +1623,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                         if (msg.topologyHistory() != null)
                             topHist.putAll(msg.topologyHistory());
                     }
-                    else if (log.isDebugEnabled())
-                        log.debug("Discarding node added message with empty 
topology: " + msg);
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Discarding node added message with 
empty topology: " + msg);
+                    }
                 }
                 else if (log.isDebugEnabled())
                     log.debug("Discarding node added message (this message has 
already been processed) " +
@@ -1651,8 +1646,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                             spi.onExchange(newNodeId, newNodeId, data, null);
                     }
                 }
-                else if (log.isDebugEnabled())
-                    log.debug("Ignore topology message, local node not added 
to topology: " + msg);
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Ignore topology message, local node not 
added to topology: " + msg);
+                }
             }
         }
 
@@ -1679,6 +1676,11 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     locNode.order(topVer);
 
+                    for (Iterator<Long> it = topHist.keySet().iterator(); 
it.hasNext();) {
+                        if (it.next() >= topVer)
+                            it.remove();
+                    }
+
                     Collection<ClusterNode> nodes = 
updateTopologyHistory(topVer, msg);
 
                     notifyDiscovery(EVT_NODE_JOINED, topVer, locNode, nodes);
@@ -1738,7 +1740,7 @@ class ClientImpl extends TcpDiscoveryImpl {
                     assert top != null && top.contains(node) : "Topology does 
not contain node [msg=" + msg +
                         ", node=" + node + ", top=" + top + ']';
 
-                    if (!pending && joinLatch.getCount() > 0) {
+                    if (state != CONNECTED) {
                         if (log.isDebugEnabled())
                             log.debug("Discarding node add finished message 
(join process is not finished): " + msg);
 
@@ -1751,8 +1753,10 @@ class ClientImpl extends TcpDiscoveryImpl {
                         spi.stats.onNodeJoined();
                     }
                 }
-                else if (log.isDebugEnabled())
-                    log.debug("Ignore topology message, local node not added 
to topology: " + msg);
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Ignore topology message, local node not 
added to topology: " + msg);
+                }
             }
         }
 
@@ -1782,7 +1786,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     Collection<ClusterNode> top = 
updateTopologyHistory(msg.topologyVersion(), msg);
 
-                    if (!pending && joinLatch.getCount() > 0) {
+                    if (state != CONNECTED) {
                         if (log.isDebugEnabled())
                             log.debug("Discarding node left message (join 
process is not finished): " + msg);
 
@@ -1793,8 +1797,10 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     spi.stats.onNodeLeft();
                 }
-                else if (log.isDebugEnabled())
-                    log.debug("Ignore topology message, local node not added 
to topology: " + msg);
+                else {
+                    if (log.isDebugEnabled())
+                        log.debug("Ignore topology message, local node not 
added to topology: " + msg);
+                }
             }
         }
 
@@ -1835,7 +1841,7 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                 Collection<ClusterNode> top = 
updateTopologyHistory(msg.topologyVersion(), msg);
 
-                if (!pending && joinLatch.getCount() > 0) {
+                if (state != CONNECTED) {
                     if (log.isDebugEnabled())
                         log.debug("Discarding node failed message (join 
process is not finished): " + msg);
 
@@ -1908,18 +1914,11 @@ class ClientImpl extends TcpDiscoveryImpl {
 
                     reconnector = null;
 
-                    pending = true;
-
-                    try {
-                        for (TcpDiscoveryAbstractMessage pendingMsg : 
msg.pendingMessages()) {
-                            if (log.isDebugEnabled())
-                                log.debug("Process pending message on 
reconnect [msg=" + pendingMsg + ']');
+                    for (TcpDiscoveryAbstractMessage pendingMsg : 
msg.pendingMessages()) {
+                        if (log.isDebugEnabled())
+                            log.debug("Process pending message on reconnect 
[msg=" + pendingMsg + ']');
 
-                            processDiscoveryMessage(pendingMsg);
-                        }
-                    }
-                    finally {
-                        pending = false;
+                        processDiscoveryMessage(pendingMsg);
                     }
                 }
                 else {
@@ -1947,7 +1946,7 @@ class ClientImpl extends TcpDiscoveryImpl {
          * @param msg Message.
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg) {
-            if (msg.verified() && state == CONNECTED) {
+            if (state == CONNECTED) {
                 DiscoverySpiListener lsnr = spi.lsnr;
 
                 if (lsnr != null) {
@@ -2120,6 +2119,11 @@ class ClientImpl extends TcpDiscoveryImpl {
         InputStream stream() {
             return in;
         }
+
+        /** {@inheritDoc} */
+        public String toString() {
+            return sock.toString();
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f23a53b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8c63eb2..94653fb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1253,9 +1253,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             lsnr.onDiscovery(type, topVer, node, top, hist, null);
         }
-        else if (log.isDebugEnabled())
-            log.debug("Skipped discovery notification [node=" + node + ", 
spiState=" + spiState +
-                ", type=" + U.gridEventName(type) + ", topVer=" + topVer + 
']');
+        else {
+            if (log.isDebugEnabled())
+                log.debug("Skipped discovery notification [node=" + node + ", 
spiState=" + spiState +
+                    ", type=" + U.gridEventName(type) + ", topVer=" + topVer + 
']');
+        }
     }
 
     /**
@@ -1393,7 +1395,11 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             if (node.id().equals(destNodeId)) {
                 Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
-                Collection<TcpDiscoveryNode> topToSnd = new 
ArrayList<>(allNodes.size());
+
+                Collection<TcpDiscoveryNode> topToSnd = 
nodeAddedMsg.topology();
+
+                if (topToSnd == null)
+                    topToSnd = new ArrayList<>(allNodes.size());
 
                 for (TcpDiscoveryNode n0 : allNodes) {
                     assert n0.internalOrder() != 0 : n0;
@@ -2165,19 +2171,9 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
-         * Sends message across the ring.
-         *
-         * @param msg Message to send
+         * @param msg Message.
          */
-        @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", 
"ContinueStatementWithLabel"})
-        private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
-            assert msg != null;
-
-            assert ring.hasRemoteNodes();
-
-            for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : 
spi.sendMsgLsnrs)
-                msgLsnr.apply(msg);
-
+        private void sendMessageToClients(TcpDiscoveryAbstractMessage msg) {
             if (redirectToClients(msg)) {
                 byte[] marshalledMsg = null;
 
@@ -2200,6 +2196,23 @@ class ServerImpl extends TcpDiscoveryImpl {
                     clientMsgWorker.addMessage(msgClone);
                 }
             }
+        }
+
+        /**
+         * Sends message across the ring.
+         *
+         * @param msg Message to send
+         */
+        @SuppressWarnings({"BreakStatementWithLabel", "LabeledStatement", 
"ContinueStatementWithLabel"})
+        private void sendMessageAcrossRing(TcpDiscoveryAbstractMessage msg) {
+            assert msg != null;
+
+            assert ring.hasRemoteNodes();
+
+            for (IgniteInClosure<TcpDiscoveryAbstractMessage> msgLsnr : 
spi.sendMsgLsnrs)
+                msgLsnr.apply(msg);
+
+            sendMessageToClients(msg);
 
             Collection<TcpDiscoveryNode> failedNodes;
 
@@ -2814,7 +2827,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                                     "[clientNode=" + existingNode + ", msg=" + 
reconMsg + ']');
                         }
                         else {
-                            if (ring.hasRemoteNodes())
+                            if (sendMessageToRemotes(reconMsg))
                                 sendMessageAcrossRing(reconMsg);
                         }
                     }
@@ -3006,8 +3019,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                 nodeAddedMsg.client(msg.client());
 
                 processNodeAddedMessage(nodeAddedMsg);
+
+                if (nodeAddedMsg.verified())
+                    msgHist.add(nodeAddedMsg);
             }
-            else if (ring.hasRemoteNodes())
+            else if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
         }
 
@@ -3126,12 +3142,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                                 locNodeId + ", clientNodeId=" + nodeId + ']');
                     }
                     else {
-                        if (ring.hasRemoteNodes())
+                        if (sendMessageToRemotes(msg))
                             sendMessageAcrossRing(msg);
                     }
                 }
                 else {
-                    if (ring.hasRemoteNodes())
+                    if (sendMessageToRemotes(msg))
                         sendMessageAcrossRing(msg);
                 }
             }
@@ -3193,17 +3209,35 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                     processNodeAddFinishedMessage(addFinishMsg);
 
+                    if (addFinishMsg.verified())
+                        msgHist.add(addFinishMsg);
+
                     addMessage(new TcpDiscoveryDiscardMessage(locNodeId, 
msg.id(), false));
 
                     return;
                 }
 
                 msg.verify(locNodeId);
+
+                if (node.isClient()) {
+                    Collection<TcpDiscoveryNode> allNodes = ring.allNodes();
+
+                    Collection<TcpDiscoveryNode> top = new 
ArrayList<>(allNodes.size());
+
+                    for (TcpDiscoveryNode n0 : allNodes) {
+                        assert n0.internalOrder() > 0 : n0;
+
+                        if (n0.internalOrder() < node.internalOrder())
+                            top.add(n0);
+                    }
+
+                    msg.topology(top);
+                }
             }
             else if (!locNodeId.equals(node.id()) && ring.node(node.id()) != 
null) {
                 // Local node already has node from message in local topology.
                 // Just pass it to coordinator via the ring.
-                if (ring.hasRemoteNodes())
+                if (sendMessageToRemotes(msg))
                     sendMessageAcrossRing(msg);
 
                 if (log.isDebugEnabled())
@@ -3391,7 +3425,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (ring.hasRemoteNodes())
+            if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
         }
 
@@ -3526,7 +3560,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 notifyDiscovery(EVT_NODE_JOINED, topVer, locNode);
             }
 
-            if (ring.hasRemoteNodes())
+            if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
 
             checkPendingCustomMessages();
@@ -3694,7 +3728,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (ring.hasRemoteNodes()) {
+            if (sendMessageToRemotes(msg)) {
                 try {
                     sendMessageAcrossRing(msg);
                 }
@@ -3715,6 +3749,19 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * @param msg Message to send.
+         * @return {@code True} if message should be send across the ring.
+         */
+        private boolean sendMessageToRemotes(TcpDiscoveryAbstractMessage msg) {
+            if (ring.hasRemoteNodes())
+                return true;
+
+            sendMessageToClients(msg);
+
+            return false;
+        }
+
+        /**
          * Processes node failed message.
          *
          * @param msg Node failed message.
@@ -3846,7 +3893,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 spi.stats.onNodeFailed();
             }
 
-            if (ring.hasRemoteNodes())
+            if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
             else {
                 if (log.isDebugEnabled())
@@ -3986,7 +4033,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (ring.hasRemoteNodes())
+            if (sendMessageToRemotes(msg))
                 sendMessageAcrossRing(msg);
         }
 
@@ -4052,7 +4099,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                 }
             }
 
-            if (ring.hasRemoteNodes()) {
+            if (sendMessageToRemotes(msg)) {
                 if ((locNodeId.equals(msg.creatorNodeId()) && 
msg.senderNodeId() == null ||
                     !hasMetrics(msg, locNodeId)) && spiStateCopy() == 
CONNECTED) {
                     // Message is on its first ring or just created on 
coordinator.
@@ -4098,7 +4145,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     }
                 }
 
-                if (ring.hasRemoteNodes())
+                if (sendMessageToRemotes(msg))
                     sendMessageAcrossRing(msg);
             }
             else {
@@ -4305,7 +4352,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                     notifyDiscoveryListener(msg);
                 }
 
-                if (ring.hasRemoteNodes())
+                if (sendMessageToRemotes(msg))
                     sendMessageAcrossRing(msg);
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/0f23a53b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
index 7c6a960..fcb0116 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoveryMultiThreadedTest.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -85,7 +86,10 @@ public class TcpDiscoveryMultiThreadedTest extends 
GridCommonAbstractTest {
         if (client())
             cfg.setClientMode(true);
 
-        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(ipFinder));
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().
+            setIpFinder(ipFinder).
+            setJoinTimeout(60_000).
+            setNetworkTimeout(60_000));
 
         cfg.setCacheConfiguration();
 
@@ -112,164 +116,203 @@ public class TcpDiscoveryMultiThreadedTest extends 
GridCommonAbstractTest {
      * @throws Exception If any error occurs.
      */
     public void testMultiThreadedClientsRestart() throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-1123";);
+        final AtomicBoolean done = new AtomicBoolean();
 
-        clientFlagGlobal = false;
+        try {
+            clientFlagGlobal = false;
 
-        info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
+            info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " 
min.");
 
-        startGridsMultiThreaded(GRID_CNT);
+            startGridsMultiThreaded(GRID_CNT);
 
-        clientFlagGlobal = true;
+            clientFlagGlobal = true;
 
-        startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+            startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
 
-        final AtomicBoolean done = new AtomicBoolean();
+            final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);
 
-        final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);
+            IgniteInternalFuture<?> fut1 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        clientFlagPerThread.set(true);
 
-        IgniteInternalFuture<?> fut1 = multithreadedAsync(
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    clientFlagPerThread.set(true);
+                        int idx = clientIdx.getAndIncrement();
 
-                    int idx = clientIdx.getAndIncrement();
+                        while (!done.get()) {
+                            stopGrid(idx, true);
+                            startGrid(idx);
+                        }
 
-                    while (!done.get()) {
-                        stopGrid(idx, true);
-                        startGrid(idx);
+                        return null;
                     }
+                },
+                CLIENT_GRID_CNT
+            );
 
-                    return null;
-                }
-            },
-            CLIENT_GRID_CNT
-        );
-
-        Thread.sleep(getTestTimeout() - 60 * 1000);
+            Thread.sleep(getTestTimeout() - 60 * 1000);
 
-        done.set(true);
+            done.set(true);
 
-        fut1.get();
+            fut1.get();
+        }
+        finally {
+            done.set(true);
+        }
     }
 
     /**
      * @throws Exception If any error occurs.
      */
     public void testMultiThreadedClientsServersRestart() throws Throwable {
-        clientFlagGlobal = false;
+        final AtomicBoolean done = new AtomicBoolean();
 
-        info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");
+        try {
+            clientFlagGlobal = false;
 
-        startGridsMultiThreaded(GRID_CNT);
+            info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " 
min.");
 
-        clientFlagGlobal = true;
+            startGridsMultiThreaded(GRID_CNT);
 
-        startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
+            clientFlagGlobal = true;
 
-        final AtomicBoolean done = new AtomicBoolean();
+            startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);
 
-        final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);
+            final AtomicReference<Throwable> error = new AtomicReference<>();
 
-        final AtomicReference<Throwable> error = new AtomicReference<>();
+            final BlockingQueue<Integer> clientStopIdxs = new 
LinkedBlockingQueue<>();
 
-        IgniteInternalFuture<?> fut1 = multithreadedAsync(
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    try {
-                        clientFlagPerThread.set(true);
+            for (int i = GRID_CNT; i < GRID_CNT + CLIENT_GRID_CNT; i++)
+                clientStopIdxs.add(i);
 
-                        int idx = clientIdx.getAndIncrement();
+            final AtomicInteger clientStartIdx = new AtomicInteger(9000);
 
-                        while (!done.get() && error.get() == null) {
-                            stopGrid(idx);
+            IgniteInternalFuture<?> fut1 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        try {
+                            clientFlagPerThread.set(true);
 
-                            try {
-                                startGrid(idx);
-                            }
-                            catch (Exception e) {
-                                if (X.hasCause(e, 
IgniteClientDisconnectedCheckedException.class))
-                                    log.info("Client disconnected: " + e);
-                                else
-                                    throw e;
+                            while (!done.get() && error.get() == null) {
+                                Integer stopIdx = clientStopIdxs.take();
+
+                                log.info("Stop client: " + stopIdx);
+
+                                stopGrid(stopIdx);
+
+                                while (!done.get() && error.get() == null) {
+                                    // Generate unique name to simplify 
debugging.
+                                    int startIdx = 
clientStartIdx.getAndIncrement();
+
+                                    log.info("Start client: " + startIdx);
+
+                                    try {
+                                        Ignite ignite = startGrid(startIdx);
+
+                                        
assertTrue(ignite.configuration().isClientMode());
+
+                                        clientStopIdxs.add(startIdx);
+
+                                        break;
+                                    }
+                                    catch (Exception e) {
+                                        if (X.hasCause(e, 
IgniteClientDisconnectedCheckedException.class) ||
+                                            X.hasCause(e, 
IgniteClientDisconnectedException.class))
+                                            log.info("Client disconnected: " + 
e);
+                                        else
+                                            throw e;
+                                    }
+                                }
                             }
                         }
-                    }
-                    catch (Throwable e) {
-                        log.error("Unexpected error: " + e, e);
+                        catch (Throwable e) {
+                            log.error("Unexpected error: " + e, e);
+
+                            error.compareAndSet(null, e);
 
-                        error.compareAndSet(null, e);
+                            return null;
+                        }
 
                         return null;
                     }
+                },
+                CLIENT_GRID_CNT,
+                "client-restart");
 
-                    return null;
-                }
-            },
-            CLIENT_GRID_CNT
-        );
+            final BlockingQueue<Integer> srvStopIdxs = new 
LinkedBlockingQueue<>();
 
-        final BlockingQueue<Integer> srvIdx = new LinkedBlockingQueue<>();
+            for (int i = 0; i < GRID_CNT; i++)
+                srvStopIdxs.add(i);
 
-        for (int i = 0; i < GRID_CNT; i++)
-            srvIdx.add(i);
+            final AtomicInteger srvStartIdx = new AtomicInteger(GRID_CNT + 
CLIENT_GRID_CNT);
 
-        IgniteInternalFuture<?> fut2 = multithreadedAsync(
-            new Callable<Object>() {
-                @Override public Object call() throws Exception {
-                    try {
-                        clientFlagPerThread.set(false);
+            IgniteInternalFuture<?> fut2 = multithreadedAsync(
+                new Callable<Object>() {
+                    @Override public Object call() throws Exception {
+                        try {
+                            clientFlagPerThread.set(false);
 
-                        while (!done.get() && error.get() == null) {
-                            int idx = srvIdx.take();
+                            while (!done.get() && error.get() == null) {
+                                int stopIdx = srvStopIdxs.take();
 
-                            stopGrid(idx);
-                            startGrid(idx);
+                                log.info("Stop server: " + stopIdx);
+
+                                stopGrid(stopIdx);
+
+                                // Generate unique name to simplify debugging.
+                                int startIdx = srvStartIdx.getAndIncrement();
+
+                                log.info("Start server: " + startIdx);
 
-                            srvIdx.add(idx);
+                                Ignite ignite = startGrid(startIdx);
+
+                                
assertFalse(ignite.configuration().isClientMode());
+
+                                srvStopIdxs.add(startIdx);
+                            }
                         }
-                    }
-                    catch (Throwable e) {
-                        log.error("Unexpected error: " + e, e);
+                        catch (Throwable e) {
+                            log.error("Unexpected error: " + e, e);
 
-                        error.compareAndSet(null, e);
+                            error.compareAndSet(null, e);
+
+                            return null;
+                        }
 
                         return null;
                     }
+                },
+                GRID_CNT - 1,
+                "server-restart");
 
-                    return null;
-                }
-            },
-            GRID_CNT - 1
-        );
-
-        long timeToExec = getTestTimeout() - 60 * 1000;
+            final long timeToExec = 2 * 60 * 1000;
 
-        while (timeToExec > 0) {
-            long start = System.currentTimeMillis();
+            final long endTime = System.currentTimeMillis() + timeToExec;
 
-            Thread.sleep(3000);
+            while (System.currentTimeMillis() < endTime) {
+                Thread.sleep(3000);
 
-            timeToExec -= (System.currentTimeMillis() - start);
+                if (error.get() != null) {
+                    Throwable err = error.get();
 
-            if (error.get() != null) {
-                Throwable err = error.get();
+                    U.error(log, "Test failed: " + err.getMessage());
 
-                U.error(log, "Test failed: " + err.getMessage());
+                    done.set(true);
 
-                done.set(true);
+                    fut1.cancel();
+                    fut2.cancel();
 
-                fut1.cancel();
-                fut2.cancel();
-
-                throw err;
+                    throw err;
+                }
             }
-        }
 
-        done.set(true);
+            done.set(true);
 
-        fut1.get();
-        fut2.get();
+            fut1.get();
+            fut2.get();
+        }
+        finally {
+            done.set(true);
+        }
     }
 
     /**

Reply via email to