This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8e99cc5 IGNITE-12252 Unchecked exceptions during rebalancing should
be handled (#6965)
8e99cc5 is described below
commit 8e99cc5b5baf21fe8b2a1fdfca25e2c523889ed4
Author: Nikolai Kulagin <[email protected]>
AuthorDate: Thu Apr 30 18:39:39 2020 +0300
IGNITE-12252 Unchecked exceptions during rebalancing should be handled
(#6965)
---
.../org/apache/ignite/internal/IgnitionEx.java | 11 ++++--
.../failure/FailureHandlerTriggeredTest.java | 40 ++++++++++++++++++++++
2 files changed, 49 insertions(+), 2 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 5ea5baa..ff6ddfb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1752,6 +1752,13 @@ public class IgnitionEx {
}
};
+ UncaughtExceptionHandler excHnd = new UncaughtExceptionHandler() {
+ @Override public void uncaughtException(Thread t, Throwable e)
{
+ if (grid != null)
+ grid.context().failure().process(new
FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
+ };
+
execSvc = new IgniteThreadPoolExecutor(
"pub",
cfg.getIgniteInstanceName(),
@@ -2007,7 +2014,7 @@ public class IgnitionEx {
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<>(),
GridIoPolicy.UNDEFINED,
- oomeHnd);
+ excHnd);
rebalanceExecSvc.allowCoreThreadTimeOut(true);
@@ -2015,7 +2022,7 @@ public class IgnitionEx {
cfg.getRebalanceThreadPoolSize(),
cfg.getIgniteInstanceName(),
"rebalance-striped",
- oomeHnd,
+ excHnd,
true,
DFLT_THREAD_KEEP_ALIVE_TIME);
diff --git
a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
index 67ec311..2abdd58 100644
---
a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
@@ -19,6 +19,9 @@ package org.apache.ignite.failure;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import
org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
@@ -31,6 +34,8 @@ import
org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
+import static
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
+
/**
* Test of triggering of failure handler.
*/
@@ -68,6 +73,41 @@ public class FailureHandlerTriggeredTest extends
GridCommonAbstractTest {
}
/**
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testFailureHandlerTriggeredOnUncheckedErrorOnRebalancing()
throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+
+ TestFailureHandler hnd = new TestFailureHandler(false, latch);
+
+ IgniteEx grid0 = startGrid(getConfiguration(testNodeName(0))
+ .setIncludeEventTypes(EVT_CACHE_REBALANCE_OBJECT_LOADED)
+ .setFailureHandler(hnd));
+
+ grid0.getOrCreateCache(new CacheConfiguration<>()
+ .setName(DEFAULT_CACHE_NAME)
+ .setCacheMode(CacheMode.REPLICATED))
+ .put(1,1);
+
+ grid0.cluster().baselineAutoAdjustEnabled(false);
+
+ IgniteEx grid1 = startGrid(getConfiguration(testNodeName(1))
+ .setIncludeEventTypes(EVT_CACHE_REBALANCE_OBJECT_LOADED)
+ .setFailureHandler(hnd));
+
+ grid1.events().localListen(e -> { throw new Error(); },
EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED);
+
+ grid1.cluster().setBaselineTopology(grid1.cluster().topologyVersion());
+
+ assertTrue(latch.await(3, TimeUnit.SECONDS));
+
+ assertNotNull(hnd.failureCtx);
+
+ assertEquals(hnd.failureCtx.type(), FailureType.CRITICAL_ERROR);
+ }
+
+ /**
* Custom exchange worker task implementation for delaying exchange worker
processing.
*/
static class ExchangeWorkerFailureTask extends SchemaExchangeWorkerTask
implements CachePartitionExchangeWorkerTask {