This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bf87a2ab9 [flink] Support setting upper limit for inferring scan 
parallelism for source (#2206)
bf87a2ab9 is described below

commit bf87a2ab91f6235380a755532883c23dcad51491
Author: Yongming Zhang <[email protected]>
AuthorDate: Thu Nov 2 13:53:17 2023 +0800

    [flink] Support setting upper limit for inferring scan parallelism for 
source (#2206)
---
 docs/content/how-to/querying-tables.md                  |  8 +++++++-
 .../generated/flink_connector_configuration.html        |  6 ++++++
 .../org/apache/paimon/flink/FlinkConnectorOptions.java  |  7 +++++++
 .../org/apache/paimon/flink/source/DataTableSource.java |  4 ++++
 .../org/apache/paimon/flink/ReadWriteTableITCase.java   | 17 +++++++++++++++++
 5 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index f45dfb46e..5064302fe 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -350,7 +350,7 @@ commits of `OVERWRITE`, you can configure 
`streaming-read-overwrite`.
 {{< tab "Flink" >}}
 
 By default, the parallelism of batch reads is the same as the number of 
splits, while the parallelism of stream 
-reads is the same as the number of buckets.
+reads is the same as the number of buckets, but not greater than 
`scan.infer-parallelism.max`.
 
 Disable `scan.infer-parallelism`, global parallelism will be used for reads.
 
@@ -371,6 +371,12 @@ You can also manually specify the parallelism from 
`scan.parallelism`.
             <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
             <td>If it is false, parallelism of source are set by global 
parallelism. Otherwise, source parallelism is inferred from splits number 
(batch mode) or bucket number(streaming mode).</td>
+        </tr>
+         <tr>
+            <td><h5>scan.infer-parallelism.max</h5></td>
+            <td style="word-wrap: break-word;">1024</td>
+            <td>Integer</td>
+            <td>If scan.infer-parallelism is true, limit the parallelism of 
source through this option.</td>
         </tr>
         <tr>
             <td><h5>scan.parallelism</h5></td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index f5bdf73ed..b57fa13ab 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -68,6 +68,12 @@ under the License.
             <td>Boolean</td>
             <td>If it is false, parallelism of source are set by global 
parallelism. Otherwise, source parallelism is inferred from splits number 
(batch mode) or bucket number(streaming mode).</td>
         </tr>
+        <tr>
+            <td><h5>scan.infer-parallelism.max</h5></td>
+            <td style="word-wrap: break-word;">1024</td>
+            <td>Integer</td>
+            <td>If scan.infer-parallelism is true, limit the parallelism of 
source through this option.</td>
+        </tr>
         <tr>
             <td><h5>scan.parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 551f541fd..33d8cecdb 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -120,6 +120,13 @@ public class FlinkConnectorOptions {
                             "If it is false, parallelism of source are set by 
global parallelism."
                                     + " Otherwise, source parallelism is 
inferred from splits number (batch mode) or bucket number(streaming mode).");
 
+    public static final ConfigOption<Integer> INFER_SCAN_MAX_PARALLELISM =
+            ConfigOptions.key("scan.infer-parallelism.max")
+                    .intType()
+                    .defaultValue(1024)
+                    .withDescription(
+                            "If scan.infer-parallelism is true, limit the 
parallelism of source through this option.");
+
     @Deprecated
     @ExcludeFromDocumentation("Deprecated")
     public static final ConfigOption<Duration> 
CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index db5b10cae..b714dfe2f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -243,6 +243,10 @@ public class DataTableSource extends FlinkTableSource {
 
                 parallelism = Math.max(1, parallelism);
             }
+            parallelism =
+                    Math.min(
+                            parallelism,
+                            
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
         }
 
         return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 23246dcc9..5a634fa2e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -78,6 +78,7 @@ import static 
org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
 import static 
org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable;
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_PARALLELISM;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
@@ -1137,6 +1138,22 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                                         })))
                 .isEqualTo(3);
 
+        // when scan.infer-parallelism.max less than infer parallelism, the 
parallelism is
+        // scan.infer-parallelism.max
+        assertThat(
+                        sourceParallelism(
+                                buildQueryWithTableOptions(
+                                        table,
+                                        "*",
+                                        "",
+                                        new HashMap<String, String>() {
+                                            {
+                                                
put(INFER_SCAN_PARALLELISM.key(), "true");
+                                                
put(INFER_SCAN_MAX_PARALLELISM.key(), "1");
+                                            }
+                                        })))
+                .isEqualTo(1);
+
         // for streaming mode
         assertThat(
                         sourceParallelismStreaming(

Reply via email to