This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ac121aa6 [flink] Rename to setParallelismIfAdaptiveConflict in 
FlinkSinkBuilder
8ac121aa6 is described below

commit 8ac121aa6146fdf6393a52c6a1ab2a5333257ba5
Author: Jingsong <[email protected]>
AuthorDate: Tue Sep 24 12:02:38 2024 +0800

    [flink] Rename to setParallelismIfAdaptiveConflict in FlinkSinkBuilder
---
 .../java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java   | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index fb9cb1959..64c8c3963 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -55,6 +55,7 @@ import static 
org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
+import static org.apache.paimon.flink.sink.FlinkSink.isStreaming;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
 import static org.apache.paimon.flink.sorter.TableSorter.OrderType.HILBERT;
 import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ORDER;
@@ -144,7 +145,7 @@ public class FlinkSinkBuilder {
             return this;
         }
         checkState(input != null, "The input stream should be specified 
earlier.");
-        if (FlinkSink.isStreaming(input) || 
!table.bucketMode().equals(BUCKET_UNAWARE)) {
+        if (isStreaming(input) || !table.bucketMode().equals(BUCKET_UNAWARE)) {
             LOG.warn(
                     "Clustering is enabled; however, it has been skipped as "
                             + "it only supports the bucket unaware table 
without primary keys and "
@@ -210,7 +211,7 @@ public class FlinkSinkBuilder {
 
     /** Build {@link DataStreamSink}. */
     public DataStreamSink<?> build() {
-        parallelism = checkAndUpdateParallelism(input, parallelism);
+        setParallelismIfAdaptiveConflict();
         input = trySortInput(input);
         DataStream<InternalRow> input = mapToInternalRow(this.input, 
table.rowType());
         if (table.coreOptions().localMergeEnabled() && 
table.schema().primaryKeys().size() > 0) {
@@ -286,10 +287,10 @@ public class FlinkSinkBuilder {
         return input;
     }
 
-    private Integer checkAndUpdateParallelism(DataStream<?> input, Integer 
parallelism) {
+    private void setParallelismIfAdaptiveConflict() {
         try {
             boolean parallelismUndefined = parallelism == null || parallelism 
== -1;
-            boolean isStreaming = FlinkSink.isStreaming(input);
+            boolean isStreaming = isStreaming(input);
             boolean isAdaptiveParallelismEnabled =
                     
AdaptiveParallelism.isEnabled(input.getExecutionEnvironment());
             boolean writeMCacheEnabled = 
table.coreOptions().writeManifestCache().getBytes() > 0;
@@ -325,10 +326,8 @@ public class FlinkSinkBuilder {
                                 messages, parallelismSource);
                 LOG.warn(msg);
             }
-            return parallelism;
         } catch (NoClassDefFoundError ignored) {
             // before 1.17, there is no adaptive parallelism
-            return parallelism;
         }
     }
 }

Reply via email to