This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 88b5f47 IGNITE-13575 Fix invalid blocking thread reporting waiting on
selector.select. Fix infinite loop while only one thread is registered in
WorkersRegistry. (#8354)
88b5f47 is described below
commit 88b5f4798a4a2ff3f5c1e7c981b7927a8a06854b
Author: Ivan Daschinskiy <[email protected]>
AuthorDate: Wed Oct 14 10:25:38 2020 +0300
IGNITE-13575 Fix invalid blocking thread reporting waiting on
selector.select. Fix infinite loop while only one thread is registered in
WorkersRegistry. (#8354)
---
.../ignite/internal/util/nio/GridNioServer.java | 21 +++-
.../ignite/internal/worker/WorkersRegistry.java | 2 +-
.../ignite/failure/SystemWorkersBlockingTest.java | 106 ++++++++++++++++-----
3 files changed, 97 insertions(+), 32 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 41574ee..d52da34 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -2233,15 +2233,21 @@ public class GridNioServer<T> {
if (!changeReqs.isEmpty())
continue;
- updateHeartbeat();
+ blockingSectionBegin();
// Wake up every 2 seconds to check if closed.
- if (selector.select(2000) > 0) {
+ int numKeys = selector.select(2000);
+
+ blockingSectionEnd();
+
+ if (numKeys > 0) {
// Walk through the ready keys collection and
process network events.
if (selectedKeys == null)
processSelectedKeys(selector.selectedKeys());
else
processSelectedKeysOptimized(selectedKeys.flip());
+
+ updateHeartbeat();
}
// select() call above doesn't throw on interruption;
checking it here to propagate timely.
@@ -3037,14 +3043,19 @@ public class GridNioServer<T> {
private void accept() throws IgniteCheckedException {
try {
while (!closed && selector.isOpen() &&
!Thread.currentThread().isInterrupted()) {
- updateHeartbeat();
+ blockingSectionBegin();
// Wake up every 2 seconds to check if closed.
- if (selector.select(2000) > 0)
+ int numKeys = selector.select(2000);
+
+ blockingSectionEnd();
+
+ if (numKeys > 0) {
// Walk through the ready keys collection and process
date requests.
processSelectedKeys(selector.selectedKeys());
- else
+
updateHeartbeat();
+ }
if (balancer != null)
balancer.run();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
index 3cf1d03..5829b3c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/worker/WorkersRegistry.java
@@ -178,7 +178,7 @@ public class WorkersRegistry implements GridWorkerListener {
Thread prevCheckerThread = lastChecker.get();
- if (prevCheckerThread == null ||
+ if (prevCheckerThread == null || registeredWorkers.size() < 2 ||
U.currentTimeMillis() - lastCheckTs <= checkInterval ||
!lastChecker.compareAndSet(prevCheckerThread, null))
return;
diff --git
a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
index 8a84af8..8455f87 100644
---
a/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/failure/SystemWorkersBlockingTest.java
@@ -21,10 +21,15 @@ import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.worker.GridWorker;
+import org.apache.ignite.internal.worker.WorkersRegistry;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.thread.IgniteThread;
import org.junit.Test;
@@ -33,23 +38,29 @@ import org.junit.Test;
* Tests the handling of long blocking operations in system-critical workers.
*/
public class SystemWorkersBlockingTest extends GridCommonAbstractTest {
+ /** */
+ private static final long SYSTEM_WORKER_BLOCKED_TIMEOUT = 1_000L;
+
/** Handler latch. */
- private static volatile CountDownLatch hndLatch;
+ private final CountDownLatch hndLatch = new CountDownLatch(1);
- /** */
- private static final long FAILURE_DETECTION_TIMEOUT = 5_000;
+ /** Reference to failure error. */
+ private final AtomicReference<Throwable> failureError = new
AtomicReference<>();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
// Set small value for the test.
- cfg.setSystemWorkerBlockedTimeout(1_000);
+ cfg.setSystemWorkerBlockedTimeout(SYSTEM_WORKER_BLOCKED_TIMEOUT);
AbstractFailureHandler failureHnd = new AbstractFailureHandler() {
@Override protected boolean handle(Ignite ignite, FailureContext
failureCtx) {
- if (failureCtx.type() == FailureType.SYSTEM_WORKER_BLOCKED)
+ if (failureCtx.type() == FailureType.SYSTEM_WORKER_BLOCKED) {
+ failureError.set(failureCtx.error());
+
hndLatch.countDown();
+ }
return false;
}
@@ -63,21 +74,10 @@ public class SystemWorkersBlockingTest extends
GridCommonAbstractTest {
cfg.setFailureHandler(failureHnd);
- cfg.setFailureDetectionTimeout(FAILURE_DETECTION_TIMEOUT);
-
return cfg;
}
/** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- super.beforeTest();
-
- hndLatch = new CountDownLatch(1);
-
- startGrid(0);
- }
-
- /** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
@@ -89,28 +89,82 @@ public class SystemWorkersBlockingTest extends
GridCommonAbstractTest {
*/
@Test
public void testBlockingWorker() throws Exception {
- IgniteEx ignite = grid(0);
+ IgniteEx ignite = startGrid(0);
+
+ CountDownLatch blockLatch = new CountDownLatch(1);
GridWorker worker = new GridWorker(ignite.name(), "test-worker", log) {
@Override protected void body() throws InterruptedException {
- Thread.sleep(Long.MAX_VALUE);
+ blockLatch.await();
}
};
- new IgniteThread(worker).start();
+ IgniteThread runner = null;
+ try {
+ runner = runWorker(worker);
+
+ ignite.context().workersRegistry().register(worker);
+
+ assertTrue(hndLatch.await(SYSTEM_WORKER_BLOCKED_TIMEOUT * 2,
TimeUnit.MILLISECONDS));
+
+ Throwable err = failureError.get();
+
+ assertNotNull(err);
+ assertTrue(err.getMessage() != null &&
err.getMessage().contains("test-worker"));
+ }
+ finally {
+ if (runner != null) {
+ blockLatch.countDown();
+
+ runner.join(SYSTEM_WORKER_BLOCKED_TIMEOUT);
+ }
+ }
+ }
+
+ /**
+ * Tests that repeatedly calling {@link WorkersRegistry#onIdle} in single
registered {@link GridWorker}
+ * doesn't lead to infinite loop.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testSingleWorker_NotInInfiniteLoop() throws Exception {
+ WorkersRegistry registry = new WorkersRegistry((w, e) -> {},
SYSTEM_WORKER_BLOCKED_TIMEOUT, log());
+
+ CountDownLatch finishLatch = new CountDownLatch(1);
- while (worker.runner() == null)
- Thread.sleep(10);
+ GridWorker worker = new GridWorker("test", "test-worker", log,
registry) {
+ @Override protected void body() {
+ while (!Thread.currentThread().isInterrupted()) {
+ onIdle();
- ignite.context().workersRegistry().register(worker);
+ LockSupport.parkNanos(1000);
+ }
-
assertTrue(hndLatch.await(ignite.configuration().getFailureDetectionTimeout() *
2, TimeUnit.MILLISECONDS));
+ finishLatch.countDown();
+ }
+ };
- Thread runner = worker.runner();
+ IgniteThread runner = runWorker(worker);
+
+ Thread.sleep(2 * SYSTEM_WORKER_BLOCKED_TIMEOUT);
runner.interrupt();
- runner.join(1000);
- assertFalse(runner.isAlive());
+ assertTrue(finishLatch.await(SYSTEM_WORKER_BLOCKED_TIMEOUT,
TimeUnit.MILLISECONDS));
+ }
+
+ /**
+ * @param worker Grid worker to run.
+ * @return Thread, running worker.
+ */
+ private IgniteThread runWorker(GridWorker worker) throws
IgniteInterruptedCheckedException {
+ IgniteThread runner = new IgniteThread(worker);
+
+ runner.start();
+
+ GridTestUtils.waitForCondition(() -> worker.runner() != null, 100);
+
+ return runner;
}
}