This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 037dc54bb [core] Remove bounded check for append compact topology 
(#3948)
037dc54bb is described below

commit 037dc54bbbc3f9553c26cddae5b4e00231fa8198
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 13 17:39:53 2024 +0800

    [core] Remove bounded check for append compact topology (#3948)
---
 .../paimon/flink/sink/cdc/CdcUnawareBucketSink.java |  2 +-
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java  | 21 ++-------------------
 .../paimon/flink/sink/FlinkTableSinkBase.java       |  1 -
 .../paimon/flink/sink/RowUnawareBucketSink.java     |  5 ++---
 .../apache/paimon/flink/sink/UnawareBucketSink.java |  7 ++-----
 5 files changed, 7 insertions(+), 29 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java
index 5cd3d6085..c60772d10 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcUnawareBucketSink.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 public class CdcUnawareBucketSink extends UnawareBucketSink<CdcRecord> {
 
     public CdcUnawareBucketSink(FileStoreTable table, Integer parallelism) {
-        super(table, null, null, parallelism, false);
+        super(table, null, null, parallelism);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index dd73bd559..546f82ec1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -76,7 +76,6 @@ public class FlinkSinkBuilder {
     private DataStream<RowData> input;
     @Nullable protected Map<String, String> overwritePartition;
     @Nullable protected Integer parallelism;
-    private Boolean boundedInput = null;
     @Nullable private TableSortInfo tableSortInfo;
 
     // ============== for extension ==============
@@ -131,15 +130,6 @@ public class FlinkSinkBuilder {
         return this;
     }
 
-    /**
-     * Set input bounded, if it is bounded, append table sink does not 
generate a topology for
-     * merging small files.
-     */
-    public FlinkSinkBuilder inputBounded(boolean bounded) {
-        this.boundedInput = bounded;
-        return this;
-    }
-
     /** Clustering the input data if possible. */
     public FlinkSinkBuilder clusteringIfPossible(
             String clusteringColumns,
@@ -152,10 +142,7 @@ public class FlinkSinkBuilder {
             return this;
         }
         checkState(input != null, "The input stream should be specified 
earlier.");
-        if (boundedInput == null) {
-            boundedInput = !FlinkSink.isStreaming(input);
-        }
-        if (!boundedInput || !table.bucketMode().equals(BUCKET_UNAWARE)) {
+        if (FlinkSink.isStreaming(input) || 
!table.bucketMode().equals(BUCKET_UNAWARE)) {
             LOG.warn(
                     "Clustering is enabled; however, it has been skipped as "
                             + "it only supports the bucket unaware table 
without primary keys and "
@@ -282,11 +269,7 @@ public class FlinkSinkBuilder {
         checkArgument(
                 table.primaryKeys().isEmpty(),
                 "Unaware bucket mode only works with append-only table for 
now.");
-        if (boundedInput == null) {
-            boundedInput = !FlinkSink.isStreaming(input);
-        }
-        return new RowUnawareBucketSink(
-                        table, overwritePartition, logSinkFunction, 
parallelism, boundedInput)
+        return new RowUnawareBucketSink(table, overwritePartition, 
logSinkFunction, parallelism)
                 .sinkFrom(input);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index ebe1c4c51..e719b9a77 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -135,7 +135,6 @@ public abstract class FlinkTableSinkBase
                                     new DataStream<>(
                                             
dataStream.getExecutionEnvironment(),
                                             dataStream.getTransformation()))
-                            .inputBounded(context.isBounded())
                             .clusteringIfPossible(
                                     conf.get(CLUSTERING_COLUMNS),
                                     conf.get(CLUSTERING_STRATEGY),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
index d7b8c76cf..b670b905d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowUnawareBucketSink.java
@@ -32,9 +32,8 @@ public class RowUnawareBucketSink extends 
UnawareBucketSink<InternalRow> {
             FileStoreTable table,
             Map<String, String> overwritePartitions,
             LogSinkFunction logSinkFunction,
-            Integer parallelism,
-            boolean boundedInput) {
-        super(table, overwritePartitions, logSinkFunction, parallelism, 
boundedInput);
+            Integer parallelism) {
+        super(table, overwritePartitions, logSinkFunction, parallelism);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
index c4ac99906..7c79d53b6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketSink.java
@@ -42,19 +42,16 @@ public abstract class UnawareBucketSink<T> extends 
FlinkWriteSink<T> {
     protected final LogSinkFunction logSinkFunction;
 
     @Nullable protected final Integer parallelism;
-    protected final boolean boundedInput;
 
     public UnawareBucketSink(
             FileStoreTable table,
             @Nullable Map<String, String> overwritePartitions,
             LogSinkFunction logSinkFunction,
-            @Nullable Integer parallelism,
-            boolean boundedInput) {
+            @Nullable Integer parallelism) {
         super(table, overwritePartitions);
         this.table = table;
         this.logSinkFunction = logSinkFunction;
         this.parallelism = parallelism;
-        this.boundedInput = boundedInput;
     }
 
     @Override
@@ -69,7 +66,7 @@ public abstract class UnawareBucketSink<T> extends 
FlinkWriteSink<T> {
                                 .get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
         // if enable compaction, we need to add compaction topology to this job
-        if (enableCompaction && isStreamingMode && !boundedInput) {
+        if (enableCompaction && isStreamingMode) {
             written =
                     written.transform(
                                     "Compact Coordinator: " + table.name(),

Reply via email to