This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new c44ed5bb [FLINK-28098] Refactor table store compactor
c44ed5bb is described below

commit c44ed5bb50f06eb94f18e8ae5d8c09ee2b909d10
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 17 14:36:01 2022 +0800

    [FLINK-28098] Refactor table store compactor
    
    This closes #160
---
 .../table/store/connector/sink/StoreSink.java      |  23 ++-
 .../store/connector/sink/StoreSinkCompactor.java   | 114 ++-----------
 .../table/store/connector/sink/StoreSinkTest.java  |   8 -
 .../table/store/connector/sink/TestFileStore.java  |  10 +-
 .../store/connector/sink/TestFileStoreTable.java   |   5 +-
 .../store/file/mergetree/MergeTreeWriter.java      |   6 +-
 .../file/mergetree/compact/CompactManager.java     | 179 +-------------------
 .../file/mergetree/compact/CompactResult.java      |  31 ++++
 .../file/mergetree/compact/CompactRewriter.java    |  32 ++++
 .../store/file/mergetree/compact/CompactTask.java  | 182 +++++++++++++++++++++
 .../file/operation/AppendOnlyFileStoreWrite.java   |  11 +-
 .../table/store/file/operation/FileStoreWrite.java |  18 +-
 .../file/operation/KeyValueFileStoreWrite.java     |  61 +++----
 .../table/store/file/writer/CompactWriter.java     |  81 ---------
 .../table/store/table/AbstractFileStoreTable.java  |   9 +
 .../flink/table/store/table/FileStoreTable.java    |   4 +-
 .../flink/table/store/table/sink/TableCompact.java | 111 +++++++++++++
 .../table/store/file/mergetree/MergeTreeTest.java  |   3 +-
 .../file/mergetree/compact/CompactManagerTest.java |   2 +-
 19 files changed, 469 insertions(+), 421 deletions(-)

diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index e24a81c5..eb60b409 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.store.log.LogSinkProvider;
 import org.apache.flink.table.store.log.LogWriteCallback;
 import org.apache.flink.table.store.table.FileStoreTable;
 import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.TableCompact;
 
 import javax.annotation.Nullable;
 
@@ -45,6 +46,7 @@ import java.io.UncheckedIOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.Callable;
 import java.util.function.Consumer;
 
@@ -92,19 +94,11 @@ public class StoreSink<WriterStateT, LogCommT>
         return restoreWriter(initContext, null);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public StatefulPrecommittingSinkWriter<WriterStateT> restoreWriter(
             InitContext initContext, Collection<WriterStateT> states) throws 
IOException {
         if (compactionTask) {
-            return (StatefulPrecommittingSinkWriter<WriterStateT>)
-                    new StoreSinkCompactor(
-                            initContext.getSubtaskId(),
-                            initContext.getNumberOfParallelSubtasks(),
-                            table.store(),
-                            compactPartitionSpec == null
-                                    ? Collections.emptyMap()
-                                    : compactPartitionSpec);
+            return createCompactWriter(initContext);
         }
         SinkWriter<SinkRecord> logWriter = null;
         LogWriteCallback logCallback = null;
@@ -123,6 +117,17 @@ public class StoreSink<WriterStateT, LogCommT>
                 table.newWrite().withOverwrite(overwritePartition != null), 
logWriter, logCallback);
     }
 
+    private StoreSinkCompactor<WriterStateT> createCompactWriter(InitContext 
initContext) {
+        int task = initContext.getSubtaskId();
+        int numTask = initContext.getNumberOfParallelSubtasks();
+        TableCompact tableCompact = table.newCompact();
+        tableCompact.withPartitions(
+                compactPartitionSpec == null ? Collections.emptyMap() : 
compactPartitionSpec);
+        tableCompact.withFilter(
+                (partition, bucket) -> task == 
Math.abs(Objects.hash(partition, bucket) % numTask));
+        return new StoreSinkCompactor<>(tableCompact);
+    }
+
     @Override
     public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
         return logSinkProvider == null
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
index a0f7e1a3..4a79a8c6 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
@@ -19,61 +19,30 @@
 package org.apache.flink.table.store.connector.sink;
 
 import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
-import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.predicate.PredicateConverter;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.table.sink.FileCommittable;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.table.store.table.sink.TableCompact;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
 
 /** A dedicated {@link SinkWriter} for manual triggered compaction. */
-public class StoreSinkCompactor implements 
StatefulPrecommittingSinkWriter<Void> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(StoreSinkCompactor.class);
+public class StoreSinkCompactor<WriterStateT>
+        implements StatefulPrecommittingSinkWriter<WriterStateT> {
 
-    private final int subTaskId;
-    private final int numOfParallelInstances;
+    private final TableCompact tableCompact;
 
-    private final FileStore<?> fileStore;
-    private final Map<String, String> partitionSpec;
-    private final ExecutorService compactExecutor;
-
-    public StoreSinkCompactor(
-            int subTaskId,
-            int numOfParallelInstances,
-            FileStore<?> fileStore,
-            Map<String, String> partitionSpec) {
-        this.subTaskId = subTaskId;
-        this.numOfParallelInstances = numOfParallelInstances;
-        this.fileStore = fileStore;
-        this.partitionSpec = partitionSpec;
-        this.compactExecutor =
-                Executors.newSingleThreadScheduledExecutor(
-                        new ExecutorThreadFactory(
-                                String.format("compaction-subtask-%d", 
subTaskId)));
+    public StoreSinkCompactor(TableCompact tableCompact) {
+        this.tableCompact = tableCompact;
     }
 
     @Override
-    public void flush(boolean endOfInput) {}
+    public void flush(boolean endOfInput) {
+        // nothing to flush
+    }
 
     @Override
     public void write(RowData element, Context context) throws IOException, 
InterruptedException {
@@ -82,68 +51,19 @@ public class StoreSinkCompactor implements 
StatefulPrecommittingSinkWriter<Void>
 
     @Override
     public void close() throws Exception {
-        compactExecutor.shutdownNow();
+        // nothing to close
     }
 
     @Override
-    public List<Void> snapshotState(long checkpointId) {
+    public List<WriterStateT> snapshotState(long checkpointId) {
+        // nothing to snapshot
         return Collections.emptyList();
     }
 
     @Override
-    public Collection<Committable> prepareCommit() throws IOException {
-        List<Committable> committables = new ArrayList<>();
-
-        FileStoreScan.Plan plan =
-                fileStore
-                        .newScan()
-                        .withPartitionFilter(
-                                PredicateConverter.CONVERTER.fromMap(
-                                        partitionSpec, 
fileStore.partitionType()))
-                        .plan();
-        for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>> 
partEntry :
-                plan.groupByPartFiles().entrySet()) {
-            BinaryRowData partition = partEntry.getKey();
-            for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
-                    partEntry.getValue().entrySet()) {
-                int bucket = bucketEntry.getKey();
-                List<DataFileMeta> restoredFiles = bucketEntry.getValue();
-                if (select(partition, bucket)) {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug(
-                                "Assign partition {}, bucket {} to subtask {}",
-                                FileStorePathFactory.getPartitionComputer(
-                                                fileStore.partitionType(),
-                                                
FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
-                                                        .defaultValue())
-                                        .generatePartValues(partition),
-                                bucket,
-                                subTaskId);
-                    }
-                    RecordWriter<?> writer =
-                            fileStore
-                                    .newWrite()
-                                    .createCompactWriter(
-                                            partition.copy(),
-                                            bucket,
-                                            compactExecutor,
-                                            restoredFiles);
-                    FileCommittable committable;
-                    try {
-                        committable =
-                                new FileCommittable(
-                                        partition, bucketEntry.getKey(), 
writer.prepareCommit());
-                        committables.add(new 
Committable(Committable.Kind.FILE, committable));
-                    } catch (Exception e) {
-                        throw new IOException(e);
-                    }
-                }
-            }
-        }
-        return committables;
-    }
-
-    private boolean select(BinaryRowData partition, int bucket) {
-        return subTaskId == Math.abs(Objects.hash(partition, bucket) % 
numOfParallelInstances);
+    public Collection<Committable> prepareCommit() {
+        return tableCompact.compact().stream()
+                .map(c -> new Committable(Committable.Kind.FILE, c))
+                .collect(Collectors.toList());
     }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index edd19a5a..4efcc09f 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -200,14 +200,6 @@ public class StoreSinkTest {
                 .isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
     }
 
-    @Test
-    public void testCreateCompactor() throws Exception {
-        StoreSink<?, ?> sink =
-                new StoreSink<>(identifier, table, true, null, () -> lock, 
null, null);
-        StatefulPrecommittingSinkWriter<?> writer = 
sink.createWriter(initContext());
-        assertThat(writer).isInstanceOf(StoreSinkCompactor.class);
-    }
-
     private void writeAndAssert(StoreSink<?, ?> sink) throws Exception {
         writeAndCommit(
                 sink,
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 109c692c..d048eff6 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestCommittable;
 import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.operation.FileStoreCommit;
 import org.apache.flink.table.store.file.operation.FileStoreExpire;
 import org.apache.flink.table.store.file.operation.FileStoreRead;
@@ -33,10 +34,11 @@ import 
org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.stats.StatsTestUtils;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.CompactWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,6 +46,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.stream.Collectors;
 
@@ -89,11 +92,10 @@ public class TestFileStore implements FileStore<KeyValue> {
             }
 
             @Override
-            public CompactWriter createCompactWriter(
+            public Callable<CompactResult> createCompactWriter(
                     BinaryRowData partition,
                     int bucket,
-                    ExecutorService compactExecutor,
-                    List<DataFileMeta> restoreFiles) {
+                    @Nullable List<DataFileMeta> compactFiles) {
                 throw new UnsupportedOperationException();
             }
         };
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
index 49a387ac..6003d55d 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.store.table.sink.AbstractTableWrite;
 import org.apache.flink.table.store.table.sink.SinkRecord;
 import org.apache.flink.table.store.table.sink.SinkRecordConverter;
 import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableCompact;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
@@ -100,7 +101,7 @@ public class TestFileStoreTable implements FileStoreTable {
     }
 
     @Override
-    public TestFileStore store() {
-        return store;
+    public TableCompact newCompact() {
+        throw new UnsupportedOperationException();
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 677b02ff..6dd1d04a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.store.file.KeyValue;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.util.CloseableIterator;
@@ -153,7 +154,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue> {
         return increment;
     }
 
-    private void updateCompactResult(CompactManager.CompactResult result) {
+    private void updateCompactResult(CompactResult result) {
         Set<String> afterFiles =
                 
result.after().stream().map(DataFileMeta::fileName).collect(Collectors.toSet());
         for (DataFileMeta file : result.before()) {
@@ -182,8 +183,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue> {
     }
 
     private void finishCompaction(boolean blocking) throws Exception {
-        Optional<CompactManager.CompactResult> result =
-                compactManager.finishCompaction(levels, blocking);
+        Optional<CompactResult> result = 
compactManager.finishCompaction(levels, blocking);
         result.ifPresent(this::updateCompactResult);
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
index 133e2a12..294e54fd 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
@@ -19,25 +19,18 @@
 package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.Levels;
-import org.apache.flink.table.store.file.mergetree.SortedRun;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Comparator;
-import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.singletonList;
-
 /** Manager to submit compaction task. */
 public class CompactManager {
 
@@ -51,7 +44,7 @@ public class CompactManager {
 
     private final long minFileSize;
 
-    private final Rewriter rewriter;
+    private final CompactRewriter rewriter;
 
     private Future<CompactResult> taskFuture;
 
@@ -60,7 +53,7 @@ public class CompactManager {
             CompactStrategy strategy,
             Comparator<RowData> keyComparator,
             long minFileSize,
-            Rewriter rewriter) {
+            CompactRewriter rewriter) {
         this.executor = executor;
         this.minFileSize = minFileSize;
         this.keyComparator = keyComparator;
@@ -113,8 +106,8 @@ public class CompactManager {
                         });
     }
 
-    public void submitCompaction(CompactUnit unit, boolean dropDelete) {
-        CompactTask task = new CompactTask(unit, dropDelete);
+    private void submitCompaction(CompactUnit unit, boolean dropDelete) {
+        CompactTask task = new CompactTask(keyComparator, minFileSize, 
rewriter, unit, dropDelete);
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Pick these files (name, level, size) for compaction: {}",
@@ -137,7 +130,7 @@ public class CompactManager {
         return result;
     }
 
-    public Optional<CompactResult> finishCompaction(boolean blocking)
+    private Optional<CompactResult> finishCompaction(boolean blocking)
             throws ExecutionException, InterruptedException {
         if (taskFuture != null) {
             if (blocking || taskFuture.isDone()) {
@@ -148,166 +141,4 @@ public class CompactManager {
         }
         return Optional.empty();
     }
-
-    /** Rewrite sections to the files. */
-    @FunctionalInterface
-    public interface Rewriter {
-
-        List<DataFileMeta> rewrite(
-                int outputLevel, boolean dropDelete, List<List<SortedRun>> 
sections)
-                throws Exception;
-    }
-
-    /** Result of compaction. */
-    public interface CompactResult {
-
-        List<DataFileMeta> before();
-
-        List<DataFileMeta> after();
-    }
-
-    // 
--------------------------------------------------------------------------------------------
-    // Internal classes
-    // 
--------------------------------------------------------------------------------------------
-
-    /** Compaction task. */
-    private class CompactTask implements Callable<CompactResult> {
-
-        private final int outputLevel;
-
-        private final List<List<SortedRun>> partitioned;
-
-        private final boolean dropDelete;
-
-        // metrics
-        private long rewriteInputSize;
-        private long rewriteOutputSize;
-        private int rewriteFilesNum;
-        private int upgradeFilesNum;
-
-        private CompactTask(CompactUnit unit, boolean dropDelete) {
-            this.outputLevel = unit.outputLevel();
-            this.partitioned = new IntervalPartition(unit.files(), 
keyComparator).partition();
-            this.dropDelete = dropDelete;
-
-            this.rewriteInputSize = 0;
-            this.rewriteOutputSize = 0;
-            this.rewriteFilesNum = 0;
-            this.upgradeFilesNum = 0;
-        }
-
-        @Override
-        public CompactResult call() throws Exception {
-            return compact();
-        }
-
-        private CompactResult compact() throws Exception {
-            long startMillis = System.currentTimeMillis();
-
-            List<List<SortedRun>> candidate = new ArrayList<>();
-            List<DataFileMeta> before = new ArrayList<>();
-            List<DataFileMeta> after = new ArrayList<>();
-
-            // Checking the order and compacting adjacent and contiguous files
-            // Note: can't skip an intermediate file to compact, this will 
destroy the overall
-            // orderliness
-            for (List<SortedRun> section : partitioned) {
-                if (section.size() > 1) {
-                    candidate.add(section);
-                } else {
-                    SortedRun run = section.get(0);
-                    // No overlapping:
-                    // We can just upgrade the large file and just change the 
level instead of
-                    // rewriting it
-                    // But for small files, we will try to compact it
-                    for (DataFileMeta file : run.files()) {
-                        if (file.fileSize() < minFileSize) {
-                            // Smaller files are rewritten along with the 
previous files
-                            
candidate.add(singletonList(SortedRun.fromSingle(file)));
-                        } else {
-                            // Large file appear, rewrite previous and upgrade 
it
-                            rewrite(candidate, before, after);
-                            upgrade(file, before, after);
-                        }
-                    }
-                }
-            }
-            rewrite(candidate, before, after);
-
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(
-                        "Done compacting {} files to {} files in {}ms. "
-                                + "Rewrite input size = {}, output size = {}, 
rewrite file num = {}, upgrade file num = {}",
-                        before.size(),
-                        after.size(),
-                        System.currentTimeMillis() - startMillis,
-                        rewriteInputSize,
-                        rewriteOutputSize,
-                        rewriteFilesNum,
-                        upgradeFilesNum);
-            }
-
-            return result(before, after);
-        }
-
-        private void upgrade(
-                DataFileMeta file, List<DataFileMeta> before, 
List<DataFileMeta> after) {
-            if (file.level() != outputLevel) {
-                before.add(file);
-                after.add(file.upgrade(outputLevel));
-                upgradeFilesNum++;
-            }
-        }
-
-        private void rewrite(
-                List<List<SortedRun>> candidate,
-                List<DataFileMeta> before,
-                List<DataFileMeta> after)
-                throws Exception {
-            if (candidate.isEmpty()) {
-                return;
-            }
-            if (candidate.size() == 1) {
-                List<SortedRun> section = candidate.get(0);
-                if (section.size() == 0) {
-                    return;
-                } else if (section.size() == 1) {
-                    for (DataFileMeta file : section.get(0).files()) {
-                        upgrade(file, before, after);
-                    }
-                    candidate.clear();
-                    return;
-                }
-            }
-            candidate.forEach(
-                    runs ->
-                            runs.forEach(
-                                    run -> {
-                                        before.addAll(run.files());
-                                        rewriteInputSize +=
-                                                run.files().stream()
-                                                        
.mapToLong(DataFileMeta::fileSize)
-                                                        .sum();
-                                        rewriteFilesNum += run.files().size();
-                                    }));
-            List<DataFileMeta> result = rewriter.rewrite(outputLevel, 
dropDelete, candidate);
-            after.addAll(result);
-            rewriteOutputSize += 
result.stream().mapToLong(DataFileMeta::fileSize).sum();
-            candidate.clear();
-        }
-
-        private CompactResult result(List<DataFileMeta> before, 
List<DataFileMeta> after) {
-            return new CompactResult() {
-                @Override
-                public List<DataFileMeta> before() {
-                    return before;
-                }
-
-                @Override
-                public List<DataFileMeta> after() {
-                    return after;
-                }
-            };
-        }
-    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java
new file mode 100644
index 00000000..e5b6e826
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import java.util.List;
+
+/** Result of compaction. */
+public interface CompactResult {
+
+    List<DataFileMeta> before();
+
+    List<DataFileMeta> after();
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
new file mode 100644
index 00000000..dcb267a6
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+
+import java.util.List;
+
+/** Rewrite sections to the files. */
+@FunctionalInterface
+public interface CompactRewriter {
+
+    List<DataFileMeta> rewrite(int outputLevel, boolean dropDelete, 
List<List<SortedRun>> sections)
+            throws Exception;
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java
new file mode 100644
index 00000000..cac1000b
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import static java.util.Collections.singletonList;
+
+/** Compaction task. */
+public class CompactTask implements Callable<CompactResult> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactTask.class);
+
+    private final long minFileSize;
+    private final CompactRewriter rewriter;
+    private final int outputLevel;
+
+    private final List<List<SortedRun>> partitioned;
+
+    private final boolean dropDelete;
+
+    // metrics
+    private long rewriteInputSize;
+    private long rewriteOutputSize;
+    private int rewriteFilesNum;
+    private int upgradeFilesNum;
+
+    public CompactTask(
+            Comparator<RowData> keyComparator,
+            long minFileSize,
+            CompactRewriter rewriter,
+            CompactUnit unit,
+            boolean dropDelete) {
+        this.minFileSize = minFileSize;
+        this.rewriter = rewriter;
+        this.outputLevel = unit.outputLevel();
+        this.partitioned = new IntervalPartition(unit.files(), 
keyComparator).partition();
+        this.dropDelete = dropDelete;
+
+        this.rewriteInputSize = 0;
+        this.rewriteOutputSize = 0;
+        this.rewriteFilesNum = 0;
+        this.upgradeFilesNum = 0;
+    }
+
+    @Override
+    public CompactResult call() throws Exception {
+        return compact();
+    }
+
+    private CompactResult compact() throws Exception {
+        long startMillis = System.currentTimeMillis();
+
+        List<List<SortedRun>> candidate = new ArrayList<>();
+        List<DataFileMeta> before = new ArrayList<>();
+        List<DataFileMeta> after = new ArrayList<>();
+
+        // Checking the order and compacting adjacent and contiguous files
+        // Note: can't skip an intermediate file to compact, this will destroy 
the overall
+        // orderliness
+        for (List<SortedRun> section : partitioned) {
+            if (section.size() > 1) {
+                candidate.add(section);
+            } else {
+                SortedRun run = section.get(0);
+                // No overlapping:
+                // We can just upgrade the large file and just change the 
level instead of
+                // rewriting it
+                // But for small files, we will try to compact it
+                for (DataFileMeta file : run.files()) {
+                    if (file.fileSize() < minFileSize) {
+                        // Smaller files are rewritten along with the previous 
files
+                        
candidate.add(singletonList(SortedRun.fromSingle(file)));
+                    } else {
+                        // Large file appear, rewrite previous and upgrade it
+                        rewrite(candidate, before, after);
+                        upgrade(file, before, after);
+                    }
+                }
+            }
+        }
+        rewrite(candidate, before, after);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Done compacting {} files to {} files in {}ms. "
+                            + "Rewrite input size = {}, output size = {}, 
rewrite file num = {}, upgrade file num = {}",
+                    before.size(),
+                    after.size(),
+                    System.currentTimeMillis() - startMillis,
+                    rewriteInputSize,
+                    rewriteOutputSize,
+                    rewriteFilesNum,
+                    upgradeFilesNum);
+        }
+
+        return result(before, after);
+    }
+
+    private void upgrade(DataFileMeta file, List<DataFileMeta> before, 
List<DataFileMeta> after) {
+        if (file.level() != outputLevel) {
+            before.add(file);
+            after.add(file.upgrade(outputLevel));
+            upgradeFilesNum++;
+        }
+    }
+
+    private void rewrite(
+            List<List<SortedRun>> candidate, List<DataFileMeta> before, 
List<DataFileMeta> after)
+            throws Exception {
+        if (candidate.isEmpty()) {
+            return;
+        }
+        if (candidate.size() == 1) {
+            List<SortedRun> section = candidate.get(0);
+            if (section.size() == 0) {
+                return;
+            } else if (section.size() == 1) {
+                for (DataFileMeta file : section.get(0).files()) {
+                    upgrade(file, before, after);
+                }
+                candidate.clear();
+                return;
+            }
+        }
+        candidate.forEach(
+                runs ->
+                        runs.forEach(
+                                run -> {
+                                    before.addAll(run.files());
+                                    rewriteInputSize +=
+                                            run.files().stream()
+                                                    
.mapToLong(DataFileMeta::fileSize)
+                                                    .sum();
+                                    rewriteFilesNum += run.files().size();
+                                }));
+        List<DataFileMeta> result = rewriter.rewrite(outputLevel, dropDelete, 
candidate);
+        after.addAll(result);
+        rewriteOutputSize += 
result.stream().mapToLong(DataFileMeta::fileSize).sum();
+        candidate.clear();
+    }
+
+    private CompactResult result(List<DataFileMeta> before, List<DataFileMeta> 
after) {
+        return new CompactResult() {
+            @Override
+            public List<DataFileMeta> before() {
+                return before;
+            }
+
+            @Override
+            public List<DataFileMeta> after() {
+                return after;
+            }
+        };
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 76a780f9..29cf3177 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -24,12 +24,16 @@ import 
org.apache.flink.table.store.file.data.AppendOnlyWriter;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
 import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
 /** {@link FileStoreWrite} for {@link 
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
@@ -71,11 +75,8 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<RowData> {
     }
 
     @Override
-    public RecordWriter<RowData> createCompactWriter(
-            BinaryRowData partition,
-            int bucket,
-            ExecutorService compactExecutor,
-            List<DataFileMeta> restoredFiles) {
+    public Callable<CompactResult> createCompactWriter(
+            BinaryRowData partition, int bucket, @Nullable List<DataFileMeta> 
compactFiles) {
         throw new UnsupportedOperationException(
                 "Currently append only write mode does not support 
compaction.");
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 930bce95..862e7596 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -20,9 +20,13 @@ package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -40,10 +44,12 @@ public interface FileStoreWrite<T> {
     RecordWriter<T> createEmptyWriter(
             BinaryRowData partition, int bucket, ExecutorService 
compactExecutor);
 
-    /** Create a compact {@link RecordWriter} from partition, bucket and 
restore files. */
-    RecordWriter<T> createCompactWriter(
-            BinaryRowData partition,
-            int bucket,
-            ExecutorService compactExecutor,
-            List<DataFileMeta> restoredFiles);
+    /**
+     * Create a {@link Callable} compactor from partition, bucket.
+     *
+     * @param compactFiles input files of compaction. When it is null, will 
automatically read all
+     *     files of the current bucket.
+     */
+    Callable<CompactResult> createCompactWriter(
+            BinaryRowData partition, int bucket, @Nullable List<DataFileMeta> 
compactFiles);
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 86d68ef5..174fe255 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -31,7 +31,10 @@ import 
org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
 import org.apache.flink.table.store.file.mergetree.SortBufferMemTable;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
+import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
+import org.apache.flink.table.store.file.mergetree.compact.CompactTask;
 import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
@@ -39,14 +42,15 @@ import 
org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.CompactWriter;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Optional;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
@@ -102,20 +106,17 @@ public class KeyValueFileStoreWrite extends 
AbstractFileStoreWrite<KeyValue> {
     }
 
     @Override
-    public RecordWriter<KeyValue> createCompactWriter(
-            BinaryRowData partition,
-            int bucket,
-            ExecutorService compactExecutor,
-            List<DataFileMeta> restoredFiles) {
-        Levels levels = new Levels(keyComparatorSupplier.get(), restoredFiles, 
options.numLevels);
-        return new CompactWriter(
-                CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1, 
levels.levelSortedRuns()),
-                createCompactManager(
-                        partition,
-                        bucket,
-                        (numLevels, runs) ->
-                                
Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs)),
-                        compactExecutor));
+    public Callable<CompactResult> createCompactWriter(
+            BinaryRowData partition, int bucket, @Nullable List<DataFileMeta> 
compactFiles) {
+        if (compactFiles == null) {
+            compactFiles = scanExistingFileMetas(partition, bucket);
+        }
+        Comparator<RowData> keyComparator = keyComparatorSupplier.get();
+        CompactRewriter rewriter = compactRewriter(partition, bucket, 
keyComparator);
+        Levels levels = new Levels(keyComparator, compactFiles, 
options.numLevels);
+        CompactUnit unit =
+                CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1, 
levels.levelSortedRuns());
+        return new CompactTask(keyComparator, options.targetFileSize, 
rewriter, unit, true);
     }
 
     private RecordWriter<KeyValue> createMergeTreeWriter(
@@ -153,20 +154,24 @@ public class KeyValueFileStoreWrite extends 
AbstractFileStoreWrite<KeyValue> {
             int bucket,
             CompactStrategy compactStrategy,
             ExecutorService compactExecutor) {
-        DataFileWriter dataFileWriter = 
dataFileWriterFactory.create(partition, bucket);
         Comparator<RowData> keyComparator = keyComparatorSupplier.get();
-        CompactManager.Rewriter rewriter =
-                (outputLevel, dropDelete, sections) ->
-                        dataFileWriter.write(
-                                new RecordReaderIterator<>(
-                                        new MergeTreeReader(
-                                                sections,
-                                                dropDelete,
-                                                
dataFileReaderFactory.create(partition, bucket),
-                                                keyComparator,
-                                                mergeFunction.copy())),
-                                outputLevel);
+        CompactRewriter rewriter = compactRewriter(partition, bucket, 
keyComparator);
         return new CompactManager(
                 compactExecutor, compactStrategy, keyComparator, 
options.targetFileSize, rewriter);
     }
+
+    private CompactRewriter compactRewriter(
+            BinaryRowData partition, int bucket, Comparator<RowData> 
keyComparator) {
+        DataFileWriter dataFileWriter = 
dataFileWriterFactory.create(partition, bucket);
+        return (outputLevel, dropDelete, sections) ->
+                dataFileWriter.write(
+                        new RecordReaderIterator<>(
+                                new MergeTreeReader(
+                                        sections,
+                                        dropDelete,
+                                        
dataFileReaderFactory.create(partition, bucket),
+                                        keyComparator,
+                                        mergeFunction.copy())),
+                        outputLevel);
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
deleted file mode 100644
index 884af4e8..00000000
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.writer;
-
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
-import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-/**
- * A {@link RecordWriter} implementation that only perform compaction on 
existing records and does
- * not generate new records.
- */
-public class CompactWriter implements RecordWriter<KeyValue> {
-
-    private final CompactUnit unit;
-    private final CompactManager compactManager;
-
-    public CompactWriter(CompactUnit unit, CompactManager compactManager) {
-        this.unit = unit;
-        this.compactManager = compactManager;
-    }
-
-    @Override
-    public Increment prepareCommit() throws IOException, InterruptedException {
-        List<DataFileMeta> compactBefore = new ArrayList<>();
-        List<DataFileMeta> compactAfter = new ArrayList<>();
-        if (compactManager.isCompactionFinished()) {
-            compactManager.submitCompaction(unit, true);
-            try {
-                compactManager
-                        .finishCompaction(true)
-                        .ifPresent(
-                                result -> {
-                                    compactBefore.addAll(result.before());
-                                    compactAfter.addAll(result.after());
-                                });
-                return Increment.forCompact(compactBefore, compactAfter);
-            } catch (ExecutionException e) {
-                throw new IOException(e.getCause());
-            }
-        }
-        throw new IllegalStateException("Compact manager should have finished 
previous task.");
-    }
-
-    @Override
-    public List<DataFileMeta> close() throws Exception {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public void write(KeyValue kv) throws Exception {
-        // nothing to write
-    }
-
-    @Override
-    public void sync() throws Exception {}
-}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index c7ea9765..a993ee2d 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableCompact;
 
 /** Abstract {@link FileStoreTable}. */
 public abstract class AbstractFileStoreTable implements FileStoreTable {
@@ -35,6 +37,8 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
         this.schema = schema;
     }
 
+    protected abstract FileStore<?> store();
+
     @Override
     public String name() {
         return name;
@@ -54,4 +58,9 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
     public TableCommit newCommit() {
         return new TableCommit(store().newCommit(), store().newExpire());
     }
+
+    @Override
+    public TableCompact newCompact() {
+        return new TableCompact(store().newScan(), store().newWrite(), 
store().partitionType());
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index f16398be..82271f2c 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.table.store.table;
 
-import org.apache.flink.table.store.file.FileStore;
 import org.apache.flink.table.store.file.schema.Schema;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableCompact;
 import org.apache.flink.table.store.table.sink.TableWrite;
 import org.apache.flink.table.store.table.source.TableRead;
 import org.apache.flink.table.store.table.source.TableScan;
@@ -48,5 +48,5 @@ public interface FileStoreTable extends Serializable {
 
     TableCommit newCommit();
 
-    FileStore store();
+    TableCompact newCompact();
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
new file mode 100644
index 00000000..9632d6dd
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiPredicate;
+
+/** An abstraction layer above {@link FileStoreWrite#createCompactWriter} to 
provide compaction. */
+public class TableCompact {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TableCompact.class);
+
+    private final FileStoreScan scan;
+    private final FileStoreWrite<?> write;
+    private final RowType partitionType;
+
+    private BiPredicate<BinaryRowData, Integer> partBucketFilter;
+
+    public TableCompact(FileStoreScan scan, FileStoreWrite<?> write, RowType 
partitionType) {
+        this.scan = scan;
+        this.write = write;
+        this.partitionType = partitionType;
+    }
+
+    public TableCompact withPartitions(Map<String, String> partitionSpec) {
+        scan.withPartitionFilter(
+                PredicateConverter.CONVERTER.fromMap(partitionSpec, 
partitionType));
+        return this;
+    }
+
+    public TableCompact withFilter(BiPredicate<BinaryRowData, Integer> 
partBucketFilter) {
+        this.partBucketFilter = partBucketFilter;
+        return this;
+    }
+
+    public List<FileCommittable> compact() {
+        List<FileCommittable> committables = new ArrayList<>();
+        scan.plan()
+                .groupByPartFiles()
+                .forEach(
+                        (partition, buckets) ->
+                                buckets.forEach(
+                                        (bucket, files) ->
+                                                doCompact(partition, bucket, 
files)
+                                                        
.ifPresent(committables::add)));
+        return committables;
+    }
+
+    private Optional<FileCommittable> doCompact(
+            BinaryRowData partition, int bucket, List<DataFileMeta> files) {
+        if (!partBucketFilter.test(partition, bucket)) {
+            return Optional.empty();
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Do compaction for partition {}, bucket {}",
+                    FileStorePathFactory.getPartitionComputer(
+                                    partitionType,
+                                    
FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
+                                            .defaultValue())
+                            .generatePartValues(partition),
+                    bucket);
+        }
+        try {
+            CompactResult result =
+                    write.createCompactWriter(partition.copy(), bucket, 
files).call();
+            FileCommittable committable =
+                    new FileCommittable(
+                            partition,
+                            bucket,
+                            Increment.forCompact(result.before(), 
result.after()));
+            return Optional.of(committable);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 16137dd3..e7607a12 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.format.FileFormat;
 import org.apache.flink.table.store.file.format.FlushingFileFormat;
 import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
@@ -286,7 +287,7 @@ public class MergeTreeTest {
                         options.maxSizeAmplificationPercent,
                         options.sizeRatio,
                         options.numSortedRunCompactionTrigger);
-        CompactManager.Rewriter rewriter =
+        CompactRewriter rewriter =
                 (outputLevel, dropDelete, sections) ->
                         dataFileWriter.write(
                                 new RecordReaderIterator<KeyValue>(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
index bffac73e..51a56305 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
@@ -230,7 +230,7 @@ public class CompactManagerTest {
         return (numLevels, runs) -> 
Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs));
     }
 
-    private CompactManager.Rewriter testRewriter(boolean expectedDropDelete) {
+    private CompactRewriter testRewriter(boolean expectedDropDelete) {
         return (outputLevel, dropDelete, sections) -> {
             assertThat(dropDelete).isEqualTo(expectedDropDelete);
             int minKey = Integer.MAX_VALUE;

Reply via email to