Repository: ignite
Updated Branches:
  refs/heads/ignite-801 1635748ad -> d99233d66


IGNITE-1911: Discovery MessageWorker thread moves node to "zombie" state when 
fails


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0d3ef8af
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0d3ef8af
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0d3ef8af

Branch: refs/heads/ignite-801
Commit: 0d3ef8af2de53a8a658e1101ef327e90da9dcea5
Parents: 967cfcb
Author: dmagda <[email protected]>
Authored: Mon Nov 16 17:58:35 2015 +0300
Committer: dmagda <[email protected]>
Committed: Mon Nov 16 17:58:35 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 18 +++++
 .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 79 ++++++++++++++++++--
 2 files changed, 91 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/0d3ef8af/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 0fe2881..c1ecd50 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2101,6 +2101,24 @@ class ServerImpl extends TcpDiscoveryImpl {
             initConnectionCheckFrequency();
         }
 
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException {
+            try {
+                super.body();
+            }
+            catch (Throwable e) {
+                if (!spi.isNodeStopping0() && spi.ignite() != null) {
+                    log.error("TcpDiscoverSpi's message worker thread failed 
abnormally. Stopping the grid in order " +
+                        "to prevent cluster wide instability.", e);
+
+                    spi.ignite().close();
+                }
+
+                // Must be processed by IgniteSpiThread as well.
+                throw e;
+            }
+        }
+
         /**
          * Initializes connection check frequency. Used only when failure 
detection timeout is enabled.
          */

http://git-wip-us.apache.org/repos/asf/ignite/blob/0d3ef8af/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 51d8a2d..874ae41 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -65,7 +65,6 @@ import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryCustomEventMessa
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
 import 
org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage;
-import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
@@ -355,6 +354,8 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     private void testFailureDetectionOnNodePing(Ignite pingingNode, Ignite 
failedNode) throws Exception {
         final CountDownLatch cnt = new CountDownLatch(1);
 
+        final UUID failedNodeId = failedNode.cluster().localNode().id();
+
         pingingNode.events().localListen(
             new IgnitePredicate<Event>() {
                 @Override public boolean apply(Event evt) {
@@ -372,9 +373,9 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
         TcpDiscoverySpi spi = discoMap.get(pingingNode.name());
 
-        boolean res = spi.pingNode(failedNode.cluster().localNode().id());
+        boolean res = spi.pingNode(failedNodeId);
 
-        assertFalse("Ping is ok for node " + 
failedNode.cluster().localNode().id() + ", but had to fail.", res);
+        assertFalse("Ping is ok for node " + failedNodeId + ", but had to 
fail.", res);
 
         // Heartbeat interval is 40 seconds, but we should detect node failure 
faster.
         assert cnt.await(7, SECONDS);
@@ -391,6 +392,8 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
 
             
((TestTcpDiscoverySpi)failedNode.configuration().getDiscoverySpi()).ignorePingResponse
 = true;
 
+            final UUID failedNodeId = failedNode.cluster().localNode().id();
+
             final CountDownLatch pingLatch = new CountDownLatch(1);
 
             final CountDownLatch eventLatch = new CountDownLatch(1);
@@ -404,7 +407,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
             pingingNode.events().localListen(
                 new IgnitePredicate<Event>() {
                     @Override public boolean apply(Event event) {
-                        if 
(((DiscoveryEvent)event).eventNode().id().equals(failedNode.cluster().localNode().id()))
 {
+                        if 
(((DiscoveryEvent)event).eventNode().id().equals(failedNodeId)) {
                             failRes.set(true);
                             eventLatch.countDown();
                         }
@@ -420,7 +423,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
                         pingLatch.countDown();
 
                         
pingRes.set(pingingNode.configuration().getDiscoverySpi().pingNode(
-                            failedNode.cluster().localNode().id()));
+                            failedNodeId));
 
                         return null;
                     }
@@ -1148,7 +1151,7 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
             for (IgniteKernal grid : grids)
                 assertEquals(startTime, 
(Long)grid.context().discovery().gridStartTime());
 
-            grids.add((IgniteKernal) startGrid(5));
+            grids.add((IgniteKernal)startGrid(5));
 
             for (IgniteKernal grid : grids)
                 assertEquals(startTime, 
(Long)grid.context().discovery().gridStartTime());
@@ -1308,6 +1311,51 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     * @throws Exception If failed
+     */
+    public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception {
+        try {
+            TestMessageWorkerFailureSpi spi0 = new 
TestMessageWorkerFailureSpi();
+
+            nodeSpi = spi0;
+
+            final Ignite ignite0 = startGrid(0);
+
+            nodeSpi = new TcpDiscoverySpi();
+
+            Ignite ignite1 = startGrid(1);
+
+            final AtomicBoolean disconnected = new AtomicBoolean();
+
+            final CountDownLatch latch = new CountDownLatch(1);
+
+            final UUID failedNodeId = ignite0.cluster().localNode().id();
+
+            ignite1.events().localListen(new IgnitePredicate<Event>() {
+                @Override public boolean apply(Event event) {
+                    if (event.type() == EventType.EVT_NODE_FAILED &&
+                        
failedNodeId.equals(((DiscoveryEvent)event).eventNode().id()))
+                        disconnected.set(true);
+
+                    latch.countDown();
+
+                    return false;
+                }
+            }, EventType.EVT_NODE_FAILED);
+
+            spi0.stop = true;
+
+            latch.await(15, TimeUnit.SECONDS);
+
+            assertTrue(disconnected.get());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+
+    /**
      * @param twoNodes If {@code true} starts two nodes, otherwise three.
      * @throws Exception If failed
      */
@@ -1458,6 +1506,25 @@ public class TcpDiscoverySelfTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     *
+     */
+    private static class TestMessageWorkerFailureSpi extends TcpDiscoverySpi {
+        /** */
+        private volatile boolean stop;
+
+
+        /** {@inheritDoc} */
+        @Override protected void writeToSocket(Socket sock, 
TcpDiscoveryAbstractMessage msg,
+            GridByteArrayOutputStream bout, long timeout) throws IOException, 
IgniteCheckedException {
+
+            if (stop)
+                throw new RuntimeException("Failing ring message worker 
explicitly");
+
+            super.writeToSocket(sock, msg, bout, timeout);
+        }
+    }
+
+    /**
      * Starts new grid with given index. Method optimize is not invoked.
      *
      * @param idx Index of the grid to start.

Reply via email to