This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a6206cdd88ad24731c470cdd779d310b1c2cbffc
Author: Stephan Ewen <[email protected]>
AuthorDate: Sun Nov 22 15:28:29 2020 +0100

    [FLINK-20266][runtime] Replace dedicated thread pool in 
ComponentClosingUtils with use of 'FutureUtils.orTimeout()'
    
    This removes extra (non-daemon) threads, which were previously keeping the 
threads alive.
---
 .../coordination}/ComponentClosingUtils.java       | 51 ++++++++++------------
 .../RecreateOnResetOperatorCoordinator.java        |  2 +-
 2 files changed, 23 insertions(+), 30 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
similarity index 71%
rename from 
flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
index 65c83ae..581204a 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
@@ -16,17 +16,12 @@
  limitations under the License.
  */
 
-package org.apache.flink.util;
+package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -34,18 +29,8 @@ import java.util.concurrent.TimeoutException;
  * A util class to help with a clean component shutdown.
  */
 public class ComponentClosingUtils {
-       private static final Logger LOG = 
LoggerFactory.getLogger(ComponentClosingUtils.class);
-       // A shared watchdog executor to handle the timeout closing.
-       private static final ScheduledExecutorService watchDog =
-                       
Executors.newSingleThreadScheduledExecutor((ThreadFactory) r -> {
-                               Thread t = new Thread(r, 
"ComponentClosingUtil");
-                               t.setUncaughtExceptionHandler((thread, 
exception) -> {
-                                       LOG.error("FATAL: The component closing 
util thread caught exception ", exception);
-                                       System.exit(-17);
-                               });
-                               return t;
-                       });
 
+       /** Utility class, not meant to be instantiated. */
        private ComponentClosingUtils() {}
 
        /**
@@ -85,26 +70,34 @@ public class ComponentClosingUtils {
                        long closeTimeoutMs) {
                final CompletableFuture<Void> future = new 
CompletableFuture<>();
                // Start a dedicate thread to close the component.
-               Thread t = new Thread(() -> {
+               final Thread t = new Thread(() -> {
                        closingSequence.run();
                        future.complete(null);
                });
                // Use uncaught exception handler to handle exceptions during 
closing.
                t.setUncaughtExceptionHandler((thread, error) -> 
future.completeExceptionally(error));
                t.start();
-               // Schedule a watch dog job to the watching executor to detect 
timeout when
-               // closing the component.
-               watchDog.schedule(() -> {
-                               if (t.isAlive()) {
-                                       t.interrupt();
-                                       future.completeExceptionally(new 
TimeoutException(
-                                                       String.format("Failed 
to close the %s before timeout of %d milliseconds",
-                                                                       
componentName, closeTimeoutMs)));
-                               }
-                       }, closeTimeoutMs, TimeUnit.MILLISECONDS);
+
+               // if the future fails due to a timeout, we interrupt the thread
+               future.exceptionally((error) -> {
+                       if (error instanceof TimeoutException && t.isAlive()) {
+                               abortThread(t);
+                       }
+                       return null;
+               });
+
+               FutureUtils.orTimeout(
+                               future,
+                               closeTimeoutMs, TimeUnit.MILLISECONDS);
+
                return future;
        }
 
+       static void abortThread(Thread t) {
+               // the abortion strategy is pretty simple here...
+               t.interrupt();
+       }
+
        // ---------------------------
 
        private static class ClosingException extends RuntimeException {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index af19225..c2e9cf5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -33,7 +33,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static 
org.apache.flink.util.ComponentClosingUtils.closeAsyncWithTimeout;
+import static 
org.apache.flink.runtime.operators.coordination.ComponentClosingUtils.closeAsyncWithTimeout;
 
 /**
  * A class that will recreate a new {@link OperatorCoordinator} instance when

Reply via email to