This is an automated email from the ASF dual-hosted git repository. mapohl pushed a commit to branch release-1.18 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0afa89d2840b23b606329e1885c8d622c6960645 Author: Matthias Pohl <[email protected]> AuthorDate: Fri Feb 2 14:34:47 2024 +0100 [hotfix][core] Makes ExecutorUtils#gracefulShutdown return any outstanding tasks --- .../src/main/java/org/apache/flink/util/ExecutorUtils.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java index 939ffd6624c..99e2f3317e5 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExecutorUtils.java @@ -21,6 +21,8 @@ package org.apache.flink.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -38,8 +40,9 @@ public class ExecutorUtils { * @param timeout to wait for the termination of all ExecutorServices * @param unit of the timeout * @param executorServices to shut down + * @return Tasks that were not executed prior to a {@link ExecutorService#shutdownNow()}. */ - public static void gracefulShutdown( + public static List<Runnable> gracefulShutdown( long timeout, TimeUnit unit, ExecutorService... executorServices) { for (ExecutorService executorService : executorServices) { executorService.shutdown(); @@ -50,22 +53,23 @@ public class ExecutorUtils { long timeLeft = unit.toMillis(timeout); boolean hasTimeLeft = timeLeft > 0L; + final List<Runnable> outstandingTasks = new ArrayList<>(); for (ExecutorService executorService : executorServices) { if (wasInterrupted || !hasTimeLeft) { - executorService.shutdownNow(); + outstandingTasks.addAll(executorService.shutdownNow()); } else { try { if (!executorService.awaitTermination(timeLeft, TimeUnit.MILLISECONDS)) { LOG.warn( "ExecutorService did not terminate in time. Shutting it down now."); - executorService.shutdownNow(); + outstandingTasks.addAll(executorService.shutdownNow()); } } catch (InterruptedException e) { LOG.warn( "Interrupted while shutting down executor services. Shutting all " + "remaining ExecutorServices down now.", e); - executorService.shutdownNow(); + outstandingTasks.addAll(executorService.shutdownNow()); wasInterrupted = true; @@ -76,6 +80,8 @@ public class ExecutorUtils { hasTimeLeft = timeLeft > 0L; } } + + return outstandingTasks; } /**
