Repository: ignite
Updated Branches:
  refs/heads/ignite-1758 f96781d8b -> 7661e3ee1


ignite-1758 debug


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7661e3ee
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7661e3ee
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7661e3ee

Branch: refs/heads/ignite-1758
Commit: 7661e3ee1d6651cef8399cb05fedae0663b7d327
Parents: f96781d
Author: sboikov <[email protected]>
Authored: Mon Nov 9 16:53:39 2015 +0300
Committer: sboikov <[email protected]>
Committed: Mon Nov 9 16:53:39 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  67 +++++----
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 142 +++++++++++++++++--
 2 files changed, 173 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7661e3ee/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 5db1e34..af385e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -1087,9 +1087,20 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 openSock = true;
 
+                TcpDiscoveryHandshakeRequest req = new 
TcpDiscoveryHandshakeRequest(locNodeId);
+
+                if (msg instanceof TcpDiscoveryJoinRequestMessage) {
+                    synchronized (failedNodes) {
+                        for (TcpDiscoveryNode node : failedNodes) {
+                            debugLog(null, "Add failed node [node=" + node + 
", msg=" + req + ']');
+
+                            req.addFailedNode(node);
+                        }
+                    }
+                }
+
                 // Handshake.
-                spi.writeToSocket(sock, new 
TcpDiscoveryHandshakeRequest(locNodeId), timeoutHelper.nextTimeoutChunk(
-                    spi.getSocketTimeout()));
+                spi.writeToSocket(sock, req, 
timeoutHelper.nextTimeoutChunk(spi.getSocketTimeout()));
 
                 TcpDiscoveryHandshakeResponse res = spi.readMessage(sock, 
null, timeoutHelper.nextTimeoutChunk(
                     ackTimeout0));
@@ -1760,6 +1771,27 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
     }
 
+    private void processMessageFailedNodes(TcpDiscoveryAbstractMessage msg) {
+        if (msg.failedNodes() != null) {
+            for (UUID nodeId : msg.failedNodes()) {
+                TcpDiscoveryNode failedNode = ring.node(nodeId);
+
+                if (failedNode != null) {
+                    boolean add;
+
+                    synchronized (mux) {
+                        add = failedNodes.add(failedNode);
+                    }
+
+                    if (add)
+                        debugLog(null, "New failed node [node=" + failedNode + 
", msg=" + msg + ']');
+                }
+                else
+                    debugLog(null, "Unknown failed node [nodeId=" + nodeId + 
", msg=" + msg + ']');
+            }
+        }
+    }
+
     /**
      * Discovery messages history used for client reconnect.
      */
@@ -2141,27 +2173,6 @@ class ServerImpl extends TcpDiscoveryImpl {
                 log.debug("Connection check frequency is calculated: " + 
connCheckFreq);
         }
 
-        private void addMessageFailedNodes(TcpDiscoveryAbstractMessage msg) {
-            if (msg.failedNodes() != null) {
-                for (UUID nodeId : msg.failedNodes()) {
-                    TcpDiscoveryNode failedNode = ring.node(nodeId);
-
-                    if (failedNode != null) {
-                        boolean add;
-
-                        synchronized (mux) {
-                            add = failedNodes.add(failedNode);
-                        }
-
-                        if (add)
-                            debugLog(null, "New failed node [node=" + 
failedNode + ", msg=" + msg + ']');
-                    }
-                    else
-                        debugLog(null, "Unknown failed node [nodeId=" + nodeId 
+ ", msg=" + msg + ']');
-                }
-            }
-        }
-
         /**
          * @param msg Message to process.
          */
@@ -2175,7 +2186,7 @@ class ServerImpl extends TcpDiscoveryImpl {
 
                 spi.stats.onMessageProcessingStarted(msg);
 
-                addMessageFailedNodes(msg);
+                processMessageFailedNodes(msg);
 
                 if (msg instanceof TcpDiscoveryJoinRequestMessage)
                     
processJoinRequestMessage((TcpDiscoveryJoinRequestMessage)msg);
@@ -3589,7 +3600,7 @@ class ServerImpl extends TcpDiscoveryImpl {
                         spi.onExchange(node.id(), entry.getKey(), 
entry.getValue(), U.gridClassLoader());
                 }
 
-                addMessageFailedNodes(msg);
+                processMessageFailedNodes(msg);
             }
 
             if (sendMessageToRemotes(msg))
@@ -4916,6 +4927,12 @@ class ServerImpl extends TcpDiscoveryImpl {
                     // Handshake.
                     TcpDiscoveryHandshakeRequest req = 
(TcpDiscoveryHandshakeRequest)msg;
 
+                    if (req.failedNodes() != null && 
req.failedNodes().contains(getLocalNodeId())) {
+                        debugLog(msg, "Ignore handshake request: " + msg);
+
+                        return;
+                    }
+
                     UUID nodeId = req.creatorNodeId();
 
                     this.nodeId = nodeId;

http://git-wip-us.apache.org/repos/asf/ignite/blob/7661e3ee/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 0280e9c..29cc169 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -21,6 +21,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -94,7 +95,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     private UUID nodeId;
 
     /** */
-    private TcpDiscoverySpi nodeSpi;
+    private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>();
 
     /**
      * @throws Exception If fails.
@@ -108,11 +109,14 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
 
-        TcpDiscoverySpi spi = nodeSpi;
+        TcpDiscoverySpi spi = nodeSpi.get();
 
-        if (spi == null)
+        if (spi == null) {
             spi = 
gridName.contains("testPingInterruptedOnNodeFailedFailingNode") ?
                 new TestTcpDiscoverySpi() : new TcpDiscoverySpi();
+        }
+        else
+            nodeSpi.set(null);
 
         discoMap.put(gridName, spi);
 
@@ -1219,11 +1223,11 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     private void customEventRace1(final boolean cacheStartFrom1, boolean 
stopCrd) throws Exception {
         TestCustomEventRaceSpi spi0 = new TestCustomEventRaceSpi();
 
-        nodeSpi = spi0;
+        nodeSpi.set(spi0);
 
         final Ignite ignite0 = startGrid(0);
 
-        nodeSpi = new TestCustomEventRaceSpi();
+        nodeSpi.set(new TestCustomEventRaceSpi());
 
         final Ignite ignite1 = startGrid(1);
 
@@ -1238,7 +1242,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
             @Override public Void call() throws Exception {
                 log.info("Start 2");
 
-                nodeSpi = new TestCustomEventRaceSpi();
+                nodeSpi.set(new TestCustomEventRaceSpi());
 
                 Ignite ignite2 = startGrid(2);
 
@@ -1288,7 +1292,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
         assertEquals(1, cache.get(1));
 
-        nodeSpi = new TestCustomEventRaceSpi();
+        nodeSpi.set(new TestCustomEventRaceSpi());
 
         Ignite ignite = startGrid(3);
 
@@ -1331,15 +1335,15 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     private void customEventCoordinatorFailure(boolean twoNodes) throws 
Exception {
         TestCustomEventCoordinatorFailureSpi spi0 = new 
TestCustomEventCoordinatorFailureSpi();
 
-        nodeSpi = spi0;
+        nodeSpi.set(spi0);
 
         Ignite ignite0 = startGrid(0);
 
-        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
 
         Ignite ignite1 = startGrid(1);
 
-        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
 
         Ignite ignite2 = twoNodes ? null : startGrid(2);
 
@@ -1383,7 +1387,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
         log.info("Try start one more node.");
 
-        nodeSpi = new TestCustomEventCoordinatorFailureSpi();
+        nodeSpi.set(new TestCustomEventCoordinatorFailureSpi());
 
         Ignite ignite = startGrid(twoNodes ? 2 : 3);
 
@@ -1398,6 +1402,122 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes1() throws Exception {
+        try {
+            final int FAIL_ORDER = 3;
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            startGrid(0);
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            startGrid(1);
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            startGrid(2);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes2() throws Exception {
+        try {
+            final int FAIL_ORDER = 3;
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            startGrid(0);
+
+            nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+            startGrid(1);
+
+            final AtomicInteger nodeIdx = new AtomicInteger(1);
+
+            GridTestUtils.runMultiThreaded(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    int idx = nodeIdx.incrementAndGet();
+
+                    nodeSpi.set(new TestFailedNodesSpi(FAIL_ORDER));
+
+                    startGrid(idx);
+
+                    return null;
+                }
+            }, 3, "start-node");
+
+            Ignite ignite2 = ignite(2);
+
+            waitForRemoteNodes(ignite2, 3);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testFailedNodes3() throws Exception {
+        try {
+            nodeSpi.set(new TestFailedNodesSpi(-1));
+
+            startGrid(0);
+
+            nodeSpi.set(new TestFailedNodesSpi(2));
+
+            startGrid(1);
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Simulate scenario when node detects node failure trying to send 
message, but node still alive.
+     */
+    private static class TestFailedNodesSpi extends TcpDiscoverySpi {
+        /** */
+        private AtomicBoolean failMsg = new AtomicBoolean();
+
+        /** */
+        private int failOrder;
+
+        /**
+         * @param failOrder Spi fails connection if local node order equals to 
this order.
+         */
+        TestFailedNodesSpi(int failOrder) {
+            this.failOrder = failOrder;
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock,
+            TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout,
+            long timeout) throws IOException, IgniteCheckedException {
+            if (locNode.internalOrder() == failOrder &&
+                (msg instanceof TcpDiscoveryNodeAddedMessage) &&
+                failMsg.compareAndSet(false, true)) {
+                log.info("IO error on message send [locNode=" + locNode + ", 
msg=" + msg + ']');
+
+                sock.close();
+
+                throw new SocketTimeoutException();
+            }
+
+            super.writeToSocket(sock, msg, bout, timeout);
+        }
+    }
+
+    /**
      *
      */
     private static class TestCustomEventCoordinatorFailureSpi extends 
TcpDiscoverySpi {

Reply via email to