This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c9f5f82 [FLINK-29714] Merge TableWrite and TableCompact into one 
interface
9c9f5f82 is described below

commit 9c9f5f823ce52a20eb417444b59baac3e96d6a75
Author: tsreaper <[email protected]>
AuthorDate: Tue Oct 25 16:03:22 2022 +0800

    [FLINK-29714] Merge TableWrite and TableCompact into one interface
    
    This closes #328
---
 .../store/connector/sink/StoreCompactOperator.java |  71 ++++++++++---
 .../file/append/AppendOnlyCompactManager.java      |  43 ++++++--
 .../table/store/file/append/AppendOnlyWriter.java  |  14 ++-
 ...mpactManager.java => CompactFutureManager.java} |  29 +-----
 .../table/store/file/compact/CompactManager.java   |  60 +++--------
 .../store/file/compact/NoopCompactManager.java     |  27 +++--
 .../store/file/mergetree/MergeTreeWriter.java      |  13 ++-
 .../file/mergetree/compact/CompactStrategy.java    |   6 ++
 .../mergetree/compact/MergeTreeCompactManager.java |  96 ++++++++++--------
 .../file/operation/AbstractFileStoreWrite.java     |  17 ++++
 .../file/operation/AppendOnlyFileStoreWrite.java   |  43 ++------
 .../table/store/file/operation/FileStoreWrite.java |  32 ++----
 .../file/operation/KeyValueFileStoreWrite.java     |  74 +++++---------
 .../flink/table/store/file/utils/RecordWriter.java |   3 +
 .../table/store/table/AbstractFileStoreTable.java  |   6 --
 .../flink/table/store/table/FileStoreTable.java    |   3 -
 .../flink/table/store/table/sink/TableCompact.java | 112 ---------------------
 .../flink/table/store/table/sink/TableWrite.java   |   3 +
 .../table/store/table/sink/TableWriteImpl.java     |   6 ++
 .../flink/table/store/file/TestFileStore.java      |   4 +-
 .../file/append/AppendOnlyCompactManagerTest.java  |   1 +
 .../store/file/append/AppendOnlyWriterTest.java    |   3 +-
 .../store/file/format/FileFormatSuffixTest.java    |   3 +-
 .../compact/MergeTreeCompactManagerTest.java       |   2 +-
 .../store/file/operation/TestCommitThread.java     |   2 +-
 25 files changed, 289 insertions(+), 384 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index 8b4ded82..a703f3fd 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -18,13 +18,21 @@
 
 package org.apache.flink.table.store.connector.sink;
 
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.TableCompact;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -33,11 +41,13 @@ import java.util.stream.Collectors;
 /** A dedicated operator for manual triggered compaction. */
 public class StoreCompactOperator extends PrepareCommitOperator {
 
-    private final FileStoreTable table;
+    private static final Logger LOG = 
LoggerFactory.getLogger(StoreCompactOperator.class);
 
+    private final FileStoreTable table;
     @Nullable private final Map<String, String> compactPartitionSpec;
 
-    private TableCompact compact;
+    private TableScan scan;
+    private TableWrite write;
 
     public StoreCompactOperator(
             FileStoreTable table, @Nullable Map<String, String> 
compactPartitionSpec) {
@@ -48,20 +58,53 @@ public class StoreCompactOperator extends 
PrepareCommitOperator {
     @Override
     public void open() throws Exception {
         super.open();
-        int task = getRuntimeContext().getIndexOfThisSubtask();
-        int numTask = getRuntimeContext().getNumberOfParallelSubtasks();
-        compact = table.newCompact();
-        compact.withPartitions(
-                compactPartitionSpec == null ? Collections.emptyMap() : 
compactPartitionSpec);
-        compact.withFilter(
-                (partition, bucket) -> task == 
Math.abs(Objects.hash(partition, bucket) % numTask));
+
+        scan = table.newScan();
+        if (compactPartitionSpec != null) {
+            scan.withFilter(
+                    PredicateConverter.fromMap(
+                            compactPartitionSpec, 
table.schema().logicalPartitionType()));
+        }
+
+        write = table.newWrite();
     }
 
     @Override
     protected List<Committable> prepareCommit(boolean endOfInput, long 
checkpointId)
             throws IOException {
-        return compact.compact().stream()
-                .map(c -> new Committable(checkpointId, Committable.Kind.FILE, 
c))
-                .collect(Collectors.toList());
+        int task = getRuntimeContext().getIndexOfThisSubtask();
+        int numTask = getRuntimeContext().getNumberOfParallelSubtasks();
+
+        for (Split split : scan.plan().splits) {
+            BinaryRowData partition = split.partition();
+            int bucket = split.bucket();
+            if (Math.abs(Objects.hash(partition, bucket)) % numTask != task) {
+                continue;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                RowType partitionType = table.schema().logicalPartitionType();
+                LOG.debug(
+                        "Do compaction for partition {}, bucket {}",
+                        FileStorePathFactory.getPartitionComputer(
+                                        partitionType,
+                                        
FileStorePathFactory.PARTITION_DEFAULT_NAME.defaultValue())
+                                .generatePartValues(partition),
+                        bucket);
+            }
+            try {
+                write.compact(partition, bucket);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        try {
+            return write.prepareCommit(true).stream()
+                    .map(c -> new Committable(checkpointId, 
Committable.Kind.FILE, c))
+                    .collect(Collectors.toList());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
index c80c92df..f2af051b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
@@ -19,12 +19,13 @@
 package org.apache.flink.table.store.file.append;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactFutureManager;
 import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.compact.CompactTask;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.DataFilePathFactory;
 import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -37,13 +38,15 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 /** Compact manager for {@link 
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
-public class AppendOnlyCompactManager extends CompactManager {
+public class AppendOnlyCompactManager extends CompactFutureManager {
 
+    private final ExecutorService executor;
+    private final LinkedList<DataFileMeta> toCompact;
     private final int minFileNum;
     private final int maxFileNum;
     private final long targetFileSize;
     private final CompactRewriter rewriter;
-    private final LinkedList<DataFileMeta> toCompact;
+    private final DataFilePathFactory pathFactory;
 
     public AppendOnlyCompactManager(
             ExecutorService executor,
@@ -51,17 +54,43 @@ public class AppendOnlyCompactManager extends 
CompactManager {
             int minFileNum,
             int maxFileNum,
             long targetFileSize,
-            CompactRewriter rewriter) {
-        super(executor);
+            CompactRewriter rewriter,
+            DataFilePathFactory pathFactory) {
+        this.executor = executor;
         this.toCompact = toCompact;
-        this.maxFileNum = maxFileNum;
         this.minFileNum = minFileNum;
+        this.maxFileNum = maxFileNum;
         this.targetFileSize = targetFileSize;
         this.rewriter = rewriter;
+        this.pathFactory = pathFactory;
     }
 
     @Override
-    public void triggerCompaction() {
+    public void triggerCompaction(boolean fullCompaction) {
+        if (fullCompaction) {
+            triggerFullCompaction();
+        } else {
+            triggerCompactionWithBestEffort();
+        }
+    }
+
+    private void triggerFullCompaction() {
+        Preconditions.checkState(
+                taskFuture == null,
+                "A compaction task is still running while the user "
+                        + "forces a new compaction. This is unexpected.");
+        taskFuture =
+                executor.submit(
+                        new AppendOnlyCompactManager.IterativeCompactTask(
+                                toCompact,
+                                targetFileSize,
+                                minFileNum,
+                                maxFileNum,
+                                rewriter,
+                                pathFactory));
+    }
+
+    private void triggerCompactionWithBestEffort() {
         if (taskFuture != null) {
             return;
         }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
index ca73c71b..8c6c9635 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
@@ -90,6 +90,11 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
         writer.write(rowData);
     }
 
+    @Override
+    public void fullCompaction() throws Exception {
+        submitCompaction(true);
+    }
+
     @Override
     public CommitIncrement prepareCommit(boolean endOnfInput) throws Exception 
{
         List<DataFileMeta> newFiles = new ArrayList<>();
@@ -104,7 +109,7 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
         }
         // add new generated files
         newFiles.forEach(compactManager::addNewFile);
-        submitCompaction();
+        submitCompaction(false);
 
         boolean blocking = endOnfInput || forceCompact;
         trySyncLatestCompaction(blocking);
@@ -134,9 +139,10 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
                 schemaId, fileFormat, targetFileSize, writeSchema, 
pathFactory, seqNumCounter);
     }
 
-    private void submitCompaction() throws ExecutionException, 
InterruptedException {
-        trySyncLatestCompaction(false);
-        compactManager.triggerCompaction();
+    private void submitCompaction(boolean forcedFullCompaction)
+            throws ExecutionException, InterruptedException {
+        trySyncLatestCompaction(forcedFullCompaction);
+        compactManager.triggerCompaction(forcedFullCompaction);
     }
 
     private void trySyncLatestCompaction(boolean blocking)
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactFutureManager.java
similarity index 72%
copy from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
copy to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactFutureManager.java
index ef578720..e07e0a97 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactFutureManager.java
@@ -18,43 +18,22 @@
 
 package org.apache.flink.table.store.file.compact;
 
-import org.apache.flink.table.store.file.io.DataFileMeta;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
-/** Manager to submit compaction task. */
-public abstract class CompactManager {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CompactManager.class);
+/** Base implementation of {@link CompactManager} which runs compaction in a 
separate thread. */
+public abstract class CompactFutureManager implements CompactManager {
 
-    protected final ExecutorService executor;
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactFutureManager.class);
 
     protected Future<CompactResult> taskFuture;
 
-    public CompactManager(ExecutorService executor) {
-        this.executor = executor;
-    }
-
-    /** Should wait compaction finish. */
-    public abstract boolean shouldWaitCompaction();
-
-    /** Add a new file. */
-    public abstract void addNewFile(DataFileMeta file);
-
-    /** Trigger a new compaction task. */
-    public abstract void triggerCompaction();
-
-    /** Get compaction result. Wait finish if {@code blocking} is true. */
-    public abstract Optional<CompactResult> getCompactionResult(boolean 
blocking)
-            throws ExecutionException, InterruptedException;
-
+    @Override
     public void cancelCompaction() {
         // TODO this method may leave behind orphan files if compaction is 
actually finished
         //  but some CPU work still needs to be done
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
index ef578720..e82e1748 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
@@ -20,65 +20,29 @@ package org.apache.flink.table.store.file.compact;
 
 import org.apache.flink.table.store.file.io.DataFileMeta;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.Optional;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 /** Manager to submit compaction task. */
-public abstract class CompactManager {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CompactManager.class);
-
-    protected final ExecutorService executor;
-
-    protected Future<CompactResult> taskFuture;
-
-    public CompactManager(ExecutorService executor) {
-        this.executor = executor;
-    }
+public interface CompactManager {
 
     /** Should wait compaction finish. */
-    public abstract boolean shouldWaitCompaction();
+    boolean shouldWaitCompaction();
 
     /** Add a new file. */
-    public abstract void addNewFile(DataFileMeta file);
+    void addNewFile(DataFileMeta file);
 
-    /** Trigger a new compaction task. */
-    public abstract void triggerCompaction();
+    /**
+     * Trigger a new compaction task.
+     *
+     * @param fullCompaction if caller needs a guaranteed full compaction
+     */
+    void triggerCompaction(boolean fullCompaction);
 
     /** Get compaction result. Wait finish if {@code blocking} is true. */
-    public abstract Optional<CompactResult> getCompactionResult(boolean 
blocking)
+    Optional<CompactResult> getCompactionResult(boolean blocking)
             throws ExecutionException, InterruptedException;
 
-    public void cancelCompaction() {
-        // TODO this method may leave behind orphan files if compaction is 
actually finished
-        //  but some CPU work still needs to be done
-        if (taskFuture != null && !taskFuture.isCancelled()) {
-            taskFuture.cancel(true);
-        }
-    }
-
-    protected final Optional<CompactResult> innerGetCompactionResult(boolean 
blocking)
-            throws ExecutionException, InterruptedException {
-        if (taskFuture != null) {
-            if (blocking || taskFuture.isDone()) {
-                CompactResult result;
-                try {
-                    result = taskFuture.get();
-                } catch (CancellationException e) {
-                    LOG.info("Compaction future is cancelled", e);
-                    taskFuture = null;
-                    return Optional.empty();
-                }
-                taskFuture = null;
-                return Optional.of(result);
-            }
-        }
-        return Optional.empty();
-    }
+    /** Cancel currently running compaction task. */
+    void cancelCompaction();
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
index 5ca4b14b..db014c45 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
@@ -18,17 +18,17 @@
 
 package org.apache.flink.table.store.file.compact;
 
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
 
-/** A {@link CompactManager} which doesn't do things. */
-public class NoopCompactManager extends CompactManager {
+/** A {@link CompactManager} which never compacts. */
+public class NoopCompactManager implements CompactManager {
 
-    public NoopCompactManager(ExecutorService executor) {
-        super(executor);
-    }
+    public NoopCompactManager() {}
 
     @Override
     public boolean shouldWaitCompaction() {
@@ -39,10 +39,21 @@ public class NoopCompactManager extends CompactManager {
     public void addNewFile(DataFileMeta file) {}
 
     @Override
-    public void triggerCompaction() {}
+    public void triggerCompaction(boolean fullCompaction) {
+        Preconditions.checkArgument(
+                !fullCompaction,
+                "NoopCompactManager does not support user triggered 
compaction.\n"
+                        + "If you really need a guaranteed compaction, please 
set "
+                        + CoreOptions.WRITE_COMPACTION_SKIP.key()
+                        + " property of this table to false.");
+    }
 
     @Override
-    public Optional<CompactResult> getCompactionResult(boolean blocking) {
+    public Optional<CompactResult> getCompactionResult(boolean blocking)
+            throws ExecutionException, InterruptedException {
         return Optional.empty();
     }
+
+    @Override
+    public void cancelCompaction() {}
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 287ac0d1..7f0adfcc 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -137,6 +137,11 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         }
     }
 
+    @Override
+    public void fullCompaction() throws Exception {
+        submitCompaction(true);
+    }
+
     @Override
     public long memoryOccupancy() {
         return writeBuffer.memoryOccupancy();
@@ -187,7 +192,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
             }
 
             writeBuffer.clear();
-            submitCompaction();
+            submitCompaction(false);
         }
     }
 
@@ -252,9 +257,9 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         compactAfter.addAll(result.after());
     }
 
-    private void submitCompaction() throws Exception {
-        trySyncLatestCompaction(false);
-        compactManager.triggerCompaction();
+    private void submitCompaction(boolean forcedFullCompaction) throws 
Exception {
+        trySyncLatestCompaction(forcedFullCompaction);
+        compactManager.triggerCompaction(forcedFullCompaction);
     }
 
     private void trySyncLatestCompaction(boolean blocking) throws Exception {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
index 50addb42..58e5902a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
@@ -37,4 +37,10 @@ public interface CompactStrategy {
      * </ul>
      */
     Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);
+
+    /** Pick a compaction unit consisting of all existing files. */
+    static Optional<CompactUnit> pickFullCompaction(int numLevels, 
List<LevelSortedRun> runs) {
+        int maxLevel = numLevels - 1;
+        return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
index 15f02594..815edc86 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
@@ -20,11 +20,12 @@ package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactFutureManager;
 import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.compact.CompactUnit;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.Levels;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,20 +37,16 @@ import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
 /** Compact manager for {@link 
org.apache.flink.table.store.file.KeyValueFileStore}. */
-public class MergeTreeCompactManager extends CompactManager {
+public class MergeTreeCompactManager extends CompactFutureManager {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MergeTreeCompactManager.class);
 
+    private final ExecutorService executor;
     private final Levels levels;
-
     private final CompactStrategy strategy;
-
     private final Comparator<RowData> keyComparator;
-
     private final long minFileSize;
-
     private final int numSortedRunStopTrigger;
-
     private final CompactRewriter rewriter;
 
     public MergeTreeCompactManager(
@@ -60,7 +57,7 @@ public class MergeTreeCompactManager extends CompactManager {
             long minFileSize,
             int numSortedRunStopTrigger,
             CompactRewriter rewriter) {
-        super(executor);
+        this.executor = executor;
         this.levels = levels;
         this.strategy = strategy;
         this.minFileSize = minFileSize;
@@ -80,43 +77,54 @@ public class MergeTreeCompactManager extends CompactManager 
{
     }
 
     @Override
-    public void triggerCompaction() {
-        if (taskFuture != null) {
-            return;
+    public void triggerCompaction(boolean fullCompaction) {
+        Optional<CompactUnit> optionalUnit;
+        if (fullCompaction) {
+            Preconditions.checkState(
+                    taskFuture == null,
+                    "A compaction task is still running while the user "
+                            + "forces a new compaction. This is unexpected.");
+            optionalUnit =
+                    CompactStrategy.pickFullCompaction(
+                            levels.numberOfLevels(), levels.levelSortedRuns());
+        } else {
+            if (taskFuture != null) {
+                return;
+            }
+            optionalUnit =
+                    strategy.pick(levels.numberOfLevels(), 
levels.levelSortedRuns())
+                            .map(unit -> unit.files().size() < 2 ? null : 
unit);
         }
-        strategy.pick(levels.numberOfLevels(), levels.levelSortedRuns())
-                .ifPresent(
-                        unit -> {
-                            if (unit.files().size() < 2) {
-                                return;
-                            }
-                            /*
-                             * As long as there is no older data, We can drop 
the deletion.
-                             * If the output level is 0, there may be older 
data not involved in compaction.
-                             * If the output level is bigger than 0, as long 
as there is no older data in
-                             * the current levels, the output is the oldest, 
so we can drop the deletion.
-                             * See CompactStrategy.pick.
-                             */
-                            boolean dropDelete =
-                                    unit.outputLevel() != 0
-                                            && unit.outputLevel() >= 
levels.nonEmptyHighestLevel();
-
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug(
-                                        "Submit compaction with files (name, 
level, size): "
-                                                + 
levels.levelSortedRuns().stream()
-                                                        .flatMap(lsr -> 
lsr.run().files().stream())
-                                                        .map(
-                                                                file ->
-                                                                        
String.format(
-                                                                               
 "(%s, %d, %d)",
-                                                                               
 file.fileName(),
-                                                                               
 file.level(),
-                                                                               
 file.fileSize()))
-                                                        
.collect(Collectors.joining(", ")));
-                            }
-                            submitCompaction(unit, dropDelete);
-                        });
+
+        optionalUnit.ifPresent(
+                unit -> {
+                    /*
+                     * As long as there is no older data, We can drop the 
deletion.
+                     * If the output level is 0, there may be older data not 
involved in compaction.
+                     * If the output level is bigger than 0, as long as there 
is no older data in
+                     * the current levels, the output is the oldest, so we can 
drop the deletion.
+                     * See CompactStrategy.pick.
+                     */
+                    boolean dropDelete =
+                            unit.outputLevel() != 0
+                                    && unit.outputLevel() >= 
levels.nonEmptyHighestLevel();
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(
+                                "Submit compaction with files (name, level, 
size): "
+                                        + levels.levelSortedRuns().stream()
+                                                .flatMap(lsr -> 
lsr.run().files().stream())
+                                                .map(
+                                                        file ->
+                                                                String.format(
+                                                                        "(%s, 
%d, %d)",
+                                                                        
file.fileName(),
+                                                                        
file.level(),
+                                                                        
file.fileSize()))
+                                                .collect(Collectors.joining(", 
")));
+                    }
+                    submitCompaction(unit, dropDelete);
+                });
     }
 
     @VisibleForTesting
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
index 50334778..0e9ed821 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.io.DataFileMeta;
@@ -97,6 +98,12 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
         writer.write(data);
     }
 
+    @Override
+    public void compact(BinaryRowData partition, int bucket) throws Exception {
+        getWriter(partition, bucket).fullCompaction();
+    }
+
+    @Override
     public List<FileCommittable> prepareCommit(boolean endOfInput) throws 
Exception {
         List<FileCommittable> result = new ArrayList<>();
 
@@ -167,4 +174,14 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
     }
 
     protected void notifyNewWriter(RecordWriter<T> writer) {}
+
+    /** Create a {@link RecordWriter} from partition and bucket. */
+    @VisibleForTesting
+    public abstract RecordWriter<T> createWriter(
+            BinaryRowData partition, int bucket, ExecutorService 
compactExecutor);
+
+    /** Create an empty {@link RecordWriter} from partition and bucket. */
+    @VisibleForTesting
+    public abstract RecordWriter<T> createEmptyWriter(
+            BinaryRowData partition, int bucket, ExecutorService 
compactExecutor);
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index d2b482be..627746c2 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.append.AppendOnlyCompactManager;
 import org.apache.flink.table.store.file.append.AppendOnlyWriter;
 import org.apache.flink.table.store.file.compact.CompactManager;
-import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.compact.NoopCompactManager;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.DataFilePathFactory;
@@ -38,12 +37,9 @@ import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.RowType;
 
-import javax.annotation.Nullable;
-
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
 import static 
org.apache.flink.table.store.file.io.DataFileMeta.getMaxSequenceNumber;
@@ -96,21 +92,6 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<RowData> {
         return createWriter(partition, bucket, Collections.emptyList(), 
compactExecutor);
     }
 
-    @Override
-    public Callable<CompactResult> createCompactWriter(
-            BinaryRowData partition, int bucket, @Nullable List<DataFileMeta> 
compactFiles) {
-        if (compactFiles == null) {
-            compactFiles = scanExistingFileMetas(partition, bucket);
-        }
-        return new AppendOnlyCompactManager.IterativeCompactTask(
-                compactFiles,
-                targetFileSize,
-                compactionMinFileNum,
-                compactionMaxFileNum,
-                compactRewriter(partition, bucket),
-                pathFactory.createDataFilePathFactory(partition, bucket));
-    }
-
     private RecordWriter<RowData> createWriter(
             BinaryRowData partition,
             int bucket,
@@ -120,19 +101,17 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<RowData> {
         // and make restore files mutable to update
         LinkedList<DataFileMeta> restored = new LinkedList<>(restoredFiles);
         DataFilePathFactory factory = 
pathFactory.createDataFilePathFactory(partition, bucket);
-        CompactManager compactManager;
-        if (skipCompaction) {
-            compactManager = new NoopCompactManager(compactExecutor);
-        } else {
-            compactManager =
-                    new AppendOnlyCompactManager(
-                            compactExecutor,
-                            restored,
-                            compactionMinFileNum,
-                            compactionMaxFileNum,
-                            targetFileSize,
-                            compactRewriter(partition, bucket));
-        }
+        CompactManager compactManager =
+                skipCompaction
+                        ? new NoopCompactManager()
+                        : new AppendOnlyCompactManager(
+                                compactExecutor,
+                                restored,
+                                compactionMinFileNum,
+                                compactionMaxFileNum,
+                                targetFileSize,
+                                compactRewriter(partition, bucket),
+                                factory);
         return new AppendOnlyWriter(
                 schemaId,
                 fileFormat,
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 85ecdcb7..7f79abef 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -21,17 +21,11 @@ package org.apache.flink.table.store.file.operation;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.compact.CompactResult;
-import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.table.sink.FileCommittable;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 
-import javax.annotation.Nullable;
-
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 
 /**
  * Write operation which provides {@link RecordWriter} creation and writes 
{@link SinkRecord} to
@@ -43,23 +37,6 @@ public interface FileStoreWrite<T> {
 
     FileStoreWrite<T> withIOManager(IOManager ioManager);
 
-    /** Create a {@link RecordWriter} from partition and bucket. */
-    RecordWriter<T> createWriter(
-            BinaryRowData partition, int bucket, ExecutorService 
compactExecutor);
-
-    /** Create an empty {@link RecordWriter} from partition and bucket. */
-    RecordWriter<T> createEmptyWriter(
-            BinaryRowData partition, int bucket, ExecutorService 
compactExecutor);
-
-    /**
-     * Create a {@link Callable} compactor from partition, bucket.
-     *
-     * @param compactFiles input files of compaction. When it is null, will 
automatically read all
-     *     files of the current bucket.
-     */
-    Callable<CompactResult> createCompactWriter(
-            BinaryRowData partition, int bucket, @Nullable List<DataFileMeta> 
compactFiles);
-
     /**
      * If overwrite is true, the writer will overwrite the store, otherwise it 
won't.
      *
@@ -77,6 +54,15 @@ public interface FileStoreWrite<T> {
      */
     void write(BinaryRowData partition, int bucket, T data) throws Exception;
 
+    /**
+     * Compact data stored in given partition and bucket.
+     *
+     * @param partition the partition to compact
+     * @param bucket the bucket to compact
+     * @throws Exception the thrown exception when compacting the records
+     */
+    void compact(BinaryRowData partition, int bucket) throws Exception;
+
     /**
      * Prepare commit in the write.
      *
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 5bfd1f19..89b038e3 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -23,8 +23,6 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.compact.CompactManager;
-import org.apache.flink.table.store.file.compact.CompactResult;
-import org.apache.flink.table.store.file.compact.CompactUnit;
 import org.apache.flink.table.store.file.compact.NoopCompactManager;
 import org.apache.flink.table.store.file.io.DataFileMeta;
 import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
@@ -37,7 +35,6 @@ import 
org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager;
-import 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactTask;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -46,12 +43,9 @@ import org.apache.flink.table.store.file.utils.RecordWriter;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.RowType;
 
-import javax.annotation.Nullable;
-
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
@@ -112,21 +106,6 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
         return createMergeTreeWriter(partition, bucket, 
Collections.emptyList(), compactExecutor);
     }
 
-    @Override
-    public Callable<CompactResult> createCompactWriter(
-            BinaryRowData partition, int bucket, @Nullable List<DataFileMeta> 
compactFiles) {
-        if (compactFiles == null) {
-            compactFiles = scanExistingFileMetas(partition, bucket);
-        }
-        Comparator<RowData> keyComparator = keyComparatorSupplier.get();
-        CompactRewriter rewriter = compactRewriter(partition, bucket, 
keyComparator);
-        Levels levels = new Levels(keyComparator, compactFiles, 
options.numLevels());
-        CompactUnit unit =
-                CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1, 
levels.levelSortedRuns());
-        return new MergeTreeCompactTask(
-                keyComparator, options.targetFileSize(), rewriter, unit, true);
-    }
-
     private MergeTreeWriter createMergeTreeWriter(
             BinaryRowData partition,
             int bucket,
@@ -134,23 +113,18 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             ExecutorService compactExecutor) {
         KeyValueFileWriterFactory writerFactory = 
writerFactoryBuilder.build(partition, bucket);
         Comparator<RowData> keyComparator = keyComparatorSupplier.get();
-        CompactManager compactManager;
-        if (options.writeCompactionSkip()) {
-            compactManager = new NoopCompactManager(compactExecutor);
-        } else {
-            Levels levels = new Levels(keyComparator, restoreFiles, 
options.numLevels());
-            compactManager =
-                    createCompactManager(
-                            partition,
-                            bucket,
-                            new UniversalCompaction(
-                                    options.maxSizeAmplificationPercent(),
-                                    options.sortedRunSizeRatio(),
-                                    options.numSortedRunCompactionTrigger(),
-                                    options.maxSortedRunNum()),
-                            compactExecutor,
-                            levels);
-        }
+        Levels levels = new Levels(keyComparator, restoreFiles, 
options.numLevels());
+        CompactManager compactManager =
+                createCompactManager(
+                        partition,
+                        bucket,
+                        new UniversalCompaction(
+                                options.maxSizeAmplificationPercent(),
+                                options.sortedRunSizeRatio(),
+                                options.numSortedRunCompactionTrigger(),
+                                options.maxSortedRunNum()),
+                        compactExecutor,
+                        levels);
         return new MergeTreeWriter(
                 options.writeBufferSpillable(),
                 options.localSortMaxNumFileHandles(),
@@ -170,16 +144,20 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             CompactStrategy compactStrategy,
             ExecutorService compactExecutor,
             Levels levels) {
-        Comparator<RowData> keyComparator = keyComparatorSupplier.get();
-        CompactRewriter rewriter = compactRewriter(partition, bucket, 
keyComparator);
-        return new MergeTreeCompactManager(
-                compactExecutor,
-                levels,
-                compactStrategy,
-                keyComparator,
-                options.targetFileSize(),
-                options.numSortedRunStopTrigger(),
-                rewriter);
+        if (options.writeCompactionSkip()) {
+            return new NoopCompactManager();
+        } else {
+            Comparator<RowData> keyComparator = keyComparatorSupplier.get();
+            CompactRewriter rewriter = compactRewriter(partition, bucket, 
keyComparator);
+            return new MergeTreeCompactManager(
+                    compactExecutor,
+                    levels,
+                    compactStrategy,
+                    keyComparator,
+                    options.targetFileSize(),
+                    options.numSortedRunStopTrigger(),
+                    rewriter);
+        }
     }
 
     private CompactRewriter compactRewriter(
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
index a071f30d..4acc1dd0 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -33,6 +33,9 @@ public interface RecordWriter<T> {
     /** Add a key-value element to the writer. */
     void write(T record) throws Exception;
 
+    /** Compact all files related to the writer. */
+    void fullCompaction() throws Exception;
+
     /**
      * Prepare for a commit.
      *
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index 16f2e826..36d954d5 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableCompact;
 
 /** Abstract {@link FileStoreTable}. */
 public abstract class AbstractFileStoreTable implements FileStoreTable {
@@ -65,9 +64,4 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
     public TableCommit newCommit(String user) {
         return new TableCommit(store().newCommit(user), store().newExpire());
     }
-
-    @Override
-    public TableCompact newCompact() {
-        return new TableCompact(store().newScan(), store().newWrite(), 
store().partitionType());
-    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 84188fa7..595a8524 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.schema.TableSchema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableCompact;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
@@ -51,6 +50,4 @@ public interface FileStoreTable extends Serializable {
     TableWrite newWrite();
 
     TableCommit newCommit(String user);
-
-    TableCompact newCompact();
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
deleted file mode 100644
index fc57c0aa..00000000
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.sink;
-
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.compact.CompactResult;
-import org.apache.flink.table.store.file.io.CompactIncrement;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.io.NewFilesIncrement;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.predicate.PredicateConverter;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.types.logical.RowType;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.BiPredicate;
-
-/** An abstraction layer above {@link FileStoreWrite#createCompactWriter} to 
provide compaction. */
-public class TableCompact {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(TableCompact.class);
-
-    private final FileStoreScan scan;
-    private final FileStoreWrite<?> write;
-    private final RowType partitionType;
-
-    private BiPredicate<BinaryRowData, Integer> partBucketFilter;
-
-    public TableCompact(FileStoreScan scan, FileStoreWrite<?> write, RowType 
partitionType) {
-        this.scan = scan;
-        this.write = write;
-        this.partitionType = partitionType;
-    }
-
-    public TableCompact withPartitions(Map<String, String> partitionSpec) {
-        scan.withPartitionFilter(PredicateConverter.fromMap(partitionSpec, 
partitionType));
-        return this;
-    }
-
-    public TableCompact withFilter(BiPredicate<BinaryRowData, Integer> 
partBucketFilter) {
-        this.partBucketFilter = partBucketFilter;
-        return this;
-    }
-
-    public List<FileCommittable> compact() {
-        List<FileCommittable> committables = new ArrayList<>();
-        scan.plan()
-                .groupByPartFiles()
-                .forEach(
-                        (partition, buckets) ->
-                                buckets.forEach(
-                                        (bucket, files) ->
-                                                doCompact(partition, bucket, 
files)
-                                                        
.ifPresent(committables::add)));
-        return committables;
-    }
-
-    private Optional<FileCommittable> doCompact(
-            BinaryRowData partition, int bucket, List<DataFileMeta> files) {
-        if (!partBucketFilter.test(partition, bucket)) {
-            return Optional.empty();
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                    "Do compaction for partition {}, bucket {}",
-                    FileStorePathFactory.getPartitionComputer(
-                                    partitionType,
-                                    
FileStorePathFactory.PARTITION_DEFAULT_NAME.defaultValue())
-                            .generatePartValues(partition),
-                    bucket);
-        }
-        try {
-            CompactResult result =
-                    write.createCompactWriter(partition.copy(), bucket, 
files).call();
-            FileCommittable committable =
-                    new FileCommittable(
-                            partition,
-                            bucket,
-                            NewFilesIncrement.emptyIncrement(),
-                            new CompactIncrement(
-                                    result.before(), result.after(), 
Collections.emptyList()));
-            return Optional.of(committable);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 0c026fbf..647d74e8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.table.sink;
 
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
 
 import java.util.List;
 
@@ -37,6 +38,8 @@ public interface TableWrite {
 
     SinkRecord write(RowData rowData) throws Exception;
 
+    void compact(BinaryRowData partition, int bucket) throws Exception;
+
     List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception;
 
     void close() throws Exception;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
index 7a938162..83abf141 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.table.sink;
 
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 
 import java.util.List;
@@ -68,6 +69,11 @@ public class TableWriteImpl<T> implements TableWrite {
         return record;
     }
 
+    @Override
+    public void compact(BinaryRowData partition, int bucket) throws Exception {
+        write.compact(partition, bucket);
+    }
+
     @Override
     public List<FileCommittable> prepareCommit(boolean endOfInput) throws 
Exception {
         return write.prepareCommit(endOfInput);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 42d952e3..e55d225a 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -32,12 +32,12 @@ import 
org.apache.flink.table.store.file.manifest.ManifestList;
 import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
 import org.apache.flink.table.store.file.memory.MemoryOwner;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.operation.AbstractFileStoreWrite;
 import org.apache.flink.table.store.file.operation.FileStoreCommit;
 import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
 import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
@@ -173,7 +173,7 @@ public class TestFileStore extends KeyValueFileStore {
             String identifier,
             BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
             throws Exception {
-        FileStoreWrite<KeyValue> write = newWrite();
+        AbstractFileStoreWrite<KeyValue> write = newWrite();
         Map<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> writers = new 
HashMap<>();
         for (KeyValue kv : kvs) {
             BinaryRowData partition = partitionCalculator.apply(kv);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManagerTest.java
index 6449ac87..b8c6e99b 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManagerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManagerTest.java
@@ -207,6 +207,7 @@ public class AppendOnlyCompactManagerTest {
                         minFileNum,
                         maxFileNum,
                         targetFileSize,
+                        null, // not used
                         null // not used
                         );
         Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
index 31856d35..7accc95c 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
@@ -318,7 +318,8 @@ public class AppendOnlyWriterTest {
                                         compactBefore.isEmpty()
                                                 ? Collections.emptyList()
                                                 : Collections.singletonList(
-                                                        
generateCompactAfter(compactBefore))),
+                                                        
generateCompactAfter(compactBefore)),
+                                pathFactory),
                         forceCompact,
                         pathFactory),
                 toCompact);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
index 7e7f8482..54ddf245 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
@@ -67,7 +67,8 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         10,
                         SCHEMA,
                         0,
-                        new AppendOnlyCompactManager(null, toCompact, 4, 10, 
10, null), // not used
+                        new AppendOnlyCompactManager(
+                                null, toCompact, 4, 10, 10, null, 
dataFilePathFactory), // not used
                         false,
                         dataFilePathFactory);
         appendOnlyWriter.write(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
index e964fe01..ace9965b 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -204,7 +204,7 @@ public class MergeTreeCompactManagerTest {
                         2,
                         Integer.MAX_VALUE,
                         testRewriter(expectedDropDelete));
-        manager.triggerCompaction();
+        manager.triggerCompaction(false);
         manager.getCompactionResult(true);
         List<LevelMinMax> outputs =
                 
levels.allFiles().stream().map(LevelMinMax::new).collect(Collectors.toList());
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 5483ca16..a08bda5e 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -61,7 +61,7 @@ public class TestCommitThread extends Thread {
     private final Map<BinaryRowData, List<KeyValue>> result;
     private final Map<BinaryRowData, MergeTreeWriter> writers;
 
-    private final FileStoreWrite<KeyValue> write;
+    private final AbstractFileStoreWrite<KeyValue> write;
     private final FileStoreCommit commit;
 
     public TestCommitThread(

Reply via email to