This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1fc886b5103 [FLINK-36961][state/forst] Wait ForSt state excutor shutdown when disposing (#25854) 1fc886b5103 is described below commit 1fc886b5103d8bb990e8cc476a9564ffe4e05c49 Author: Yanfei Lei <fredia...@gmail.com> AuthorDate: Wed Jan 1 09:52:21 2025 +0800 [FLINK-36961][state/forst] Wait ForSt state excutor shutdown when disposing (#25854) --- .../org/apache/flink/state/forst/ForStStateExecutor.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java index 4099b4bbf5b..6640181eb50 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -237,11 +238,20 @@ public class ForStStateExecutor implements StateExecutor { @Override public void shutdown() { // Coordinator should be shutdown before others, since it submit jobs to others. - coordinatorThread.shutdown(); - readThreads.shutdown(); + shutdownAndWait(coordinatorThread); + shutdownAndWait(readThreads); if (!sharedWriteThread) { - writeThreads.shutdown(); + shutdownAndWait(writeThreads); } LOG.info("Shutting down the ForStStateExecutor."); } + + private void shutdownAndWait(ExecutorService executorService) { + try { + executorService.shutdown(); + while (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {} + } catch (InterruptedException e) { + executorService.shutdownNow(); + } + } }