This is an automated email from the ASF dual-hosted git repository.

hossman pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/main by this push:
     new 70abd8db898 SOLR-17000: Fix ExecutorUtilTest to eliminate concurrency 
assumptions
70abd8db898 is described below

commit 70abd8db8980a91913f28771247a3c8418e2840e
Author: Chris Hostetter <[email protected]>
AuthorDate: Tue Oct 3 13:35:10 2023 -0700

    SOLR-17000: Fix ExecutorUtilTest to eliminate concurrency assumptions
---
 .../apache/solr/common/util/ExecutorUtilTest.java  | 180 ++++++++++++++-------
 1 file changed, 124 insertions(+), 56 deletions(-)

diff --git 
a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java 
b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
index c3fc2137018..a9df98a296b 100644
--- a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
+++ b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -32,74 +33,141 @@ import org.apache.solr.util.TimeOut;
 import org.junit.Test;
 
 public class ExecutorUtilTest extends SolrTestCase {
+
+  private static final long MAX_AWAIT_TERMINATION_ARG_MS = 100;
+
+  /**
+   * The maximum amount of time we're willing to let the test wait for any 
type of blocking action,
+   * no matter ow slow our CPU is. Any thing that exceeds this time is 
presumably a bug
+   */
+  private static final long MAX_SANE_WAIT_DURRATION_MS = 2_000;
+
+  /** Test that if there is a non interruptable thread that awaitTermination 
eventually returns. */
   @Test
   // Must prevent runaway failures so limit this to short timeframe in case of 
failure
-  @Timeout(millis = 3000)
-  public void testExecutorUtilAwaitsTerminationEnds() throws Exception {
-    final long awaitTerminationTimeout = 100;
-    final long threadTimeoutDuration = 3 * awaitTerminationTimeout;
-    final TimeUnit testTimeUnit = TimeUnit.MILLISECONDS;
+  @Timeout(millis = 5_000)
+  public void testExecutorUtilAwaitsTerminationWhenTaskIgnoresInterupt() 
throws Exception {
 
-    // check that if there is a non interruptable thread that awaitTermination 
eventually returns.
-
-    ExecutorService executorService =
+    final Worker w = new Worker(false);
+    final ExecutorService executorService =
         ExecutorUtil.newMDCAwareSingleThreadExecutor(
             new SolrNamedThreadFactory(this.getClass().getSimpleName() + 
"non-interruptable"));
-    final AtomicInteger interruptCount = new AtomicInteger();
-    Future<Boolean> nonInterruptableFuture =
-        executorService.submit(
-            () -> getTestThread(threadTimeoutDuration, testTimeUnit, 
interruptCount, false));
-    executorService.shutdownNow();
-    assertThrows(
-        RuntimeException.class,
-        () ->
-            ExecutorUtil.awaitTermination(executorService, 
awaitTerminationTimeout, testTimeUnit));
-
-    // Thread should not have finished in await termination.
-    assertFalse(nonInterruptableFuture.isDone());
-    assertTrue(interruptCount.get() > 0);
-
-    // Thread should have finished by now.
-    Thread.sleep(TimeUnit.MILLISECONDS.convert(threadTimeoutDuration, 
testTimeUnit));
-    assertTrue(nonInterruptableFuture.isDone());
-    assertTrue(nonInterruptableFuture.get());
-
-    // check that if there is an interruptable thread that awaitTermination 
forcefully returns.
-
-    ExecutorService executorService2 =
+    try {
+      final Future<Boolean> f = executorService.submit(w);
+
+      assertTrue("Worker didn't start in a sane amount of time", 
w.awaitWorkerStart());
+      executorService.shutdownNow();
+      assertTrue(
+          "Worker not interupted by shutdown in a sane amount of time",
+          w.awaitWorkerInteruptedAtLeastOnce());
+      assertThrows(
+          RuntimeException.class,
+          () ->
+              ExecutorUtil.awaitTermination(
+                  executorService, MAX_AWAIT_TERMINATION_ARG_MS, 
TimeUnit.MILLISECONDS));
+
+      assertFalse("Worker should not finish due to shutdown or 
awaitTermination", f.isDone());
+      assertTrue("test sanity check: WTF? how did we get here?", 
w.getNumberOfInterupts() > 0);
+
+      // Worker should finish if we let it
+      w.tellWorkerToFinish();
+      assertTrue(f.get(MAX_SANE_WAIT_DURRATION_MS, TimeUnit.MILLISECONDS));
+    } finally {
+      w.tellWorkerToFinish();
+      ExecutorUtil.shutdownNowAndAwaitTermination(executorService);
+    }
+  }
+
+  /** Test that if there is an interruptable thread that awaitTermination 
forcefully returns. */
+  @Test
+  // Must prevent runaway failures so limit this to short timeframe in case of 
failure
+  @Timeout(millis = 5_000)
+  public void testExecutorUtilAwaitsTerminationWhenTaskRespectsInterupt() 
throws Exception {
+
+    final Worker w = new Worker(true);
+    final ExecutorService executorService =
         ExecutorUtil.newMDCAwareSingleThreadExecutor(
             new SolrNamedThreadFactory(this.getClass().getSimpleName() + 
"interruptable"));
-    interruptCount.set(0);
-    Future<Boolean> interruptableFuture =
-        executorService2.submit(
-            () -> getTestThread(threadTimeoutDuration, testTimeUnit, 
interruptCount, true));
-    executorService2.shutdownNow();
-    ExecutorUtil.awaitTermination(executorService2, awaitTerminationTimeout, 
testTimeUnit);
-
-    // Thread should have been interrupted.
-    assertTrue(interruptableFuture.isDone());
-    assertTrue(interruptCount.get() > 0);
-    assertFalse(interruptableFuture.get());
+    try {
+      final Future<Boolean> f = executorService.submit(w);
+
+      assertTrue("Worker didn't start in a sane amount of time", 
w.awaitWorkerStart());
+      executorService.shutdownNow();
+      ExecutorUtil.awaitTermination(
+          executorService, MAX_AWAIT_TERMINATION_ARG_MS, 
TimeUnit.MILLISECONDS);
+
+      // Worker should finish on it's own after the interupt
+      assertTrue(
+          "Worker not interupted in a sane amount of time", 
w.awaitWorkerInteruptedAtLeastOnce());
+      assertFalse(f.get(MAX_SANE_WAIT_DURRATION_MS, TimeUnit.MILLISECONDS));
+      assertTrue("test sanity check: WTF? how did we get here?", 
w.getNumberOfInterupts() > 0);
+
+    } finally {
+      w.tellWorkerToFinish();
+      ExecutorUtil.shutdownNowAndAwaitTermination(executorService);
+    }
   }
 
-  private boolean getTestThread(
-      long threadTimeoutDuration,
-      TimeUnit testTimeUnit,
-      AtomicInteger interruptCount,
-      boolean interruptable) {
-    TimeOut threadTimeout = new TimeOut(threadTimeoutDuration, testTimeUnit, 
TimeSource.NANO_TIME);
-    while (!threadTimeout.hasTimedOut()) {
-      try {
-        
threadTimeout.sleep(TimeUnit.MILLISECONDS.convert(threadTimeoutDuration, 
testTimeUnit));
-      } catch (InterruptedException interruptedException) {
-        interruptCount.incrementAndGet();
-        if (interruptable) {
-          Thread.currentThread().interrupt();
-          return false; // didn't run full time
+  private static final class Worker implements Callable<Boolean> {
+    // how we communiate out to our caller
+    private final CountDownLatch taskStartedLatch = new CountDownLatch(1);
+    private final CountDownLatch gotFirstInteruptLatch = new CountDownLatch(1);
+    private final AtomicInteger interruptCount = new AtomicInteger(0);
+
+    // how our caller communicates with us
+    private final CountDownLatch allowedToFinishLatch = new CountDownLatch(1);
+    private final boolean interruptable;
+
+    public Worker(final boolean interruptable) {
+      this.interruptable = interruptable;
+    }
+
+    /** Returns false if worker doesn't start in a sane amount of time */
+    public boolean awaitWorkerStart() throws InterruptedException {
+      return taskStartedLatch.await(MAX_SANE_WAIT_DURRATION_MS, 
TimeUnit.MILLISECONDS);
+    }
+
+    /** Returns false if worker didn't recieve interupt in a sane amount of 
time */
+    public boolean awaitWorkerInteruptedAtLeastOnce() throws 
InterruptedException {
+      return gotFirstInteruptLatch.await(MAX_SANE_WAIT_DURRATION_MS, 
TimeUnit.MILLISECONDS);
+    }
+
+    public int getNumberOfInterupts() {
+      return interruptCount.get();
+    }
+
+    public void tellWorkerToFinish() {
+      allowedToFinishLatch.countDown();
+    }
+
+    @Override
+    public Boolean call() {
+      // aboslute last resort timeout to prevent infinite while loop
+      final TimeOut threadTimeout =
+          new TimeOut(MAX_SANE_WAIT_DURRATION_MS, TimeUnit.MILLISECONDS, 
TimeSource.NANO_TIME);
+
+      while (!threadTimeout.hasTimedOut()) {
+        try {
+
+          // this must be inside the try block, so we'll still catch the 
InterruptedException if our
+          // caller shutsdown & awaits termination before we get a chance to 
start await'ing...
+          taskStartedLatch.countDown();
+
+          if (allowedToFinishLatch.await(
+              threadTimeout.timeLeft(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS)) {
+            return true; // ran until we were told to stop
+          }
+        } catch (InterruptedException interruptedException) {
+          interruptCount.incrementAndGet();
+          gotFirstInteruptLatch.countDown();
+          if (interruptable) {
+            Thread.currentThread().interrupt();
+            return false; // was explicitly interupted
+          }
         }
       }
+      throw new RuntimeException("Sane timeout elapsed before worker 
finished");
     }
-    return true; // ran full time
   }
 
   @Test

Reply via email to