dawidwys commented on a change in pull request #18745:
URL: https://github.com/apache/flink/pull/18745#discussion_r809111421



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
##########
@@ -95,8 +100,94 @@ private ComponentClosingUtils() {}
         return future;
     }
 
+    /**
+     * A util method that tries to shut down an {@link ExecutorService} 
elegantly within the given
+     * timeout. If the executor has not been shut down before it hits timeout 
or the thread is
+     * interrupted when waiting for the termination, a forceful shutdown will 
be attempted on the
+     * executor.
+     *
+     * @param executor the {@link ExecutorService} to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor has been successfully closed, false 
otherwise.
+     */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    public static boolean tryShutdownExecutorElegantly(ExecutorService 
executor, Duration timeout) {
+        try {
+            executor.shutdown();
+            executor.awaitTermination(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException ie) {
+            // Let it go.
+        }
+        if (!executor.isTerminated()) {
+            shutdownExecutorForcefully(executor, Duration.ZERO, false);
+        }
+        return executor.isTerminated();
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout. The 
method returns if it is
+     * interrupted.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(ExecutorService executor, 
Duration timeout) {
+        return shutdownExecutorForcefully(executor, timeout, true);
+    }
+
+    /**
+     * Shutdown the given executor forcefully within the given timeout.
+     *
+     * @param executor the executor to shut down.
+     * @param timeout the timeout duration.
+     * @param interruptable when set to true, the method cannot be 
interrupted. Each interruption to
+     *     the thread results in another {@code ExecutorService.shutdownNow()} 
call to the shutting
+     *     down executor.
+     * @return true if the given executor is terminated, false otherwise.
+     */
+    public static boolean shutdownExecutorForcefully(
+            ExecutorService executor, Duration timeout, boolean interruptable) 
{
+        long startingTime = clock.relativeTimeMillis();
+        long timeElapsed = 0L;
+        boolean isInterrupted = false;
+        do {
+            executor.shutdownNow();
+            try {
+                long timeRemaining = timeout.toMillis() - timeElapsed;
+                executor.awaitTermination(timeRemaining, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                isInterrupted = interruptable;
+            }
+            timeElapsed = clock.relativeTimeMillis() - startingTime;
+        } while (!executor.isTerminated() && !isInterrupted && timeElapsed < 
timeout.toMillis());
+        return executor.isTerminated();
+    }
+
     static void abortThread(Thread t) {
-        // the abortion strategy is pretty simple here...
-        t.interrupt();
+        // Try our best here to ensure the thread is aborted. Keep 
interrupting the
+        // thread for 10 times with 10 ms intervals. This helps handle the case
+        // where the shutdown sequence consists of a bunch of closeQuietly() 
calls
+        // that will swallow the InterruptedException so the thread to be 
aborted
+        // may block multiple times. If the thread is still alive after all the
+        // attempts, just let it go. The caller of closeAsyncWithTimeout() 
should
+        // have received a TimeoutException in this case.
+        int i = 0;
+        while (t.isAlive() && i < 10) {
+            t.interrupt();
+            i++;
+            try {
+                Thread.sleep(10);
+            } catch (InterruptedException e) {
+                // Let it go.
+            }
+        }
+    }
+
+    // ========= Method visible for testing ========
+
+    @VisibleForTesting
+    static void setClock(Clock clock) {

Review comment:
       My eyes hurt if I see a static variable being assigned, but I won't be 
stubborn on it ;) Up to you to decide.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to