This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c2bf345 [FLINK-28662] Compaction should not block job cancelling
7c2bf345 is described below

commit 7c2bf3453c68eabd2935f4a8f0275933779f1dd5
Author: tsreaper <[email protected]>
AuthorDate: Mon Jul 25 14:51:36 2022 +0800

    [FLINK-28662] Compaction should not block job cancelling
    
    This closes #233
---
 .../table/store/file/compact/CompactManager.java   | 23 +++++++++++++++++++++-
 .../table/store/file/data/AppendOnlyWriter.java    |  2 ++
 .../store/file/mergetree/MergeTreeWriter.java      |  3 +++
 3 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
index 2063e17b..daf062b8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
@@ -18,7 +18,11 @@
 
 package org.apache.flink.table.store.file.compact;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.Optional;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
@@ -26,6 +30,8 @@ import java.util.concurrent.Future;
 /** Manager to submit compaction task. */
 public abstract class CompactManager {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactManager.class);
+
     protected final ExecutorService executor;
 
     protected Future<CompactResult> taskFuture;
@@ -45,11 +51,26 @@ public abstract class CompactManager {
             throws ExecutionException, InterruptedException {
         if (taskFuture != null) {
             if (blocking || taskFuture.isDone()) {
-                CompactResult result = taskFuture.get();
+                CompactResult result;
+                try {
+                    result = taskFuture.get();
+                } catch (CancellationException e) {
+                    LOG.info("Compaction future is cancelled", e);
+                    taskFuture = null;
+                    return Optional.empty();
+                }
                 taskFuture = null;
                 return Optional.of(result);
             }
         }
         return Optional.empty();
     }
+
+    public void cancelCompaction() {
+        // TODO this method may leave behind orphan files if compaction is 
actually finished
+        //  but some CPU work still needs to be done
+        if (taskFuture != null && !taskFuture.isCancelled()) {
+            taskFuture.cancel(true);
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index f58e9f60..1790e4c0 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -144,6 +144,8 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
 
     @Override
     public List<DataFileMeta> close() throws Exception {
+        // cancel compaction so that it does not block job cancelling
+        compactManager.cancelCompaction();
         sync();
 
         List<DataFileMeta> result = new ArrayList<>();
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index c838a253..34f04a71 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -238,7 +238,10 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
 
     @Override
     public List<DataFileMeta> close() throws Exception {
+        // cancel compaction so that it does not block job cancelling
+        compactManager.cancelCompaction();
         sync();
+
         // delete temporary files
         List<DataFileMeta> delete = new ArrayList<>(newFiles);
         for (DataFileMeta file : compactAfter) {

Reply via email to