This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 624a553fb60d5d35640bc89c876f66fc288bcd4a
Author: YeJunHao <[email protected]>
AuthorDate: Thu Dec 7 11:16:51 2023 +0800

    [flink] Disable compaction topology when streaming with bounded input 
stream (#2463)
---
 .../main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java | 9 ++++++++-
 .../java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java    | 1 +
 .../org/apache/paimon/flink/sink/UnawareBucketWriteSink.java     | 8 ++++++--
 3 files changed, 15 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 56ead8a85..46a0c09fa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -44,6 +44,7 @@ public class FlinkSinkBuilder {
     @Nullable private Map<String, String> overwritePartition;
     @Nullable private LogSinkFunction logSinkFunction;
     @Nullable private Integer parallelism;
+    private boolean boundedInput = false;
     private boolean compactSink = false;
 
     public FlinkSinkBuilder(FileStoreTable table) {
@@ -79,6 +80,11 @@ public class FlinkSinkBuilder {
         return this;
     }
 
+    public FlinkSinkBuilder withBoundedInputStream(boolean bounded) {
+        this.boundedInput = bounded;
+        return this;
+    }
+
     public FlinkSinkBuilder forCompact(boolean compactSink) {
         this.compactSink = compactSink;
         return this;
@@ -142,7 +148,8 @@ public class FlinkSinkBuilder {
                         (AppendOnlyFileStoreTable) table,
                         overwritePartition,
                         logSinkFunction,
-                        parallelism)
+                        parallelism,
+                        boundedInput)
                 .sinkFrom(input);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index be6628bde..3874e2ca7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -133,6 +133,7 @@ public abstract class FlinkTableSinkBase
                                 .withLogSinkFunction(logSinkFunction)
                                 .withOverwritePartition(overwrite ? 
staticPartitions : null)
                                 
.withParallelism(conf.get(FlinkConnectorOptions.SINK_PARALLELISM))
+                                .withBoundedInputStream(context.isBounded())
                                 .build());
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
index 856432582..802a98dcf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketWriteSink.java
@@ -40,16 +40,19 @@ public class UnawareBucketWriteSink extends FileStoreSink {
     private final boolean enableCompaction;
     private final AppendOnlyFileStoreTable table;
     private final Integer parallelism;
+    private final boolean boundedInput;
 
     public UnawareBucketWriteSink(
             AppendOnlyFileStoreTable table,
             Map<String, String> overwritePartitions,
             LogSinkFunction logSinkFunction,
-            Integer parallelism) {
+            Integer parallelism,
+            boolean boundedInput) {
         super(table, overwritePartitions, logSinkFunction);
         this.table = table;
         this.enableCompaction = !table.coreOptions().writeOnly();
         this.parallelism = parallelism;
+        this.boundedInput = boundedInput;
     }
 
     @Override
@@ -63,7 +66,8 @@ public class UnawareBucketWriteSink extends FileStoreSink {
                                 .get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
         // if enable compaction, we need to add compaction topology to this job
-        if (enableCompaction && isStreamingMode) {
+        if (enableCompaction && isStreamingMode && !boundedInput) {
+            // if streaming mode with bounded input, we disable compaction 
topology
             UnawareBucketCompactionTopoBuilder builder =
                     new UnawareBucketCompactionTopoBuilder(
                             input.getExecutionEnvironment(), table.name(), 
table);

Reply via email to