This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b66d30abe [flink] Unaware-bucket table do compaction async, uncoupled 
with CheckpointBarrier. (#1690)
b66d30abe is described below

commit b66d30abe89da68c9ccde206e87faa17b8149649
Author: YeJunHao <[email protected]>
AuthorDate: Wed Aug 2 11:29:40 2023 +0800

    [flink] Unaware-bucket table do compaction async, uncoupled with 
CheckpointBarrier. (#1690)
---
 .../generated/flink_connector_configuration.html   |  12 +-
 .../generated/spark_connector_configuration.html   |  38 ++--
 .../paimon/append/AppendOnlyCompactionTask.java    |  11 +-
 .../append/AppendOnlyTableCompactionWorker.java    |  66 -------
 .../append/AppendOnlyTableCompactionITTest.java    |  23 ++-
 .../AppendOnlyTableCompactionWorkerTest.java       | 174 -----------------
 .../org/apache/paimon/table/TableTestBase.java     |  81 +++++++-
 .../AppendOnlyTableCompactionWorkerOperator.java   |  93 +++++++--
 ...ppendOnlyTableCompactionWorkerOperatorTest.java | 207 +++++++++++++++++++++
 9 files changed, 411 insertions(+), 294 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index f8f15a34c..8ce743cbb 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -116,12 +116,6 @@ under the License.
             <td>Boolean</td>
             <td>If true, flink sink will use managed memory for merge tree; 
otherwise, it will create an independent memory allocator.</td>
         </tr>
-        <tr>
-            <td><h5>unaware-bucket.compaction.parallelism</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>Integer</td>
-            <td>Defines a custom parallelism for the unaware-bucket table 
compaction job. By default, if this option is not defined, the planner will 
derive the parallelism for each statement individually by also considering the 
global configuration.</td>
-        </tr>
         <tr>
             <td><h5>source.checkpoint-align.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -134,5 +128,11 @@ under the License.
             <td>Duration</td>
             <td>If the new snapshot has not been generated when the checkpoint 
starts to trigger, the enumerator will block the checkpoint and wait for the 
new snapshot. Set the maximum waiting time to avoid infinite waiting, if 
timeout, the checkpoint will fail. Note that it should be set smaller than the 
checkpoint timeout.</td>
         </tr>
+        <tr>
+            <td><h5>unaware-bucket.compaction.parallelism</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>Defines a custom parallelism for the unaware-bucket table 
compaction job. By default, if this option is not defined, the planner will 
derive the parallelism for each statement individually by also considering the 
global configuration.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html 
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index c38a5cc85..db9c1d889 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -7,7 +7,7 @@ to you under the Apache License, Version 2.0 (the
 "License"); you may not use this file except in compliance
 with the License.  You may obtain a copy of the License at
 
-http://www.apache.org/licenses/LICENSE-2.0
+  http://www.apache.org/licenses/LICENSE-2.0
 
 Unless required by applicable law or agreed to in writing,
 software distributed under the License is distributed on an
@@ -18,25 +18,25 @@ under the License.
 */}}
 <table class="configuration table table-bordered">
     <thead>
-    <tr>
-        <th class="text-left" style="width: 20%">Key</th>
-        <th class="text-left" style="width: 15%">Default</th>
-        <th class="text-left" style="width: 10%">Type</th>
-        <th class="text-left" style="width: 55%">Description</th>
-    </tr>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
     </thead>
     <tbody>
-    <tr>
-        <td><h5>write.merge-schema</h5></td>
-        <td style="word-wrap: break-word;">false</td>
-        <td>Boolean</td>
-        <td>If true, merge the data schema and the table schema automatically 
before write data.</td>
-    </tr>
-    <tr>
-        <td><h5>write.merge-schema.explicit-cast</h5></td>
-        <td style="word-wrap: break-word;">false</td>
-        <td>Boolean</td>
-        <td>If true, allow to merge data types if the two types meet the rules 
for explicit casting.</td>
-    </tr>
+        <tr>
+            <td><h5>write.merge-schema</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, merge the data schema and the table schema 
automatically before write data.</td>
+        </tr>
+        <tr>
+            <td><h5>write.merge-schema.explicit-cast</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, allow to merge data types if the two types meet the 
rules for explicit casting.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
index 80b9331f1..3ad9a647f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.NewFilesIncrement;
+import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.Preconditions;
@@ -31,10 +32,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
-/**
- * Compaction task generated by {@link AppendOnlyTableCompactionCoordinator} 
and executed be {@link
- * AppendOnlyTableCompactionWorker}.
- */
+/** Compaction task generated by {@link AppendOnlyTableCompactionCoordinator}. 
*/
 public class AppendOnlyCompactionTask {
 
     private BinaryRow partition;
@@ -62,9 +60,8 @@ public class AppendOnlyCompactionTask {
         return compactAfter;
     }
 
-    public CommitMessage doCompact(AppendOnlyCompactManager.CompactRewriter 
rewriter)
-            throws Exception {
-        compactAfter.addAll(rewriter.rewrite(compactBefore));
+    public CommitMessage doCompact(AppendOnlyFileStoreWrite write) throws 
Exception {
+        compactAfter.addAll(write.compactRewriter(partition, 
0).rewrite(compactBefore));
         CompactIncrement compactIncrement =
                 new CompactIncrement(compactBefore, compactAfter, 
Collections.emptyList());
         return new CommitMessageImpl(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionWorker.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionWorker.java
deleted file mode 100644
index d1a66cb4d..000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyTableCompactionWorker.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.append;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
-import org.apache.paimon.table.AppendOnlyFileStoreTable;
-import org.apache.paimon.table.sink.CommitMessage;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/** {@link AppendOnlyFileStoreTable} compact worker. */
-public class AppendOnlyTableCompactionWorker {
-
-    private final List<AppendOnlyCompactionTask> tasks = new ArrayList<>();
-    private final AppendOnlyFileStoreWrite write;
-
-    public AppendOnlyTableCompactionWorker(AppendOnlyFileStoreTable table, 
String commitUser) {
-        this.write = table.store().newWrite(commitUser);
-    }
-
-    public AppendOnlyTableCompactionWorker 
accept(List<AppendOnlyCompactionTask> tasks) {
-        this.tasks.addAll(tasks);
-        return this;
-    }
-
-    public AppendOnlyTableCompactionWorker accept(AppendOnlyCompactionTask 
task) {
-        this.tasks.add(task);
-        return this;
-    }
-
-    public List<CommitMessage> doCompact() throws Exception {
-        List<CommitMessage> result = new ArrayList<>();
-        Map<BinaryRow, AppendOnlyCompactManager.CompactRewriter> rewriters = 
new HashMap<>();
-        for (AppendOnlyCompactionTask task : tasks) {
-            AppendOnlyCompactManager.CompactRewriter rewriter =
-                    rewriters.computeIfAbsent(task.partition(), 
this::buildRewriter);
-            result.add(task.doCompact(rewriter));
-        }
-        tasks.clear();
-        return result;
-    }
-
-    private AppendOnlyCompactManager.CompactRewriter buildRewriter(BinaryRow 
partition) {
-        return write.compactRewriter(partition, 0);
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
index 6e2df6572..d6983f104 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -53,7 +54,7 @@ public class AppendOnlyTableCompactionITTest {
     private AppendOnlyFileStoreTable appendOnlyFileStoreTable;
     private SnapshotManager snapshotManager;
     private AppendOnlyTableCompactionCoordinator compactionCoordinator;
-    private AppendOnlyTableCompactionWorker compactionWorker;
+    private AppendOnlyFileStoreWrite write;
     private final String commitUser = UUID.randomUUID().toString();
 
     @Test
@@ -78,8 +79,7 @@ public class AppendOnlyTableCompactionITTest {
         Assertions.assertEquals(tasks.size(), 1);
         AppendOnlyCompactionTask task = tasks.get(0);
         Assertions.assertEquals(task.compactBefore().size(), 6);
-        compactionWorker.accept(tasks);
-        List<CommitMessage> result = compactionWorker.doCompact();
+        List<CommitMessage> result = doCompact(tasks);
         Assertions.assertEquals(1, result.size());
         commit(result);
         compactionCoordinator.scan();
@@ -90,7 +90,7 @@ public class AppendOnlyTableCompactionITTest {
         Assertions.assertEquals(tasks.size(), 1);
         // before update, zero file left
         
Assertions.assertEquals(compactionCoordinator.listRestoredFiles().size(), 0);
-        commit(compactionWorker.accept(tasks).doCompact());
+        commit(doCompact(tasks));
         compactionCoordinator.scan();
         // one file is loaded from delta
         List<DataFileMeta> last = new 
ArrayList<>(compactionCoordinator.listRestoredFiles());
@@ -107,7 +107,7 @@ public class AppendOnlyTableCompactionITTest {
         for (int i = 90; i < 100; i++) {
             count += i;
             commit(writeCommit(i));
-            
commit(compactionWorker.accept(compactionCoordinator.run()).doCompact());
+            commit(doCompact(compactionCoordinator.run()));
             // scan the file generated itself
             Assertions.assertTrue(compactionCoordinator.scan());
             Assertions.assertEquals(
@@ -124,7 +124,7 @@ public class AppendOnlyTableCompactionITTest {
 
         List<AppendOnlyCompactionTask> tasks = 
compactionCoordinator.compactPlan();
         while (tasks.size() != 0) {
-            commit(compactionWorker.accept(tasks).doCompact());
+            commit(doCompact(tasks));
             tasks = compactionCoordinator.run();
         }
 
@@ -159,6 +159,14 @@ public class AppendOnlyTableCompactionITTest {
         return messages;
     }
 
+    private List<CommitMessage> doCompact(List<AppendOnlyCompactionTask> 
tasks) throws Exception {
+        List<CommitMessage> result = new ArrayList<>();
+        for (AppendOnlyCompactionTask task : tasks) {
+            result.add(task.doCompact(write));
+        }
+        return result;
+    }
+
     private InternalRow randomRow() {
         return GenericRow.of(
                 random.nextInt(100),
@@ -183,7 +191,6 @@ public class AppendOnlyTableCompactionITTest {
                                 new 
org.apache.paimon.fs.Path(tempDir.toString()),
                                 tableSchema);
         compactionCoordinator = new 
AppendOnlyTableCompactionCoordinator(appendOnlyFileStoreTable);
-        compactionWorker =
-                new AppendOnlyTableCompactionWorker(appendOnlyFileStoreTable, 
commitUser);
+        write = appendOnlyFileStoreTable.store().newWrite(commitUser);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionWorkerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionWorkerTest.java
deleted file mode 100644
index 167ac351f..000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionWorkerTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.append;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.BinaryRowWriter;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.AppendOnlyFileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.types.DataTypes;
-
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-
-/** Tests for {@link AppendOnlyTableCompactionWorker}. */
-public class AppendOnlyTableCompactionWorkerTest {
-
-    private static final Random RANDOM = new Random();
-
-    @TempDir Path tempDir;
-    private AppendOnlyFileStoreTable appendOnlyFileStoreTable;
-    private AppendOnlyTableCompactionCoordinator compactionCoordinator;
-    private AppendOnlyTableCompactionWorker compactionWorker;
-    private BinaryRow partition;
-    private final String commitUser = UUID.randomUUID().toString();
-
-    @Test
-    public void compactinoWorkerExecuteSinglePartitionTest() throws Exception {
-        // single partition worker execute test
-        int fileNumer = 100;
-        int taskSize = fileNumer / 6;
-        List<DataFileMeta> writtenFiles = writeCommit(fileNumer);
-        compactionCoordinator.notifyNewFiles(partition, writtenFiles);
-        List<AppendOnlyCompactionTask> tasks = 
compactionCoordinator.compactPlan();
-        Assertions.assertEquals(tasks.size(), taskSize);
-
-        compactionWorker.accept(tasks);
-        List<CommitMessage> lists = compactionWorker.doCompact();
-
-        Assertions.assertEquals(lists.size(), taskSize);
-
-        for (int i = 0; i < taskSize; i++) {
-            AppendOnlyCompactionTask task = tasks.get(i);
-            CommitMessageImpl commitMessage = (CommitMessageImpl) lists.get(i);
-            Assertions.assertIterableEquals(
-                    task.compactBefore(), 
commitMessage.compactIncrement().compactBefore());
-            
Assertions.assertEquals(commitMessage.compactIncrement().compactAfter().size(), 
1);
-        }
-    }
-
-    @Test
-    public void compactinoWorkerExecuteMultiPartitionTest() throws Exception {
-        // test muti-partition compaction task execute situation
-        BinaryRow binaryRow = new BinaryRow(1);
-        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRow);
-        binaryRowWriter.writeInt(0, 0);
-        binaryRowWriter.complete();
-        BinaryRow partition0 = binaryRow.copy();
-
-        binaryRowWriter.reset();
-        binaryRowWriter.writeInt(0, 1);
-        binaryRowWriter.complete();
-        BinaryRow partition1 = binaryRow.copy();
-
-        int fileNumer = 100;
-        int taskSize = fileNumer / 6;
-        List<DataFileMeta> writtenFiles0 = writeCommit(fileNumer);
-        compactionCoordinator.notifyNewFiles(partition0, writtenFiles0);
-        List<DataFileMeta> writtenFiles1 = writeCommit(fileNumer);
-        compactionCoordinator.notifyNewFiles(partition1, writtenFiles1);
-        List<AppendOnlyCompactionTask> tasks = 
compactionCoordinator.compactPlan();
-        Assertions.assertEquals(tasks.size(), taskSize * 2);
-
-        compactionWorker.accept(tasks);
-        List<CommitMessage> lists = compactionWorker.doCompact();
-
-        Assertions.assertEquals(lists.size(), taskSize * 2);
-
-        for (int i = 0; i < taskSize; i++) {
-            AppendOnlyCompactionTask task = tasks.get(i);
-            CommitMessageImpl commitMessage = (CommitMessageImpl) lists.get(i);
-            Assertions.assertEquals(task.partition(), 
commitMessage.partition());
-            Assertions.assertIterableEquals(
-                    task.compactBefore(), 
commitMessage.compactIncrement().compactBefore());
-        }
-    }
-
-    private static Schema schema() {
-        Schema.Builder schemaBuilder = Schema.newBuilder();
-        schemaBuilder.column("f0", DataTypes.INT());
-        schemaBuilder.column("f1", DataTypes.STRING());
-        schemaBuilder.column("f2", DataTypes.STRING());
-        schemaBuilder.column("f3", DataTypes.STRING());
-        schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "3");
-        schemaBuilder.option(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "6");
-        return schemaBuilder.build();
-    }
-
-    private List<DataFileMeta> writeCommit(int number) throws Exception {
-        List<DataFileMeta> fileMetas = new ArrayList<>();
-        StreamTableWrite writer = 
appendOnlyFileStoreTable.newStreamWriteBuilder().newWrite();
-        for (int i = 0; i < number; i++) {
-            writer.write(randomRow());
-            for (CommitMessage message : writer.prepareCommit(true, i)) {
-                CommitMessageImpl commitMessage = (CommitMessageImpl) message;
-                fileMetas.addAll(commitMessage.newFilesIncrement().newFiles());
-            }
-        }
-        return fileMetas;
-    }
-
-    private InternalRow randomRow() {
-        return GenericRow.of(
-                RANDOM.nextInt(100),
-                BinaryString.fromString("A" + RANDOM.nextInt(100)),
-                BinaryString.fromString("B" + RANDOM.nextInt(100)),
-                BinaryString.fromString("C" + RANDOM.nextInt(100)));
-    }
-
-    @BeforeEach
-    public void createCoordinator() throws Exception {
-        FileIO fileIO = new LocalFileIO();
-        org.apache.paimon.fs.Path path = new 
org.apache.paimon.fs.Path(tempDir.toString());
-        SchemaManager schemaManager = new SchemaManager(fileIO, path);
-        TableSchema tableSchema = schemaManager.createTable(schema());
-
-        appendOnlyFileStoreTable =
-                (AppendOnlyFileStoreTable)
-                        FileStoreTableFactory.create(
-                                fileIO,
-                                new 
org.apache.paimon.fs.Path(tempDir.toString()),
-                                tableSchema);
-        compactionCoordinator = new 
AppendOnlyTableCompactionCoordinator(appendOnlyFileStoreTable);
-        compactionWorker =
-                new AppendOnlyTableCompactionWorker(appendOnlyFileStoreTable, 
commitUser);
-        partition = BinaryRow.EMPTY_ROW;
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index 9f5b32514..9f305182f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -23,15 +23,22 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.TraceableFileIO;
 
@@ -44,6 +51,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
 import java.util.function.Predicate;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -51,11 +60,15 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Test base for table. */
 public abstract class TableTestBase {
 
-    @TempDir java.nio.file.Path tempPath;
+    protected static final Random RANDOM = new Random();
+    protected static final String DEFAULT_TABLE_NAME = "MyTable";
+
+    protected final String commitUser = UUID.randomUUID().toString();
 
     protected Path warehouse;
     protected Catalog catalog;
     protected String database;
+    @TempDir java.nio.file.Path tempPath;
 
     @BeforeEach
     public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
@@ -77,6 +90,10 @@ public abstract class TableTestBase {
         return new Identifier(database, tableName);
     }
 
+    protected Identifier identifier() {
+        return identifier(DEFAULT_TABLE_NAME);
+    }
+
     protected void write(Table table, InternalRow... rows) throws Exception {
         BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
         try (BatchTableWrite write = writeBuilder.newWrite();
@@ -112,4 +129,66 @@ public abstract class TableTestBase {
         reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
         return rows;
     }
+
+    public void createTableDefault() throws Exception {
+        catalog.createTable(identifier(), schemaDefault(), true);
+    }
+
+    protected void commitDefault(List<CommitMessage> messages) throws 
Exception {
+        getTableDefault().newBatchWriteBuilder().newCommit().commit(messages);
+    }
+
+    protected List<CommitMessage> writeDataDefault(int size, int times) throws 
Exception {
+        List<CommitMessage> messages = new ArrayList<>();
+        for (int i = 0; i < times; i++) {
+            messages.addAll(writeOnce(getTableDefault(), i, size));
+        }
+
+        return messages;
+    }
+
+    public Table getTableDefault() throws Exception {
+        return catalog.getTable(identifier());
+    }
+
+    private List<CommitMessage> writeOnce(Table table, int time, int size) 
throws Exception {
+        StreamWriteBuilder builder = table.newStreamWriteBuilder();
+        builder.withCommitUser(commitUser);
+        try (StreamTableWrite streamTableWrite = builder.newWrite()) {
+            for (int j = 0; j < size; j++) {
+                streamTableWrite.write(dataDefault(time, j));
+            }
+            return streamTableWrite.prepareCommit(false, Long.MAX_VALUE);
+        }
+    }
+
+    // schema with all the basic types.
+    protected Schema schemaDefault() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.STRING());
+        schemaBuilder.column("f2", DataTypes.BYTES());
+        return schemaBuilder.build();
+    }
+
+    protected InternalRow dataDefault(int time, int size) {
+        return GenericRow.of(RANDOM.nextInt(), randomString(), randomBytes());
+    }
+
+    protected BinaryString randomString() {
+        int length = RANDOM.nextInt(50);
+        byte[] buffer = new byte[length];
+
+        for (int i = 0; i < length; i += 1) {
+            buffer[i] = (byte) ('a' + RANDOM.nextInt(26));
+        }
+
+        return BinaryString.fromBytes(buffer);
+    }
+
+    protected byte[] randomBytes() {
+        byte[] binary = new byte[RANDOM.nextInt(10)];
+        RANDOM.nextBytes(binary);
+        return binary;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
index abcc8d618..9d2e76a30 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
@@ -18,18 +18,28 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.append.AppendOnlyCompactionTask;
-import org.apache.paimon.append.AppendOnlyTableCompactionWorker;
 import org.apache.paimon.flink.source.BucketUnawareCompactSource;
+import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.utils.ExecutorThreadFactory;
 
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
 /**
@@ -38,10 +48,15 @@ import java.util.stream.Collectors;
  */
 public class AppendOnlyTableCompactionWorkerOperator
         extends PrepareCommitOperator<AppendOnlyCompactionTask, Committable> {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(AppendOnlyTableCompactionWorkerOperator.class);
+
     private final AppendOnlyFileStoreTable table;
-    private transient AppendOnlyTableCompactionWorker worker;
     private final String commitUser;
-    private transient List<CommitMessage> result;
+    private transient AppendOnlyFileStoreWrite write;
+    private transient ExecutorService lazyCompactExecutor;
+    transient Queue<Future<CommitMessage>> result;
 
     public AppendOnlyTableCompactionWorkerOperator(
             AppendOnlyFileStoreTable table, String commitUser) {
@@ -52,25 +67,77 @@ public class AppendOnlyTableCompactionWorkerOperator
 
     @Override
     public void open() throws Exception {
-        this.worker = new AppendOnlyTableCompactionWorker(table, commitUser);
-        this.result = new ArrayList<>();
+        LOG.debug("Opened a append-only table compaction worker.");
+        this.write = table.store().newWrite(commitUser);
+        this.result = new LinkedList<>();
     }
 
     @Override
     protected List<Committable> prepareCommit(boolean waitCompaction, long 
checkpointId)
             throws IOException {
-        // ignore waitCompaction tag
-        ArrayList<CommitMessage> tempList = new ArrayList<>(result);
-        result.clear();
-        return tempList.stream()
-                .map(s -> new Committable(checkpointId, Committable.Kind.FILE, 
s))
-                .collect(Collectors.toList());
+        List<CommitMessage> tempList = new ArrayList<>();
+        try {
+            while (!result.isEmpty()) {
+                Future<CommitMessage> future = result.peek();
+                if (!future.isDone() && !waitCompaction) {
+                    break;
+                }
+
+                result.poll();
+                tempList.add(future.get());
+            }
+            return tempList.stream()
+                    .map(s -> new Committable(checkpointId, 
Committable.Kind.FILE, s))
+                    .collect(Collectors.toList());
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while waiting tasks 
done.", e);
+        } catch (Exception e) {
+            throw new RuntimeException("Encountered an error while do 
compaction", e);
+        }
     }
 
     @Override
     public void processElement(StreamRecord<AppendOnlyCompactionTask> element) 
throws Exception {
         AppendOnlyCompactionTask task = element.getValue();
-        worker.accept(task);
-        result.addAll(worker.doCompact());
+        result.add(workerExecutor().submit(() -> task.doCompact(write)));
+    }
+
+    private ExecutorService workerExecutor() {
+        if (lazyCompactExecutor == null) {
+            lazyCompactExecutor =
+                    Executors.newSingleThreadScheduledExecutor(
+                            new ExecutorThreadFactory(
+                                    Thread.currentThread().getName()
+                                            + "-append-only-compact-worker"));
+        }
+        return lazyCompactExecutor;
+    }
+
+    @Override
+    public void close() throws Exception {
+        shutdown();
+    }
+
+    @VisibleForTesting
+    void shutdown() throws Exception {
+        if (lazyCompactExecutor != null) {
+            // ignore runnable tasks in queue
+            lazyCompactExecutor.shutdownNow();
+            List<CommitMessage> messages = new ArrayList<>();
+            for (Future<CommitMessage> resultFuture : result) {
+                if (!resultFuture.isDone()) {
+                    // the later tasks should be stopped running
+                    break;
+                }
+                try {
+                    messages.add(resultFuture.get());
+                } catch (Exception exception) {
+                    // exception should already be handled
+                }
+            }
+            try (TableCommitImpl tableCommit = table.newCommit(commitUser)) {
+                tableCommit.abort(messages);
+            }
+        }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java
new file mode 100644
index 000000000..cbf6d37ef
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.AppendOnlyCompactionTask;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.DataTypes;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/** Tests for {@link AppendOnlyTableCompactionWorkerOperator}. */
+public class AppendOnlyTableCompactionWorkerOperatorTest extends TableTestBase 
{
+
+    @Test
+    public void testAsyncCompactionWorks() throws Exception {
+        createTableDefault();
+        AppendOnlyTableCompactionWorkerOperator workerOperator =
+                new AppendOnlyTableCompactionWorkerOperator(
+                        (AppendOnlyFileStoreTable) getTableDefault(), "user");
+
+        // write 200 files
+        List<CommitMessage> commitMessages = writeDataDefault(200, 100);
+
+        List<AppendOnlyCompactionTask> tasks = packTask(commitMessages, 5);
+        List<StreamRecord<AppendOnlyCompactionTask>> records =
+                
tasks.stream().map(StreamRecord::new).collect(Collectors.toList());
+        Assertions.assertThat(tasks.size()).isEqualTo(20);
+
+        workerOperator.open();
+
+        for (StreamRecord<AppendOnlyCompactionTask> record : records) {
+            workerOperator.processElement(record);
+        }
+
+        List<Committable> committables = new ArrayList<>();
+        Long timeStart = System.currentTimeMillis();
+        long timeout = 60_000L;
+
+        Assertions.assertThatCode(
+                        () -> {
+                            while (committables.size() != 20) {
+                                committables.addAll(
+                                        workerOperator.prepareCommit(false, 
Long.MAX_VALUE));
+
+                                Long now = System.currentTimeMillis();
+                                if (timeStart - now > timeout && 
committables.size() != 20) {
+                                    throw new RuntimeException(
+                                            "Timeout waiting for compaction, 
maybe some error happens in "
+                                                    + 
AppendOnlyTableCompactionWorkerOperator.class
+                                                            .getName());
+                                }
+                                Thread.sleep(1_000L);
+                            }
+                        })
+                .doesNotThrowAnyException();
+        committables.forEach(
+                a ->
+                        Assertions.assertThat(
+                                        ((CommitMessageImpl) 
a.wrappedCommittable())
+                                                        .compactIncrement()
+                                                        .compactAfter()
+                                                        .size()
+                                                == 1)
+                                .isTrue());
+    }
+
+    @Test
+    public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception {
+        createTableDefault();
+        AppendOnlyTableCompactionWorkerOperator workerOperator =
+                new AppendOnlyTableCompactionWorkerOperator(
+                        (AppendOnlyFileStoreTable) getTableDefault(), "user");
+
+        // write 200 files
+        List<CommitMessage> commitMessages = writeDataDefault(200, 200);
+
+        List<AppendOnlyCompactionTask> tasks = packTask(commitMessages, 5);
+        List<StreamRecord<AppendOnlyCompactionTask>> records =
+                
tasks.stream().map(StreamRecord::new).collect(Collectors.toList());
+        Assertions.assertThat(tasks.size()).isEqualTo(40);
+
+        workerOperator.open();
+
+        for (StreamRecord<AppendOnlyCompactionTask> record : records) {
+            workerOperator.processElement(record);
+        }
+
+        // wait compaction
+        Thread.sleep(2000);
+
+        LocalFileIO localFileIO = LocalFileIO.create();
+        DataFilePathFactory dataFilePathFactory =
+                ((AppendOnlyFileStoreTable) getTableDefault())
+                        .store()
+                        .pathFactory()
+                        .createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
+        int i = 0;
+        for (Future<CommitMessage> f : workerOperator.result) {
+            if (!f.isDone()) {
+                break;
+            }
+            CommitMessage commitMessage = f.get();
+            List<DataFileMeta> fileMetas =
+                    ((CommitMessageImpl) 
commitMessage).compactIncrement().compactAfter();
+            for (DataFileMeta fileMeta : fileMetas) {
+                Assertions.assertThat(
+                                
localFileIO.exists(dataFilePathFactory.toPath(fileMeta.fileName())))
+                        .isTrue();
+            }
+            if (i++ > 8) {
+                break;
+            }
+        }
+
+        // shut down worker operator
+        workerOperator.shutdown();
+
+        for (Future<CommitMessage> f : workerOperator.result) {
+            try {
+                CommitMessage commitMessage = f.get();
+                List<DataFileMeta> fileMetas =
+                        ((CommitMessageImpl) 
commitMessage).compactIncrement().compactAfter();
+                for (DataFileMeta fileMeta : fileMetas) {
+                    Assertions.assertThat(
+                                    localFileIO.exists(
+                                            
dataFilePathFactory.toPath(fileMeta.fileName())))
+                            .isFalse();
+                }
+            } catch (Exception e) {
+                // do nothing
+            }
+        }
+    }
+
+    @Override
+    protected Schema schemaDefault() {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("f0", DataTypes.INT());
+        schemaBuilder.column("f1", DataTypes.BIGINT());
+        schemaBuilder.column("f2", DataTypes.STRING());
+        schemaBuilder.option(CoreOptions.BUCKET.key(), "-1");
+        schemaBuilder.option(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "5");
+        return schemaBuilder.build();
+    }
+
+    @Override
+    protected InternalRow dataDefault(int time, int size) {
+        return GenericRow.of(RANDOM.nextInt(), RANDOM.nextLong(), 
randomString());
+    }
+
+    private List<AppendOnlyCompactionTask> packTask(List<CommitMessage> 
messages, int fileSize) {
+        List<AppendOnlyCompactionTask> result = new ArrayList<>();
+        List<DataFileMeta> metas =
+                messages.stream()
+                        .flatMap(
+                                m ->
+                                        ((CommitMessageImpl) m)
+                                                
.newFilesIncrement().newFiles().stream())
+                        .collect(Collectors.toList());
+        for (int i = 0; i < metas.size(); i += fileSize) {
+            if (i < metas.size() - fileSize) {
+                result.add(
+                        new AppendOnlyCompactionTask(
+                                BinaryRow.EMPTY_ROW, metas.subList(i, i + 
fileSize)));
+            } else {
+                result.add(
+                        new AppendOnlyCompactionTask(
+                                BinaryRow.EMPTY_ROW, metas.subList(i, 
metas.size())));
+            }
+        }
+        return result;
+    }
+}


Reply via email to