This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new c44ed5bb [FLINK-28098] Refactor table store compactor
c44ed5bb is described below
commit c44ed5bb50f06eb94f18e8ae5d8c09ee2b909d10
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 17 14:36:01 2022 +0800
[FLINK-28098] Refactor table store compactor
This closes #160
---
.../table/store/connector/sink/StoreSink.java | 23 ++-
.../store/connector/sink/StoreSinkCompactor.java | 114 ++-----------
.../table/store/connector/sink/StoreSinkTest.java | 8 -
.../table/store/connector/sink/TestFileStore.java | 10 +-
.../store/connector/sink/TestFileStoreTable.java | 5 +-
.../store/file/mergetree/MergeTreeWriter.java | 6 +-
.../file/mergetree/compact/CompactManager.java | 179 +-------------------
.../file/mergetree/compact/CompactResult.java | 31 ++++
.../file/mergetree/compact/CompactRewriter.java | 32 ++++
.../store/file/mergetree/compact/CompactTask.java | 182 +++++++++++++++++++++
.../file/operation/AppendOnlyFileStoreWrite.java | 11 +-
.../table/store/file/operation/FileStoreWrite.java | 18 +-
.../file/operation/KeyValueFileStoreWrite.java | 61 +++----
.../table/store/file/writer/CompactWriter.java | 81 ---------
.../table/store/table/AbstractFileStoreTable.java | 9 +
.../flink/table/store/table/FileStoreTable.java | 4 +-
.../flink/table/store/table/sink/TableCompact.java | 111 +++++++++++++
.../table/store/file/mergetree/MergeTreeTest.java | 3 +-
.../file/mergetree/compact/CompactManagerTest.java | 2 +-
19 files changed, 469 insertions(+), 421 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
index e24a81c5..eb60b409 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSink.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogWriteCallback;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.SinkRecord;
+import org.apache.flink.table.store.table.sink.TableCompact;
import javax.annotation.Nullable;
@@ -45,6 +46,7 @@ import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
@@ -92,19 +94,11 @@ public class StoreSink<WriterStateT, LogCommT>
return restoreWriter(initContext, null);
}
- @SuppressWarnings("unchecked")
@Override
public StatefulPrecommittingSinkWriter<WriterStateT> restoreWriter(
InitContext initContext, Collection<WriterStateT> states) throws
IOException {
if (compactionTask) {
- return (StatefulPrecommittingSinkWriter<WriterStateT>)
- new StoreSinkCompactor(
- initContext.getSubtaskId(),
- initContext.getNumberOfParallelSubtasks(),
- table.store(),
- compactPartitionSpec == null
- ? Collections.emptyMap()
- : compactPartitionSpec);
+ return createCompactWriter(initContext);
}
SinkWriter<SinkRecord> logWriter = null;
LogWriteCallback logCallback = null;
@@ -123,6 +117,17 @@ public class StoreSink<WriterStateT, LogCommT>
table.newWrite().withOverwrite(overwritePartition != null),
logWriter, logCallback);
}
+ private StoreSinkCompactor<WriterStateT> createCompactWriter(InitContext
initContext) {
+ int task = initContext.getSubtaskId();
+ int numTask = initContext.getNumberOfParallelSubtasks();
+ TableCompact tableCompact = table.newCompact();
+ tableCompact.withPartitions(
+ compactPartitionSpec == null ? Collections.emptyMap() :
compactPartitionSpec);
+ tableCompact.withFilter(
+ (partition, bucket) -> task ==
Math.abs(Objects.hash(partition, bucket) % numTask));
+ return new StoreSinkCompactor<>(tableCompact);
+ }
+
@Override
public SimpleVersionedSerializer<WriterStateT> getWriterStateSerializer() {
return logSinkProvider == null
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
index a0f7e1a3..4a79a8c6 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/sink/StoreSinkCompactor.java
@@ -19,61 +19,30 @@
package org.apache.flink.table.store.connector.sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
-import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.connector.StatefulPrecommittingSinkWriter;
-import org.apache.flink.table.store.file.FileStore;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.predicate.PredicateConverter;
-import org.apache.flink.table.store.file.utils.FileStorePathFactory;
-import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.table.sink.FileCommittable;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.flink.table.store.table.sink.TableCompact;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
/** A dedicated {@link SinkWriter} for manual triggered compaction. */
-public class StoreSinkCompactor implements
StatefulPrecommittingSinkWriter<Void> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(StoreSinkCompactor.class);
+public class StoreSinkCompactor<WriterStateT>
+ implements StatefulPrecommittingSinkWriter<WriterStateT> {
- private final int subTaskId;
- private final int numOfParallelInstances;
+ private final TableCompact tableCompact;
- private final FileStore<?> fileStore;
- private final Map<String, String> partitionSpec;
- private final ExecutorService compactExecutor;
-
- public StoreSinkCompactor(
- int subTaskId,
- int numOfParallelInstances,
- FileStore<?> fileStore,
- Map<String, String> partitionSpec) {
- this.subTaskId = subTaskId;
- this.numOfParallelInstances = numOfParallelInstances;
- this.fileStore = fileStore;
- this.partitionSpec = partitionSpec;
- this.compactExecutor =
- Executors.newSingleThreadScheduledExecutor(
- new ExecutorThreadFactory(
- String.format("compaction-subtask-%d",
subTaskId)));
+ public StoreSinkCompactor(TableCompact tableCompact) {
+ this.tableCompact = tableCompact;
}
@Override
- public void flush(boolean endOfInput) {}
+ public void flush(boolean endOfInput) {
+ // nothing to flush
+ }
@Override
public void write(RowData element, Context context) throws IOException,
InterruptedException {
@@ -82,68 +51,19 @@ public class StoreSinkCompactor implements
StatefulPrecommittingSinkWriter<Void>
@Override
public void close() throws Exception {
- compactExecutor.shutdownNow();
+ // nothing to close
}
@Override
- public List<Void> snapshotState(long checkpointId) {
+ public List<WriterStateT> snapshotState(long checkpointId) {
+ // nothing to snapshot
return Collections.emptyList();
}
@Override
- public Collection<Committable> prepareCommit() throws IOException {
- List<Committable> committables = new ArrayList<>();
-
- FileStoreScan.Plan plan =
- fileStore
- .newScan()
- .withPartitionFilter(
- PredicateConverter.CONVERTER.fromMap(
- partitionSpec,
fileStore.partitionType()))
- .plan();
- for (Map.Entry<BinaryRowData, Map<Integer, List<DataFileMeta>>>
partEntry :
- plan.groupByPartFiles().entrySet()) {
- BinaryRowData partition = partEntry.getKey();
- for (Map.Entry<Integer, List<DataFileMeta>> bucketEntry :
- partEntry.getValue().entrySet()) {
- int bucket = bucketEntry.getKey();
- List<DataFileMeta> restoredFiles = bucketEntry.getValue();
- if (select(partition, bucket)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Assign partition {}, bucket {} to subtask {}",
- FileStorePathFactory.getPartitionComputer(
- fileStore.partitionType(),
-
FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
- .defaultValue())
- .generatePartValues(partition),
- bucket,
- subTaskId);
- }
- RecordWriter<?> writer =
- fileStore
- .newWrite()
- .createCompactWriter(
- partition.copy(),
- bucket,
- compactExecutor,
- restoredFiles);
- FileCommittable committable;
- try {
- committable =
- new FileCommittable(
- partition, bucketEntry.getKey(),
writer.prepareCommit());
- committables.add(new
Committable(Committable.Kind.FILE, committable));
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
- }
- }
- return committables;
- }
-
- private boolean select(BinaryRowData partition, int bucket) {
- return subTaskId == Math.abs(Objects.hash(partition, bucket) %
numOfParallelInstances);
+ public Collection<Committable> prepareCommit() {
+ return tableCompact.compact().stream()
+ .map(c -> new Committable(Committable.Kind.FILE, c))
+ .collect(Collectors.toList());
}
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
index edd19a5a..4efcc09f 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/StoreSinkTest.java
@@ -200,14 +200,6 @@ public class StoreSinkTest {
.isEqualTo(Collections.singletonList("ADD-key-8-value-0/8/9"));
}
- @Test
- public void testCreateCompactor() throws Exception {
- StoreSink<?, ?> sink =
- new StoreSink<>(identifier, table, true, null, () -> lock,
null, null);
- StatefulPrecommittingSinkWriter<?> writer =
sink.createWriter(initContext());
- assertThat(writer).isInstanceOf(StoreSinkCompactor.class);
- }
-
private void writeAndAssert(StoreSink<?, ?> sink) throws Exception {
writeAndCommit(
sink,
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
index 109c692c..d048eff6 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStore.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.manifest.ManifestCommittable;
import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
import org.apache.flink.table.store.file.operation.FileStoreCommit;
import org.apache.flink.table.store.file.operation.FileStoreExpire;
import org.apache.flink.table.store.file.operation.FileStoreRead;
@@ -33,10 +34,11 @@ import
org.apache.flink.table.store.file.operation.FileStoreWrite;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.stats.StatsTestUtils;
import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.CompactWriter;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -44,6 +46,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@@ -89,11 +92,10 @@ public class TestFileStore implements FileStore<KeyValue> {
}
@Override
- public CompactWriter createCompactWriter(
+ public Callable<CompactResult> createCompactWriter(
BinaryRowData partition,
int bucket,
- ExecutorService compactExecutor,
- List<DataFileMeta> restoreFiles) {
+ @Nullable List<DataFileMeta> compactFiles) {
throw new UnsupportedOperationException();
}
};
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
index 49a387ac..6003d55d 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/TestFileStoreTable.java
@@ -29,6 +29,7 @@ import
org.apache.flink.table.store.table.sink.AbstractTableWrite;
import org.apache.flink.table.store.table.sink.SinkRecord;
import org.apache.flink.table.store.table.sink.SinkRecordConverter;
import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableCompact;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
@@ -100,7 +101,7 @@ public class TestFileStoreTable implements FileStoreTable {
}
@Override
- public TestFileStore store() {
- return store;
+ public TableCompact newCompact() {
+ throw new UnsupportedOperationException();
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 677b02ff..6dd1d04a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFileWriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.util.CloseableIterator;
@@ -153,7 +154,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue> {
return increment;
}
- private void updateCompactResult(CompactManager.CompactResult result) {
+ private void updateCompactResult(CompactResult result) {
Set<String> afterFiles =
result.after().stream().map(DataFileMeta::fileName).collect(Collectors.toSet());
for (DataFileMeta file : result.before()) {
@@ -182,8 +183,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue> {
}
private void finishCompaction(boolean blocking) throws Exception {
- Optional<CompactManager.CompactResult> result =
- compactManager.finishCompaction(levels, blocking);
+ Optional<CompactResult> result =
compactManager.finishCompaction(levels, blocking);
result.ifPresent(this::updateCompactResult);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
index 133e2a12..294e54fd 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
@@ -19,25 +19,18 @@
package org.apache.flink.table.store.file.mergetree.compact;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.Levels;
-import org.apache.flink.table.store.file.mergetree.SortedRun;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Comparator;
-import java.util.List;
import java.util.Optional;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
-import static java.util.Collections.singletonList;
-
/** Manager to submit compaction task. */
public class CompactManager {
@@ -51,7 +44,7 @@ public class CompactManager {
private final long minFileSize;
- private final Rewriter rewriter;
+ private final CompactRewriter rewriter;
private Future<CompactResult> taskFuture;
@@ -60,7 +53,7 @@ public class CompactManager {
CompactStrategy strategy,
Comparator<RowData> keyComparator,
long minFileSize,
- Rewriter rewriter) {
+ CompactRewriter rewriter) {
this.executor = executor;
this.minFileSize = minFileSize;
this.keyComparator = keyComparator;
@@ -113,8 +106,8 @@ public class CompactManager {
});
}
- public void submitCompaction(CompactUnit unit, boolean dropDelete) {
- CompactTask task = new CompactTask(unit, dropDelete);
+ private void submitCompaction(CompactUnit unit, boolean dropDelete) {
+ CompactTask task = new CompactTask(keyComparator, minFileSize,
rewriter, unit, dropDelete);
if (LOG.isDebugEnabled()) {
LOG.debug(
"Pick these files (name, level, size) for compaction: {}",
@@ -137,7 +130,7 @@ public class CompactManager {
return result;
}
- public Optional<CompactResult> finishCompaction(boolean blocking)
+ private Optional<CompactResult> finishCompaction(boolean blocking)
throws ExecutionException, InterruptedException {
if (taskFuture != null) {
if (blocking || taskFuture.isDone()) {
@@ -148,166 +141,4 @@ public class CompactManager {
}
return Optional.empty();
}
-
- /** Rewrite sections to the files. */
- @FunctionalInterface
- public interface Rewriter {
-
- List<DataFileMeta> rewrite(
- int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections)
- throws Exception;
- }
-
- /** Result of compaction. */
- public interface CompactResult {
-
- List<DataFileMeta> before();
-
- List<DataFileMeta> after();
- }
-
- //
--------------------------------------------------------------------------------------------
- // Internal classes
- //
--------------------------------------------------------------------------------------------
-
- /** Compaction task. */
- private class CompactTask implements Callable<CompactResult> {
-
- private final int outputLevel;
-
- private final List<List<SortedRun>> partitioned;
-
- private final boolean dropDelete;
-
- // metrics
- private long rewriteInputSize;
- private long rewriteOutputSize;
- private int rewriteFilesNum;
- private int upgradeFilesNum;
-
- private CompactTask(CompactUnit unit, boolean dropDelete) {
- this.outputLevel = unit.outputLevel();
- this.partitioned = new IntervalPartition(unit.files(),
keyComparator).partition();
- this.dropDelete = dropDelete;
-
- this.rewriteInputSize = 0;
- this.rewriteOutputSize = 0;
- this.rewriteFilesNum = 0;
- this.upgradeFilesNum = 0;
- }
-
- @Override
- public CompactResult call() throws Exception {
- return compact();
- }
-
- private CompactResult compact() throws Exception {
- long startMillis = System.currentTimeMillis();
-
- List<List<SortedRun>> candidate = new ArrayList<>();
- List<DataFileMeta> before = new ArrayList<>();
- List<DataFileMeta> after = new ArrayList<>();
-
- // Checking the order and compacting adjacent and contiguous files
- // Note: can't skip an intermediate file to compact, this will
destroy the overall
- // orderliness
- for (List<SortedRun> section : partitioned) {
- if (section.size() > 1) {
- candidate.add(section);
- } else {
- SortedRun run = section.get(0);
- // No overlapping:
- // We can just upgrade the large file and just change the
level instead of
- // rewriting it
- // But for small files, we will try to compact it
- for (DataFileMeta file : run.files()) {
- if (file.fileSize() < minFileSize) {
- // Smaller files are rewritten along with the
previous files
-
candidate.add(singletonList(SortedRun.fromSingle(file)));
- } else {
- // Large file appear, rewrite previous and upgrade
it
- rewrite(candidate, before, after);
- upgrade(file, before, after);
- }
- }
- }
- }
- rewrite(candidate, before, after);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Done compacting {} files to {} files in {}ms. "
- + "Rewrite input size = {}, output size = {},
rewrite file num = {}, upgrade file num = {}",
- before.size(),
- after.size(),
- System.currentTimeMillis() - startMillis,
- rewriteInputSize,
- rewriteOutputSize,
- rewriteFilesNum,
- upgradeFilesNum);
- }
-
- return result(before, after);
- }
-
- private void upgrade(
- DataFileMeta file, List<DataFileMeta> before,
List<DataFileMeta> after) {
- if (file.level() != outputLevel) {
- before.add(file);
- after.add(file.upgrade(outputLevel));
- upgradeFilesNum++;
- }
- }
-
- private void rewrite(
- List<List<SortedRun>> candidate,
- List<DataFileMeta> before,
- List<DataFileMeta> after)
- throws Exception {
- if (candidate.isEmpty()) {
- return;
- }
- if (candidate.size() == 1) {
- List<SortedRun> section = candidate.get(0);
- if (section.size() == 0) {
- return;
- } else if (section.size() == 1) {
- for (DataFileMeta file : section.get(0).files()) {
- upgrade(file, before, after);
- }
- candidate.clear();
- return;
- }
- }
- candidate.forEach(
- runs ->
- runs.forEach(
- run -> {
- before.addAll(run.files());
- rewriteInputSize +=
- run.files().stream()
-
.mapToLong(DataFileMeta::fileSize)
- .sum();
- rewriteFilesNum += run.files().size();
- }));
- List<DataFileMeta> result = rewriter.rewrite(outputLevel,
dropDelete, candidate);
- after.addAll(result);
- rewriteOutputSize +=
result.stream().mapToLong(DataFileMeta::fileSize).sum();
- candidate.clear();
- }
-
- private CompactResult result(List<DataFileMeta> before,
List<DataFileMeta> after) {
- return new CompactResult() {
- @Override
- public List<DataFileMeta> before() {
- return before;
- }
-
- @Override
- public List<DataFileMeta> after() {
- return after;
- }
- };
- }
- }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java
new file mode 100644
index 00000000..e5b6e826
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import java.util.List;
+
+/** Result of compaction. */
+public interface CompactResult {
+
+ List<DataFileMeta> before();
+
+ List<DataFileMeta> after();
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
new file mode 100644
index 00000000..dcb267a6
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+
+import java.util.List;
+
+/** Rewrite sections to the files. */
+@FunctionalInterface
+public interface CompactRewriter {
+
+ List<DataFileMeta> rewrite(int outputLevel, boolean dropDelete,
List<List<SortedRun>> sections)
+ throws Exception;
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java
new file mode 100644
index 00000000..cac1000b
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import static java.util.Collections.singletonList;
+
+/** Compaction task. */
+public class CompactTask implements Callable<CompactResult> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactTask.class);
+
+ private final long minFileSize;
+ private final CompactRewriter rewriter;
+ private final int outputLevel;
+
+ private final List<List<SortedRun>> partitioned;
+
+ private final boolean dropDelete;
+
+ // metrics
+ private long rewriteInputSize;
+ private long rewriteOutputSize;
+ private int rewriteFilesNum;
+ private int upgradeFilesNum;
+
+ public CompactTask(
+ Comparator<RowData> keyComparator,
+ long minFileSize,
+ CompactRewriter rewriter,
+ CompactUnit unit,
+ boolean dropDelete) {
+ this.minFileSize = minFileSize;
+ this.rewriter = rewriter;
+ this.outputLevel = unit.outputLevel();
+ this.partitioned = new IntervalPartition(unit.files(),
keyComparator).partition();
+ this.dropDelete = dropDelete;
+
+ this.rewriteInputSize = 0;
+ this.rewriteOutputSize = 0;
+ this.rewriteFilesNum = 0;
+ this.upgradeFilesNum = 0;
+ }
+
+ @Override
+ public CompactResult call() throws Exception {
+ return compact();
+ }
+
+ private CompactResult compact() throws Exception {
+ long startMillis = System.currentTimeMillis();
+
+ List<List<SortedRun>> candidate = new ArrayList<>();
+ List<DataFileMeta> before = new ArrayList<>();
+ List<DataFileMeta> after = new ArrayList<>();
+
+ // Checking the order and compacting adjacent and contiguous files
+ // Note: can't skip an intermediate file to compact, this will destroy
the overall
+ // orderliness
+ for (List<SortedRun> section : partitioned) {
+ if (section.size() > 1) {
+ candidate.add(section);
+ } else {
+ SortedRun run = section.get(0);
+ // No overlapping:
+ // We can just upgrade the large file and just change the
level instead of
+ // rewriting it
+ // But for small files, we will try to compact it
+ for (DataFileMeta file : run.files()) {
+ if (file.fileSize() < minFileSize) {
+ // Smaller files are rewritten along with the previous
files
+
candidate.add(singletonList(SortedRun.fromSingle(file)));
+ } else {
+ // Large file appear, rewrite previous and upgrade it
+ rewrite(candidate, before, after);
+ upgrade(file, before, after);
+ }
+ }
+ }
+ }
+ rewrite(candidate, before, after);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Done compacting {} files to {} files in {}ms. "
+ + "Rewrite input size = {}, output size = {},
rewrite file num = {}, upgrade file num = {}",
+ before.size(),
+ after.size(),
+ System.currentTimeMillis() - startMillis,
+ rewriteInputSize,
+ rewriteOutputSize,
+ rewriteFilesNum,
+ upgradeFilesNum);
+ }
+
+ return result(before, after);
+ }
+
+ private void upgrade(DataFileMeta file, List<DataFileMeta> before,
List<DataFileMeta> after) {
+ if (file.level() != outputLevel) {
+ before.add(file);
+ after.add(file.upgrade(outputLevel));
+ upgradeFilesNum++;
+ }
+ }
+
+ private void rewrite(
+ List<List<SortedRun>> candidate, List<DataFileMeta> before,
List<DataFileMeta> after)
+ throws Exception {
+ if (candidate.isEmpty()) {
+ return;
+ }
+ if (candidate.size() == 1) {
+ List<SortedRun> section = candidate.get(0);
+ if (section.size() == 0) {
+ return;
+ } else if (section.size() == 1) {
+ for (DataFileMeta file : section.get(0).files()) {
+ upgrade(file, before, after);
+ }
+ candidate.clear();
+ return;
+ }
+ }
+ candidate.forEach(
+ runs ->
+ runs.forEach(
+ run -> {
+ before.addAll(run.files());
+ rewriteInputSize +=
+ run.files().stream()
+
.mapToLong(DataFileMeta::fileSize)
+ .sum();
+ rewriteFilesNum += run.files().size();
+ }));
+ List<DataFileMeta> result = rewriter.rewrite(outputLevel, dropDelete,
candidate);
+ after.addAll(result);
+ rewriteOutputSize +=
result.stream().mapToLong(DataFileMeta::fileSize).sum();
+ candidate.clear();
+ }
+
+ private CompactResult result(List<DataFileMeta> before, List<DataFileMeta>
after) {
+ return new CompactResult() {
+ @Override
+ public List<DataFileMeta> before() {
+ return before;
+ }
+
+ @Override
+ public List<DataFileMeta> after() {
+ return after;
+ }
+ };
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 76a780f9..29cf3177 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -24,12 +24,16 @@ import
org.apache.flink.table.store.file.data.AppendOnlyWriter;
import org.apache.flink.table.store.file.data.DataFileMeta;
import org.apache.flink.table.store.file.data.DataFilePathFactory;
import org.apache.flink.table.store.file.format.FileFormat;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
+import javax.annotation.Nullable;
+
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
/** {@link FileStoreWrite} for {@link
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
@@ -71,11 +75,8 @@ public class AppendOnlyFileStoreWrite extends
AbstractFileStoreWrite<RowData> {
}
@Override
- public RecordWriter<RowData> createCompactWriter(
- BinaryRowData partition,
- int bucket,
- ExecutorService compactExecutor,
- List<DataFileMeta> restoredFiles) {
+ public Callable<CompactResult> createCompactWriter(
+ BinaryRowData partition, int bucket, @Nullable List<DataFileMeta>
compactFiles) {
throw new UnsupportedOperationException(
"Currently append only write mode does not support
compaction.");
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 930bce95..862e7596 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -20,9 +20,13 @@ package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
import org.apache.flink.table.store.file.writer.RecordWriter;
+import javax.annotation.Nullable;
+
import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
/**
@@ -40,10 +44,12 @@ public interface FileStoreWrite<T> {
RecordWriter<T> createEmptyWriter(
BinaryRowData partition, int bucket, ExecutorService
compactExecutor);
- /** Create a compact {@link RecordWriter} from partition, bucket and
restore files. */
- RecordWriter<T> createCompactWriter(
- BinaryRowData partition,
- int bucket,
- ExecutorService compactExecutor,
- List<DataFileMeta> restoredFiles);
+ /**
+ * Create a {@link Callable} compactor from partition, bucket.
+ *
+ * @param compactFiles input files of compaction. When it is null, will
automatically read all
+ * files of the current bucket.
+ */
+ Callable<CompactResult> createCompactWriter(
+ BinaryRowData partition, int bucket, @Nullable List<DataFileMeta>
compactFiles);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 86d68ef5..174fe255 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -31,7 +31,10 @@ import
org.apache.flink.table.store.file.mergetree.MergeTreeReader;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
import org.apache.flink.table.store.file.mergetree.SortBufferMemTable;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
+import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
+import org.apache.flink.table.store.file.mergetree.compact.CompactTask;
import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
@@ -39,14 +42,15 @@ import
org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.utils.FileStorePathFactory;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
-import org.apache.flink.table.store.file.writer.CompactWriter;
import org.apache.flink.table.store.file.writer.RecordWriter;
import org.apache.flink.table.types.logical.RowType;
+import javax.annotation.Nullable;
+
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.Optional;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
@@ -102,20 +106,17 @@ public class KeyValueFileStoreWrite extends
AbstractFileStoreWrite<KeyValue> {
}
@Override
- public RecordWriter<KeyValue> createCompactWriter(
- BinaryRowData partition,
- int bucket,
- ExecutorService compactExecutor,
- List<DataFileMeta> restoredFiles) {
- Levels levels = new Levels(keyComparatorSupplier.get(), restoredFiles,
options.numLevels);
- return new CompactWriter(
- CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1,
levels.levelSortedRuns()),
- createCompactManager(
- partition,
- bucket,
- (numLevels, runs) ->
-
Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs)),
- compactExecutor));
+ public Callable<CompactResult> createCompactWriter(
+ BinaryRowData partition, int bucket, @Nullable List<DataFileMeta>
compactFiles) {
+ if (compactFiles == null) {
+ compactFiles = scanExistingFileMetas(partition, bucket);
+ }
+ Comparator<RowData> keyComparator = keyComparatorSupplier.get();
+ CompactRewriter rewriter = compactRewriter(partition, bucket,
keyComparator);
+ Levels levels = new Levels(keyComparator, compactFiles,
options.numLevels);
+ CompactUnit unit =
+ CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1,
levels.levelSortedRuns());
+ return new CompactTask(keyComparator, options.targetFileSize,
rewriter, unit, true);
}
private RecordWriter<KeyValue> createMergeTreeWriter(
@@ -153,20 +154,24 @@ public class KeyValueFileStoreWrite extends
AbstractFileStoreWrite<KeyValue> {
int bucket,
CompactStrategy compactStrategy,
ExecutorService compactExecutor) {
- DataFileWriter dataFileWriter =
dataFileWriterFactory.create(partition, bucket);
Comparator<RowData> keyComparator = keyComparatorSupplier.get();
- CompactManager.Rewriter rewriter =
- (outputLevel, dropDelete, sections) ->
- dataFileWriter.write(
- new RecordReaderIterator<>(
- new MergeTreeReader(
- sections,
- dropDelete,
-
dataFileReaderFactory.create(partition, bucket),
- keyComparator,
- mergeFunction.copy())),
- outputLevel);
+ CompactRewriter rewriter = compactRewriter(partition, bucket,
keyComparator);
return new CompactManager(
compactExecutor, compactStrategy, keyComparator,
options.targetFileSize, rewriter);
}
+
+ private CompactRewriter compactRewriter(
+ BinaryRowData partition, int bucket, Comparator<RowData>
keyComparator) {
+ DataFileWriter dataFileWriter =
dataFileWriterFactory.create(partition, bucket);
+ return (outputLevel, dropDelete, sections) ->
+ dataFileWriter.write(
+ new RecordReaderIterator<>(
+ new MergeTreeReader(
+ sections,
+ dropDelete,
+
dataFileReaderFactory.create(partition, bucket),
+ keyComparator,
+ mergeFunction.copy())),
+ outputLevel);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
deleted file mode 100644
index 884af4e8..00000000
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/writer/CompactWriter.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.file.writer;
-
-import org.apache.flink.table.store.file.KeyValue;
-import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
-import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-
-/**
- * A {@link RecordWriter} implementation that only perform compaction on
existing records and does
- * not generate new records.
- */
-public class CompactWriter implements RecordWriter<KeyValue> {
-
- private final CompactUnit unit;
- private final CompactManager compactManager;
-
- public CompactWriter(CompactUnit unit, CompactManager compactManager) {
- this.unit = unit;
- this.compactManager = compactManager;
- }
-
- @Override
- public Increment prepareCommit() throws IOException, InterruptedException {
- List<DataFileMeta> compactBefore = new ArrayList<>();
- List<DataFileMeta> compactAfter = new ArrayList<>();
- if (compactManager.isCompactionFinished()) {
- compactManager.submitCompaction(unit, true);
- try {
- compactManager
- .finishCompaction(true)
- .ifPresent(
- result -> {
- compactBefore.addAll(result.before());
- compactAfter.addAll(result.after());
- });
- return Increment.forCompact(compactBefore, compactAfter);
- } catch (ExecutionException e) {
- throw new IOException(e.getCause());
- }
- }
- throw new IllegalStateException("Compact manager should have finished
previous task.");
- }
-
- @Override
- public List<DataFileMeta> close() throws Exception {
- return Collections.emptyList();
- }
-
- @Override
- public void write(KeyValue kv) throws Exception {
- // nothing to write
- }
-
- @Override
- public void sync() throws Exception {}
-}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
index c7ea9765..a993ee2d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AbstractFileStoreTable.java
@@ -18,9 +18,11 @@
package org.apache.flink.table.store.table;
+import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableCompact;
/** Abstract {@link FileStoreTable}. */
public abstract class AbstractFileStoreTable implements FileStoreTable {
@@ -35,6 +37,8 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
this.schema = schema;
}
+ protected abstract FileStore<?> store();
+
@Override
public String name() {
return name;
@@ -54,4 +58,9 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
public TableCommit newCommit() {
return new TableCommit(store().newCommit(), store().newExpire());
}
+
+ @Override
+ public TableCompact newCompact() {
+ return new TableCompact(store().newScan(), store().newWrite(),
store().partitionType());
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
index f16398be..82271f2c 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/FileStoreTable.java
@@ -18,10 +18,10 @@
package org.apache.flink.table.store.table;
-import org.apache.flink.table.store.file.FileStore;
import org.apache.flink.table.store.file.schema.Schema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.sink.TableCommit;
+import org.apache.flink.table.store.table.sink.TableCompact;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.TableScan;
@@ -48,5 +48,5 @@ public interface FileStoreTable extends Serializable {
TableCommit newCommit();
- FileStore store();
+ TableCompact newCompact();
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
new file mode 100644
index 00000000..9632d6dd
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.table.sink;
+
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.data.DataFileMeta;
+import org.apache.flink.table.store.file.mergetree.Increment;
+import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
+import org.apache.flink.table.store.file.operation.FileStoreScan;
+import org.apache.flink.table.store.file.operation.FileStoreWrite;
+import org.apache.flink.table.store.file.predicate.PredicateConverter;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiPredicate;
+
+/** An abstraction layer above {@link FileStoreWrite#createCompactWriter} to
provide compaction. */
+public class TableCompact {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TableCompact.class);
+
+ private final FileStoreScan scan;
+ private final FileStoreWrite<?> write;
+ private final RowType partitionType;
+
+ private BiPredicate<BinaryRowData, Integer> partBucketFilter;
+
+ public TableCompact(FileStoreScan scan, FileStoreWrite<?> write, RowType
partitionType) {
+ this.scan = scan;
+ this.write = write;
+ this.partitionType = partitionType;
+ }
+
+ public TableCompact withPartitions(Map<String, String> partitionSpec) {
+ scan.withPartitionFilter(
+ PredicateConverter.CONVERTER.fromMap(partitionSpec,
partitionType));
+ return this;
+ }
+
+ public TableCompact withFilter(BiPredicate<BinaryRowData, Integer>
partBucketFilter) {
+ this.partBucketFilter = partBucketFilter;
+ return this;
+ }
+
+ public List<FileCommittable> compact() {
+ List<FileCommittable> committables = new ArrayList<>();
+ scan.plan()
+ .groupByPartFiles()
+ .forEach(
+ (partition, buckets) ->
+ buckets.forEach(
+ (bucket, files) ->
+ doCompact(partition, bucket,
files)
+
.ifPresent(committables::add)));
+ return committables;
+ }
+
+ private Optional<FileCommittable> doCompact(
+ BinaryRowData partition, int bucket, List<DataFileMeta> files) {
+ if (!partBucketFilter.test(partition, bucket)) {
+ return Optional.empty();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Do compaction for partition {}, bucket {}",
+ FileStorePathFactory.getPartitionComputer(
+ partitionType,
+
FileSystemConnectorOptions.PARTITION_DEFAULT_NAME
+ .defaultValue())
+ .generatePartValues(partition),
+ bucket);
+ }
+ try {
+ CompactResult result =
+ write.createCompactWriter(partition.copy(), bucket,
files).call();
+ FileCommittable committable =
+ new FileCommittable(
+ partition,
+ bucket,
+ Increment.forCompact(result.before(),
result.after()));
+ return Optional.of(committable);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index 16137dd3..e7607a12 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.store.file.data.DataFileWriter;
import org.apache.flink.table.store.file.format.FileFormat;
import org.apache.flink.table.store.file.format.FlushingFileFormat;
import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
+import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
@@ -286,7 +287,7 @@ public class MergeTreeTest {
options.maxSizeAmplificationPercent,
options.sizeRatio,
options.numSortedRunCompactionTrigger);
- CompactManager.Rewriter rewriter =
+ CompactRewriter rewriter =
(outputLevel, dropDelete, sections) ->
dataFileWriter.write(
new RecordReaderIterator<KeyValue>(
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
index bffac73e..51a56305 100644
---
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
@@ -230,7 +230,7 @@ public class CompactManagerTest {
return (numLevels, runs) ->
Optional.of(CompactUnit.fromLevelRuns(numLevels - 1, runs));
}
- private CompactManager.Rewriter testRewriter(boolean expectedDropDelete) {
+ private CompactRewriter testRewriter(boolean expectedDropDelete) {
return (outputLevel, dropDelete, sections) -> {
assertThat(dropDelete).isEqualTo(expectedDropDelete);
int minKey = Integer.MAX_VALUE;