This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a3caa7148 [flink] Graceful shutdown refreshExecutor in 
FullCacheLookupTable
a3caa7148 is described below

commit a3caa71484c36c6e20a4a26192ca274d7261f243
Author: Jingsong <[email protected]>
AuthorDate: Tue Jun 25 11:52:59 2024 +0800

    [flink] Graceful shutdown refreshExecutor in FullCacheLookupTable
---
 .../java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 8cb1b5254..28b0da0d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.sort.BinaryExternalSortBuffer;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.ExecutorUtils;
 import org.apache.paimon.utils.FieldsComparator;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.MutableObjectIterator;
@@ -57,6 +58,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -300,7 +302,7 @@ public abstract class FullCacheLookupTable implements 
LookupTable {
     public void close() throws IOException {
         try {
             if (refreshExecutor != null) {
-                refreshExecutor.shutdown();
+                ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES, 
refreshExecutor);
             }
         } finally {
             stateFactory.close();

Reply via email to