This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bf87a2ab9 [flink] Support setting upper limit for inferring scan
parallelism for source (#2206)
bf87a2ab9 is described below
commit bf87a2ab91f6235380a755532883c23dcad51491
Author: Yongming Zhang <[email protected]>
AuthorDate: Thu Nov 2 13:53:17 2023 +0800
[flink] Support setting upper limit for inferring scan parallelism for
source (#2206)
---
docs/content/how-to/querying-tables.md | 8 +++++++-
.../generated/flink_connector_configuration.html | 6 ++++++
.../org/apache/paimon/flink/FlinkConnectorOptions.java | 7 +++++++
.../org/apache/paimon/flink/source/DataTableSource.java | 4 ++++
.../org/apache/paimon/flink/ReadWriteTableITCase.java | 17 +++++++++++++++++
5 files changed, 41 insertions(+), 1 deletion(-)
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index f45dfb46e..5064302fe 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -350,7 +350,7 @@ commits of `OVERWRITE`, you can configure
`streaming-read-overwrite`.
{{< tab "Flink" >}}
By default, the parallelism of batch reads is the same as the number of
splits, while the parallelism of stream
-reads is the same as the number of buckets.
+reads is the same as the number of buckets, but not greater than
`scan.infer-parallelism.max`.
Disable `scan.infer-parallelism`, global parallelism will be used for reads.
@@ -371,6 +371,12 @@ You can also manually specify the parallelism from
`scan.parallelism`.
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>If it is false, parallelism of source are set by global
parallelism. Otherwise, source parallelism is inferred from splits number
(batch mode) or bucket number(streaming mode).</td>
+ </tr>
+ <tr>
+ <td><h5>scan.infer-parallelism.max</h5></td>
+ <td style="word-wrap: break-word;">1024</td>
+ <td>Integer</td>
+ <td>If scan.infer-parallelism is true, limit the parallelism of
source through this option.</td>
</tr>
<tr>
<td><h5>scan.parallelism</h5></td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index f5bdf73ed..b57fa13ab 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -68,6 +68,12 @@ under the License.
<td>Boolean</td>
<td>If it is false, parallelism of source are set by global
parallelism. Otherwise, source parallelism is inferred from splits number
(batch mode) or bucket number(streaming mode).</td>
</tr>
+ <tr>
+ <td><h5>scan.infer-parallelism.max</h5></td>
+ <td style="word-wrap: break-word;">1024</td>
+ <td>Integer</td>
+ <td>If scan.infer-parallelism is true, limit the parallelism of
source through this option.</td>
+ </tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 551f541fd..33d8cecdb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -120,6 +120,13 @@ public class FlinkConnectorOptions {
"If it is false, parallelism of source are set by
global parallelism."
+ " Otherwise, source parallelism is
inferred from splits number (batch mode) or bucket number(streaming mode).");
+ public static final ConfigOption<Integer> INFER_SCAN_MAX_PARALLELISM =
+ ConfigOptions.key("scan.infer-parallelism.max")
+ .intType()
+ .defaultValue(1024)
+ .withDescription(
+ "If scan.infer-parallelism is true, limit the
parallelism of source through this option.");
+
@Deprecated
@ExcludeFromDocumentation("Deprecated")
public static final ConfigOption<Duration>
CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index db5b10cae..b714dfe2f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -243,6 +243,10 @@ public class DataTableSource extends FlinkTableSource {
parallelism = Math.max(1, parallelism);
}
+ parallelism =
+ Math.min(
+ parallelism,
+
options.get(FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM));
}
return sourceBuilder.withParallelism(parallelism).withEnv(env).build();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 23246dcc9..5a634fa2e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -78,6 +78,7 @@ import static
org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static
org.apache.paimon.flink.AbstractFlinkTableFactory.buildPaimonTable;
+import static
org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_MAX_PARALLELISM;
import static
org.apache.paimon.flink.FlinkConnectorOptions.INFER_SCAN_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_PARALLELISM;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
@@ -1137,6 +1138,22 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
})))
.isEqualTo(3);
+ // when scan.infer-parallelism.max less than infer parallelism, the
parallelism is
+ // scan.infer-parallelism.max
+ assertThat(
+ sourceParallelism(
+ buildQueryWithTableOptions(
+ table,
+ "*",
+ "",
+ new HashMap<String, String>() {
+ {
+
put(INFER_SCAN_PARALLELISM.key(), "true");
+
put(INFER_SCAN_MAX_PARALLELISM.key(), "1");
+ }
+ })))
+ .isEqualTo(1);
+
// for streaming mode
assertThat(
sourceParallelismStreaming(