This is an automated email from the ASF dual-hosted git repository.
hossman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/main by this push:
new 70abd8db898 SOLR-17000: Fix ExecutorUtilTest to eliminate concurrency
assumptions
70abd8db898 is described below
commit 70abd8db8980a91913f28771247a3c8418e2840e
Author: Chris Hostetter <[email protected]>
AuthorDate: Tue Oct 3 13:35:10 2023 -0700
SOLR-17000: Fix ExecutorUtilTest to eliminate concurrency assumptions
---
.../apache/solr/common/util/ExecutorUtilTest.java | 180 ++++++++++++++-------
1 file changed, 124 insertions(+), 56 deletions(-)
diff --git
a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
index c3fc2137018..a9df98a296b 100644
--- a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
+++ b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -32,74 +33,141 @@ import org.apache.solr.util.TimeOut;
import org.junit.Test;
public class ExecutorUtilTest extends SolrTestCase {
+
+ private static final long MAX_AWAIT_TERMINATION_ARG_MS = 100;
+
+ /**
+ * The maximum amount of time we're willing to let the test wait for any
type of blocking action,
+ * no matter ow slow our CPU is. Any thing that exceeds this time is
presumably a bug
+ */
+ private static final long MAX_SANE_WAIT_DURRATION_MS = 2_000;
+
+ /** Test that if there is a non interruptable thread that awaitTermination
eventually returns. */
@Test
// Must prevent runaway failures so limit this to short timeframe in case of
failure
- @Timeout(millis = 3000)
- public void testExecutorUtilAwaitsTerminationEnds() throws Exception {
- final long awaitTerminationTimeout = 100;
- final long threadTimeoutDuration = 3 * awaitTerminationTimeout;
- final TimeUnit testTimeUnit = TimeUnit.MILLISECONDS;
+ @Timeout(millis = 5_000)
+ public void testExecutorUtilAwaitsTerminationWhenTaskIgnoresInterupt()
throws Exception {
- // check that if there is a non interruptable thread that awaitTermination
eventually returns.
-
- ExecutorService executorService =
+ final Worker w = new Worker(false);
+ final ExecutorService executorService =
ExecutorUtil.newMDCAwareSingleThreadExecutor(
new SolrNamedThreadFactory(this.getClass().getSimpleName() +
"non-interruptable"));
- final AtomicInteger interruptCount = new AtomicInteger();
- Future<Boolean> nonInterruptableFuture =
- executorService.submit(
- () -> getTestThread(threadTimeoutDuration, testTimeUnit,
interruptCount, false));
- executorService.shutdownNow();
- assertThrows(
- RuntimeException.class,
- () ->
- ExecutorUtil.awaitTermination(executorService,
awaitTerminationTimeout, testTimeUnit));
-
- // Thread should not have finished in await termination.
- assertFalse(nonInterruptableFuture.isDone());
- assertTrue(interruptCount.get() > 0);
-
- // Thread should have finished by now.
- Thread.sleep(TimeUnit.MILLISECONDS.convert(threadTimeoutDuration,
testTimeUnit));
- assertTrue(nonInterruptableFuture.isDone());
- assertTrue(nonInterruptableFuture.get());
-
- // check that if there is an interruptable thread that awaitTermination
forcefully returns.
-
- ExecutorService executorService2 =
+ try {
+ final Future<Boolean> f = executorService.submit(w);
+
+ assertTrue("Worker didn't start in a sane amount of time",
w.awaitWorkerStart());
+ executorService.shutdownNow();
+ assertTrue(
+ "Worker not interupted by shutdown in a sane amount of time",
+ w.awaitWorkerInteruptedAtLeastOnce());
+ assertThrows(
+ RuntimeException.class,
+ () ->
+ ExecutorUtil.awaitTermination(
+ executorService, MAX_AWAIT_TERMINATION_ARG_MS,
TimeUnit.MILLISECONDS));
+
+ assertFalse("Worker should not finish due to shutdown or
awaitTermination", f.isDone());
+ assertTrue("test sanity check: WTF? how did we get here?",
w.getNumberOfInterupts() > 0);
+
+ // Worker should finish if we let it
+ w.tellWorkerToFinish();
+ assertTrue(f.get(MAX_SANE_WAIT_DURRATION_MS, TimeUnit.MILLISECONDS));
+ } finally {
+ w.tellWorkerToFinish();
+ ExecutorUtil.shutdownNowAndAwaitTermination(executorService);
+ }
+ }
+
+ /** Test that if there is an interruptable thread that awaitTermination
forcefully returns. */
+ @Test
+ // Must prevent runaway failures so limit this to short timeframe in case of
failure
+ @Timeout(millis = 5_000)
+ public void testExecutorUtilAwaitsTerminationWhenTaskRespectsInterupt()
throws Exception {
+
+ final Worker w = new Worker(true);
+ final ExecutorService executorService =
ExecutorUtil.newMDCAwareSingleThreadExecutor(
new SolrNamedThreadFactory(this.getClass().getSimpleName() +
"interruptable"));
- interruptCount.set(0);
- Future<Boolean> interruptableFuture =
- executorService2.submit(
- () -> getTestThread(threadTimeoutDuration, testTimeUnit,
interruptCount, true));
- executorService2.shutdownNow();
- ExecutorUtil.awaitTermination(executorService2, awaitTerminationTimeout,
testTimeUnit);
-
- // Thread should have been interrupted.
- assertTrue(interruptableFuture.isDone());
- assertTrue(interruptCount.get() > 0);
- assertFalse(interruptableFuture.get());
+ try {
+ final Future<Boolean> f = executorService.submit(w);
+
+ assertTrue("Worker didn't start in a sane amount of time",
w.awaitWorkerStart());
+ executorService.shutdownNow();
+ ExecutorUtil.awaitTermination(
+ executorService, MAX_AWAIT_TERMINATION_ARG_MS,
TimeUnit.MILLISECONDS);
+
+ // Worker should finish on it's own after the interupt
+ assertTrue(
+ "Worker not interupted in a sane amount of time",
w.awaitWorkerInteruptedAtLeastOnce());
+ assertFalse(f.get(MAX_SANE_WAIT_DURRATION_MS, TimeUnit.MILLISECONDS));
+ assertTrue("test sanity check: WTF? how did we get here?",
w.getNumberOfInterupts() > 0);
+
+ } finally {
+ w.tellWorkerToFinish();
+ ExecutorUtil.shutdownNowAndAwaitTermination(executorService);
+ }
}
- private boolean getTestThread(
- long threadTimeoutDuration,
- TimeUnit testTimeUnit,
- AtomicInteger interruptCount,
- boolean interruptable) {
- TimeOut threadTimeout = new TimeOut(threadTimeoutDuration, testTimeUnit,
TimeSource.NANO_TIME);
- while (!threadTimeout.hasTimedOut()) {
- try {
-
threadTimeout.sleep(TimeUnit.MILLISECONDS.convert(threadTimeoutDuration,
testTimeUnit));
- } catch (InterruptedException interruptedException) {
- interruptCount.incrementAndGet();
- if (interruptable) {
- Thread.currentThread().interrupt();
- return false; // didn't run full time
+ private static final class Worker implements Callable<Boolean> {
+ // how we communiate out to our caller
+ private final CountDownLatch taskStartedLatch = new CountDownLatch(1);
+ private final CountDownLatch gotFirstInteruptLatch = new CountDownLatch(1);
+ private final AtomicInteger interruptCount = new AtomicInteger(0);
+
+ // how our caller communicates with us
+ private final CountDownLatch allowedToFinishLatch = new CountDownLatch(1);
+ private final boolean interruptable;
+
+ public Worker(final boolean interruptable) {
+ this.interruptable = interruptable;
+ }
+
+ /** Returns false if worker doesn't start in a sane amount of time */
+ public boolean awaitWorkerStart() throws InterruptedException {
+ return taskStartedLatch.await(MAX_SANE_WAIT_DURRATION_MS,
TimeUnit.MILLISECONDS);
+ }
+
+ /** Returns false if worker didn't recieve interupt in a sane amount of
time */
+ public boolean awaitWorkerInteruptedAtLeastOnce() throws
InterruptedException {
+ return gotFirstInteruptLatch.await(MAX_SANE_WAIT_DURRATION_MS,
TimeUnit.MILLISECONDS);
+ }
+
+ public int getNumberOfInterupts() {
+ return interruptCount.get();
+ }
+
+ public void tellWorkerToFinish() {
+ allowedToFinishLatch.countDown();
+ }
+
+ @Override
+ public Boolean call() {
+ // aboslute last resort timeout to prevent infinite while loop
+ final TimeOut threadTimeout =
+ new TimeOut(MAX_SANE_WAIT_DURRATION_MS, TimeUnit.MILLISECONDS,
TimeSource.NANO_TIME);
+
+ while (!threadTimeout.hasTimedOut()) {
+ try {
+
+ // this must be inside the try block, so we'll still catch the
InterruptedException if our
+ // caller shutsdown & awaits termination before we get a chance to
start await'ing...
+ taskStartedLatch.countDown();
+
+ if (allowedToFinishLatch.await(
+ threadTimeout.timeLeft(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS)) {
+ return true; // ran until we were told to stop
+ }
+ } catch (InterruptedException interruptedException) {
+ interruptCount.incrementAndGet();
+ gotFirstInteruptLatch.countDown();
+ if (interruptable) {
+ Thread.currentThread().interrupt();
+ return false; // was explicitly interupted
+ }
}
}
+ throw new RuntimeException("Sane timeout elapsed before worker
finished");
}
- return true; // ran full time
}
@Test