This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c907544c4 [flink] Refactor compactorSink to support extended compact 
type. (#4569)
c907544c4 is described below

commit c907544c4218e1ce28259915a9a6fbd18f0fb5a4
Author: HunterXHunter <[email protected]>
AuthorDate: Fri Nov 22 14:53:57 2024 +0800

    [flink] Refactor compactorSink to support extended compact type. (#4569)
---
 .../java/org/apache/paimon/flink/action/CompactAction.java     |  3 ++-
 .../org/apache/paimon/flink/action/CompactDatabaseAction.java  |  3 ++-
 .../main/java/org/apache/paimon/flink/sink/CompactorSink.java  |  7 +++++--
 .../org/apache/paimon/flink/sink/CompactorSinkBuilder.java     |  9 ++++++++-
 .../org/apache/paimon/flink/sink/StoreCompactOperator.java     | 10 +++++-----
 .../java/org/apache/paimon/flink/sink/CompactorSinkITCase.java |  6 ++++--
 .../org/apache/paimon/flink/sink/StoreCompactOperatorTest.java |  3 ++-
 7 files changed, 28 insertions(+), 13 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 8ea120015..ce88857f1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -138,7 +138,8 @@ public class CompactAction extends TableActionBase {
         }
         CompactorSourceBuilder sourceBuilder =
                 new CompactorSourceBuilder(identifier.getFullName(), table);
-        CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
+        CompactorSinkBuilder sinkBuilder =
+                new 
CompactorSinkBuilder(table).withFullCompaction(!isStreaming);
 
         sourceBuilder.withPartitionPredicate(getPredicate());
         DataStreamSource<RowData> source =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index fda9ff695..471c6fdd4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -259,7 +259,8 @@ public class CompactDatabaseAction extends ActionBase {
         CompactorSourceBuilder sourceBuilder =
                 new CompactorSourceBuilder(fullName, table)
                         .withPartitionIdleTime(partitionIdleTime);
-        CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
+        CompactorSinkBuilder sinkBuilder =
+                new 
CompactorSinkBuilder(table).withFullCompaction(!isStreaming);
 
         DataStreamSource<RowData> source =
                 
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index 7dc3ab115..a0c830d73 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -29,14 +29,17 @@ public class CompactorSink extends FlinkSink<RowData> {
 
     private static final long serialVersionUID = 1L;
 
-    public CompactorSink(FileStoreTable table) {
+    private final boolean fullCompaction;
+
+    public CompactorSink(FileStoreTable table, boolean fullCompaction) {
         super(table, false);
+        this.fullCompaction = fullCompaction;
     }
 
     @Override
     protected OneInputStreamOperator<RowData, Committable> createWriteOperator(
             StoreSinkWrite.Provider writeProvider, String commitUser) {
-        return new StoreCompactOperator(table, writeProvider, commitUser);
+        return new StoreCompactOperator(table, writeProvider, commitUser, 
fullCompaction);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
index 926155cab..2173b1d34 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
@@ -37,6 +37,8 @@ public class CompactorSinkBuilder {
 
     private DataStream<RowData> input;
 
+    private boolean fullCompaction;
+
     public CompactorSinkBuilder(FileStoreTable table) {
         this.table = table;
     }
@@ -46,6 +48,11 @@ public class CompactorSinkBuilder {
         return this;
     }
 
+    public CompactorSinkBuilder withFullCompaction(boolean fullCompaction) {
+        this.fullCompaction = fullCompaction;
+        return this;
+    }
+
     public DataStreamSink<?> build() {
         BucketMode bucketMode = table.bucketMode();
         switch (bucketMode) {
@@ -66,6 +73,6 @@ public class CompactorSinkBuilder {
                         .orElse(null);
         DataStream<RowData> partitioned =
                 partition(input, new BucketsRowChannelComputer(), parallelism);
-        return new CompactorSink(table).sinkFrom(partitioned);
+        return new CompactorSink(table, fullCompaction).sinkFrom(partitioned);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index bc7bb350d..9b152a81c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -52,6 +52,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
     private final FileStoreTable table;
     private final StoreSinkWrite.Provider storeSinkWriteProvider;
     private final String initialCommitUser;
+    private final boolean fullCompaction;
 
     private transient StoreSinkWriteState state;
     private transient StoreSinkWrite write;
@@ -61,7 +62,8 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
     public StoreCompactOperator(
             FileStoreTable table,
             StoreSinkWrite.Provider storeSinkWriteProvider,
-            String initialCommitUser) {
+            String initialCommitUser,
+            boolean fullCompaction) {
         super(Options.fromMap(table.options()));
         Preconditions.checkArgument(
                 !table.coreOptions().writeOnly(),
@@ -69,6 +71,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
         this.table = table;
         this.storeSinkWriteProvider = storeSinkWriteProvider;
         this.initialCommitUser = initialCommitUser;
+        this.fullCompaction = fullCompaction;
     }
 
     @Override
@@ -136,10 +139,7 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
 
         try {
             for (Pair<BinaryRow, Integer> partitionBucket : waitToCompact) {
-                write.compact(
-                        partitionBucket.getKey(),
-                        partitionBucket.getRight(),
-                        !write.streamingMode());
+                write.compact(partitionBucket.getKey(), 
partitionBucket.getRight(), fullCompaction);
             }
         } catch (Exception e) {
             throw new RuntimeException("Exception happens while executing 
compaction.", e);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index a5f260fb2..c38ac4b3d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -132,7 +132,7 @@ public class CompactorSinkITCase extends AbstractTestBase {
                         .withContinuousMode(false)
                         .withPartitionPredicate(predicate)
                         .build();
-        new CompactorSinkBuilder(table).withInput(source).build();
+        new 
CompactorSinkBuilder(table).withFullCompaction(true).withInput(source).build();
         env.execute();
 
         snapshot = 
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
@@ -182,6 +182,7 @@ public class CompactorSinkITCase extends AbstractTestBase {
                                                 
String.valueOf(sinkParalellism));
                                     }
                                 }))
+                .withFullCompaction(false)
                 .withInput(source)
                 .build();
 
@@ -267,7 +268,8 @@ public class CompactorSinkITCase extends AbstractTestBase {
                                 false,
                                 memoryPool,
                                 metricGroup),
-                "test");
+                "test",
+                true);
     }
 
     protected MultiTablesStoreCompactOperator createMultiTablesCompactOperator(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
index 3f2daedff..f8387e1fc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreCompactOperatorTest.java
@@ -53,7 +53,8 @@ public class StoreCompactOperatorTest extends TableTestBase {
                         getTableDefault(),
                         (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
                                 compactRememberStoreWrite,
-                        "10086");
+                        "10086",
+                        !streamingMode);
 
         TypeSerializer<Committable> serializer =
                 new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());

Reply via email to