This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a17655ebe7e3b2870b7616f1c2b640fcb3154187 Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Fri Feb 11 16:13:50 2022 +0800 [FLINK-24607] Add util methods to shutdown executor services. --- .../coordination/ComponentClosingUtils.java | 95 ++++++++++- .../coordination/ComponentClosingUtilsTest.java | 173 +++++++++++++++++++++ .../ManuallyTriggeredScheduledExecutorService.java | 2 +- 3 files changed, 266 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java index deed49e..4bfe302 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java @@ -18,16 +18,22 @@ limitations under the License. package org.apache.flink.runtime.operators.coordination; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.util.clock.Clock; +import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.ThrowingRunnable; import java.time.Duration; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** A util class to help with a clean component shutdown. */ public class ComponentClosingUtils { + private static Clock clock = SystemClock.getInstance(); /** Utility class, not meant to be instantiated. */ private ComponentClosingUtils() {} @@ -95,8 +101,91 @@ public class ComponentClosingUtils { return future; } - static void abortThread(Thread t) { - // the abortion strategy is pretty simple here... - t.interrupt(); + /** + * A util method that tries to shut down an {@link ExecutorService} elegantly within the given + * timeout. If the executor has not been shut down before it hits timeout or the thread is + * interrupted when waiting for the termination, a forceful shutdown will be attempted on the + * executor. + * + * @param executor the {@link ExecutorService} to shut down. + * @param timeout the timeout duration. + * @return true if the given executor has been successfully closed, false otherwise. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + public static boolean tryShutdownExecutorElegantly(ExecutorService executor, Duration timeout) { + try { + executor.shutdown(); + executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) { + // Let it go. + } + if (!executor.isTerminated()) { + shutdownExecutorForcefully(executor, Duration.ZERO, false); + } + return executor.isTerminated(); + } + + /** + * Shutdown the given executor forcefully within the given timeout. The method returns if it is + * interrupted. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @return true if the given executor is terminated, false otherwise. + */ + public static boolean shutdownExecutorForcefully(ExecutorService executor, Duration timeout) { + return shutdownExecutorForcefully(executor, timeout, true); + } + + /** + * Shutdown the given executor forcefully within the given timeout. + * + * @param executor the executor to shut down. + * @param timeout the timeout duration. + * @param interruptable when set to true, the method can be interrupted. Each interruption to + * the thread results in another {@code ExecutorService.shutdownNow()} call to the shutting + * down executor. + * @return true if the given executor is terminated, false otherwise. + */ + public static boolean shutdownExecutorForcefully( + ExecutorService executor, Duration timeout, boolean interruptable) { + Deadline deadline = Deadline.fromNowWithClock(timeout, clock); + boolean isInterrupted = false; + do { + executor.shutdownNow(); + try { + executor.awaitTermination(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + isInterrupted = interruptable; + } + } while (!isInterrupted && deadline.hasTimeLeft() && !executor.isTerminated()); + return executor.isTerminated(); + } + + private static void abortThread(Thread t) { + // Try our best here to ensure the thread is aborted. Keep interrupting the + // thread for 10 times with 10 ms intervals. This helps handle the case + // where the shutdown sequence consists of a bunch of closeQuietly() calls + // that will swallow the InterruptedException so the thread to be aborted + // may block multiple times. If the thread is still alive after all the + // attempts, just let it go. The caller of closeAsyncWithTimeout() should + // have received a TimeoutException in this case. + int i = 0; + while (t.isAlive() && i < 10) { + t.interrupt(); + i++; + try { + Thread.sleep(10); + } catch (InterruptedException e) { + // Let it go. + } + } + } + + // ========= Method visible for testing ======== + + @VisibleForTesting + static void setClock(Clock clock) { + ComponentClosingUtils.clock = clock; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java new file mode 100644 index 0000000..d43089d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtilsTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.coordination; + +import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; +import org.apache.flink.util.clock.ManualClock; + +import org.jetbrains.annotations.NotNull; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** The unit test class for {@link ComponentClosingUtils}. */ +public class ComponentClosingUtilsTest { + private ManualClock clock; + + @Before + public void setup() { + clock = new ManualClock(); + ComponentClosingUtils.setClock(clock); + } + + @Test + public void testTryShutdownExecutorElegantlyWithoutForcefulShutdown() { + MockExecutorService executor = new MockExecutorService(0); + assertTrue( + ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1))); + assertEquals(0, executor.forcefullyShutdownCount); + } + + @Test + public void testTryShutdownExecutorElegantlyWithForcefulShutdown() { + MockExecutorService executor = new MockExecutorService(5); + assertFalse( + ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1))); + assertEquals(1, executor.forcefullyShutdownCount); + } + + @Test + public void testTryShutdownExecutorElegantlyTimeoutWithForcefulShutdown() { + MockExecutorService executor = new MockExecutorService(5); + executor.timeoutAfterNumForcefulShutdown(clock, 0); + assertFalse( + ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1))); + assertEquals(1, executor.forcefullyShutdownCount); + } + + @Test + public void testTryShutdownExecutorElegantlyInterruptedWithForcefulShutdown() { + MockExecutorService executor = new MockExecutorService(5); + executor.interruptAfterNumForcefulShutdown(0); + assertFalse( + ComponentClosingUtils.tryShutdownExecutorElegantly(executor, Duration.ofDays(1))); + assertEquals(1, executor.forcefullyShutdownCount); + } + + @Test + public void testShutdownExecutorForcefully() { + MockExecutorService executor = new MockExecutorService(5); + assertTrue( + ComponentClosingUtils.shutdownExecutorForcefully( + executor, Duration.ofDays(1), false)); + assertEquals(5, executor.forcefullyShutdownCount); + } + + @Test + public void testShutdownExecutorForcefullyReachesTimeout() { + MockExecutorService executor = new MockExecutorService(5); + executor.timeoutAfterNumForcefulShutdown(clock, 1); + assertFalse( + ComponentClosingUtils.shutdownExecutorForcefully( + executor, Duration.ofDays(1), false)); + assertEquals(1, executor.forcefullyShutdownCount); + } + + @Test + public void testShutdownExecutorForcefullyNotInterruptable() { + MockExecutorService executor = new MockExecutorService(5); + executor.interruptAfterNumForcefulShutdown(1); + assertTrue( + ComponentClosingUtils.shutdownExecutorForcefully( + executor, Duration.ofDays(1), false)); + assertEquals(5, executor.forcefullyShutdownCount); + } + + @Test + public void testShutdownExecutorForcefullyInterruptable() { + MockExecutorService executor = new MockExecutorService(5); + executor.interruptAfterNumForcefulShutdown(1); + assertFalse( + ComponentClosingUtils.shutdownExecutorForcefully( + executor, Duration.ofDays(1), true)); + assertEquals(1, executor.forcefullyShutdownCount); + } + + // ============== private class for testing =============== + + /** An executor class that behaves in an orchestrated way. */ + private static final class MockExecutorService + extends ManuallyTriggeredScheduledExecutorService { + private final int numRequiredForcefullyShutdown; + private ManualClock clock; + private int forcefullyShutdownCount; + private int interruptAfterNumForcefulShutdown = Integer.MAX_VALUE; + private int timeoutAfterNumForcefulShutdown = Integer.MAX_VALUE; + + private MockExecutorService(int numRequiredForcefullyShutdown) { + this.numRequiredForcefullyShutdown = numRequiredForcefullyShutdown; + forcefullyShutdownCount = 0; + } + + @Override + public @NotNull List<Runnable> shutdownNow() { + forcefullyShutdownCount++; + return super.shutdownNow(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + if (forcefullyShutdownCount < numRequiredForcefullyShutdown) { + if (forcefullyShutdownCount >= timeoutAfterNumForcefulShutdown) { + clock.advanceTime(Duration.ofDays(100)); + } + if (forcefullyShutdownCount >= interruptAfterNumForcefulShutdown) { + throw new InterruptedException(); + } + } + return super.awaitTermination(timeout, unit) && reachedForcefulShutdownCount(); + } + + @Override + public boolean isTerminated() { + return super.isTerminated() && reachedForcefulShutdownCount(); + } + + public void interruptAfterNumForcefulShutdown(int interruptAfterNumForcefulShutdown) { + this.interruptAfterNumForcefulShutdown = interruptAfterNumForcefulShutdown; + } + + public void timeoutAfterNumForcefulShutdown( + ManualClock clock, int timeoutAfterNumForcefulShutdown) { + this.clock = clock; + this.timeoutAfterNumForcefulShutdown = timeoutAfterNumForcefulShutdown; + } + + private boolean reachedForcefulShutdownCount() { + return forcefullyShutdownCount >= numRequiredForcefullyShutdown; + } + } +} diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java index 15c1f7b..0ae1d28 100644 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java @@ -113,7 +113,7 @@ public class ManuallyTriggeredScheduledExecutorService implements ScheduledExecu } @Override - public boolean awaitTermination(long timeout, TimeUnit unit) { + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return true; }
