This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8ac121aa6 [flink] Rename to setParallelismIfAdaptiveConflict in
FlinkSinkBuilder
8ac121aa6 is described below
commit 8ac121aa6146fdf6393a52c6a1ab2a5333257ba5
Author: Jingsong <[email protected]>
AuthorDate: Tue Sep 24 12:02:38 2024 +0800
[flink] Rename to setParallelismIfAdaptiveConflict in FlinkSinkBuilder
---
.../java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java | 11 +++++------
1 file changed, 5 insertions(+), 6 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index fb9cb1959..64c8c3963 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -55,6 +55,7 @@ import static
org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
import static
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
import static
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
+import static org.apache.paimon.flink.sink.FlinkSink.isStreaming;
import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
import static org.apache.paimon.flink.sorter.TableSorter.OrderType.HILBERT;
import static org.apache.paimon.flink.sorter.TableSorter.OrderType.ORDER;
@@ -144,7 +145,7 @@ public class FlinkSinkBuilder {
return this;
}
checkState(input != null, "The input stream should be specified
earlier.");
- if (FlinkSink.isStreaming(input) ||
!table.bucketMode().equals(BUCKET_UNAWARE)) {
+ if (isStreaming(input) || !table.bucketMode().equals(BUCKET_UNAWARE)) {
LOG.warn(
"Clustering is enabled; however, it has been skipped as "
+ "it only supports the bucket unaware table
without primary keys and "
@@ -210,7 +211,7 @@ public class FlinkSinkBuilder {
/** Build {@link DataStreamSink}. */
public DataStreamSink<?> build() {
- parallelism = checkAndUpdateParallelism(input, parallelism);
+ setParallelismIfAdaptiveConflict();
input = trySortInput(input);
DataStream<InternalRow> input = mapToInternalRow(this.input,
table.rowType());
if (table.coreOptions().localMergeEnabled() &&
table.schema().primaryKeys().size() > 0) {
@@ -286,10 +287,10 @@ public class FlinkSinkBuilder {
return input;
}
- private Integer checkAndUpdateParallelism(DataStream<?> input, Integer
parallelism) {
+ private void setParallelismIfAdaptiveConflict() {
try {
boolean parallelismUndefined = parallelism == null || parallelism
== -1;
- boolean isStreaming = FlinkSink.isStreaming(input);
+ boolean isStreaming = isStreaming(input);
boolean isAdaptiveParallelismEnabled =
AdaptiveParallelism.isEnabled(input.getExecutionEnvironment());
boolean writeMCacheEnabled =
table.coreOptions().writeManifestCache().getBytes() > 0;
@@ -325,10 +326,8 @@ public class FlinkSinkBuilder {
messages, parallelismSource);
LOG.warn(msg);
}
- return parallelism;
} catch (NoClassDefFoundError ignored) {
// before 1.17, there is no adaptive parallelism
- return parallelism;
}
}
}