This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a3caa7148 [flink] Graceful shutdown refreshExecutor in
FullCacheLookupTable
a3caa7148 is described below
commit a3caa71484c36c6e20a4a26192ca274d7261f243
Author: Jingsong <[email protected]>
AuthorDate: Tue Jun 25 11:52:59 2024 +0800
[flink] Graceful shutdown refreshExecutor in FullCacheLookupTable
---
.../java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
index 8cb1b5254..28b0da0d1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FullCacheLookupTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.ExecutorUtils;
import org.apache.paimon.utils.FieldsComparator;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.MutableObjectIterator;
@@ -57,6 +58,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -300,7 +302,7 @@ public abstract class FullCacheLookupTable implements
LookupTable {
public void close() throws IOException {
try {
if (refreshExecutor != null) {
- refreshExecutor.shutdown();
+ ExecutorUtils.gracefulShutdown(1L, TimeUnit.MINUTES,
refreshExecutor);
}
} finally {
stateFactory.close();