This is an automated email from the ASF dual-hosted git repository.

mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0afa89d2840b23b606329e1885c8d622c6960645
Author: Matthias Pohl <[email protected]>
AuthorDate: Fri Feb 2 14:34:47 2024 +0100

    [hotfix][core] Makes ExecutorUtils#gracefulShutdown return any outstanding 
tasks
---
 .../src/main/java/org/apache/flink/util/ExecutorUtils.java | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java 
b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
index 939ffd6624c..99e2f3317e5 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java
@@ -21,6 +21,8 @@ package org.apache.flink.util;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -38,8 +40,9 @@ public class ExecutorUtils {
      * @param timeout to wait for the termination of all ExecutorServices
      * @param unit of the timeout
      * @param executorServices to shut down
+     * @return Tasks that were not executed prior to a {@link 
ExecutorService#shutdownNow()}.
      */
-    public static void gracefulShutdown(
+    public static List<Runnable> gracefulShutdown(
             long timeout, TimeUnit unit, ExecutorService... executorServices) {
         for (ExecutorService executorService : executorServices) {
             executorService.shutdown();
@@ -50,22 +53,23 @@ public class ExecutorUtils {
         long timeLeft = unit.toMillis(timeout);
         boolean hasTimeLeft = timeLeft > 0L;
 
+        final List<Runnable> outstandingTasks = new ArrayList<>();
         for (ExecutorService executorService : executorServices) {
             if (wasInterrupted || !hasTimeLeft) {
-                executorService.shutdownNow();
+                outstandingTasks.addAll(executorService.shutdownNow());
             } else {
                 try {
                     if (!executorService.awaitTermination(timeLeft, 
TimeUnit.MILLISECONDS)) {
                         LOG.warn(
                                 "ExecutorService did not terminate in time. 
Shutting it down now.");
-                        executorService.shutdownNow();
+                        outstandingTasks.addAll(executorService.shutdownNow());
                     }
                 } catch (InterruptedException e) {
                     LOG.warn(
                             "Interrupted while shutting down executor 
services. Shutting all "
                                     + "remaining ExecutorServices down now.",
                             e);
-                    executorService.shutdownNow();
+                    outstandingTasks.addAll(executorService.shutdownNow());
 
                     wasInterrupted = true;
 
@@ -76,6 +80,8 @@ public class ExecutorUtils {
                 hasTimeLeft = timeLeft > 0L;
             }
         }
+
+        return outstandingTasks;
     }
 
     /**

Reply via email to