This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1b3260801 [flink] add flink compaction job of unaware-bucket
compaction (#1347)
1b3260801 is described below
commit 1b326080193edd7fe7f633a97f42715ade61b215
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 13 09:54:55 2023 +0800
[flink] add flink compaction job of unaware-bucket compaction (#1347)
---
.../generated/flink_connector_configuration.html | 6 +
.../AppendOnlyTableCompactionCoordinator.java | 35 +++++-
.../apache/paimon/flink/FlinkConnectorOptions.java | 9 ++
.../apache/paimon/flink/action/CompactAction.java | 43 ++++++-
.../UnawareBucketCompactionTopoBuilder.java | 123 +++++++++++++++++++
.../AppendOnlyTableCompactionWorkerOperator.java | 76 ++++++++++++
.../flink/sink/CompactionTaskSimpleSerializer.java | 65 ++++++++++
.../paimon/flink/sink/CompactionTaskTypeInfo.java | 99 ++++++++++++++++
.../org/apache/paimon/flink/sink/FlinkSink.java | 46 ++++---
.../flink/sink/GlobalFullCompactionSinkWrite.java | 4 +-
.../flink/sink/UnawareBucketCompactionSink.java | 61 ++++++++++
.../flink/source/UnawareBucketSourceFunction.java | 123 +++++++++++++++++++
.../paimon/flink/action/CompactActionITCase.java | 132 +++++++++++++++++++++
.../sink/CompactionTaskSimpleSerializerTest.java | 79 ++++++++++++
14 files changed, 874 insertions(+), 27 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index d1776e9f8..dc4c4e0b5 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -116,5 +116,11 @@ under the License.
<td>Boolean</td>
<td>The option to enable return per iterator instead of per record
in streaming read.This can ensure that there will be no checkpoint segmentation
in iterator consumption.<br />By default, streaming source checkpoint will be
performed in any time, this means 'UPDATE_BEFORE' and 'UPDATE_AFTER' can be
split into two checkpoint. Downstream can see intermediate state.</td>
</tr>
+ <tr>
+ <td><h5>unaware-bucket.compaction.parallelism</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Integer</td>
+ <td>Defines a custom parallelism for the unaware-bucket table
compaction job. By default, if this option is not defined, the planner will
derive the parallelism for each statement individually by also considering the
global configuration.</td>
+ </tr>
</tbody>
</table>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
index 4f125ddd9..e89cff30e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
@@ -22,10 +22,14 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.StreamTableScan;
+
+import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
@@ -56,16 +60,37 @@ public class AppendOnlyTableCompactionCoordinator {
protected static final int REMOVE_AGE = 10;
protected static final int COMPACT_AGE = 5;
- private final StreamTableScan scan;
+ private final InnerTableScan scan;
private final long targetFileSize;
private final int minFileNum;
private final int maxFileNum;
+ private final boolean streamingMode;
final Map<BinaryRow, PartitionCompactCoordinator>
partitionCompactCoordinators =
new HashMap<>();
public AppendOnlyTableCompactionCoordinator(AppendOnlyFileStoreTable
table) {
- scan = table.copy(compactScanType()).newStreamScan();
+ this(table, true);
+ }
+
+ public AppendOnlyTableCompactionCoordinator(
+ AppendOnlyFileStoreTable table, boolean isStreaming) {
+ this(table, isStreaming, null);
+ }
+
+ public AppendOnlyTableCompactionCoordinator(
+ AppendOnlyFileStoreTable table, boolean isStreaming, @Nullable
Predicate filter) {
+
+ FileStoreTable tableCopy = table.copy(compactScanType());
+ if (isStreaming) {
+ scan = tableCopy.newStreamScan();
+ } else {
+ scan = tableCopy.newScan();
+ }
+ if (filter != null) {
+ scan.withFilter(filter);
+ }
+ this.streamingMode = isStreaming;
CoreOptions coreOptions = table.coreOptions();
this.targetFileSize = coreOptions.targetFileSize();
this.minFileNum = coreOptions.compactionMinFileNum();
@@ -93,6 +118,10 @@ public class AppendOnlyTableCompactionCoordinator {
DataSplit dataSplit = (DataSplit) split;
notifyNewFiles(dataSplit.partition(),
dataSplit.files());
});
+ // batch mode, we don't do continuous scanning
+ if (!streamingMode) {
+ break;
+ }
}
return hasResult;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 04a9c509c..71c0cd003 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -87,6 +87,15 @@ public class FlinkConnectorOptions {
+ "for each statement individually by also
considering the global configuration. "
+ "If user enable the
scan.infer-parallelism, the planner will derive the parallelism by inferred
parallelism.");
+ public static final ConfigOption<Integer>
UNAWARE_BUCKET_COMPACTION_PARALLELISM =
+ ConfigOptions.key("unaware-bucket.compaction.parallelism")
+ .intType()
+ .noDefaultValue()
+ .withDescription(
+ "Defines a custom parallelism for the
unaware-bucket table compaction job. "
+ + "By default, if this option is not
defined, the planner will derive the parallelism "
+ + "for each statement individually by also
considering the global configuration.");
+
public static final ConfigOption<Boolean> INFER_SCAN_PARALLELISM =
ConfigOptions.key("scan.infer-parallelism")
.booleanType()
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 5665bcf20..f2064bc29 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -19,9 +19,11 @@
package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -49,8 +51,7 @@ public class CompactAction extends TableActionBase {
private static final Logger LOG =
LoggerFactory.getLogger(CompactAction.class);
- private final CompactorSourceBuilder sourceBuilder;
- private final CompactorSinkBuilder sinkBuilder;
+ private List<Map<String, String>> partitions;
public CompactAction(String warehouse, String database, String tableName) {
this(warehouse, database, tableName, Collections.emptyMap());
@@ -69,9 +70,6 @@ public class CompactAction extends TableActionBase {
table.getClass().getName()));
}
table =
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
- sourceBuilder =
- new CompactorSourceBuilder(identifier.getFullName(),
(FileStoreTable) table);
- sinkBuilder = new CompactorSinkBuilder((FileStoreTable) table);
}
// ------------------------------------------------------------------------
@@ -79,7 +77,7 @@ public class CompactAction extends TableActionBase {
// ------------------------------------------------------------------------
public CompactAction withPartitions(List<Map<String, String>> partitions) {
- sourceBuilder.withPartitions(partitions);
+ this.partitions = partitions;
return this;
}
@@ -87,12 +85,45 @@ public class CompactAction extends TableActionBase {
ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ switch (fileStoreTable.bucketMode()) {
+ case UNAWARE:
+ {
+ buildForUnawareBucketCompaction(
+ env, (AppendOnlyFileStoreTable) table,
isStreaming);
+ break;
+ }
+ case FIXED:
+ case DYNAMIC:
+ default:
+ {
+ buildForTraditionalCompaction(env, fileStoreTable,
isStreaming);
+ }
+ }
+ }
+
+ private void buildForTraditionalCompaction(
+ StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming) {
+ CompactorSourceBuilder sourceBuilder =
+ new CompactorSourceBuilder(identifier.getFullName(), table);
+ CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
+ sourceBuilder.withPartitions(partitions);
DataStreamSource<RowData> source =
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
sinkBuilder.withInput(source).build();
}
+ private void buildForUnawareBucketCompaction(
+ StreamExecutionEnvironment env, AppendOnlyFileStoreTable table,
boolean isStreaming) {
+ UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder =
+ new UnawareBucketCompactionTopoBuilder(env,
identifier.getFullName(), table);
+
+ unawareBucketCompactionTopoBuilder.withPartitions(partitions);
+ unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
+ unawareBucketCompactionTopoBuilder.build();
+ }
+
// ------------------------------------------------------------------------
// Flink run methods
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
new file mode 100644
index 000000000..8d1b70271
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
+import org.apache.paimon.flink.source.UnawareBucketSourceFunction;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Build for unaware-bucket table flink compaction job.
+ *
+ * <p>Note: This compaction job class is only used for unaware-bucket
compaction, at start-up, it
+ * scans all the files from the latest snapshot, filter large file, and add
small files into memory,
+ * generates compaction task for them. At continuous, it scans the delta files
from the follow-up
+ * snapshot. We need to enable checkpoint for this compaction job, checkpoint
will trigger committer
+ * stage to commit all the compacted files.
+ */
+public class UnawareBucketCompactionTopoBuilder {
+
+ private final transient StreamExecutionEnvironment env;
+ private final String tableIdentifier;
+ private final AppendOnlyFileStoreTable table;
+ @Nullable private List<Map<String, String>> specifiedPartitions = null;
+ private boolean isContinuous = false;
+
+ public UnawareBucketCompactionTopoBuilder(
+ StreamExecutionEnvironment env,
+ String tableIdentifier,
+ AppendOnlyFileStoreTable table) {
+ this.env = env;
+ this.tableIdentifier = tableIdentifier;
+ this.table = table;
+ }
+
+ public void withContinuousMode(boolean isContinuous) {
+ this.isContinuous = isContinuous;
+ }
+
+ public void withPartitions(List<Map<String, String>> partitions) {
+ this.specifiedPartitions = partitions;
+ }
+
+ public void build() {
+
+ // build source from UnawareSourceFunction
+ DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
+
+ // from source, construct the full flink job
+ sinkFromSource(source);
+ }
+
+ private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
+ long scanInterval =
table.coreOptions().continuousDiscoveryInterval().toMillis();
+ UnawareBucketSourceFunction source =
+ new UnawareBucketSourceFunction(
+ table, isContinuous, scanInterval,
getPartitionFilter());
+
+ return UnawareBucketSourceFunction.buildSource(env, source,
isContinuous, tableIdentifier);
+ }
+
+ private void sinkFromSource(DataStreamSource<AppendOnlyCompactionTask>
input) {
+ Options conf = Options.fromMap(table.options());
+ Integer compactionWorkerParallelism =
+
conf.get(FlinkConnectorOptions.UNAWARE_BUCKET_COMPACTION_PARALLELISM);
+ PartitionTransformation<AppendOnlyCompactionTask> transformation =
+ new PartitionTransformation<>(
+ input.getTransformation(), new
RebalancePartitioner<>());
+ if (compactionWorkerParallelism != null) {
+ transformation.setParallelism(compactionWorkerParallelism);
+ } else {
+ // cause source function for unaware-bucket table compaction has
only one parallelism,
+ // we need to set to default parallelism by hand.
+ transformation.setParallelism(env.getParallelism());
+ }
+ DataStream<AppendOnlyCompactionTask> rebalanced = new
DataStream<>(env, transformation);
+
+ UnawareBucketCompactionSink.sink(table, rebalanced);
+ }
+
+ private Predicate getPartitionFilter() {
+ Predicate partitionPredicate = null;
+ if (specifiedPartitions != null) {
+ partitionPredicate =
+ PredicateBuilder.or(
+ specifiedPartitions.stream()
+ .map(p -> PredicateBuilder.partition(p,
table.rowType()))
+ .toArray(Predicate[]::new));
+ }
+ return partitionPredicate;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
new file mode 100644
index 000000000..6114b1d3f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.append.AppendOnlyTableCompactionWorker;
+import org.apache.paimon.flink.source.UnawareBucketSourceFunction;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link
+ * UnawareBucketSourceFunction}.
+ */
+public class AppendOnlyTableCompactionWorkerOperator
+ extends PrepareCommitOperator<AppendOnlyCompactionTask, Committable> {
+ private final AppendOnlyFileStoreTable table;
+ private transient AppendOnlyTableCompactionWorker worker;
+ private final String commitUser;
+ private transient List<CommitMessage> result;
+
+ public AppendOnlyTableCompactionWorkerOperator(
+ AppendOnlyFileStoreTable table, String commitUser) {
+ super(Options.fromMap(table.options()));
+ this.table = table;
+ this.commitUser = commitUser;
+ }
+
+ @Override
+ public void open() throws Exception {
+ this.worker = new AppendOnlyTableCompactionWorker(table, commitUser);
+ this.result = new ArrayList<>();
+ }
+
+ @Override
+ protected List<Committable> prepareCommit(boolean doCompaction, long
checkpointId)
+ throws IOException {
+ // ignore doCompaction tag
+ ArrayList<CommitMessage> tempList = new ArrayList<>(result);
+ result.clear();
+ return tempList.stream()
+ .map(s -> new Committable(checkpointId, Committable.Kind.FILE,
s))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public void processElement(StreamRecord<AppendOnlyCompactionTask> element)
throws Exception {
+ AppendOnlyCompactionTask task = element.getValue();
+ worker.accept(task);
+ result.addAll(worker.doCompact());
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializer.java
new file mode 100644
index 000000000..06d968fae
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.table.sink.CompactionTaskSerializer;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** {@link SimpleVersionedSerializer} for {@link AppendOnlyCompactionTask}. */
+public class CompactionTaskSimpleSerializer
+ implements SimpleVersionedSerializer<AppendOnlyCompactionTask> {
+
+ private final CompactionTaskSerializer compactionTaskSerializer;
+
+ public CompactionTaskSimpleSerializer(CompactionTaskSerializer
compactionTaskSerializer) {
+ this.compactionTaskSerializer = compactionTaskSerializer;
+ }
+
+ @Override
+ public int getVersion() {
+ return 2;
+ }
+
+ @Override
+ public byte[] serialize(AppendOnlyCompactionTask compactionTask) throws
IOException {
+ byte[] wrapped = compactionTaskSerializer.serialize(compactionTask);
+ int version = compactionTaskSerializer.getVersion();
+
+ return ByteBuffer.allocate(wrapped.length +
4).put(wrapped).putInt(version).array();
+ }
+
+ @Override
+ public AppendOnlyCompactionTask deserialize(int compactionTaskVersion,
byte[] bytes)
+ throws IOException {
+ if (compactionTaskVersion != getVersion()) {
+ throw new RuntimeException("Can not deserialize version: " +
compactionTaskVersion);
+ }
+
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ byte[] wrapped = new byte[bytes.length - 4];
+ buffer.get(wrapped);
+ int version = buffer.getInt();
+ return compactionTaskSerializer.deserialize(version, wrapped);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
new file mode 100644
index 000000000..c33ab95e1
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.table.sink.CompactionTaskSerializer;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+
+/** Type information of {@link AppendOnlyCompactionTask}. */
+public class CompactionTaskTypeInfo extends
TypeInformation<AppendOnlyCompactionTask> {
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ public Class<AppendOnlyCompactionTask> getTypeClass() {
+ return AppendOnlyCompactionTask.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<AppendOnlyCompactionTask>
createSerializer(ExecutionConfig config) {
+ // we don't need copy for task
+ return new
SimpleVersionedSerializerTypeSerializerProxy<AppendOnlyCompactionTask>(
+ () -> new CompactionTaskSimpleSerializer(new
CompactionTaskSerializer())) {
+ @Override
+ public AppendOnlyCompactionTask copy(AppendOnlyCompactionTask
from) {
+ return from;
+ }
+
+ @Override
+ public AppendOnlyCompactionTask copy(
+ AppendOnlyCompactionTask from, AppendOnlyCompactionTask
reuse) {
+ return from;
+ }
+ };
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof CompactionTaskTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof CompactionTaskTypeInfo;
+ }
+
+ @Override
+ public String toString() {
+ return "CompactionTask";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 52fff82d4..6ab2ddd4c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -59,11 +59,11 @@ public abstract class FlinkSink<T> implements Serializable {
private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
protected final FileStoreTable table;
- private final boolean isOverwrite;
+ private final boolean emptyWriter;
- public FlinkSink(FileStoreTable table, boolean isOverwrite) {
+ public FlinkSink(FileStoreTable table, boolean emptyWriter) {
this.table = table;
- this.isOverwrite = isOverwrite;
+ this.emptyWriter = emptyWriter;
}
private StoreSinkWrite.Provider createWriteProvider(CheckpointConfig
checkpointConfig) {
@@ -97,7 +97,7 @@ public abstract class FlinkSink<T> implements Serializable {
commitUser,
state,
ioManager,
- isOverwrite,
+ emptyWriter,
waitCompaction,
finalDeltaCommits,
memoryPool);
@@ -110,7 +110,7 @@ public abstract class FlinkSink<T> implements Serializable {
commitUser,
state,
ioManager,
- isOverwrite,
+ emptyWriter,
waitCompaction,
memoryPool);
}
@@ -122,14 +122,10 @@ public abstract class FlinkSink<T> implements
Serializable {
// When the job restarts, commitUser will be recovered from states and
this value is
// ignored.
String initialCommitUser = UUID.randomUUID().toString();
- return sinkFrom(
- input,
- initialCommitUser,
-
createWriteProvider(input.getExecutionEnvironment().getCheckpointConfig()));
+ return sinkFrom(input, initialCommitUser);
}
- public DataStreamSink<?> sinkFrom(
- DataStream<T> input, String commitUser, StoreSinkWrite.Provider
sinkProvider) {
+ public DataStreamSink<?> sinkFrom(DataStream<T> input, String commitUser) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
@@ -142,12 +138,24 @@ public abstract class FlinkSink<T> implements
Serializable {
assertCheckpointConfiguration(env);
}
- CommittableTypeInfo typeInfo = new CommittableTypeInfo();
+ SingleOutputStreamOperator<Committable> written =
+ handleInput(input, isStreaming, commitUser);
+
+ return commit(written, streamingCheckpointEnabled, commitUser);
+ }
+
+ protected SingleOutputStreamOperator<Committable> handleInput(
+ DataStream<T> input, boolean isStreaming, String commitUser) {
SingleOutputStreamOperator<Committable> written =
input.transform(
WRITER_NAME + " -> " + table.name(),
- typeInfo,
- createWriteOperator(sinkProvider, isStreaming,
commitUser))
+ new CommittableTypeInfo(),
+ createWriteOperator(
+ createWriteProvider(
+ input.getExecutionEnvironment()
+
.getCheckpointConfig()),
+ isStreaming,
+ commitUser))
.setParallelism(input.getParallelism());
Options options = Options.fromMap(table.options());
if (options.get(SINK_USE_MANAGED_MEMORY)) {
@@ -156,11 +164,17 @@ public abstract class FlinkSink<T> implements
Serializable {
.declareManagedMemoryUseCaseAtOperatorScope(
ManagedMemoryUseCase.OPERATOR,
memorySize.getMebiBytes());
}
+ return written;
+ }
+ protected DataStreamSink<?> commit(
+ SingleOutputStreamOperator<Committable> written,
+ boolean streamingCheckpointEnabled,
+ String commitUser) {
SingleOutputStreamOperator<?> committed =
written.transform(
GLOBAL_COMMITTER_NAME + " -> " + table.name(),
- typeInfo,
+ new CommittableTypeInfo(),
new CommitterOperator<>(
streamingCheckpointEnabled,
commitUser,
@@ -171,7 +185,7 @@ public abstract class FlinkSink<T> implements Serializable {
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
}
- private void assertCheckpointConfiguration(StreamExecutionEnvironment env)
{
+ public static void
assertCheckpointConfiguration(StreamExecutionEnvironment env) {
Preconditions.checkArgument(
!env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
"Paimon sink currently does not support unaligned checkpoints.
Please set "
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index 8e6e1ba14..8ed101b8f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -69,11 +69,11 @@ public class GlobalFullCompactionSinkWrite extends
StoreSinkWriteImpl {
String commitUser,
StoreSinkWriteState state,
IOManager ioManager,
- boolean isOverwrite,
+ boolean emptyWriter,
boolean waitCompaction,
int deltaCommits,
@Nullable MemorySegmentPool memoryPool) {
- super(table, commitUser, state, ioManager, isOverwrite,
waitCompaction, memoryPool);
+ super(table, commitUser, state, ioManager, emptyWriter,
waitCompaction, memoryPool);
this.deltaCommits = deltaCommits;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
new file mode 100644
index 000000000..1aec4e711
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+
+/** Compaction Sink for unaware-bucket table. */
+public class UnawareBucketCompactionSink extends
FlinkSink<AppendOnlyCompactionTask> {
+
+ private final AppendOnlyFileStoreTable table;
+
+ public UnawareBucketCompactionSink(AppendOnlyFileStoreTable table) {
+ super(table, true);
+ this.table = table;
+ }
+
+ public static DataStreamSink<?> sink(
+ AppendOnlyFileStoreTable table,
DataStream<AppendOnlyCompactionTask> input) {
+ return new UnawareBucketCompactionSink(table).sinkFrom(input);
+ }
+
+ @Override
+ protected OneInputStreamOperator<AppendOnlyCompactionTask, Committable>
createWriteOperator(
+ StoreSinkWrite.Provider writeProvider, boolean isStreaming, String
commitUser) {
+ return new AppendOnlyTableCompactionWorkerOperator(table, commitUser);
+ }
+
+ @Override
+ protected SerializableFunction<String, Committer<Committable,
ManifestCommittable>>
+ createCommitterFactory(boolean streamingCheckpointEnabled) {
+ return s -> new StoreCommitter(table.newCommit(s));
+ }
+
+ @Override
+ protected CommittableStateManager<ManifestCommittable>
createCommittableStateManager() {
+ return new NoopCommittableStateManager();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/UnawareBucketSourceFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/UnawareBucketSourceFunction.java
new file mode 100644
index 000000000..aa57439cc
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/UnawareBucketSourceFunction.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator;
+import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.source.EndOfScanException;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Source Function for unaware-bucket Compaction. */
+public class UnawareBucketSourceFunction extends
RichSourceFunction<AppendOnlyCompactionTask> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(UnawareBucketSourceFunction.class);
+ private static final String COMPACTION_COORDINATOR_NAME = "Compaction
Coordinator";
+
+ private final AppendOnlyFileStoreTable table;
+ private final boolean streaming;
+ private final long scanInterval;
+ private final Predicate filter;
+ private transient AppendOnlyTableCompactionCoordinator
compactionCoordinator;
+ private transient SourceContext<AppendOnlyCompactionTask> ctx;
+ private volatile boolean isRunning = true;
+
+ public UnawareBucketSourceFunction(
+ AppendOnlyFileStoreTable table,
+ boolean isStreaming,
+ long scanInterval,
+ @Nullable Predicate filter) {
+ this.table = table;
+ this.streaming = isStreaming;
+ this.scanInterval = scanInterval;
+ this.filter = filter;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ compactionCoordinator = new
AppendOnlyTableCompactionCoordinator(table, streaming, filter);
+ }
+
+ @Override
+ public void run(SourceContext<AppendOnlyCompactionTask> sourceContext)
throws Exception {
+ this.ctx = sourceContext;
+ while (isRunning) {
+ boolean isEmpty;
+ synchronized (ctx.getCheckpointLock()) {
+ if (!isRunning) {
+ return;
+ }
+ try {
+ // do scan and plan action, emit append-only compaction
tasks.
+ List<AppendOnlyCompactionTask> tasks =
compactionCoordinator.run();
+ isEmpty = tasks.isEmpty();
+ tasks.forEach(ctx::collect);
+ } catch (EndOfScanException esf) {
+ LOG.info("Catching EndOfStreamException, the stream is
finished.");
+ return;
+ }
+ }
+
+ if (isEmpty) {
+ Thread.sleep(scanInterval);
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (ctx != null) {
+ synchronized (ctx.getCheckpointLock()) {
+ isRunning = false;
+ }
+ } else {
+ isRunning = false;
+ }
+ }
+
+ public static DataStreamSource<AppendOnlyCompactionTask> buildSource(
+ StreamExecutionEnvironment env,
+ UnawareBucketSourceFunction source,
+ boolean streaming,
+ String tableIdentifier) {
+ final StreamSource<AppendOnlyCompactionTask,
UnawareBucketSourceFunction> sourceOperator =
+ new StreamSource<>(source);
+ return new DataStreamSource<>(
+ env,
+ new CompactionTaskTypeInfo(),
+ sourceOperator,
+ false,
+ COMPACTION_COORDINATOR_NAME + " -> " + tableIdentifier,
+ streaming ? Boundedness.CONTINUOUS_UNBOUNDED :
Boundedness.BOUNDED);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index cd363210c..ad83c50bc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -22,6 +22,10 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
@@ -43,9 +47,11 @@ import org.junit.jupiter.api.Timeout;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
@@ -201,6 +207,110 @@ public class CompactActionITCase extends ActionITCaseBase
{
client.cancel();
}
+ @Test
+ public void testUnawareBucketStreamingCompact() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
+ // test that dedicated compact job will expire snapshots
+ options.put(CoreOptions.BUCKET.key(), "-1");
+ options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+
+ FileStoreTable table =
+ createFileStoreTable(
+ ROW_TYPE, Arrays.asList("k"), Collections.emptyList(),
options);
+ snapshotManager = table.snapshotManager();
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ // base records
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ Assertions.assertEquals(2, snapshot.id());
+ Assertions.assertEquals(Snapshot.CommitKind.APPEND,
snapshot.commitKind());
+
+ FileStoreScan storeScan = ((AbstractFileStoreTable)
table).store().newScan();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().setCheckpointInterval(500);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+ new CompactAction(warehouse, database, tableName).build(env);
+ JobClient client = env.executeAsync();
+
+ // first compaction, snapshot will be 3
+ checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6);
+
+ writeData(
+ rowData(1, 101, 15, BinaryString.fromString("20221208")),
+ rowData(1, 101, 16, BinaryString.fromString("20221208")),
+ rowData(1, 101, 15, BinaryString.fromString("20221209")));
+
+ // second compaction, snapshot will be 5
+ checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9);
+
+ client.cancel().get();
+ }
+
+ @Test
+ public void testUnawareBucketBatchCompact() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ // test that dedicated compact job will expire snapshots
+ options.put(CoreOptions.BUCKET.key(), "-1");
+ options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+
+ FileStoreTable table =
+ createFileStoreTable(
+ ROW_TYPE, Arrays.asList("k"), Collections.emptyList(),
options);
+ snapshotManager = table.snapshotManager();
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ // base records
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+ writeData(
+ rowData(1, 100, 15, BinaryString.fromString("20221208")),
+ rowData(1, 100, 16, BinaryString.fromString("20221208")),
+ rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ Assertions.assertEquals(2, snapshot.id());
+ Assertions.assertEquals(Snapshot.CommitKind.APPEND,
snapshot.commitKind());
+
+ FileStoreScan storeScan = ((AbstractFileStoreTable)
table).store().newScan();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+ new CompactAction(warehouse, database, tableName).build(env);
+ env.execute();
+
+ // first compaction, snapshot will be 3.
+ checkFileAndRowSize(storeScan, 3L, 0L, 1, 6);
+
+ writeData(
+ rowData(1, 101, 15, BinaryString.fromString("20221208")),
+ rowData(1, 101, 16, BinaryString.fromString("20221208")),
+ rowData(1, 101, 15, BinaryString.fromString("20221209")));
+ }
+
private List<Map<String, String>> getSpecifiedPartitions() {
Map<String, String> partition1 = new HashMap<>();
partition1.put("dt", "20221208");
@@ -235,4 +345,26 @@ public class CompactActionITCase extends ActionITCaseBase {
actual.sort(String::compareTo);
Assertions.assertEquals(expected, actual);
}
+
+ private void checkFileAndRowSize(
+ FileStoreScan scan, Long expectedSnapshotId, Long timeout, int
fileNum, long rowCount)
+ throws Exception {
+
+ long start = System.currentTimeMillis();
+ while (!Objects.equals(snapshotManager.latestSnapshotId(),
expectedSnapshotId)) {
+ Thread.sleep(500);
+ if (System.currentTimeMillis() - start > timeout) {
+ throw new RuntimeException("can't wait for a compaction.");
+ }
+ }
+
+ List<ManifestEntry> files =
+
scan.withSnapshot(snapshotManager.latestSnapshotId()).plan().files(FileKind.ADD);
+ long count = 0;
+ for (ManifestEntry file : files) {
+ count += file.file().rowCount();
+ }
+ Assertions.assertEquals(fileNum, files.size());
+ Assertions.assertEquals(rowCount, count);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
new file mode 100644
index 000000000..70f127291
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.sink.CompactionTaskSerializer;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
+import static org.apache.paimon.stats.StatsTestUtils.newTableStats;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CompactionTaskSimpleSerializer}. */
+public class CompactionTaskSimpleSerializerTest {
+
+ private final CompactionTaskSerializer compactionTaskSerializer =
+ new CompactionTaskSerializer();
+
+ private final CompactionTaskSimpleSerializer serializer =
+ new CompactionTaskSimpleSerializer(compactionTaskSerializer);
+
+ private final BinaryRow partition = BinaryRow.EMPTY_ROW;
+
+ @Test
+ public void testSerializer() throws IOException {
+
+ AppendOnlyCompactionTask task1 = new
AppendOnlyCompactionTask(partition, newFiles(20));
+ AppendOnlyCompactionTask task2 = serializer.deserialize(2,
serializer.serialize(task1));
+
+ assertThat(task1).isEqualTo(task2);
+ }
+
+ private List<DataFileMeta> newFiles(int num) {
+ List<DataFileMeta> list = new ArrayList<>();
+ for (int i = 0; i < num; i++) {
+ list.add(newFile());
+ }
+ return list;
+ }
+
+ private DataFileMeta newFile() {
+ return new DataFileMeta(
+ UUID.randomUUID().toString(),
+ 0,
+ 1,
+ row(0),
+ row(0),
+ newTableStats(0, 1),
+ newTableStats(0, 1),
+ 0,
+ 1,
+ 0,
+ 0);
+ }
+}