This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push: new 4a7c5d75f [flink] Fix that compact actions haven't handled scan parallelism (#3077) 4a7c5d75f is described below commit 4a7c5d75fc33888a9bc8aebfc889103dd8be6352 Author: yuzelin <33053040+yuze...@users.noreply.github.com> AuthorDate: Fri Mar 22 18:52:52 2024 +0800 [flink] Fix that compact actions haven't handled scan parallelism (#3077) --- .../paimon/flink/source/CompactorSourceBuilder.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java index 70b8a772a..e963c92fc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java @@ -118,11 +118,18 @@ public class CompactorSourceBuilder { BucketsTable bucketsTable = new BucketsTable(table, isContinuous); RowType produceType = bucketsTable.rowType(); - return env.fromSource( - buildSource(bucketsTable), - WatermarkStrategy.noWatermarks(), - tableIdentifier + "-compact-source", - InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType))); + DataStreamSource<RowData> dataStream = + env.fromSource( + buildSource(bucketsTable), + WatermarkStrategy.noWatermarks(), + tableIdentifier + "-compact-source", + InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType))); + Integer parallelism = + Options.fromMap(table.options()).get(FlinkConnectorOptions.SCAN_PARALLELISM); + if (parallelism != null) { + dataStream.setParallelism(parallelism); + } + return dataStream; } private Map<String, String> streamingCompactOptions() {