This is an automated email from the ASF dual-hosted git repository. krisden pushed a commit to branch branch_9_0 in repository https://gitbox.apache.org/repos/asf/solr.git
commit a066431bb59e49b32e13e41da2c9a7daf80cab6d Author: Kevin Risden <[email protected]> AuthorDate: Tue Sep 6 10:30:09 2022 -0400 SOLR-16187: ExecutorUtil#awaitTermination shouldn't wait forever (#840) --- .../org/apache/solr/common/util/ExecutorUtil.java | 28 +++++-- .../apache/solr/common/util/ExecutorUtilTest.java | 98 ++++++++++++++++++++++ 2 files changed, 118 insertions(+), 8 deletions(-) diff --git a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java index d4e1673c906..e85470d70fd 100644 --- a/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java +++ b/solr/solrj/src/java/org/apache/solr/common/util/ExecutorUtil.java @@ -96,15 +96,27 @@ public class ExecutorUtil { } public static void awaitTermination(ExecutorService pool) { - boolean shutdown = false; - while (!shutdown) { - try { - // Wait a while for existing tasks to terminate - shutdown = pool.awaitTermination(60, TimeUnit.SECONDS); - } catch (InterruptedException ie) { - // Preserve interrupt status - Thread.currentThread().interrupt(); + awaitTermination(pool, 60, TimeUnit.SECONDS); + } + + // Used in testing to not have to wait the full 60 seconds. + static void awaitTermination(ExecutorService pool, long timeout, TimeUnit unit) { + try { + // Wait a while for existing tasks to terminate. + if (!pool.awaitTermination(timeout, unit)) { + // We want to force shutdown any remaining threads. + pool.shutdownNow(); + // Wait again for forced threads to stop. + if (!pool.awaitTermination(timeout, unit)) { + log.error("Threads from pool {} did not forcefully stop.", pool); + throw new RuntimeException("Timeout waiting for pool " + pool + " to shutdown."); + } } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); } } diff --git a/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java new file mode 100644 index 00000000000..d7cfbe9c321 --- /dev/null +++ b/solr/solrj/src/test/org/apache/solr/common/util/ExecutorUtilTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.common.util; + +import com.carrotsearch.randomizedtesting.annotations.Timeout; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.solr.SolrTestCase; +import org.apache.solr.util.TimeOut; +import org.junit.Test; + +public class ExecutorUtilTest extends SolrTestCase { + @Test + // Must prevent runaway failures so limit this to short timeframe in case of failure + @Timeout(millis = 3000) + public void testExecutorUtilAwaitsTerminationEnds() throws Exception { + final long awaitTerminationTimeout = 100; + final long threadTimeoutDuration = 3 * awaitTerminationTimeout; + final TimeUnit testTimeUnit = TimeUnit.MILLISECONDS; + + // check that if there is a non interruptable thread that awaitTermination eventually returns. + + ExecutorService executorService = + ExecutorUtil.newMDCAwareSingleThreadExecutor( + new SolrNamedThreadFactory(this.getClass().getSimpleName() + "non-interruptable")); + final AtomicInteger interruptCount = new AtomicInteger(); + Future<Boolean> nonInterruptableFuture = + executorService.submit( + () -> getTestThread(threadTimeoutDuration, testTimeUnit, interruptCount, false)); + executorService.shutdownNow(); + assertThrows( + RuntimeException.class, + () -> + ExecutorUtil.awaitTermination(executorService, awaitTerminationTimeout, testTimeUnit)); + + // Thread should not have finished in await termination. + assertFalse(nonInterruptableFuture.isDone()); + assertTrue(interruptCount.get() > 0); + + // Thread should have finished by now. + Thread.sleep(TimeUnit.MILLISECONDS.convert(threadTimeoutDuration, testTimeUnit)); + assertTrue(nonInterruptableFuture.isDone()); + assertTrue(nonInterruptableFuture.get()); + + // check that if there is an interruptable thread that awaitTermination forcefully returns. + + ExecutorService executorService2 = + ExecutorUtil.newMDCAwareSingleThreadExecutor( + new SolrNamedThreadFactory(this.getClass().getSimpleName() + "interruptable")); + interruptCount.set(0); + Future<Boolean> interruptableFuture = + executorService2.submit( + () -> getTestThread(threadTimeoutDuration, testTimeUnit, interruptCount, true)); + executorService2.shutdownNow(); + ExecutorUtil.awaitTermination(executorService2, awaitTerminationTimeout, testTimeUnit); + + // Thread should have been interrupted. + assertTrue(interruptableFuture.isDone()); + assertTrue(interruptCount.get() > 0); + assertFalse(interruptableFuture.get()); + } + + private boolean getTestThread( + long threadTimeoutDuration, + TimeUnit testTimeUnit, + AtomicInteger interruptCount, + boolean interruptable) { + TimeOut threadTimeout = new TimeOut(threadTimeoutDuration, testTimeUnit, TimeSource.NANO_TIME); + while (!threadTimeout.hasTimedOut()) { + try { + threadTimeout.sleep(TimeUnit.MILLISECONDS.convert(threadTimeoutDuration, testTimeUnit)); + } catch (InterruptedException interruptedException) { + interruptCount.incrementAndGet(); + if (interruptable) { + Thread.currentThread().interrupt(); + return false; // didn't run full time + } + } + } + return true; // ran full time + } +}
