This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a7c5d75f [flink] Fix that compact actions haven't handled scan 
parallelism (#3077)
4a7c5d75f is described below

commit 4a7c5d75fc33888a9bc8aebfc889103dd8be6352
Author: yuzelin <33053040+yuze...@users.noreply.github.com>
AuthorDate: Fri Mar 22 18:52:52 2024 +0800

    [flink] Fix that compact actions haven't handled scan parallelism (#3077)
---
 .../paimon/flink/source/CompactorSourceBuilder.java     | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 70b8a772a..e963c92fc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -118,11 +118,18 @@ public class CompactorSourceBuilder {
 
         BucketsTable bucketsTable = new BucketsTable(table, isContinuous);
         RowType produceType = bucketsTable.rowType();
-        return env.fromSource(
-                buildSource(bucketsTable),
-                WatermarkStrategy.noWatermarks(),
-                tableIdentifier + "-compact-source",
-                
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)));
+        DataStreamSource<RowData> dataStream =
+                env.fromSource(
+                        buildSource(bucketsTable),
+                        WatermarkStrategy.noWatermarks(),
+                        tableIdentifier + "-compact-source",
+                        
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)));
+        Integer parallelism =
+                
Options.fromMap(table.options()).get(FlinkConnectorOptions.SCAN_PARALLELISM);
+        if (parallelism != null) {
+            dataStream.setParallelism(parallelism);
+        }
+        return dataStream;
     }
 
     private Map<String, String> streamingCompactOptions() {

Reply via email to