IGNITE-2688: fixed InterruptException processing in RingMessageWorker caused by segmentation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/fd636e99 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/fd636e99 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/fd636e99 Branch: refs/heads/ignite-2788 Commit: fd636e9958d53e11c881dadd4935afe24862ac48 Parents: 9aab9e1 Author: Denis Magda <[email protected]> Authored: Wed Apr 20 17:52:51 2016 +0300 Committer: shtykh_roman <[email protected]> Committed: Fri May 13 16:11:14 2016 +0900 ---------------------------------------------------------------------- .../ignite/spi/discovery/tcp/ServerImpl.java | 4 +- .../spi/discovery/tcp/TcpDiscoverySelfTest.java | 148 +++++++++++++++++-- 2 files changed, 136 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/fd636e99/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 572f540..3283d99 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2179,8 +2179,8 @@ class ServerImpl extends TcpDiscoveryImpl { super.body(); } catch (Throwable e) { - if (!spi.isNodeStopping0()) { - final Ignite ignite = spi.ignite(); + if (!spi.isNodeStopping0() && spiStateCopy() != DISCONNECTING) { + final Ignite ignite = spi.ignite(); if (ignite != null) { U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally. " + http://git-wip-us.apache.org/repos/asf/ignite/blob/fd636e99/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java index 7efaca0..45cd276 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java @@ -77,6 +77,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedM import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeFailedMessage; import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeLeftMessage; +import org.apache.ignite.testframework.GridStringLogger; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.eclipse.jetty.util.ConcurrentHashSet; @@ -108,6 +109,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { /** */ private static ThreadLocal<TcpDiscoverySpi> nodeSpi = new ThreadLocal<>(); + private GridStringLogger strLogger; + /** * @throws Exception If fails. */ @@ -188,6 +191,16 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { } else if (gridName.contains("testPingInterruptedOnNodeFailedPingingNode")) cfg.setFailureDetectionTimeout(30_000); + else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureNormalNode")) { + cfg.setFailureDetectionTimeout(3_000); + } + else if (gridName.contains("testNoRingMessageWorkerAbnormalFailureSegmentedNode")) { + cfg.setFailureDetectionTimeout(6_000); + + cfg.setGridLogger(strLogger = new GridStringLogger()); + } + else if (gridName.contains("testNodeShutdownOnRingMessageWorkerFailureFailedNode")) + cfg.setGridLogger(strLogger = new GridStringLogger()); return cfg; } @@ -1355,11 +1368,12 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { */ public void testNodeShutdownOnRingMessageWorkerFailure() throws Exception { try { - TestMessageWorkerFailureSpi1 spi0 = new TestMessageWorkerFailureSpi1(); + final TestMessageWorkerFailureSpi1 spi0 = new TestMessageWorkerFailureSpi1( + TestMessageWorkerFailureSpi1.EXCEPTION_MODE); nodeSpi.set(spi0); - final Ignite ignite0 = startGrid(0); + final Ignite ignite0 = startGrid("testNodeShutdownOnRingMessageWorkerFailureFailedNode"); nodeSpi.set(new TcpDiscoverySpi()); @@ -1374,10 +1388,11 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { ignite1.events().localListen(new IgnitePredicate<Event>() { @Override public boolean apply(Event evt) { if (evt.type() == EventType.EVT_NODE_FAILED && - failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id())) + failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id())) { disconnected.set(true); - latch.countDown(); + latch.countDown(); + } return false; } @@ -1389,20 +1404,98 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { assertTrue(disconnected.get()); - try { - ignite0.cluster().localNode().id(); - } - catch (IllegalStateException e) { - if (e.getMessage().contains("Grid is in invalid state to perform this operation")) - return; - } + String result = strLogger.toString(); - fail(); + assert result.contains("TcpDiscoverSpi's message worker thread failed abnormally") : result; } finally { stopAllGrids(); } } + + /** + * @throws Exception If failed + */ + public void testNoRingMessageWorkerAbnormalFailureOnSegmentation() throws Exception { + try { + TestMessageWorkerFailureSpi1 spi1 = new TestMessageWorkerFailureSpi1( + TestMessageWorkerFailureSpi1.SEGMENTATION_MODE); + + nodeSpi.set(spi1); + + Ignite ignite1 = startGrid("testNoRingMessageWorkerAbnormalFailureNormalNode"); + + + nodeSpi.set(new TcpDiscoverySpi()); + + final Ignite ignite2 = startGrid("testNoRingMessageWorkerAbnormalFailureSegmentedNode"); + + + final AtomicBoolean disconnected = new AtomicBoolean(); + + final AtomicBoolean segmented = new AtomicBoolean(); + + final CountDownLatch disLatch = new CountDownLatch(1); + + final CountDownLatch segLatch = new CountDownLatch(1); + + final UUID failedNodeId = ignite2.cluster().localNode().id(); + + ignite1.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (evt.type() == EventType.EVT_NODE_FAILED && + failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id())) + disconnected.set(true); + + disLatch.countDown(); + + return false; + } + }, EventType.EVT_NODE_FAILED); + + ignite2.events().localListen(new IgnitePredicate<Event>() { + @Override public boolean apply(Event evt) { + if (!failedNodeId.equals(((DiscoveryEvent)evt).eventNode().id())) + return true; + + if (evt.type() == EventType.EVT_NODE_SEGMENTED) { + segmented.set(true); + + segLatch.countDown(); + } + + return true; + } + }, EventType.EVT_NODE_SEGMENTED); + + + spi1.stop = true; + + disLatch.await(15, TimeUnit.SECONDS); + + assertTrue(disconnected.get()); + + + spi1.stop = false; + + segLatch.await(15, TimeUnit.SECONDS); + + assertTrue(segmented.get()); + + + Thread.sleep(10_000); + + + String result = strLogger.toString(); + + assert result.contains("Local node SEGMENTED") && + !result.contains("TcpDiscoverSpi's message worker thread failed abnormally") : result; + } + finally { + stopAllGrids(); + } + } + /** * @throws Exception If failed */ @@ -2071,14 +2164,41 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest { */ private static class TestMessageWorkerFailureSpi1 extends TcpDiscoverySpi { /** */ + private static int EXCEPTION_MODE = 0; + + /** */ + private static int SEGMENTATION_MODE = 1; + + /** */ + private final int failureMode; + + /** */ private volatile boolean stop; + /** + * @param failureMode Failure mode to use during the test. + */ + public TestMessageWorkerFailureSpi1(int failureMode) { + this.failureMode = failureMode; + } + /** {@inheritDoc} */ @Override protected void writeToSocket(Socket sock, OutputStream out, TcpDiscoveryAbstractMessage msg, long timeout) throws IOException, IgniteCheckedException { - if (stop) - throw new RuntimeException("Failing ring message worker explicitly"); + if (stop) { + if (failureMode == EXCEPTION_MODE) + throw new RuntimeException("Failing ring message worker explicitly"); + else { + try { + Thread.sleep(5_000); + } + catch (InterruptedException e) { + // Ignore. + } + } + + } super.writeToSocket(sock, out, msg, timeout); }
