This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 9c9f5f82 [FLINK-29714] Merge TableWrite and TableCompact into one
interface
9c9f5f82 is described below
commit 9c9f5f823ce52a20eb417444b59baac3e96d6a75
Author: tsreaper <[email protected]>
AuthorDate: Tue Oct 25 16:03:22 2022 +0800
[FLINK-29714] Merge TableWrite and TableCompact into one interface
This closes #328
---
.../store/connector/sink/StoreCompactOperator.java | 71 ++++++++++---
.../file/append/AppendOnlyCompactManager.java | 43 ++++++--
.../table/store/file/append/AppendOnlyWriter.java | 14 ++-
...mpactManager.java => CompactFutureManager.java} | 29 +-----
.../table/store/file/compact/CompactManager.java | 60 +++--------
.../store/file/compact/NoopCompactManager.java | 27 +++--
.../store/file/mergetree/MergeTreeWriter.java | 13 ++-
.../file/mergetree/compact/CompactStrategy.java | 6 ++
.../mergetree/compact/MergeTreeCompactManager.java | 96 ++++++++++--------
.../file/operation/AbstractFileStoreWrite.java | 17 ++++
.../file/operation/AppendOnlyFileStoreWrite.java | 43 ++------
.../table/store/file/operation/FileStoreWrite.java | 32 ++----
.../file/operation/KeyValueFileStoreWrite.java | 74 +++++---------
.../flink/table/store/file/utils/RecordWriter.java | 3 +
.../table/store/table/AbstractFileStoreTable.java | 6 --
.../flink/table/store/table/FileStoreTable.java | 3 -
.../flink/table/store/table/sink/TableCompact.java | 112 ---------------------
.../flink/table/store/table/sink/TableWrite.java | 3 +
.../table/store/table/sink/TableWriteImpl.java | 6 ++
.../flink/table/store/file/TestFileStore.java | 4 +-
.../file/append/AppendOnlyCompactManagerTest.java | 1 +
.../store/file/append/AppendOnlyWriterTest.java | 3 +-
.../store/file/format/FileFormatSuffixTest.java | 3 +-
.../compact/MergeTreeCompactManagerTest.java | 2 +-
.../store/file/operation/TestCommitThread.java | 2 +-
25 files changed, 289 insertions(+), 384 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
index 8b4ded82..a703f3fd 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreCompactOperator.java
@@ -18,13 +18,21 @@
package org.apache.flink.table.store.connector.sink;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.table.FileStoreTable;
-import org.apache.flink.table.store.table.sink.TableCompact;
+import org.apache.flink.table.store.table.sink.TableWrite;
+import org.apache.flink.table.store.table.source.Split;
+import org.apache.flink.table.store.table.source.TableScan;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -33,11 +41,13 @@ import java.util.stream.Collectors;
/** A dedicated operator for manual triggered compaction. */
public class StoreCompactOperator extends PrepareCommitOperator {
- private final FileStoreTable table;
+ private static final Logger LOG =
LoggerFactory.getLogger(StoreCompactOperator.class);
+ private final FileStoreTable table;
@Nullable private final Map<String, String> compactPartitionSpec;
- private TableCompact compact;
+ private TableScan scan;
+ private TableWrite write;
public StoreCompactOperator(
FileStoreTable table, @Nullable Map<String, String>
compactPartitionSpec) {
@@ -48,20 +58,53 @@ public class StoreCompactOperator extends
PrepareCommitOperator {
@Override
public void open() throws Exception {
super.open();
- int task = getRuntimeContext().getIndexOfThisSubtask();
- int numTask = getRuntimeContext().getNumberOfParallelSubtasks();
- compact = table.newCompact();
- compact.withPartitions(
- compactPartitionSpec == null ? Collections.emptyMap() :
compactPartitionSpec);
- compact.withFilter(
- (partition, bucket) -> task ==
Math.abs(Objects.hash(partition, bucket) % numTask));
+
+ scan = table.newScan();
+ if (compactPartitionSpec != null) {
+ scan.withFilter(
+ PredicateConverter.fromMap(
+ compactPartitionSpec,
table.schema().logicalPartitionType()));
+ }
+
+ write = table.newWrite();
}
@Override
protected List<Committable> prepareCommit(boolean endOfInput, long
checkpointId)
throws IOException {
- return compact.compact().stream()
- .map(c -> new Committable(checkpointId, Committable.Kind.FILE,
c))
- .collect(Collectors.toList());
+ int task = getRuntimeContext().getIndexOfThisSubtask();
+ int numTask = getRuntimeContext().getNumberOfParallelSubtasks();
+
+ for (Split split : scan.plan().splits) {
+ BinaryRowData partition = split.partition();
+ int bucket = split.bucket();
+ if (Math.abs(Objects.hash(partition, bucket)) % numTask != task) {
+ continue;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ RowType partitionType = table.schema().logicalPartitionType();
+ LOG.debug(
+ "Do compaction for partition {}, bucket {}",
+ FileStorePathFactory.getPartitionComputer(
+ partitionType,
+
FileStorePathFactory.PARTITION_DEFAULT_NAME.defaultValue())
+ .generatePartValues(partition),
+ bucket);
+ }
+ try {
+ write.compact(partition, bucket);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ try {
+ return write.prepareCommit(true).stream()
+ .map(c -> new Committable(checkpointId,
Committable.Kind.FILE, c))
+ .collect(Collectors.toList());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
index c80c92df..f2af051b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
@@ -19,12 +19,13 @@
package org.apache.flink.table.store.file.append;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactFutureManager;
import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.compact.CompactTask;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.DataFilePathFactory;
import org.apache.flink.table.store.file.utils.FileUtils;
+import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
@@ -37,13 +38,15 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
/** Compact manager for {@link
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
-public class AppendOnlyCompactManager extends CompactManager {
+public class AppendOnlyCompactManager extends CompactFutureManager {
+ private final ExecutorService executor;
+ private final LinkedList<DataFileMeta> toCompact;
private final int minFileNum;
private final int maxFileNum;
private final long targetFileSize;
private final CompactRewriter rewriter;
- private final LinkedList<DataFileMeta> toCompact;
+ private final DataFilePathFactory pathFactory;
public AppendOnlyCompactManager(
ExecutorService executor,
@@ -51,17 +54,43 @@ public class AppendOnlyCompactManager extends
CompactManager {
int minFileNum,
int maxFileNum,
long targetFileSize,
- CompactRewriter rewriter) {
- super(executor);
+ CompactRewriter rewriter,
+ DataFilePathFactory pathFactory) {
+ this.executor = executor;
this.toCompact = toCompact;
- this.maxFileNum = maxFileNum;
this.minFileNum = minFileNum;
+ this.maxFileNum = maxFileNum;
this.targetFileSize = targetFileSize;
this.rewriter = rewriter;
+ this.pathFactory = pathFactory;
}
@Override
- public void triggerCompaction() {
+ public void triggerCompaction(boolean fullCompaction) {
+ if (fullCompaction) {
+ triggerFullCompaction();
+ } else {
+ triggerCompactionWithBestEffort();
+ }
+ }
+
+ private void triggerFullCompaction() {
+ Preconditions.checkState(
+ taskFuture == null,
+ "A compaction task is still running while the user "
+ + "forces a new compaction. This is unexpected.");
+ taskFuture =
+ executor.submit(
+ new AppendOnlyCompactManager.IterativeCompactTask(
+ toCompact,
+ targetFileSize,
+ minFileNum,
+ maxFileNum,
+ rewriter,
+ pathFactory));
+ }
+
+ private void triggerCompactionWithBestEffort() {
if (taskFuture != null) {
return;
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
index ca73c71b..8c6c9635 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyWriter.java
@@ -90,6 +90,11 @@ public class AppendOnlyWriter implements
RecordWriter<RowData> {
writer.write(rowData);
}
+ @Override
+ public void fullCompaction() throws Exception {
+ submitCompaction(true);
+ }
+
@Override
public CommitIncrement prepareCommit(boolean endOnfInput) throws Exception
{
List<DataFileMeta> newFiles = new ArrayList<>();
@@ -104,7 +109,7 @@ public class AppendOnlyWriter implements
RecordWriter<RowData> {
}
// add new generated files
newFiles.forEach(compactManager::addNewFile);
- submitCompaction();
+ submitCompaction(false);
boolean blocking = endOnfInput || forceCompact;
trySyncLatestCompaction(blocking);
@@ -134,9 +139,10 @@ public class AppendOnlyWriter implements
RecordWriter<RowData> {
schemaId, fileFormat, targetFileSize, writeSchema,
pathFactory, seqNumCounter);
}
- private void submitCompaction() throws ExecutionException,
InterruptedException {
- trySyncLatestCompaction(false);
- compactManager.triggerCompaction();
+ private void submitCompaction(boolean forcedFullCompaction)
+ throws ExecutionException, InterruptedException {
+ trySyncLatestCompaction(forcedFullCompaction);
+ compactManager.triggerCompaction(forcedFullCompaction);
}
private void trySyncLatestCompaction(boolean blocking)
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactFutureManager.java
similarity index 72%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
copy to
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactFutureManager.java
index ef578720..e07e0a97 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactFutureManager.java
@@ -18,43 +18,22 @@
package org.apache.flink.table.store.file.compact;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-/** Manager to submit compaction task. */
-public abstract class CompactManager {
-
- private static final Logger LOG =
LoggerFactory.getLogger(CompactManager.class);
+/** Base implementation of {@link CompactManager} which runs compaction in a
separate thread. */
+public abstract class CompactFutureManager implements CompactManager {
- protected final ExecutorService executor;
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactFutureManager.class);
protected Future<CompactResult> taskFuture;
- public CompactManager(ExecutorService executor) {
- this.executor = executor;
- }
-
- /** Should wait compaction finish. */
- public abstract boolean shouldWaitCompaction();
-
- /** Add a new file. */
- public abstract void addNewFile(DataFileMeta file);
-
- /** Trigger a new compaction task. */
- public abstract void triggerCompaction();
-
- /** Get compaction result. Wait finish if {@code blocking} is true. */
- public abstract Optional<CompactResult> getCompactionResult(boolean
blocking)
- throws ExecutionException, InterruptedException;
-
+ @Override
public void cancelCompaction() {
// TODO this method may leave behind orphan files if compaction is
actually finished
// but some CPU work still needs to be done
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
index ef578720..e82e1748 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
@@ -20,65 +20,29 @@ package org.apache.flink.table.store.file.compact;
import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.Optional;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
/** Manager to submit compaction task. */
-public abstract class CompactManager {
-
- private static final Logger LOG =
LoggerFactory.getLogger(CompactManager.class);
-
- protected final ExecutorService executor;
-
- protected Future<CompactResult> taskFuture;
-
- public CompactManager(ExecutorService executor) {
- this.executor = executor;
- }
+public interface CompactManager {
/** Should wait compaction finish. */
- public abstract boolean shouldWaitCompaction();
+ boolean shouldWaitCompaction();
/** Add a new file. */
- public abstract void addNewFile(DataFileMeta file);
+ void addNewFile(DataFileMeta file);
- /** Trigger a new compaction task. */
- public abstract void triggerCompaction();
+ /**
+ * Trigger a new compaction task.
+ *
+ * @param fullCompaction if caller needs a guaranteed full compaction
+ */
+ void triggerCompaction(boolean fullCompaction);
/** Get compaction result. Wait finish if {@code blocking} is true. */
- public abstract Optional<CompactResult> getCompactionResult(boolean
blocking)
+ Optional<CompactResult> getCompactionResult(boolean blocking)
throws ExecutionException, InterruptedException;
- public void cancelCompaction() {
- // TODO this method may leave behind orphan files if compaction is
actually finished
- // but some CPU work still needs to be done
- if (taskFuture != null && !taskFuture.isCancelled()) {
- taskFuture.cancel(true);
- }
- }
-
- protected final Optional<CompactResult> innerGetCompactionResult(boolean
blocking)
- throws ExecutionException, InterruptedException {
- if (taskFuture != null) {
- if (blocking || taskFuture.isDone()) {
- CompactResult result;
- try {
- result = taskFuture.get();
- } catch (CancellationException e) {
- LOG.info("Compaction future is cancelled", e);
- taskFuture = null;
- return Optional.empty();
- }
- taskFuture = null;
- return Optional.of(result);
- }
- }
- return Optional.empty();
- }
+ /** Cancel currently running compaction task. */
+ void cancelCompaction();
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
index 5ca4b14b..db014c45 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
@@ -18,17 +18,17 @@
package org.apache.flink.table.store.file.compact;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.util.Preconditions;
import java.util.Optional;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ExecutionException;
-/** A {@link CompactManager} which doesn't do things. */
-public class NoopCompactManager extends CompactManager {
+/** A {@link CompactManager} which never compacts. */
+public class NoopCompactManager implements CompactManager {
- public NoopCompactManager(ExecutorService executor) {
- super(executor);
- }
+ public NoopCompactManager() {}
@Override
public boolean shouldWaitCompaction() {
@@ -39,10 +39,21 @@ public class NoopCompactManager extends CompactManager {
public void addNewFile(DataFileMeta file) {}
@Override
- public void triggerCompaction() {}
+ public void triggerCompaction(boolean fullCompaction) {
+ Preconditions.checkArgument(
+ !fullCompaction,
+ "NoopCompactManager does not support user triggered
compaction.\n"
+ + "If you really need a guaranteed compaction, please
set "
+ + CoreOptions.WRITE_COMPACTION_SKIP.key()
+ + " property of this table to false.");
+ }
@Override
- public Optional<CompactResult> getCompactionResult(boolean blocking) {
+ public Optional<CompactResult> getCompactionResult(boolean blocking)
+ throws ExecutionException, InterruptedException {
return Optional.empty();
}
+
+ @Override
+ public void cancelCompaction() {}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 287ac0d1..7f0adfcc 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -137,6 +137,11 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
}
}
+ @Override
+ public void fullCompaction() throws Exception {
+ submitCompaction(true);
+ }
+
@Override
public long memoryOccupancy() {
return writeBuffer.memoryOccupancy();
@@ -187,7 +192,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
}
writeBuffer.clear();
- submitCompaction();
+ submitCompaction(false);
}
}
@@ -252,9 +257,9 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
compactAfter.addAll(result.after());
}
- private void submitCompaction() throws Exception {
- trySyncLatestCompaction(false);
- compactManager.triggerCompaction();
+ private void submitCompaction(boolean forcedFullCompaction) throws
Exception {
+ trySyncLatestCompaction(forcedFullCompaction);
+ compactManager.triggerCompaction(forcedFullCompaction);
}
private void trySyncLatestCompaction(boolean blocking) throws Exception {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
index 50addb42..58e5902a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
@@ -37,4 +37,10 @@ public interface CompactStrategy {
* </ul>
*/
Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);
+
+ /** Pick a compaction unit consisting of all existing files. */
+ static Optional<CompactUnit> pickFullCompaction(int numLevels,
List<LevelSortedRun> runs) {
+ int maxLevel = numLevels - 1;
+ return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
index 15f02594..815edc86 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
@@ -20,11 +20,12 @@ package org.apache.flink.table.store.file.mergetree.compact;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactFutureManager;
import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.compact.CompactUnit;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.Levels;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,20 +37,16 @@ import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
/** Compact manager for {@link
org.apache.flink.table.store.file.KeyValueFileStore}. */
-public class MergeTreeCompactManager extends CompactManager {
+public class MergeTreeCompactManager extends CompactFutureManager {
private static final Logger LOG =
LoggerFactory.getLogger(MergeTreeCompactManager.class);
+ private final ExecutorService executor;
private final Levels levels;
-
private final CompactStrategy strategy;
-
private final Comparator<RowData> keyComparator;
-
private final long minFileSize;
-
private final int numSortedRunStopTrigger;
-
private final CompactRewriter rewriter;
public MergeTreeCompactManager(
@@ -60,7 +57,7 @@ public class MergeTreeCompactManager extends CompactManager {
long minFileSize,
int numSortedRunStopTrigger,
CompactRewriter rewriter) {
- super(executor);
+ this.executor = executor;
this.levels = levels;
this.strategy = strategy;
this.minFileSize = minFileSize;
@@ -80,43 +77,54 @@ public class MergeTreeCompactManager extends CompactManager
{
}
@Override
- public void triggerCompaction() {
- if (taskFuture != null) {
- return;
+ public void triggerCompaction(boolean fullCompaction) {
+ Optional<CompactUnit> optionalUnit;
+ if (fullCompaction) {
+ Preconditions.checkState(
+ taskFuture == null,
+ "A compaction task is still running while the user "
+ + "forces a new compaction. This is unexpected.");
+ optionalUnit =
+ CompactStrategy.pickFullCompaction(
+ levels.numberOfLevels(), levels.levelSortedRuns());
+ } else {
+ if (taskFuture != null) {
+ return;
+ }
+ optionalUnit =
+ strategy.pick(levels.numberOfLevels(),
levels.levelSortedRuns())
+ .map(unit -> unit.files().size() < 2 ? null :
unit);
}
- strategy.pick(levels.numberOfLevels(), levels.levelSortedRuns())
- .ifPresent(
- unit -> {
- if (unit.files().size() < 2) {
- return;
- }
- /*
- * As long as there is no older data, We can drop
the deletion.
- * If the output level is 0, there may be older
data not involved in compaction.
- * If the output level is bigger than 0, as long
as there is no older data in
- * the current levels, the output is the oldest,
so we can drop the deletion.
- * See CompactStrategy.pick.
- */
- boolean dropDelete =
- unit.outputLevel() != 0
- && unit.outputLevel() >=
levels.nonEmptyHighestLevel();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Submit compaction with files (name,
level, size): "
- +
levels.levelSortedRuns().stream()
- .flatMap(lsr ->
lsr.run().files().stream())
- .map(
- file ->
-
String.format(
-
"(%s, %d, %d)",
-
file.fileName(),
-
file.level(),
-
file.fileSize()))
-
.collect(Collectors.joining(", ")));
- }
- submitCompaction(unit, dropDelete);
- });
+
+ optionalUnit.ifPresent(
+ unit -> {
+ /*
+ * As long as there is no older data, We can drop the
deletion.
+ * If the output level is 0, there may be older data not
involved in compaction.
+ * If the output level is bigger than 0, as long as there
is no older data in
+ * the current levels, the output is the oldest, so we can
drop the deletion.
+ * See CompactStrategy.pick.
+ */
+ boolean dropDelete =
+ unit.outputLevel() != 0
+ && unit.outputLevel() >=
levels.nonEmptyHighestLevel();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Submit compaction with files (name, level,
size): "
+ + levels.levelSortedRuns().stream()
+ .flatMap(lsr ->
lsr.run().files().stream())
+ .map(
+ file ->
+ String.format(
+ "(%s,
%d, %d)",
+
file.fileName(),
+
file.level(),
+
file.fileSize()))
+ .collect(Collectors.joining(",
")));
+ }
+ submitCompaction(unit, dropDelete);
+ });
}
@VisibleForTesting
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
index 50334778..0e9ed821 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreWrite.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.file.operation;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.io.DataFileMeta;
@@ -97,6 +98,12 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
writer.write(data);
}
+ @Override
+ public void compact(BinaryRowData partition, int bucket) throws Exception {
+ getWriter(partition, bucket).fullCompaction();
+ }
+
+ @Override
public List<FileCommittable> prepareCommit(boolean endOfInput) throws
Exception {
List<FileCommittable> result = new ArrayList<>();
@@ -167,4 +174,14 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
}
protected void notifyNewWriter(RecordWriter<T> writer) {}
+
+ /** Create a {@link RecordWriter} from partition and bucket. */
+ @VisibleForTesting
+ public abstract RecordWriter<T> createWriter(
+ BinaryRowData partition, int bucket, ExecutorService
compactExecutor);
+
+ /** Create an empty {@link RecordWriter} from partition and bucket. */
+ @VisibleForTesting
+ public abstract RecordWriter<T> createEmptyWriter(
+ BinaryRowData partition, int bucket, ExecutorService
compactExecutor);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index d2b482be..627746c2 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.append.AppendOnlyCompactManager;
import org.apache.flink.table.store.file.append.AppendOnlyWriter;
import org.apache.flink.table.store.file.compact.CompactManager;
-import org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.compact.NoopCompactManager;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.DataFilePathFactory;
@@ -38,12 +37,9 @@ import org.apache.flink.table.store.format.FileFormat;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.types.logical.RowType;
-import javax.annotation.Nullable;
-
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import static
org.apache.flink.table.store.file.io.DataFileMeta.getMaxSequenceNumber;
@@ -96,21 +92,6 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<RowData> {
return createWriter(partition, bucket, Collections.emptyList(),
compactExecutor);
}
- @Override
- public Callable<CompactResult> createCompactWriter(
- BinaryRowData partition, int bucket, @Nullable List<DataFileMeta>
compactFiles) {
- if (compactFiles == null) {
- compactFiles = scanExistingFileMetas(partition, bucket);
- }
- return new AppendOnlyCompactManager.IterativeCompactTask(
- compactFiles,
- targetFileSize,
- compactionMinFileNum,
- compactionMaxFileNum,
- compactRewriter(partition, bucket),
- pathFactory.createDataFilePathFactory(partition, bucket));
- }
-
private RecordWriter<RowData> createWriter(
BinaryRowData partition,
int bucket,
@@ -120,19 +101,17 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<RowData> {
// and make restore files mutable to update
LinkedList<DataFileMeta> restored = new LinkedList<>(restoredFiles);
DataFilePathFactory factory =
pathFactory.createDataFilePathFactory(partition, bucket);
- CompactManager compactManager;
- if (skipCompaction) {
- compactManager = new NoopCompactManager(compactExecutor);
- } else {
- compactManager =
- new AppendOnlyCompactManager(
- compactExecutor,
- restored,
- compactionMinFileNum,
- compactionMaxFileNum,
- targetFileSize,
- compactRewriter(partition, bucket));
- }
+ CompactManager compactManager =
+ skipCompaction
+ ? new NoopCompactManager()
+ : new AppendOnlyCompactManager(
+ compactExecutor,
+ restored,
+ compactionMinFileNum,
+ compactionMaxFileNum,
+ targetFileSize,
+ compactRewriter(partition, bucket),
+ factory);
return new AppendOnlyWriter(
schemaId,
fileFormat,
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 85ecdcb7..7f79abef 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -21,17 +21,11 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.compact.CompactResult;
-import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.table.sink.FileCommittable;
import org.apache.flink.table.store.table.sink.SinkRecord;
-import javax.annotation.Nullable;
-
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
/**
* Write operation which provides {@link RecordWriter} creation and writes
{@link SinkRecord} to
@@ -43,23 +37,6 @@ public interface FileStoreWrite<T> {
FileStoreWrite<T> withIOManager(IOManager ioManager);
- /** Create a {@link RecordWriter} from partition and bucket. */
- RecordWriter<T> createWriter(
- BinaryRowData partition, int bucket, ExecutorService
compactExecutor);
-
- /** Create an empty {@link RecordWriter} from partition and bucket. */
- RecordWriter<T> createEmptyWriter(
- BinaryRowData partition, int bucket, ExecutorService
compactExecutor);
-
- /**
- * Create a {@link Callable} compactor from partition, bucket.
- *
- * @param compactFiles input files of compaction. When it is null, will
automatically read all
- * files of the current bucket.
- */
- Callable<CompactResult> createCompactWriter(
- BinaryRowData partition, int bucket, @Nullable List<DataFileMeta>
compactFiles);
-
/**
* If overwrite is true, the writer will overwrite the store, otherwise it
won't.
*
@@ -77,6 +54,15 @@ public interface FileStoreWrite<T> {
*/
void write(BinaryRowData partition, int bucket, T data) throws Exception;
+ /**
+ * Compact data stored in given partition and bucket.
+ *
+ * @param partition the partition to compact
+ * @param bucket the bucket to compact
+ * @throws Exception the thrown exception when compacting the records
+ */
+ void compact(BinaryRowData partition, int bucket) throws Exception;
+
/**
* Prepare commit in the write.
*
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 5bfd1f19..89b038e3 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -23,8 +23,6 @@ import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.compact.CompactManager;
-import org.apache.flink.table.store.file.compact.CompactResult;
-import org.apache.flink.table.store.file.compact.CompactUnit;
import org.apache.flink.table.store.file.compact.NoopCompactManager;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
@@ -37,7 +35,6 @@ import
org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager;
-import
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactTask;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -46,12 +43,9 @@ import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.types.logical.RowType;
-import javax.annotation.Nullable;
-
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
@@ -112,21 +106,6 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
return createMergeTreeWriter(partition, bucket,
Collections.emptyList(), compactExecutor);
}
- @Override
- public Callable<CompactResult> createCompactWriter(
- BinaryRowData partition, int bucket, @Nullable List<DataFileMeta>
compactFiles) {
- if (compactFiles == null) {
- compactFiles = scanExistingFileMetas(partition, bucket);
- }
- Comparator<RowData> keyComparator = keyComparatorSupplier.get();
- CompactRewriter rewriter = compactRewriter(partition, bucket,
keyComparator);
- Levels levels = new Levels(keyComparator, compactFiles,
options.numLevels());
- CompactUnit unit =
- CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1,
levels.levelSortedRuns());
- return new MergeTreeCompactTask(
- keyComparator, options.targetFileSize(), rewriter, unit, true);
- }
-
private MergeTreeWriter createMergeTreeWriter(
BinaryRowData partition,
int bucket,
@@ -134,23 +113,18 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
ExecutorService compactExecutor) {
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket);
Comparator<RowData> keyComparator = keyComparatorSupplier.get();
- CompactManager compactManager;
- if (options.writeCompactionSkip()) {
- compactManager = new NoopCompactManager(compactExecutor);
- } else {
- Levels levels = new Levels(keyComparator, restoreFiles,
options.numLevels());
- compactManager =
- createCompactManager(
- partition,
- bucket,
- new UniversalCompaction(
- options.maxSizeAmplificationPercent(),
- options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger(),
- options.maxSortedRunNum()),
- compactExecutor,
- levels);
- }
+ Levels levels = new Levels(keyComparator, restoreFiles,
options.numLevels());
+ CompactManager compactManager =
+ createCompactManager(
+ partition,
+ bucket,
+ new UniversalCompaction(
+ options.maxSizeAmplificationPercent(),
+ options.sortedRunSizeRatio(),
+ options.numSortedRunCompactionTrigger(),
+ options.maxSortedRunNum()),
+ compactExecutor,
+ levels);
return new MergeTreeWriter(
options.writeBufferSpillable(),
options.localSortMaxNumFileHandles(),
@@ -170,16 +144,20 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
CompactStrategy compactStrategy,
ExecutorService compactExecutor,
Levels levels) {
- Comparator<RowData> keyComparator = keyComparatorSupplier.get();
- CompactRewriter rewriter = compactRewriter(partition, bucket,
keyComparator);
- return new MergeTreeCompactManager(
- compactExecutor,
- levels,
- compactStrategy,
- keyComparator,
- options.targetFileSize(),
- options.numSortedRunStopTrigger(),
- rewriter);
+ if (options.writeCompactionSkip()) {
+ return new NoopCompactManager();
+ } else {
+ Comparator<RowData> keyComparator = keyComparatorSupplier.get();
+ CompactRewriter rewriter = compactRewriter(partition, bucket,
keyComparator);
+ return new MergeTreeCompactManager(
+ compactExecutor,
+ levels,
+ compactStrategy,
+ keyComparator,
+ options.targetFileSize(),
+ options.numSortedRunStopTrigger(),
+ rewriter);
+ }
}
private CompactRewriter compactRewriter(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
index a071f30d..4acc1dd0 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/utils/RecordWriter.java
@@ -33,6 +33,9 @@ public interface RecordWriter<T> {
/** Add a key-value element to the writer. */
void write(T record) throws Exception;
+ /** Compact all files related to the writer. */
+ void fullCompaction() throws Exception;
+
/**
* Prepare for a commit.
*
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index 16f2e826..36d954d5 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -24,7 +24,6 @@ import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableCompact;
/** Abstract {@link FileStoreTable}. */
public abstract class AbstractFileStoreTable implements FileStoreTable {
@@ -65,9 +64,4 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
public TableCommit newCommit(String user) {
return new TableCommit(store().newCommit(user), store().newExpire());
}
-
- @Override
- public TableCompact newCompact() {
- return new TableCompact(store().newScan(), store().newWrite(),
store().partitionType());
- }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index 84188fa7..595a8524 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -23,7 +23,6 @@ import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.schema.TableSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.TableCommit;
-import org.apache.flink.table.store.table.sink.TableCompact;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
@@ -51,6 +50,4 @@ public interface FileStoreTable extends Serializable {
TableWrite newWrite();
TableCommit newCommit(String user);
-
- TableCompact newCompact();
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
deleted file mode 100644
index fc57c0aa..00000000
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.table.sink;
-
-import org.apache.flink.table.data.binary.BinaryRowData;
-import org.apache.flink.table.store.file.compact.CompactResult;
-import org.apache.flink.table.store.file.io.CompactIncrement;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.io.NewFilesIncrement;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
-import org.apache.flink.table.store.file.predicate.PredicateConverter;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.types.logical.RowType;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.BiPredicate;
-
-/** An abstraction layer above {@link FileStoreWrite#createCompactWriter} to
provide compaction. */
-public class TableCompact {
-
- private static final Logger LOG =
LoggerFactory.getLogger(TableCompact.class);
-
- private final FileStoreScan scan;
- private final FileStoreWrite<?> write;
- private final RowType partitionType;
-
- private BiPredicate<BinaryRowData, Integer> partBucketFilter;
-
- public TableCompact(FileStoreScan scan, FileStoreWrite<?> write, RowType
partitionType) {
- this.scan = scan;
- this.write = write;
- this.partitionType = partitionType;
- }
-
- public TableCompact withPartitions(Map<String, String> partitionSpec) {
- scan.withPartitionFilter(PredicateConverter.fromMap(partitionSpec,
partitionType));
- return this;
- }
-
- public TableCompact withFilter(BiPredicate<BinaryRowData, Integer>
partBucketFilter) {
- this.partBucketFilter = partBucketFilter;
- return this;
- }
-
- public List<FileCommittable> compact() {
- List<FileCommittable> committables = new ArrayList<>();
- scan.plan()
- .groupByPartFiles()
- .forEach(
- (partition, buckets) ->
- buckets.forEach(
- (bucket, files) ->
- doCompact(partition, bucket,
files)
-
.ifPresent(committables::add)));
- return committables;
- }
-
- private Optional<FileCommittable> doCompact(
- BinaryRowData partition, int bucket, List<DataFileMeta> files) {
- if (!partBucketFilter.test(partition, bucket)) {
- return Optional.empty();
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Do compaction for partition {}, bucket {}",
- FileStorePathFactory.getPartitionComputer(
- partitionType,
-
FileStorePathFactory.PARTITION_DEFAULT_NAME.defaultValue())
- .generatePartValues(partition),
- bucket);
- }
- try {
- CompactResult result =
- write.createCompactWriter(partition.copy(), bucket,
files).call();
- FileCommittable committable =
- new FileCommittable(
- partition,
- bucket,
- NewFilesIncrement.emptyIncrement(),
- new CompactIncrement(
- result.before(), result.after(),
Collections.emptyList()));
- return Optional.of(committable);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
index 0c026fbf..647d74e8 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWrite.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.table.sink;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
import java.util.List;
@@ -37,6 +38,8 @@ public interface TableWrite {
SinkRecord write(RowData rowData) throws Exception;
+ void compact(BinaryRowData partition, int bucket) throws Exception;
+
List<FileCommittable> prepareCommit(boolean endOfInput) throws Exception;
void close() throws Exception;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
index 7a938162..83abf141 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableWriteImpl.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.table.sink;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.operation.FileStoreWrite;
import java.util.List;
@@ -68,6 +69,11 @@ public class TableWriteImpl<T> implements TableWrite {
return record;
}
+ @Override
+ public void compact(BinaryRowData partition, int bucket) throws Exception {
+ write.compact(partition, bucket);
+ }
+
@Override
public List<FileCommittable> prepareCommit(boolean endOfInput) throws
Exception {
return write.prepareCommit(endOfInput);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 42d952e3..e55d225a 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -32,12 +32,12 @@ import
org.apache.flink.table.store.file.manifest.ManifestList;
import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import org.apache.flink.table.store.file.operation.AbstractFileStoreWrite;
import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.FileStoreCommitImpl;
import org.apache.flink.table.store.file.operation.FileStoreExpireImpl;
import org.apache.flink.table.store.file.operation.FileStoreRead;
import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
@@ -173,7 +173,7 @@ public class TestFileStore extends KeyValueFileStore {
String identifier,
BiConsumer<FileStoreCommit, ManifestCommittable> commitFunction)
throws Exception {
- FileStoreWrite<KeyValue> write = newWrite();
+ AbstractFileStoreWrite<KeyValue> write = newWrite();
Map<BinaryRowData, Map<Integer, RecordWriter<KeyValue>>> writers = new
HashMap<>();
for (KeyValue kv : kvs) {
BinaryRowData partition = partitionCalculator.apply(kv);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManagerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManagerTest.java
index 6449ac87..b8c6e99b 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManagerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManagerTest.java
@@ -207,6 +207,7 @@ public class AppendOnlyCompactManagerTest {
minFileNum,
maxFileNum,
targetFileSize,
+ null, // not used
null // not used
);
Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
index 31856d35..7accc95c 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/append/AppendOnlyWriterTest.java
@@ -318,7 +318,8 @@ public class AppendOnlyWriterTest {
compactBefore.isEmpty()
? Collections.emptyList()
: Collections.singletonList(
-
generateCompactAfter(compactBefore))),
+
generateCompactAfter(compactBefore)),
+ pathFactory),
forceCompact,
pathFactory),
toCompact);
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
index 7e7f8482..54ddf245 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
@@ -67,7 +67,8 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
10,
SCHEMA,
0,
- new AppendOnlyCompactManager(null, toCompact, 4, 10,
10, null), // not used
+ new AppendOnlyCompactManager(
+ null, toCompact, 4, 10, 10, null,
dataFilePathFactory), // not used
false,
dataFilePathFactory);
appendOnlyWriter.write(
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
index e964fe01..ace9965b 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -204,7 +204,7 @@ public class MergeTreeCompactManagerTest {
2,
Integer.MAX_VALUE,
testRewriter(expectedDropDelete));
- manager.triggerCompaction();
+ manager.triggerCompaction(false);
manager.getCompactionResult(true);
List<LevelMinMax> outputs =
levels.allFiles().stream().map(LevelMinMax::new).collect(Collectors.toList());
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
index 5483ca16..a08bda5e 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/operation/TestCommitThread.java
@@ -61,7 +61,7 @@ public class TestCommitThread extends Thread {
private final Map<BinaryRowData, List<KeyValue>> result;
private final Map<BinaryRowData, MergeTreeWriter> writers;
- private final FileStoreWrite<KeyValue> write;
+ private final AbstractFileStoreWrite<KeyValue> write;
private final FileStoreCommit commit;
public TestCommitThread(