This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 7c2bf345 [FLINK-28662] Compaction should not block job cancelling
7c2bf345 is described below
commit 7c2bf3453c68eabd2935f4a8f0275933779f1dd5
Author: tsreaper <[email protected]>
AuthorDate: Mon Jul 25 14:51:36 2022 +0800
[FLINK-28662] Compaction should not block job cancelling
This closes #233
---
.../table/store/file/compact/CompactManager.java | 23 +++++++++++++++++++++-
.../table/store/file/data/AppendOnlyWriter.java | 2 ++
.../store/file/mergetree/MergeTreeWriter.java | 3 +++
3 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
index 2063e17b..daf062b8 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
@@ -18,7 +18,11 @@
package org.apache.flink.table.store.file.compact;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Optional;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -26,6 +30,8 @@ import java.util.concurrent.Future;
/** Manager to submit compaction task. */
public abstract class CompactManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactManager.class);
+
protected final ExecutorService executor;
protected Future<CompactResult> taskFuture;
@@ -45,11 +51,26 @@ public abstract class CompactManager {
throws ExecutionException, InterruptedException {
if (taskFuture != null) {
if (blocking || taskFuture.isDone()) {
- CompactResult result = taskFuture.get();
+ CompactResult result;
+ try {
+ result = taskFuture.get();
+ } catch (CancellationException e) {
+ LOG.info("Compaction future is cancelled", e);
+ taskFuture = null;
+ return Optional.empty();
+ }
taskFuture = null;
return Optional.of(result);
}
}
return Optional.empty();
}
+
+ public void cancelCompaction() {
+ // TODO this method may leave behind orphan files if compaction is
actually finished
+ // but some CPU work still needs to be done
+ if (taskFuture != null && !taskFuture.isCancelled()) {
+ taskFuture.cancel(true);
+ }
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index f58e9f60..1790e4c0 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -144,6 +144,8 @@ public class AppendOnlyWriter implements
RecordWriter<RowData> {
@Override
public List<DataFileMeta> close() throws Exception {
+ // cancel compaction so that it does not block job cancelling
+ compactManager.cancelCompaction();
sync();
List<DataFileMeta> result = new ArrayList<>();
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index c838a253..34f04a71 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -238,7 +238,10 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
@Override
public List<DataFileMeta> close() throws Exception {
+ // cancel compaction so that it does not block job cancelling
+ compactManager.cancelCompaction();
sync();
+
// delete temporary files
List<DataFileMeta> delete = new ArrayList<>(newFiles);
for (DataFileMeta file : compactAfter) {