This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 74408a5e8a [flink] Introduce precommit compact for newly created files 
in unaware bucket tables (#4902)
74408a5e8a is described below

commit 74408a5e8a57ee85304ac2a6bd1120137abf4a52
Author: tsreaper <[email protected]>
AuthorDate: Tue Jan 14 16:34:02 2025 +0800

    [flink] Introduce precommit compact for newly created files in unaware 
bucket tables (#4902)
---
 .../generated/flink_connector_configuration.html   |  12 +-
 ...UnawareBucketNewFilesCompactionCoordinator.java |  74 ++++++
 .../paimon/table/source/DataTableStreamScan.java   |   7 +
 .../apache/paimon/flink/FlinkConnectorOptions.java |  11 +-
 ...ucketNewFilesCompactionCoordinatorOperator.java | 179 ++++++++++++++
 ...wareBucketNewFilesCompactionWorkerOperator.java | 118 +++++++++
 .../ChangelogCompactCoordinateOperator.java        |  14 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |   4 +-
 .../paimon/flink/sink/UnawareBucketSink.java       |  28 +++
 .../flink/PrimaryKeyFileStoreTableITCase.java      |   6 +
 ...tNewFilesCompactionCoordinatorOperatorTest.java | 265 +++++++++++++++++++++
 .../UnawareBucketNewFilesCompactionITCase.java     | 129 ++++++++++
 12 files changed, 828 insertions(+), 19 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index de204ee452..23bb827aef 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -26,12 +26,6 @@ under the License.
         </tr>
     </thead>
     <tbody>
-        <tr>
-            <td><h5>changelog.precommit-compact</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>If true, it will add a changelog compact coordinator and 
worker operator after the writer operator,in order to compact several changelog 
files from the same partition into large ones, which can decrease the number of 
small files. </td>
-        </tr>
         <tr>
             <td><h5>end-input.watermark</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -110,6 +104,12 @@ under the License.
             <td>Duration</td>
             <td>You can specify time interval for partition, for example, 
daily partition is '1 d', hourly partition is '1 h'.</td>
         </tr>
+        <tr>
+            <td><h5>precommit-compact</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, it will add a compact coordinator and worker operator 
after the writer operator,in order to compact several changelog files (for 
primary key tables) or newly created data files (for unaware bucket tables) 
from the same partition into large ones, which can decrease the number of small 
files. </td>
+        </tr>
         <tr>
             <td><h5>scan.infer-parallelism</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareBucketNewFilesCompactionCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareBucketNewFilesCompactionCoordinator.java
new file mode 100644
index 0000000000..4425d12a25
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareBucketNewFilesCompactionCoordinator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.utils.Pair;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/** Buffer files from the same partition, until their total size reaches 
{@code targetFileSize}. */
+public class UnawareBucketNewFilesCompactionCoordinator {
+
+    private final long targetFileSize;
+    private final Map<BinaryRow, PartitionFiles> partitions;
+
+    public UnawareBucketNewFilesCompactionCoordinator(long targetFileSize) {
+        this.targetFileSize = targetFileSize;
+        this.partitions = new LinkedHashMap<>();
+    }
+
+    public Optional<Pair<BinaryRow, List<DataFileMeta>>> addFile(
+            BinaryRow partition, DataFileMeta file) {
+        PartitionFiles files =
+                partitions.computeIfAbsent(partition, ignore -> new 
PartitionFiles());
+        files.addFile(file);
+        if (files.totalSize >= targetFileSize) {
+            partitions.remove(partition);
+            return Optional.of(Pair.of(partition, files.files));
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    public List<Pair<BinaryRow, List<DataFileMeta>>> emitAll() {
+        List<Pair<BinaryRow, List<DataFileMeta>>> result =
+                partitions.entrySet().stream()
+                        .map(e -> Pair.of(e.getKey(), e.getValue().files))
+                        .collect(Collectors.toList());
+        partitions.clear();
+        return result;
+    }
+
+    private static class PartitionFiles {
+        private final List<DataFileMeta> files = new ArrayList<>();
+        private long totalSize = 0;
+
+        private void addFile(DataFileMeta file) {
+            files.add(file);
+            totalSize += file.fileSize();
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index e8c4ddfa1c..9dab573471 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -164,6 +164,10 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
             }
             isFullPhaseEnd =
                     
boundedChecker.shouldEndInput(snapshotManager.snapshot(currentSnapshotId));
+            LOG.debug(
+                    "Starting snapshot is {}, next snapshot will be {}.",
+                    scannedResult.plan().snapshotId(),
+                    nextSnapshotId);
             return scannedResult.plan();
         } else if (result instanceof StartingScanner.NextSnapshot) {
             nextSnapshotId = ((StartingScanner.NextSnapshot) 
result).nextSnapshotId();
@@ -171,6 +175,9 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
                     snapshotManager.snapshotExists(nextSnapshotId - 1)
                             && boundedChecker.shouldEndInput(
                                     snapshotManager.snapshot(nextSnapshotId - 
1));
+            LOG.debug("There is no starting snapshot. Next snapshot will be 
{}.", nextSnapshotId);
+        } else if (result instanceof StartingScanner.NoSnapshot) {
+            LOG.debug("There is no starting snapshot and currently there is no 
next snapshot.");
         }
         return SnapshotNotExistPlan.INSTANCE;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 8f7757453c..0105202b9d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -414,13 +414,16 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "Optional endInput watermark used in case of batch 
mode or bounded stream.");
 
-    public static final ConfigOption<Boolean> CHANGELOG_PRECOMMIT_COMPACT =
-            key("changelog.precommit-compact")
+    public static final ConfigOption<Boolean> PRECOMMIT_COMPACT =
+            key("precommit-compact")
                     .booleanType()
                     .defaultValue(false)
+                    .withFallbackKeys("changelog.precommit-compact")
                     .withDescription(
-                            "If true, it will add a changelog compact 
coordinator and worker operator after the writer operator,"
-                                    + "in order to compact several changelog 
files from the same partition into large ones, "
+                            "If true, it will add a compact coordinator and 
worker operator after the writer operator,"
+                                    + "in order to compact several changelog 
files (for primary key tables) "
+                                    + "or newly created data files (for 
unaware bucket tables) "
+                                    + "from the same partition into large 
ones, "
                                     + "which can decrease the number of small 
files. ");
 
     public static final ConfigOption<String> SOURCE_OPERATOR_UID_SUFFIX =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java
new file mode 100644
index 0000000000..c52c0b2d85
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperator.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.UnawareAppendCompactionTask;
+import org.apache.paimon.append.UnawareBucketNewFilesCompactionCoordinator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Coordinator operator for compacting newly created files for unaware bucket 
tables.
+ *
+ * <p>{@link UnawareBucketNewFilesCompactionCoordinatorOperator} calculates 
the file size of newly
+ * created files contained in all buckets within each partition from {@link 
Committable} message
+ * emitted from writer operator. And emit {@link UnawareAppendCompactionTask} 
to {@link
+ * UnawareBucketNewFilesCompactionWorkerOperator}.
+ */
+public class UnawareBucketNewFilesCompactionCoordinatorOperator
+        extends AbstractStreamOperator<
+                Either<Committable, Tuple2<Long, UnawareAppendCompactionTask>>>
+        implements OneInputStreamOperator<
+                        Committable,
+                        Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>>,
+                BoundedOneInput {
+
+    private final long targetFileSize;
+    private final long compactionFileSize;
+
+    private transient UnawareBucketNewFilesCompactionCoordinator coordinator;
+    private transient long checkpointId;
+
+    public UnawareBucketNewFilesCompactionCoordinatorOperator(CoreOptions 
options) {
+        this.targetFileSize = options.targetFileSize(false);
+        this.compactionFileSize = options.compactionFileSize(false);
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+
+        coordinator = new 
UnawareBucketNewFilesCompactionCoordinator(targetFileSize);
+        checkpointId = Long.MIN_VALUE;
+    }
+
+    @Override
+    public void processElement(StreamRecord<Committable> record) throws 
Exception {
+        Committable committable = record.getValue();
+        checkpointId = Math.max(checkpointId, committable.checkpointId());
+        if (committable.kind() != Committable.Kind.FILE) {
+            output.collect(new StreamRecord<>(Either.Left(committable)));
+            return;
+        }
+
+        CommitMessageImpl message = (CommitMessageImpl) 
committable.wrappedCommittable();
+        if (message.newFilesIncrement().newFiles().isEmpty()) {
+            output.collect(new StreamRecord<>(Either.Left(committable)));
+            return;
+        }
+
+        BinaryRow partition = message.partition();
+        List<DataFileMeta> skippedFiles = new ArrayList<>();
+        for (DataFileMeta meta : message.newFilesIncrement().newFiles()) {
+            if (meta.fileSize() >= compactionFileSize) {
+                skippedFiles.add(meta);
+                continue;
+            }
+
+            Optional<Pair<BinaryRow, List<DataFileMeta>>> optionalPair =
+                    coordinator.addFile(partition, meta);
+            if (optionalPair.isPresent()) {
+                Pair<BinaryRow, List<DataFileMeta>> p = optionalPair.get();
+                Preconditions.checkArgument(!p.getValue().isEmpty());
+                if (p.getValue().size() > 1) {
+                    output.collect(
+                            new StreamRecord<>(
+                                    Either.Right(
+                                            Tuple2.of(
+                                                    checkpointId,
+                                                    new 
UnawareAppendCompactionTask(
+                                                            p.getKey(), 
p.getValue())))));
+                } else {
+                    skippedFiles.add(p.getValue().get(0));
+                }
+            }
+        }
+
+        CommitMessageImpl newMessage =
+                new CommitMessageImpl(
+                        message.partition(),
+                        message.bucket(),
+                        new DataIncrement(
+                                skippedFiles,
+                                message.newFilesIncrement().deletedFiles(),
+                                message.newFilesIncrement().changelogFiles()),
+                        message.compactIncrement(),
+                        message.indexIncrement());
+        if (!newMessage.isEmpty()) {
+            Committable newCommittable =
+                    new Committable(committable.checkpointId(), 
Committable.Kind.FILE, newMessage);
+            output.collect(new StreamRecord<>(Either.Left(newCommittable)));
+        }
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
+        emitAll();
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        emitAll();
+    }
+
+    private void emitAll() {
+        for (Pair<BinaryRow, List<DataFileMeta>> p : coordinator.emitAll()) {
+            Preconditions.checkArgument(!p.getValue().isEmpty());
+            if (p.getValue().size() > 1) {
+                output.collect(
+                        new StreamRecord<>(
+                                Either.Right(
+                                        Tuple2.of(
+                                                checkpointId,
+                                                new 
UnawareAppendCompactionTask(
+                                                        p.getKey(), 
p.getValue())))));
+            } else {
+                CommitMessageImpl message =
+                        new CommitMessageImpl(
+                                p.getKey(),
+                                BucketMode.UNAWARE_BUCKET,
+                                new DataIncrement(
+                                        
Collections.singletonList(p.getValue().get(0)),
+                                        Collections.emptyList(),
+                                        Collections.emptyList()),
+                                CompactIncrement.emptyIncrement());
+                output.collect(
+                        new StreamRecord<>(
+                                Either.Left(
+                                        new Committable(
+                                                checkpointId, 
Committable.Kind.FILE, message))));
+            }
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java
new file mode 100644
index 0000000000..d634d22ec6
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionWorkerOperator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact;
+
+import org.apache.paimon.append.UnawareAppendCompactionTask;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.operation.AppendOnlyUnawareBucketFileStoreWrite;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.utils.FileStorePathFactory;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.types.Either;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Receive and process the {@link UnawareAppendCompactionTask}s emitted by 
{@link
+ * UnawareBucketNewFilesCompactionCoordinatorOperator}.
+ */
+public class UnawareBucketNewFilesCompactionWorkerOperator
+        extends AbstractStreamOperator<Committable>
+        implements OneInputStreamOperator<
+                Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>, Committable> {
+
+    private final FileStoreTable table;
+
+    private transient AppendOnlyUnawareBucketFileStoreWrite write;
+    private transient FileStorePathFactory pathFactory;
+    private transient FileIO fileIO;
+
+    public UnawareBucketNewFilesCompactionWorkerOperator(FileStoreTable table) 
{
+        this.table = table;
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.write = (AppendOnlyUnawareBucketFileStoreWrite) 
table.store().newWrite(null);
+        this.pathFactory = table.store().pathFactory();
+        this.fileIO = table.fileIO();
+    }
+
+    @Override
+    public void processElement(
+            StreamRecord<Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>> record)
+            throws Exception {
+        if (record.getValue().isLeft()) {
+            output.collect(new StreamRecord<>(record.getValue().left()));
+        } else {
+            long checkpointId = record.getValue().right().f0;
+            CommitMessage message = doCompact(record.getValue().right().f1);
+            output.collect(
+                    new StreamRecord<>(
+                            new Committable(checkpointId, 
Committable.Kind.FILE, message)));
+        }
+    }
+
+    private CommitMessage doCompact(UnawareAppendCompactionTask task) throws 
Exception {
+        CommitMessageImpl message = (CommitMessageImpl) task.doCompact(table, 
write);
+
+        Map<String, DataFileMeta> toDelete = new HashMap<>();
+        for (DataFileMeta meta : message.compactIncrement().compactBefore()) {
+            toDelete.put(meta.fileName(), meta);
+        }
+        for (DataFileMeta meta : message.compactIncrement().compactAfter()) {
+            toDelete.remove(meta.fileName());
+        }
+        DataFilePathFactory dataFilePathFactory =
+                pathFactory.createDataFilePathFactory(task.partition(), 
message.bucket());
+        for (DataFileMeta meta : toDelete.values()) {
+            fileIO.deleteQuietly(dataFilePathFactory.toPath(meta));
+        }
+
+        return new CommitMessageImpl(
+                message.partition(),
+                message.bucket(),
+                new DataIncrement(
+                        message.compactIncrement().compactAfter(),
+                        Collections.emptyList(),
+                        Collections.emptyList()),
+                CompactIncrement.emptyIncrement());
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (write != null) {
+            write.close();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
index cd0b8716a7..d1f6a5abd2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
@@ -90,7 +90,7 @@ public class ChangelogCompactCoordinateOperator
                     .addNewChangelogFile(bucket, meta);
             PartitionChangelog partitionChangelog = 
partitionChangelogs.get(partition);
             if (partitionChangelog.totalFileSize >= targetFileSize) {
-                emitPartitionChanglogCompactTask(partition);
+                emitPartitionChangelogCompactTask(partition);
             }
         }
         for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
@@ -99,7 +99,7 @@ public class ChangelogCompactCoordinateOperator
                     .addCompactChangelogFile(bucket, meta);
             PartitionChangelog partitionChangelog = 
partitionChangelogs.get(partition);
             if (partitionChangelog.totalFileSize >= targetFileSize) {
-                emitPartitionChanglogCompactTask(partition);
+                emitPartitionChangelogCompactTask(partition);
             }
         }
 
@@ -122,14 +122,14 @@ public class ChangelogCompactCoordinateOperator
     }
 
     public void prepareSnapshotPreBarrier(long checkpointId) {
-        emitAllPartitionsChanglogCompactTask();
+        emitAllPartitionsChangelogCompactTask();
     }
 
     public void endInput() {
-        emitAllPartitionsChanglogCompactTask();
+        emitAllPartitionsChangelogCompactTask();
     }
 
-    private void emitPartitionChanglogCompactTask(BinaryRow partition) {
+    private void emitPartitionChangelogCompactTask(BinaryRow partition) {
         PartitionChangelog partitionChangelog = 
partitionChangelogs.get(partition);
         output.collect(
                 new StreamRecord<>(
@@ -142,10 +142,10 @@ public class ChangelogCompactCoordinateOperator
         partitionChangelogs.remove(partition);
     }
 
-    private void emitAllPartitionsChanglogCompactTask() {
+    private void emitAllPartitionsChangelogCompactTask() {
         List<BinaryRow> partitions = new 
ArrayList<>(partitionChangelogs.keySet());
         for (BinaryRow partition : partitions) {
-            emitPartitionChanglogCompactTask(partition);
+            emitPartitionChangelogCompactTask(partition);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 002f5887b5..4cd085883d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -57,9 +57,9 @@ import java.util.Set;
 
 import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
 import static org.apache.paimon.CoreOptions.createCommitUser;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.END_INPUT_WATERMARK;
+import static org.apache.paimon.flink.FlinkConnectorOptions.PRECOMMIT_COMPACT;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_CPU;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SINK_COMMITTER_MEMORY;
@@ -239,7 +239,7 @@ public abstract class FlinkSink<T> implements Serializable {
             declareManagedMemory(written, 
options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY));
         }
 
-        if (options.get(CHANGELOG_PRECOMMIT_COMPACT)) {
+        if (options.get(PRECOMMIT_COMPACT)) {
             written =
                     written.transform(
                                     "Changelog Compact Coordinator",
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
index 7bc40d4c20..487d0f2689 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
@@ -18,11 +18,17 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import 
org.apache.paimon.flink.compact.UnawareBucketNewFilesCompactionCoordinatorOperator;
+import 
org.apache.paimon.flink.compact.UnawareBucketNewFilesCompactionWorkerOperator;
 import org.apache.paimon.flink.source.AppendBypassCoordinateOperatorFactory;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.streaming.api.datastream.DataStream;
 
@@ -59,6 +65,28 @@ public abstract class UnawareBucketSink<T> extends 
FlinkWriteSink<T> {
             DataStream<T> input, String initialCommitUser, @Nullable Integer 
parallelism) {
         DataStream<Committable> written = super.doWrite(input, 
initialCommitUser, this.parallelism);
 
+        Options options = new Options(table.options());
+        if (options.get(FlinkConnectorOptions.PRECOMMIT_COMPACT)) {
+            written =
+                    written.transform(
+                                    "New Files Compact Coordinator: " + 
table.name(),
+                                    new EitherTypeInfo<>(
+                                            new CommittableTypeInfo(),
+                                            new TupleTypeInfo<>(
+                                                    
BasicTypeInfo.LONG_TYPE_INFO,
+                                                    new 
CompactionTaskTypeInfo())),
+                                    new 
UnawareBucketNewFilesCompactionCoordinatorOperator(
+                                            table.coreOptions()))
+                            .startNewChain()
+                            .forceNonParallel()
+                            .transform(
+                                    "New Files Compact Worker: " + 
table.name(),
+                                    new CommittableTypeInfo(),
+                                    new 
UnawareBucketNewFilesCompactionWorkerOperator(table))
+                            .startNewChain()
+                            .setParallelism(written.getParallelism());
+        }
+
         boolean enableCompaction = !table.coreOptions().writeOnly();
         boolean isStreamingMode =
                 input.getExecutionEnvironment()
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index b551d63d7b..839a45a6e4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -42,6 +42,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -63,6 +65,7 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase {
 
     private static final int TIMEOUT = 180;
+    private static final Logger LOG = 
LoggerFactory.getLogger(PrimaryKeyFileStoreTableITCase.class);
 
     // ------------------------------------------------------------------------
     //  Test Utilities
@@ -1031,6 +1034,9 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         try (CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * 
FROM T"))) {
             while (it.hasNext()) {
                 Row row = it.next();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Changelog get {}", row);
+                }
                 checker.addChangelog(row);
                 if (((long) row.getField(2)) >= LIMIT) {
                     endCnt++;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
new file mode 100644
index 0000000000..4f734ff6da
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionCoordinatorOperatorTest.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.UnawareAppendCompactionTask;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.Preconditions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link UnawareBucketNewFilesCompactionCoordinatorOperator}. */
+public class UnawareBucketNewFilesCompactionCoordinatorOperatorTest {
+
+    @Test
+    public void testPrepareSnapshotWithMultipleFiles() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+        UnawareBucketNewFilesCompactionCoordinatorOperator operator =
+                new UnawareBucketNewFilesCompactionCoordinatorOperator(new 
CoreOptions(options));
+        OneInputStreamOperatorTestHarness<
+                        Committable, Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>>
+                testHarness = createTestHarness(operator);
+
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(createCommittable(1, BinaryRow.EMPTY_ROW, 
3, 5, 1, 2, 3)));
+        testHarness.prepareSnapshotPreBarrier(1);
+        testHarness.processElement(
+                new StreamRecord<>(createCommittable(2, BinaryRow.EMPTY_ROW, 
3, 2)));
+        testHarness.prepareSnapshotPreBarrier(2);
+
+        List<Object> output = new ArrayList<>(testHarness.getOutput());
+        assertThat(output).hasSize(3);
+        assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, 3, 5);
+        assertCompactionTask(output.get(1), 1, BinaryRow.EMPTY_ROW, 1, 2, 3);
+        assertCompactionTask(output.get(2), 2, BinaryRow.EMPTY_ROW, 3, 2);
+
+        testHarness.close();
+    }
+
+    @Test
+    public void testPrepareSnapshotWithSingleFile() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+        UnawareBucketNewFilesCompactionCoordinatorOperator operator =
+                new UnawareBucketNewFilesCompactionCoordinatorOperator(new 
CoreOptions(options));
+        OneInputStreamOperatorTestHarness<
+                        Committable, Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>>
+                testHarness = createTestHarness(operator);
+
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(createCommittable(1, BinaryRow.EMPTY_ROW, 
3, 5, 1)));
+        testHarness.prepareSnapshotPreBarrier(1);
+        testHarness.processElement(
+                new StreamRecord<>(createCommittable(2, BinaryRow.EMPTY_ROW, 
4)));
+        testHarness.prepareSnapshotPreBarrier(2);
+
+        List<Object> output = new ArrayList<>(testHarness.getOutput());
+        assertThat(output).hasSize(3);
+        assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, 3, 5);
+        assertCommittable(output.get(1), 1, BinaryRow.EMPTY_ROW, 1);
+        assertCommittable(output.get(2), 2, BinaryRow.EMPTY_ROW, 4);
+
+        testHarness.close();
+    }
+
+    @Test
+    public void testPrepareSnapshotWithMultiplePartitions() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+        UnawareBucketNewFilesCompactionCoordinatorOperator operator =
+                new UnawareBucketNewFilesCompactionCoordinatorOperator(new 
CoreOptions(options));
+        OneInputStreamOperatorTestHarness<
+                        Committable, Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>>
+                testHarness = createTestHarness(operator);
+
+        Function<Integer, BinaryRow> binaryRow =
+                i -> {
+                    BinaryRow row = new BinaryRow(1);
+                    BinaryRowWriter writer = new BinaryRowWriter(row);
+                    writer.writeInt(0, i);
+                    writer.complete();
+                    return row;
+                };
+
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(createCommittable(1, binaryRow.apply(1), 3, 
5, 1, 2, 3)));
+        testHarness.processElement(
+                new StreamRecord<>(createCommittable(1, binaryRow.apply(2), 3, 
2, 4, 3)));
+        testHarness.prepareSnapshotPreBarrier(1);
+        testHarness.processElement(
+                new StreamRecord<>(createCommittable(2, binaryRow.apply(2), 3, 
2)));
+        testHarness.prepareSnapshotPreBarrier(2);
+
+        List<Object> output = new ArrayList<>(testHarness.getOutput());
+        assertThat(output).hasSize(5);
+
+        assertCompactionTask(output.get(0), 1, binaryRow.apply(1), 3, 5);
+        assertCompactionTask(output.get(1), 1, binaryRow.apply(2), 3, 2, 4);
+        assertCompactionTask(output.get(2), 1, binaryRow.apply(1), 1, 2, 3);
+        assertCommittable(output.get(3), 1, binaryRow.apply(2), 3);
+        assertCompactionTask(output.get(4), 2, binaryRow.apply(2), 3, 2);
+
+        testHarness.close();
+    }
+
+    @Test
+    public void testSkipLargeFiles() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+        UnawareBucketNewFilesCompactionCoordinatorOperator operator =
+                new UnawareBucketNewFilesCompactionCoordinatorOperator(new 
CoreOptions(options));
+        OneInputStreamOperatorTestHarness<
+                        Committable, Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>>
+                testHarness = createTestHarness(operator);
+
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(createCommittable(1, BinaryRow.EMPTY_ROW, 
8, 3, 5, 9)));
+        testHarness.prepareSnapshotPreBarrier(1);
+
+        List<Object> output = new ArrayList<>(testHarness.getOutput());
+        assertThat(output).hasSize(2);
+        assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, 3, 5);
+        assertCommittable(output.get(1), 1, BinaryRow.EMPTY_ROW, 8, 9);
+
+        testHarness.close();
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertCommittable(Object o, long checkpointId, BinaryRow 
partition, int... mbs) {
+        StreamRecord<Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>> record =
+                (StreamRecord<Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>>) o;
+        assertThat(record.getValue().isLeft()).isTrue();
+        Committable committable = record.getValue().left();
+        assertThat(committable.checkpointId()).isEqualTo(checkpointId);
+        CommitMessageImpl message = (CommitMessageImpl) 
committable.wrappedCommittable();
+        assertThat(message.partition()).isEqualTo(partition);
+        assertThat(message.newFilesIncrement().deletedFiles()).isEmpty();
+        assertThat(message.newFilesIncrement().changelogFiles()).isEmpty();
+        assertThat(message.compactIncrement().isEmpty()).isTrue();
+        assertThat(message.indexIncrement().isEmpty()).isTrue();
+        
assertThat(message.newFilesIncrement().newFiles().stream().map(DataFileMeta::fileSize))
+                .hasSameElementsAs(
+                        Arrays.stream(mbs)
+                                .mapToObj(i -> 
MemorySize.ofMebiBytes(i).getBytes())
+                                .collect(Collectors.toList()));
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertCompactionTask(
+            Object o, long checkpointId, BinaryRow partition, int... mbs) {
+        StreamRecord<Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>> record =
+                (StreamRecord<Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>>) o;
+        assertThat(record.getValue().isRight()).isTrue();
+        assertThat(record.getValue().right().f0).isEqualTo(checkpointId);
+        UnawareAppendCompactionTask task = record.getValue().right().f1;
+        assertThat(task.partition()).isEqualTo(partition);
+        assertThat(task.compactBefore().stream().map(DataFileMeta::fileSize))
+                .hasSameElementsAs(
+                        Arrays.stream(mbs)
+                                .mapToObj(i -> 
MemorySize.ofMebiBytes(i).getBytes())
+                                .collect(Collectors.toList()));
+    }
+
+    private Committable createCommittable(long checkpointId, BinaryRow 
partition, int... mbs) {
+        CommitMessageImpl message =
+                new CommitMessageImpl(
+                        partition,
+                        BucketMode.UNAWARE_BUCKET,
+                        new DataIncrement(
+                                Arrays.stream(mbs)
+                                        
.mapToObj(this::createDataFileMetaOfSize)
+                                        .collect(Collectors.toList()),
+                                Collections.emptyList(),
+                                Collections.emptyList()),
+                        CompactIncrement.emptyIncrement());
+        return new Committable(checkpointId, Committable.Kind.FILE, message);
+    }
+
+    private DataFileMeta createDataFileMetaOfSize(int mb) {
+        return DataFileMeta.forAppend(
+                UUID.randomUUID().toString(),
+                MemorySize.ofMebiBytes(mb).getBytes(),
+                0,
+                SimpleStats.EMPTY_STATS,
+                0,
+                0,
+                1,
+                Collections.emptyList(),
+                null,
+                null,
+                null,
+                null);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private OneInputStreamOperatorTestHarness<
+                    Committable, Either<Committable, Tuple2<Long, 
UnawareAppendCompactionTask>>>
+            
createTestHarness(UnawareBucketNewFilesCompactionCoordinatorOperator operator)
+                    throws Exception {
+        TypeSerializer serializer =
+                new EitherSerializer<>(
+                        new CommittableTypeInfo().createSerializer(new 
ExecutionConfig()),
+                        new TupleTypeInfo<>(
+                                        BasicTypeInfo.LONG_TYPE_INFO, new 
CompactionTaskTypeInfo())
+                                .createSerializer(new ExecutionConfig()));
+        OneInputStreamOperatorTestHarness harness =
+                new OneInputStreamOperatorTestHarness(operator, 1, 1, 0);
+        
harness.getStreamConfig().setupNetworkInputs(Preconditions.checkNotNull(serializer));
+        harness.getStreamConfig().serializeAllConfigs();
+        harness.setup(serializer);
+        return harness;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java
new file mode 100644
index 0000000000..90444287d4
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/UnawareBucketNewFilesCompactionITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * IT cases for {@link UnawareBucketNewFilesCompactionCoordinatorOperator} and 
{@link
+ * UnawareBucketNewFilesCompactionWorkerOperator}.
+ */
+public class UnawareBucketNewFilesCompactionITCase extends AbstractTestBase {
+
+    @Test
+    public void testCompactNewFiles() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment tEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .parallelism(2)
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  pt INT,\n"
+                        + "  a INT,\n"
+                        + "  b STRING\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'write-only' = 'true',\n"
+                        + "  'compaction.min.file-num' = '3',\n"
+                        + "  'compaction.max.file-num' = '3',\n"
+                        + "  'precommit-compact' = 'true',\n"
+                        + "  'sink.parallelism' = '2'\n"
+                        + ")");
+
+        List<String> values = new ArrayList<>();
+        for (int pt = 0; pt < 2; pt++) {
+            for (int a = 0; a < 50; a++) {
+                values.add(String.format("(%d, %d, '%d')", pt, a, a * 1000));
+            }
+        }
+
+        Supplier<Map<String, Integer>> getActual =
+                () -> {
+                    Map<String, Integer> result = new HashMap<>();
+                    try (CloseableIterator<Row> it = tEnv.executeSql("SELECT * 
FROM T").collect()) {
+                        while (it.hasNext()) {
+                            Row row = it.next();
+                            assertThat(row.getArity()).isEqualTo(3);
+                            result.compute(
+                                    String.format(
+                                            "(%s, %s, '%s')",
+                                            row.getField(0), row.getField(1), 
row.getField(2)),
+                                    (k, v) -> v == null ? 1 : v + 1);
+                        }
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                    return result;
+                };
+
+        LocalFileIO fileIO = LocalFileIO.create();
+        for (int r = 1; r <= 3; r++) {
+            tEnv.executeSql("INSERT INTO T VALUES " + String.join(", ", 
values)).await();
+            assertThat(fileIO.listStatus(new Path(warehouse, 
"default.db/T/pt=0/bucket-0")))
+                    .hasSize(r);
+            assertThat(fileIO.listStatus(new Path(warehouse, 
"default.db/T/pt=1/bucket-0")))
+                    .hasSize(r);
+            Map<String, Integer> actual = getActual.get();
+            assertThat(actual.keySet()).hasSameElementsAs(values);
+            final int e = r;
+            assertThat(actual.values()).allMatch(i -> i == e);
+        }
+
+        tEnv.executeSql("CALL sys.compact('default.T')").await();
+        assertThat(fileIO.listStatus(new Path(warehouse, 
"default.db/T/pt=0/bucket-0"))).hasSize(4);
+        assertThat(fileIO.listStatus(new Path(warehouse, 
"default.db/T/pt=1/bucket-0"))).hasSize(4);
+        Map<String, Integer> actual = getActual.get();
+        assertThat(actual.keySet()).hasSameElementsAs(values);
+        assertThat(actual.values()).allMatch(i -> i == 3);
+
+        tEnv.executeSql("CALL sys.expire_snapshots(`table` => 'default.T', 
retain_max => 1)")
+                .await();
+        assertThat(fileIO.listStatus(new Path(warehouse, 
"default.db/T/pt=0/bucket-0"))).hasSize(1);
+        assertThat(fileIO.listStatus(new Path(warehouse, 
"default.db/T/pt=1/bucket-0"))).hasSize(1);
+        actual = getActual.get();
+        assertThat(actual.keySet()).hasSameElementsAs(values);
+        assertThat(actual.values()).allMatch(i -> i == 3);
+    }
+}

Reply via email to