This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fc886b5103 [FLINK-36961][state/forst] Wait ForSt state excutor 
shutdown when disposing (#25854)
1fc886b5103 is described below

commit 1fc886b5103d8bb990e8cc476a9564ffe4e05c49
Author: Yanfei Lei <fredia...@gmail.com>
AuthorDate: Wed Jan 1 09:52:21 2025 +0800

    [FLINK-36961][state/forst] Wait ForSt state excutor shutdown when disposing 
(#25854)
---
 .../org/apache/flink/state/forst/ForStStateExecutor.java | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
index 4099b4bbf5b..6640181eb50 100644
--- 
a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
+++ 
b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateExecutor.java
@@ -36,6 +36,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -237,11 +238,20 @@ public class ForStStateExecutor implements StateExecutor {
     @Override
     public void shutdown() {
         // Coordinator should be shutdown before others, since it submit jobs 
to others.
-        coordinatorThread.shutdown();
-        readThreads.shutdown();
+        shutdownAndWait(coordinatorThread);
+        shutdownAndWait(readThreads);
         if (!sharedWriteThread) {
-            writeThreads.shutdown();
+            shutdownAndWait(writeThreads);
         }
         LOG.info("Shutting down the ForStStateExecutor.");
     }
+
+    private void shutdownAndWait(ExecutorService executorService) {
+        try {
+            executorService.shutdown();
+            while (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {}
+        } catch (InterruptedException e) {
+            executorService.shutdownNow();
+        }
+    }
 }

Reply via email to