This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 85334fafc [flink] Refactor codes for
AppendOnlyTableCompactionWorkerOperator
85334fafc is described below
commit 85334fafc242feced284453bb5b31ecd42a76747
Author: Jingsong <[email protected]>
AuthorDate: Wed Aug 2 11:37:33 2023 +0800
[flink] Refactor codes for AppendOnlyTableCompactionWorkerOperator
---
.../org/apache/paimon/append/AppendOnlyCompactionTask.java | 6 +++---
.../flink/sink/AppendOnlyTableCompactionWorkerOperator.java | 12 +++++++++++-
.../sink/AppendOnlyTableCompactionWorkerOperatorTest.java | 4 ++--
3 files changed, 16 insertions(+), 6 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
index 3ad9a647f..3ef3da40e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java
@@ -35,9 +35,9 @@ import java.util.Objects;
/** Compaction task generated by {@link AppendOnlyTableCompactionCoordinator}.
*/
public class AppendOnlyCompactionTask {
- private BinaryRow partition;
- private transient List<DataFileMeta> compactBefore;
- private transient List<DataFileMeta> compactAfter;
+ private final BinaryRow partition;
+ private final List<DataFileMeta> compactBefore;
+ private final List<DataFileMeta> compactAfter;
public AppendOnlyCompactionTask(BinaryRow partition, List<DataFileMeta>
files) {
Preconditions.checkArgument(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
index 9d2e76a30..f2fe92234 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java
@@ -54,9 +54,10 @@ public class AppendOnlyTableCompactionWorkerOperator
private final AppendOnlyFileStoreTable table;
private final String commitUser;
+
private transient AppendOnlyFileStoreWrite write;
private transient ExecutorService lazyCompactExecutor;
- transient Queue<Future<CommitMessage>> result;
+ private transient Queue<Future<CommitMessage>> result;
public AppendOnlyTableCompactionWorkerOperator(
AppendOnlyFileStoreTable table, String commitUser) {
@@ -65,6 +66,11 @@ public class AppendOnlyTableCompactionWorkerOperator
this.commitUser = commitUser;
}
+ @VisibleForTesting
+ Iterable<Future<CommitMessage>> result() {
+ return result;
+ }
+
@Override
public void open() throws Exception {
LOG.debug("Opened a append-only table compaction worker.");
@@ -135,6 +141,10 @@ public class AppendOnlyTableCompactionWorkerOperator
// exception should already be handled
}
}
+ if (messages.isEmpty()) {
+ return;
+ }
+
try (TableCommitImpl tableCommit = table.newCommit(commitUser)) {
tableCommit.abort(messages);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java
index cbf6d37ef..75f99dbf1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java
@@ -129,7 +129,7 @@ public class AppendOnlyTableCompactionWorkerOperatorTest
extends TableTestBase {
.pathFactory()
.createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
int i = 0;
- for (Future<CommitMessage> f : workerOperator.result) {
+ for (Future<CommitMessage> f : workerOperator.result()) {
if (!f.isDone()) {
break;
}
@@ -149,7 +149,7 @@ public class AppendOnlyTableCompactionWorkerOperatorTest
extends TableTestBase {
// shut down worker operator
workerOperator.shutdown();
- for (Future<CommitMessage> f : workerOperator.result) {
+ for (Future<CommitMessage> f : workerOperator.result()) {
try {
CommitMessage commitMessage = f.get();
List<DataFileMeta> fileMetas =