This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b3260801 [flink] add flink compaction job of unaware-bucket 
compaction (#1347)
1b3260801 is described below

commit 1b326080193edd7fe7f633a97f42715ade61b215
Author: YeJunHao <[email protected]>
AuthorDate: Tue Jun 13 09:54:55 2023 +0800

    [flink] add flink compaction job of unaware-bucket compaction (#1347)
---
 .../generated/flink_connector_configuration.html   |   6 +
 .../AppendOnlyTableCompactionCoordinator.java      |  35 +++++-
 .../apache/paimon/flink/FlinkConnectorOptions.java |   9 ++
 .../apache/paimon/flink/action/CompactAction.java  |  43 ++++++-
 .../UnawareBucketCompactionTopoBuilder.java        | 123 +++++++++++++++++++
 .../AppendOnlyTableCompactionWorkerOperator.java   |  76 ++++++++++++
 .../flink/sink/CompactionTaskSimpleSerializer.java |  65 ++++++++++
 .../paimon/flink/sink/CompactionTaskTypeInfo.java  |  99 ++++++++++++++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  46 ++++---
 .../flink/sink/GlobalFullCompactionSinkWrite.java  |   4 +-
 .../flink/sink/UnawareBucketCompactionSink.java    |  61 ++++++++++
 .../flink/source/UnawareBucketSourceFunction.java  | 123 +++++++++++++++++++
 .../paimon/flink/action/CompactActionITCase.java   | 132 +++++++++++++++++++++
 .../sink/CompactionTaskSimpleSerializerTest.java   |  79 ++++++++++++
 14 files changed, 874 insertions(+), 27 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index d1776e9f8..dc4c4e0b5 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -116,5 +116,11 @@ under the License.
             <td>Boolean</td>
             <td>The option to enable return per iterator instead of per record 
in streaming read.This can ensure that there will be no checkpoint segmentation 
in iterator consumption.<br />By default, streaming source checkpoint will be 
performed in any time, this means 'UPDATE_BEFORE' and 'UPDATE_AFTER' can be 
split into two checkpoint. Downstream can see intermediate state.</td>
         </tr>
+        <tr>
+            <td><h5>unaware-bucket.compaction.parallelism</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>Defines a custom parallelism for the unaware-bucket table 
compaction job. By default, if this option is not defined, the planner will 
derive the parallelism for each statement individually by also considering the 
global configuration.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
index 4f125ddd9..e89cff30e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinator.java
@@ -22,10 +22,14 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.StreamTableScan;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -56,16 +60,37 @@ public class AppendOnlyTableCompactionCoordinator {
     protected static final int REMOVE_AGE = 10;
     protected static final int COMPACT_AGE = 5;
 
-    private final StreamTableScan scan;
+    private final InnerTableScan scan;
     private final long targetFileSize;
     private final int minFileNum;
     private final int maxFileNum;
+    private final boolean streamingMode;
 
     final Map<BinaryRow, PartitionCompactCoordinator> 
partitionCompactCoordinators =
             new HashMap<>();
 
     public AppendOnlyTableCompactionCoordinator(AppendOnlyFileStoreTable 
table) {
-        scan = table.copy(compactScanType()).newStreamScan();
+        this(table, true);
+    }
+
+    public AppendOnlyTableCompactionCoordinator(
+            AppendOnlyFileStoreTable table, boolean isStreaming) {
+        this(table, isStreaming, null);
+    }
+
+    public AppendOnlyTableCompactionCoordinator(
+            AppendOnlyFileStoreTable table, boolean isStreaming, @Nullable 
Predicate filter) {
+
+        FileStoreTable tableCopy = table.copy(compactScanType());
+        if (isStreaming) {
+            scan = tableCopy.newStreamScan();
+        } else {
+            scan = tableCopy.newScan();
+        }
+        if (filter != null) {
+            scan.withFilter(filter);
+        }
+        this.streamingMode = isStreaming;
         CoreOptions coreOptions = table.coreOptions();
         this.targetFileSize = coreOptions.targetFileSize();
         this.minFileNum = coreOptions.compactionMinFileNum();
@@ -93,6 +118,10 @@ public class AppendOnlyTableCompactionCoordinator {
                         DataSplit dataSplit = (DataSplit) split;
                         notifyNewFiles(dataSplit.partition(), 
dataSplit.files());
                     });
+            // batch mode, we don't do continuous scanning
+            if (!streamingMode) {
+                break;
+            }
         }
         return hasResult;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 04a9c509c..71c0cd003 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -87,6 +87,15 @@ public class FlinkConnectorOptions {
                                     + "for each statement individually by also 
considering the global configuration. "
                                     + "If user enable the 
scan.infer-parallelism, the planner will derive the parallelism by inferred 
parallelism.");
 
+    public static final ConfigOption<Integer> 
UNAWARE_BUCKET_COMPACTION_PARALLELISM =
+            ConfigOptions.key("unaware-bucket.compaction.parallelism")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Defines a custom parallelism for the 
unaware-bucket table compaction job. "
+                                    + "By default, if this option is not 
defined, the planner will derive the parallelism "
+                                    + "for each statement individually by also 
considering the global configuration.");
+
     public static final ConfigOption<Boolean> INFER_SCAN_PARALLELISM =
             ConfigOptions.key("scan.infer-parallelism")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 5665bcf20..f2064bc29 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -19,9 +19,11 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
 import org.apache.paimon.flink.sink.CompactorSinkBuilder;
 import org.apache.paimon.flink.source.CompactorSourceBuilder;
 import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -49,8 +51,7 @@ public class CompactAction extends TableActionBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CompactAction.class);
 
-    private final CompactorSourceBuilder sourceBuilder;
-    private final CompactorSinkBuilder sinkBuilder;
+    private List<Map<String, String>> partitions;
 
     public CompactAction(String warehouse, String database, String tableName) {
         this(warehouse, database, tableName, Collections.emptyMap());
@@ -69,9 +70,6 @@ public class CompactAction extends TableActionBase {
                             table.getClass().getName()));
         }
         table = 
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
-        sourceBuilder =
-                new CompactorSourceBuilder(identifier.getFullName(), 
(FileStoreTable) table);
-        sinkBuilder = new CompactorSinkBuilder((FileStoreTable) table);
     }
 
     // ------------------------------------------------------------------------
@@ -79,7 +77,7 @@ public class CompactAction extends TableActionBase {
     // ------------------------------------------------------------------------
 
     public CompactAction withPartitions(List<Map<String, String>> partitions) {
-        sourceBuilder.withPartitions(partitions);
+        this.partitions = partitions;
         return this;
     }
 
@@ -87,12 +85,45 @@ public class CompactAction extends TableActionBase {
         ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
         boolean isStreaming =
                 conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        switch (fileStoreTable.bucketMode()) {
+            case UNAWARE:
+                {
+                    buildForUnawareBucketCompaction(
+                            env, (AppendOnlyFileStoreTable) table, 
isStreaming);
+                    break;
+                }
+            case FIXED:
+            case DYNAMIC:
+            default:
+                {
+                    buildForTraditionalCompaction(env, fileStoreTable, 
isStreaming);
+                }
+        }
+    }
+
+    private void buildForTraditionalCompaction(
+            StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming) {
+        CompactorSourceBuilder sourceBuilder =
+                new CompactorSourceBuilder(identifier.getFullName(), table);
+        CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
 
+        sourceBuilder.withPartitions(partitions);
         DataStreamSource<RowData> source =
                 
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
         sinkBuilder.withInput(source).build();
     }
 
+    private void buildForUnawareBucketCompaction(
+            StreamExecutionEnvironment env, AppendOnlyFileStoreTable table, 
boolean isStreaming) {
+        UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder =
+                new UnawareBucketCompactionTopoBuilder(env, 
identifier.getFullName(), table);
+
+        unawareBucketCompactionTopoBuilder.withPartitions(partitions);
+        unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
+        unawareBucketCompactionTopoBuilder.build();
+    }
+
     // ------------------------------------------------------------------------
     //  Flink run methods
     // ------------------------------------------------------------------------
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
new file mode 100644
index 000000000..8d1b70271
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactionTopoBuilder.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.UnawareBucketCompactionSink;
+import org.apache.paimon.flink.source.UnawareBucketSourceFunction;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Build for unaware-bucket table flink compaction job.
+ *
+ * <p>Note: This compaction job class is only used for unaware-bucket 
compaction, at start-up, it
+ * scans all the files from the latest snapshot, filter large file, and add 
small files into memory,
+ * generates compaction task for them. At continuous, it scans the delta files 
from the follow-up
+ * snapshot. We need to enable checkpoint for this compaction job, checkpoint 
will trigger committer
+ * stage to commit all the compacted files.
+ */
+public class UnawareBucketCompactionTopoBuilder {
+
+    private final transient StreamExecutionEnvironment env;
+    private final String tableIdentifier;
+    private final AppendOnlyFileStoreTable table;
+    @Nullable private List<Map<String, String>> specifiedPartitions = null;
+    private boolean isContinuous = false;
+
+    public UnawareBucketCompactionTopoBuilder(
+            StreamExecutionEnvironment env,
+            String tableIdentifier,
+            AppendOnlyFileStoreTable table) {
+        this.env = env;
+        this.tableIdentifier = tableIdentifier;
+        this.table = table;
+    }
+
+    public void withContinuousMode(boolean isContinuous) {
+        this.isContinuous = isContinuous;
+    }
+
+    public void withPartitions(List<Map<String, String>> partitions) {
+        this.specifiedPartitions = partitions;
+    }
+
+    public void build() {
+
+        // build source from UnawareSourceFunction
+        DataStreamSource<AppendOnlyCompactionTask> source = buildSource();
+
+        // from source, construct the full flink job
+        sinkFromSource(source);
+    }
+
+    private DataStreamSource<AppendOnlyCompactionTask> buildSource() {
+        long scanInterval = 
table.coreOptions().continuousDiscoveryInterval().toMillis();
+        UnawareBucketSourceFunction source =
+                new UnawareBucketSourceFunction(
+                        table, isContinuous, scanInterval, 
getPartitionFilter());
+
+        return UnawareBucketSourceFunction.buildSource(env, source, 
isContinuous, tableIdentifier);
+    }
+
+    private void sinkFromSource(DataStreamSource<AppendOnlyCompactionTask> 
input) {
+        Options conf = Options.fromMap(table.options());
+        Integer compactionWorkerParallelism =
+                
conf.get(FlinkConnectorOptions.UNAWARE_BUCKET_COMPACTION_PARALLELISM);
+        PartitionTransformation<AppendOnlyCompactionTask> transformation =
+                new PartitionTransformation<>(
+                        input.getTransformation(), new 
RebalancePartitioner<>());
+        if (compactionWorkerParallelism != null) {
+            transformation.setParallelism(compactionWorkerParallelism);
+        } else {
+            // cause source function for unaware-bucket table compaction has 
only one parallelism,
+            // we need to set to default parallelism by hand.
+            transformation.setParallelism(env.getParallelism());
+        }
+        DataStream<AppendOnlyCompactionTask> rebalanced = new 
DataStream<>(env, transformation);
+
+        UnawareBucketCompactionSink.sink(table, rebalanced);
+    }
+
+    private Predicate getPartitionFilter() {
+        Predicate partitionPredicate = null;
+        if (specifiedPartitions != null) {
+            partitionPredicate =
+                    PredicateBuilder.or(
+                            specifiedPartitions.stream()
+                                    .map(p -> PredicateBuilder.partition(p, 
table.rowType()))
+                                    .toArray(Predicate[]::new));
+        }
+        return partitionPredicate;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
new file mode 100644
index 000000000..6114b1d3f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.append.AppendOnlyTableCompactionWorker;
+import org.apache.paimon.flink.source.UnawareBucketSourceFunction;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link
+ * UnawareBucketSourceFunction}.
+ */
+public class AppendOnlyTableCompactionWorkerOperator
+        extends PrepareCommitOperator<AppendOnlyCompactionTask, Committable> {
+    private final AppendOnlyFileStoreTable table;
+    private transient AppendOnlyTableCompactionWorker worker;
+    private final String commitUser;
+    private transient List<CommitMessage> result;
+
+    public AppendOnlyTableCompactionWorkerOperator(
+            AppendOnlyFileStoreTable table, String commitUser) {
+        super(Options.fromMap(table.options()));
+        this.table = table;
+        this.commitUser = commitUser;
+    }
+
+    @Override
+    public void open() throws Exception {
+        this.worker = new AppendOnlyTableCompactionWorker(table, commitUser);
+        this.result = new ArrayList<>();
+    }
+
+    @Override
+    protected List<Committable> prepareCommit(boolean doCompaction, long 
checkpointId)
+            throws IOException {
+        // ignore doCompaction tag
+        ArrayList<CommitMessage> tempList = new ArrayList<>(result);
+        result.clear();
+        return tempList.stream()
+                .map(s -> new Committable(checkpointId, Committable.Kind.FILE, 
s))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public void processElement(StreamRecord<AppendOnlyCompactionTask> element) 
throws Exception {
+        AppendOnlyCompactionTask task = element.getValue();
+        worker.accept(task);
+        result.addAll(worker.doCompact());
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializer.java
new file mode 100644
index 000000000..06d968fae
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.table.sink.CompactionTaskSerializer;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** {@link SimpleVersionedSerializer} for {@link AppendOnlyCompactionTask}. */
+public class CompactionTaskSimpleSerializer
+        implements SimpleVersionedSerializer<AppendOnlyCompactionTask> {
+
+    private final CompactionTaskSerializer compactionTaskSerializer;
+
+    public CompactionTaskSimpleSerializer(CompactionTaskSerializer 
compactionTaskSerializer) {
+        this.compactionTaskSerializer = compactionTaskSerializer;
+    }
+
+    @Override
+    public int getVersion() {
+        return 2;
+    }
+
+    @Override
+    public byte[] serialize(AppendOnlyCompactionTask compactionTask) throws 
IOException {
+        byte[] wrapped = compactionTaskSerializer.serialize(compactionTask);
+        int version = compactionTaskSerializer.getVersion();
+
+        return ByteBuffer.allocate(wrapped.length + 
4).put(wrapped).putInt(version).array();
+    }
+
+    @Override
+    public AppendOnlyCompactionTask deserialize(int compactionTaskVersion, 
byte[] bytes)
+            throws IOException {
+        if (compactionTaskVersion != getVersion()) {
+            throw new RuntimeException("Can not deserialize version: " + 
compactionTaskVersion);
+        }
+
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        byte[] wrapped = new byte[bytes.length - 4];
+        buffer.get(wrapped);
+        int version = buffer.getInt();
+        return compactionTaskSerializer.deserialize(version, wrapped);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
new file mode 100644
index 000000000..c33ab95e1
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactionTaskTypeInfo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.table.sink.CompactionTaskSerializer;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
+
+/** Type information of {@link AppendOnlyCompactionTask}. */
+public class CompactionTaskTypeInfo extends 
TypeInformation<AppendOnlyCompactionTask> {
+
+    @Override
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public boolean isTupleType() {
+        return false;
+    }
+
+    @Override
+    public int getArity() {
+        return 1;
+    }
+
+    @Override
+    public int getTotalFields() {
+        return 1;
+    }
+
+    @Override
+    public Class<AppendOnlyCompactionTask> getTypeClass() {
+        return AppendOnlyCompactionTask.class;
+    }
+
+    @Override
+    public boolean isKeyType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<AppendOnlyCompactionTask> 
createSerializer(ExecutionConfig config) {
+        // we don't need copy for task
+        return new 
SimpleVersionedSerializerTypeSerializerProxy<AppendOnlyCompactionTask>(
+                () -> new CompactionTaskSimpleSerializer(new 
CompactionTaskSerializer())) {
+            @Override
+            public AppendOnlyCompactionTask copy(AppendOnlyCompactionTask 
from) {
+                return from;
+            }
+
+            @Override
+            public AppendOnlyCompactionTask copy(
+                    AppendOnlyCompactionTask from, AppendOnlyCompactionTask 
reuse) {
+                return from;
+            }
+        };
+    }
+
+    @Override
+    public int hashCode() {
+        return 0;
+    }
+
+    @Override
+    public boolean canEqual(Object obj) {
+        return obj instanceof CompactionTaskTypeInfo;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof CompactionTaskTypeInfo;
+    }
+
+    @Override
+    public String toString() {
+        return "CompactionTask";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 52fff82d4..6ab2ddd4c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -59,11 +59,11 @@ public abstract class FlinkSink<T> implements Serializable {
     private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
 
     protected final FileStoreTable table;
-    private final boolean isOverwrite;
+    private final boolean emptyWriter;
 
-    public FlinkSink(FileStoreTable table, boolean isOverwrite) {
+    public FlinkSink(FileStoreTable table, boolean emptyWriter) {
         this.table = table;
-        this.isOverwrite = isOverwrite;
+        this.emptyWriter = emptyWriter;
     }
 
     private StoreSinkWrite.Provider createWriteProvider(CheckpointConfig 
checkpointConfig) {
@@ -97,7 +97,7 @@ public abstract class FlinkSink<T> implements Serializable {
                                 commitUser,
                                 state,
                                 ioManager,
-                                isOverwrite,
+                                emptyWriter,
                                 waitCompaction,
                                 finalDeltaCommits,
                                 memoryPool);
@@ -110,7 +110,7 @@ public abstract class FlinkSink<T> implements Serializable {
                         commitUser,
                         state,
                         ioManager,
-                        isOverwrite,
+                        emptyWriter,
                         waitCompaction,
                         memoryPool);
     }
@@ -122,14 +122,10 @@ public abstract class FlinkSink<T> implements 
Serializable {
         // When the job restarts, commitUser will be recovered from states and 
this value is
         // ignored.
         String initialCommitUser = UUID.randomUUID().toString();
-        return sinkFrom(
-                input,
-                initialCommitUser,
-                
createWriteProvider(input.getExecutionEnvironment().getCheckpointConfig()));
+        return sinkFrom(input, initialCommitUser);
     }
 
-    public DataStreamSink<?> sinkFrom(
-            DataStream<T> input, String commitUser, StoreSinkWrite.Provider 
sinkProvider) {
+    public DataStreamSink<?> sinkFrom(DataStream<T> input, String commitUser) {
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
         ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
@@ -142,12 +138,24 @@ public abstract class FlinkSink<T> implements 
Serializable {
             assertCheckpointConfiguration(env);
         }
 
-        CommittableTypeInfo typeInfo = new CommittableTypeInfo();
+        SingleOutputStreamOperator<Committable> written =
+                handleInput(input, isStreaming, commitUser);
+
+        return commit(written, streamingCheckpointEnabled, commitUser);
+    }
+
+    protected SingleOutputStreamOperator<Committable> handleInput(
+            DataStream<T> input, boolean isStreaming, String commitUser) {
         SingleOutputStreamOperator<Committable> written =
                 input.transform(
                                 WRITER_NAME + " -> " + table.name(),
-                                typeInfo,
-                                createWriteOperator(sinkProvider, isStreaming, 
commitUser))
+                                new CommittableTypeInfo(),
+                                createWriteOperator(
+                                        createWriteProvider(
+                                                input.getExecutionEnvironment()
+                                                        
.getCheckpointConfig()),
+                                        isStreaming,
+                                        commitUser))
                         .setParallelism(input.getParallelism());
         Options options = Options.fromMap(table.options());
         if (options.get(SINK_USE_MANAGED_MEMORY)) {
@@ -156,11 +164,17 @@ public abstract class FlinkSink<T> implements 
Serializable {
                     .declareManagedMemoryUseCaseAtOperatorScope(
                             ManagedMemoryUseCase.OPERATOR, 
memorySize.getMebiBytes());
         }
+        return written;
+    }
 
+    protected DataStreamSink<?> commit(
+            SingleOutputStreamOperator<Committable> written,
+            boolean streamingCheckpointEnabled,
+            String commitUser) {
         SingleOutputStreamOperator<?> committed =
                 written.transform(
                                 GLOBAL_COMMITTER_NAME + " -> " + table.name(),
-                                typeInfo,
+                                new CommittableTypeInfo(),
                                 new CommitterOperator<>(
                                         streamingCheckpointEnabled,
                                         commitUser,
@@ -171,7 +185,7 @@ public abstract class FlinkSink<T> implements Serializable {
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
-    private void assertCheckpointConfiguration(StreamExecutionEnvironment env) 
{
+    public static void 
assertCheckpointConfiguration(StreamExecutionEnvironment env) {
         Preconditions.checkArgument(
                 !env.getCheckpointConfig().isUnalignedCheckpointsEnabled(),
                 "Paimon sink currently does not support unaligned checkpoints. 
Please set "
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index 8e6e1ba14..8ed101b8f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -69,11 +69,11 @@ public class GlobalFullCompactionSinkWrite extends 
StoreSinkWriteImpl {
             String commitUser,
             StoreSinkWriteState state,
             IOManager ioManager,
-            boolean isOverwrite,
+            boolean emptyWriter,
             boolean waitCompaction,
             int deltaCommits,
             @Nullable MemorySegmentPool memoryPool) {
-        super(table, commitUser, state, ioManager, isOverwrite, 
waitCompaction, memoryPool);
+        super(table, commitUser, state, ioManager, emptyWriter, 
waitCompaction, memoryPool);
 
         this.deltaCommits = deltaCommits;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
new file mode 100644
index 000000000..1aec4e711
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.utils.SerializableFunction;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+
+/** Compaction Sink for unaware-bucket table. */
+public class UnawareBucketCompactionSink extends 
FlinkSink<AppendOnlyCompactionTask> {
+
+    private final AppendOnlyFileStoreTable table;
+
+    public UnawareBucketCompactionSink(AppendOnlyFileStoreTable table) {
+        super(table, true);
+        this.table = table;
+    }
+
+    public static DataStreamSink<?> sink(
+            AppendOnlyFileStoreTable table, 
DataStream<AppendOnlyCompactionTask> input) {
+        return new UnawareBucketCompactionSink(table).sinkFrom(input);
+    }
+
+    @Override
+    protected OneInputStreamOperator<AppendOnlyCompactionTask, Committable> 
createWriteOperator(
+            StoreSinkWrite.Provider writeProvider, boolean isStreaming, String 
commitUser) {
+        return new AppendOnlyTableCompactionWorkerOperator(table, commitUser);
+    }
+
+    @Override
+    protected SerializableFunction<String, Committer<Committable, 
ManifestCommittable>>
+            createCommitterFactory(boolean streamingCheckpointEnabled) {
+        return s -> new StoreCommitter(table.newCommit(s));
+    }
+
+    @Override
+    protected CommittableStateManager<ManifestCommittable> 
createCommittableStateManager() {
+        return new NoopCommittableStateManager();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/UnawareBucketSourceFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/UnawareBucketSourceFunction.java
new file mode 100644
index 000000000..aa57439cc
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/UnawareBucketSourceFunction.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator;
+import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.source.EndOfScanException;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Source Function for unaware-bucket Compaction. */
+public class UnawareBucketSourceFunction extends 
RichSourceFunction<AppendOnlyCompactionTask> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(UnawareBucketSourceFunction.class);
+    private static final String COMPACTION_COORDINATOR_NAME = "Compaction 
Coordinator";
+
+    private final AppendOnlyFileStoreTable table;
+    private final boolean streaming;
+    private final long scanInterval;
+    private final Predicate filter;
+    private transient AppendOnlyTableCompactionCoordinator 
compactionCoordinator;
+    private transient SourceContext<AppendOnlyCompactionTask> ctx;
+    private volatile boolean isRunning = true;
+
+    public UnawareBucketSourceFunction(
+            AppendOnlyFileStoreTable table,
+            boolean isStreaming,
+            long scanInterval,
+            @Nullable Predicate filter) {
+        this.table = table;
+        this.streaming = isStreaming;
+        this.scanInterval = scanInterval;
+        this.filter = filter;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        compactionCoordinator = new 
AppendOnlyTableCompactionCoordinator(table, streaming, filter);
+    }
+
+    @Override
+    public void run(SourceContext<AppendOnlyCompactionTask> sourceContext) 
throws Exception {
+        this.ctx = sourceContext;
+        while (isRunning) {
+            boolean isEmpty;
+            synchronized (ctx.getCheckpointLock()) {
+                if (!isRunning) {
+                    return;
+                }
+                try {
+                    // do scan and plan action, emit append-only compaction 
tasks.
+                    List<AppendOnlyCompactionTask> tasks = 
compactionCoordinator.run();
+                    isEmpty = tasks.isEmpty();
+                    tasks.forEach(ctx::collect);
+                } catch (EndOfScanException esf) {
+                    LOG.info("Catching EndOfStreamException, the stream is 
finished.");
+                    return;
+                }
+            }
+
+            if (isEmpty) {
+                Thread.sleep(scanInterval);
+            }
+        }
+    }
+
+    @Override
+    public void cancel() {
+        if (ctx != null) {
+            synchronized (ctx.getCheckpointLock()) {
+                isRunning = false;
+            }
+        } else {
+            isRunning = false;
+        }
+    }
+
+    public static DataStreamSource<AppendOnlyCompactionTask> buildSource(
+            StreamExecutionEnvironment env,
+            UnawareBucketSourceFunction source,
+            boolean streaming,
+            String tableIdentifier) {
+        final StreamSource<AppendOnlyCompactionTask, 
UnawareBucketSourceFunction> sourceOperator =
+                new StreamSource<>(source);
+        return new DataStreamSource<>(
+                env,
+                new CompactionTaskTypeInfo(),
+                sourceOperator,
+                false,
+                COMPACTION_COORDINATOR_NAME + " -> " + tableIdentifier,
+                streaming ? Boundedness.CONTINUOUS_UNBOUNDED : 
Boundedness.BOUNDED);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index cd363210c..ad83c50bc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -22,6 +22,10 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.DataSplit;
@@ -43,9 +47,11 @@ import org.junit.jupiter.api.Timeout;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeoutException;
 
@@ -201,6 +207,110 @@ public class CompactActionITCase extends ActionITCaseBase 
{
         client.cancel();
     }
 
+    @Test
+    public void testUnawareBucketStreamingCompact() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
+        // test that dedicated compact job will expire snapshots
+        options.put(CoreOptions.BUCKET.key(), "-1");
+        options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+        options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+
+        FileStoreTable table =
+                createFileStoreTable(
+                        ROW_TYPE, Arrays.asList("k"), Collections.emptyList(), 
options);
+        snapshotManager = table.snapshotManager();
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
+
+        // base records
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        Assertions.assertEquals(2, snapshot.id());
+        Assertions.assertEquals(Snapshot.CommitKind.APPEND, 
snapshot.commitKind());
+
+        FileStoreScan storeScan = ((AbstractFileStoreTable) 
table).store().newScan();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+        
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+        env.getCheckpointConfig().setCheckpointInterval(500);
+        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+        new CompactAction(warehouse, database, tableName).build(env);
+        JobClient client = env.executeAsync();
+
+        // first compaction, snapshot will be 3
+        checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6);
+
+        writeData(
+                rowData(1, 101, 15, BinaryString.fromString("20221208")),
+                rowData(1, 101, 16, BinaryString.fromString("20221208")),
+                rowData(1, 101, 15, BinaryString.fromString("20221209")));
+
+        // second compaction, snapshot will be 5
+        checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9);
+
+        client.cancel().get();
+    }
+
+    @Test
+    public void testUnawareBucketBatchCompact() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        // test that dedicated compact job will expire snapshots
+        options.put(CoreOptions.BUCKET.key(), "-1");
+        options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+        options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+
+        FileStoreTable table =
+                createFileStoreTable(
+                        ROW_TYPE, Arrays.asList("k"), Collections.emptyList(), 
options);
+        snapshotManager = table.snapshotManager();
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = streamWriteBuilder.newWrite();
+        commit = streamWriteBuilder.newCommit();
+
+        // base records
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+        Snapshot snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+        Assertions.assertEquals(2, snapshot.id());
+        Assertions.assertEquals(Snapshot.CommitKind.APPEND, 
snapshot.commitKind());
+
+        FileStoreScan storeScan = ((AbstractFileStoreTable) 
table).store().newScan();
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+        new CompactAction(warehouse, database, tableName).build(env);
+        env.execute();
+
+        // first compaction, snapshot will be 3.
+        checkFileAndRowSize(storeScan, 3L, 0L, 1, 6);
+
+        writeData(
+                rowData(1, 101, 15, BinaryString.fromString("20221208")),
+                rowData(1, 101, 16, BinaryString.fromString("20221208")),
+                rowData(1, 101, 15, BinaryString.fromString("20221209")));
+    }
+
     private List<Map<String, String>> getSpecifiedPartitions() {
         Map<String, String> partition1 = new HashMap<>();
         partition1.put("dt", "20221208");
@@ -235,4 +345,26 @@ public class CompactActionITCase extends ActionITCaseBase {
         actual.sort(String::compareTo);
         Assertions.assertEquals(expected, actual);
     }
+
+    private void checkFileAndRowSize(
+            FileStoreScan scan, Long expectedSnapshotId, Long timeout, int 
fileNum, long rowCount)
+            throws Exception {
+
+        long start = System.currentTimeMillis();
+        while (!Objects.equals(snapshotManager.latestSnapshotId(), 
expectedSnapshotId)) {
+            Thread.sleep(500);
+            if (System.currentTimeMillis() - start > timeout) {
+                throw new RuntimeException("can't wait for a compaction.");
+            }
+        }
+
+        List<ManifestEntry> files =
+                
scan.withSnapshot(snapshotManager.latestSnapshotId()).plan().files(FileKind.ADD);
+        long count = 0;
+        for (ManifestEntry file : files) {
+            count += file.file().rowCount();
+        }
+        Assertions.assertEquals(fileNum, files.size());
+        Assertions.assertEquals(rowCount, count);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
new file mode 100644
index 000000000..70f127291
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.table.sink.CompactionTaskSerializer;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static 
org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
+import static org.apache.paimon.stats.StatsTestUtils.newTableStats;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CompactionTaskSimpleSerializer}. */
+public class CompactionTaskSimpleSerializerTest {
+
+    private final CompactionTaskSerializer compactionTaskSerializer =
+            new CompactionTaskSerializer();
+
+    private final CompactionTaskSimpleSerializer serializer =
+            new CompactionTaskSimpleSerializer(compactionTaskSerializer);
+
+    private final BinaryRow partition = BinaryRow.EMPTY_ROW;
+
+    @Test
+    public void testSerializer() throws IOException {
+
+        AppendOnlyCompactionTask task1 = new 
AppendOnlyCompactionTask(partition, newFiles(20));
+        AppendOnlyCompactionTask task2 = serializer.deserialize(2, 
serializer.serialize(task1));
+
+        assertThat(task1).isEqualTo(task2);
+    }
+
+    private List<DataFileMeta> newFiles(int num) {
+        List<DataFileMeta> list = new ArrayList<>();
+        for (int i = 0; i < num; i++) {
+            list.add(newFile());
+        }
+        return list;
+    }
+
+    private DataFileMeta newFile() {
+        return new DataFileMeta(
+                UUID.randomUUID().toString(),
+                0,
+                1,
+                row(0),
+                row(0),
+                newTableStats(0, 1),
+                newTableStats(0, 1),
+                0,
+                1,
+                0,
+                0);
+    }
+}

Reply via email to