This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a6206cdd88ad24731c470cdd779d310b1c2cbffc Author: Stephan Ewen <[email protected]> AuthorDate: Sun Nov 22 15:28:29 2020 +0100 [FLINK-20266][runtime] Replace dedicated thread pool in ComponentClosingUtils with use of 'FutureUtils.orTimeout()' This removes extra (non-daemon) threads, which were previously keeping the threads alive. --- .../coordination}/ComponentClosingUtils.java | 51 ++++++++++------------ .../RecreateOnResetOperatorCoordinator.java | 2 +- 2 files changed, 23 insertions(+), 30 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java similarity index 71% rename from flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java index 65c83ae..581204a 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java @@ -16,17 +16,12 @@ limitations under the License. */ -package org.apache.flink.util; +package org.apache.flink.runtime.operators.coordination; +import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.util.function.ThrowingRunnable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -34,18 +29,8 @@ import java.util.concurrent.TimeoutException; * A util class to help with a clean component shutdown. */ public class ComponentClosingUtils { - private static final Logger LOG = LoggerFactory.getLogger(ComponentClosingUtils.class); - // A shared watchdog executor to handle the timeout closing. - private static final ScheduledExecutorService watchDog = - Executors.newSingleThreadScheduledExecutor((ThreadFactory) r -> { - Thread t = new Thread(r, "ComponentClosingUtil"); - t.setUncaughtExceptionHandler((thread, exception) -> { - LOG.error("FATAL: The component closing util thread caught exception ", exception); - System.exit(-17); - }); - return t; - }); + /** Utility class, not meant to be instantiated. */ private ComponentClosingUtils() {} /** @@ -85,26 +70,34 @@ public class ComponentClosingUtils { long closeTimeoutMs) { final CompletableFuture<Void> future = new CompletableFuture<>(); // Start a dedicate thread to close the component. - Thread t = new Thread(() -> { + final Thread t = new Thread(() -> { closingSequence.run(); future.complete(null); }); // Use uncaught exception handler to handle exceptions during closing. t.setUncaughtExceptionHandler((thread, error) -> future.completeExceptionally(error)); t.start(); - // Schedule a watch dog job to the watching executor to detect timeout when - // closing the component. - watchDog.schedule(() -> { - if (t.isAlive()) { - t.interrupt(); - future.completeExceptionally(new TimeoutException( - String.format("Failed to close the %s before timeout of %d milliseconds", - componentName, closeTimeoutMs))); - } - }, closeTimeoutMs, TimeUnit.MILLISECONDS); + + // if the future fails due to a timeout, we interrupt the thread + future.exceptionally((error) -> { + if (error instanceof TimeoutException && t.isAlive()) { + abortThread(t); + } + return null; + }); + + FutureUtils.orTimeout( + future, + closeTimeoutMs, TimeUnit.MILLISECONDS); + return future; } + static void abortThread(Thread t) { + // the abortion strategy is pretty simple here... + t.interrupt(); + } + // --------------------------- private static class ClosingException extends RuntimeException { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index af19225..c2e9cf5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -33,7 +33,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; -import static org.apache.flink.util.ComponentClosingUtils.closeAsyncWithTimeout; +import static org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.closeAsyncWithTimeout; /** * A class that will recreate a new {@link OperatorCoordinator} instance when
