becketqin commented on a change in pull request #18745:
URL: https://github.com/apache/flink/pull/18745#discussion_r808954136



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
##########
@@ -95,8 +100,94 @@ private ComponentClosingUtils() {}
         return future;
     }
 
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} 
elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout 
or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will 
be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false 
otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService 
executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The 
method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, 
Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method cannot be 
interrupted. Each interruption to
+     *     the thread results in another {@code ExecutorService.shutdownNow()} 
call to the shutting
+     *     down executor.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(
+            ExecutorService executor, Duration timeout, boolean interruptable) 
{
+        long startingTime = clock.relativeTimeMillis();
+        long timeElapsed = 0L;
+        boolean isInterrupted = false;
+        do {
+            executor.shutdownNow();
+            try {
+                long timeRemaining = timeout.toMillis() - timeElapsed;
+                executor.awaitTermination(timeRemaining, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                isInterrupted = interruptable;
+            }
+            timeElapsed = clock.relativeTimeMillis() - startingTime;
+        } while (!executor.isTerminated() && !isInterrupted && timeElapsed < 
timeout.toMillis());
+        return executor.isTerminated();
+    }
+
     static void abortThread(Thread t) {
-        // the abortion strategy is pretty simple here...
-        t.interrupt();
+        // Try our best here to ensure the thread is aborted. Keep 
interrupting the
+        // thread for 10 times with 10 ms intervals. This helps handle the case
+        // where the shutdown sequence consists of a bunch of closeQuietly() 
calls
+        // that will swallow the InterruptedException so the thread to be 
aborted
+        // may block multiple times. If the thread is still alive after all the
+        // attempts, just let it go. The caller of closeAsyncWithTimeout() 
should
+        // have received a TimeoutException in this case.
+        int i = 0;
+        while (t.isAlive() && i < 10) {
+            t.interrupt();
+            i++;
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                // Let it go.
+            }
+        }
+    }
+
+    // ========= Method visible for testing ========
+
+    @VisibleForTesting
+    static void setClock(Clock clock) {

Review comment:
       This clock exists for testing purpose. It seems usually we would avoid 
affect the primary method signature for testing purpose if possible.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -82,7 +84,7 @@ Licensed to the Apache Software Foundation (ASF) under one
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SourceCoordinatorContext.class);
 
-    private final ExecutorService coordinatorExecutor;
+    private final ScheduledExecutorService coordinatorExecutor;

Review comment:
       Makes sense.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
##########
@@ -95,8 +100,94 @@ private ComponentClosingUtils() {}
         return future;
     }
 
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} 
elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout 
or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will 
be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false 
otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService 
executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The 
method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, 
Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method cannot be 
interrupted. Each interruption to

Review comment:
       Good catch. Yes, the javadoc is incorrect.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
##########
@@ -217,18 +207,11 @@ public void start() throws Exception {
     @Override
     public void close() throws Exception {
         LOG.info("Closing SourceCoordinator for source {}.", operatorName);
-        try {
-            if (started) {
-                context.close();
-                if (enumerator != null) {
-                    enumerator.close();
-                }
+        if (started) {
+            closeQuietly(context);

Review comment:
       We actually do not want to have exceptions thrown here. That may result 
in skipping the rest of the closing sequence and cause resource leak. So 
basically we are trying the best to shutdown the `SourceCoordinatorContext`, 
but if it cannot be shutdown correctly, we leave a warning message and move on 
to close the `SplitEnumerator`.
   
   What I haven't done in this patch is to add a `closeWithErrorLogging()` util 
method. I was planning to do that after moving some of the methods from 
`IOUtils` to the `ComponentClosingUtils`. And also, the `ComponentClosingUtils` 
class should be moved to `flink-core` instead.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
##########
@@ -95,8 +100,94 @@ private ComponentClosingUtils() {}
         return future;
     }
 
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} 
elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout 
or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will 
be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false 
otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService 
executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The 
method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, 
Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method cannot be 
interrupted. Each interruption to
+     *     the thread results in another {@code ExecutorService.shutdownNow()} 
call to the shutting
+     *     down executor.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(
+            ExecutorService executor, Duration timeout, boolean interruptable) 
{
+        long startingTime = clock.relativeTimeMillis();
+        long timeElapsed = 0L;
+        boolean isInterrupted = false;
+        do {
+            executor.shutdownNow();
+            try {
+                long timeRemaining = timeout.toMillis() - timeElapsed;
+                executor.awaitTermination(timeRemaining, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                isInterrupted = interruptable;
+            }
+            timeElapsed = clock.relativeTimeMillis() - startingTime;
+        } while (!executor.isTerminated() && !isInterrupted && timeElapsed < 
timeout.toMillis());
+        return executor.isTerminated();
+    }
+
     static void abortThread(Thread t) {

Review comment:
       Yes, it should be.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
##########
@@ -95,8 +100,94 @@ private ComponentClosingUtils() {}
         return future;
     }
 
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} 
elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout 
or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will 
be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false 
otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService 
executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The 
method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, 
Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method cannot be 
interrupted. Each interruption to
+     *     the thread results in another {@code ExecutorService.shutdownNow()} 
call to the shutting
+     *     down executor.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(
+            ExecutorService executor, Duration timeout, boolean interruptable) 
{
+        long startingTime = clock.relativeTimeMillis();
+        long timeElapsed = 0L;
+        boolean isInterrupted = false;
+        do {
+            executor.shutdownNow();
+            try {
+                long timeRemaining = timeout.toMillis() - timeElapsed;
+                executor.awaitTermination(timeRemaining, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                isInterrupted = interruptable;
+            }
+            timeElapsed = clock.relativeTimeMillis() - startingTime;
+        } while (!executor.isTerminated() && !isInterrupted && timeElapsed < 
timeout.toMillis());

Review comment:
       The implementation of `ThreadPoolExecutor.isTerminated()` just compares 
an atomic integer combining the executor pool state and alive thread count with 
a macro int. So it should be quite cheap. In any case the order here probably 
has negligible impact on the performance given it is rarely called.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
##########
@@ -153,7 +156,6 @@ public void close() throws InterruptedException {
             return;
         }
         // Shutdown the worker executor, so no more worker tasks can run.
-        workerExecutor.shutdownNow();
-        workerExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
+        shutdownExecutorForcefully(workerExecutor, 
Duration.ofMillis(Long.MAX_VALUE));

Review comment:
       That is a good point. I agree we should probably just close all the 
executors in the `SourceCoordinatorContext`.

##########
File path: 
flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/ManuallyTriggeredScheduledExecutorService.java
##########
@@ -113,7 +113,7 @@ public boolean isTerminated() {
     }
 
     @Override
-    public boolean awaitTermination(long timeout, TimeUnit unit) {
+    public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {

Review comment:
       The `MockExecutionService` in `ComponentClosingTestUtilsTest` test the 
case that the closing sequence was interrupted when awaiting the executor 
shutdown. So that subclass needs to throw InterruptedException from this 
method. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
##########
@@ -95,8 +100,94 @@ private ComponentClosingUtils() {}
         return future;
     }
 
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} 
elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout 
or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will 
be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false 
otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService 
executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The 
method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, 
Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method cannot be 
interrupted. Each interruption to
+     *     the thread results in another {@code ExecutorService.shutdownNow()} 
call to the shutting
+     *     down executor.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(

Review comment:
       It looks that `Deadline` does not handle the case when people pass in 
Long.MAX_VALUE. I fixed that.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
##########
@@ -259,9 +265,9 @@ public void runInCoordinatorThread(Runnable runnable) {
     @Override
     public void close() throws InterruptedException {
         closed = true;
-        notifier.close();
-        coordinatorExecutor.shutdown();
-        coordinatorExecutor.awaitTermination(Long.MAX_VALUE, 
TimeUnit.MILLISECONDS);
+        // Close quietly so the closing sequence will be executed completely.
+        closeQuietly(notifier);

Review comment:
       If I understand correctly, `closeAll()` immediately re-throws exceptions 
from the closing sequence of an `AutoClosable` and skips the rest of the 
shutdown sequence, unless the exception class is explicitly suppressed. This 
may cause incomplete execution of the closing sequence.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to