This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 85334fafc [flink] Refactor codes for 
AppendOnlyTableCompactionWorkerOperator
85334fafc is described below

commit 85334fafc242feced284453bb5b31ecd42a76747
Author: Jingsong <[email protected]>
AuthorDate: Wed Aug 2 11:37:33 2023 +0800

    [flink] Refactor codes for AppendOnlyTableCompactionWorkerOperator
---
 .../org/apache/paimon/append/AppendOnlyCompactionTask.java   |  6 +++---
 .../flink/sink/AppendOnlyTableCompactionWorkerOperator.java  | 12 +++++++++++-
 .../sink/AppendOnlyTableCompactionWorkerOperatorTest.java    |  4 ++--
 3 files changed, 16 insertions(+), 6 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
index 3ad9a647f..3ef3da40e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
@@ -35,9 +35,9 @@ import java.util.Objects;
 /** Compaction task generated by {@link AppendOnlyTableCompactionCoordinator}. 
*/
 public class AppendOnlyCompactionTask {
 
-    private BinaryRow partition;
-    private transient List<DataFileMeta> compactBefore;
-    private transient List<DataFileMeta> compactAfter;
+    private final BinaryRow partition;
+    private final List<DataFileMeta> compactBefore;
+    private final List<DataFileMeta> compactAfter;
 
     public AppendOnlyCompactionTask(BinaryRow partition, List<DataFileMeta> 
files) {
         Preconditions.checkArgument(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
index 9d2e76a30..f2fe92234 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
@@ -54,9 +54,10 @@ public class AppendOnlyTableCompactionWorkerOperator
 
     private final AppendOnlyFileStoreTable table;
     private final String commitUser;
+
     private transient AppendOnlyFileStoreWrite write;
     private transient ExecutorService lazyCompactExecutor;
-    transient Queue<Future<CommitMessage>> result;
+    private transient Queue<Future<CommitMessage>> result;
 
     public AppendOnlyTableCompactionWorkerOperator(
             AppendOnlyFileStoreTable table, String commitUser) {
@@ -65,6 +66,11 @@ public class AppendOnlyTableCompactionWorkerOperator
         this.commitUser = commitUser;
     }
 
+    @VisibleForTesting
+    Iterable<Future<CommitMessage>> result() {
+        return result;
+    }
+
     @Override
     public void open() throws Exception {
         LOG.debug("Opened a append-only table compaction worker.");
@@ -135,6 +141,10 @@ public class AppendOnlyTableCompactionWorkerOperator
                     // exception should already be handled
                 }
             }
+            if (messages.isEmpty()) {
+                return;
+            }
+
             try (TableCommitImpl tableCommit = table.newCommit(commitUser)) {
                 tableCommit.abort(messages);
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java
index cbf6d37ef..75f99dbf1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java
@@ -129,7 +129,7 @@ public class AppendOnlyTableCompactionWorkerOperatorTest 
extends TableTestBase {
                         .pathFactory()
                         .createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
         int i = 0;
-        for (Future<CommitMessage> f : workerOperator.result) {
+        for (Future<CommitMessage> f : workerOperator.result()) {
             if (!f.isDone()) {
                 break;
             }
@@ -149,7 +149,7 @@ public class AppendOnlyTableCompactionWorkerOperatorTest 
extends TableTestBase {
         // shut down worker operator
         workerOperator.shutdown();
 
-        for (Future<CommitMessage> f : workerOperator.result) {
+        for (Future<CommitMessage> f : workerOperator.result()) {
             try {
                 CommitMessage commitMessage = f.get();
                 List<DataFileMeta> fileMetas =

Reply via email to