zhuzhurk commented on code in PR #24184:
URL: https://github.com/apache/flink/pull/24184#discussion_r1464845836


##########
docs/content/docs/deployment/elastic_scaling.md:
##########
@@ -230,6 +230,22 @@ In addition, there are several related configuration 
options that may need adjus
   - Set `parallelism.default: -1`.
   - Don't call `setParallelism()` on `ExecutionEnvironment`.
 
+#### Enable dynamic parallelism inference support for Sources
+If your job uses a custom {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java"
 name="Source" >}},
+you need to implement the interface {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java"
 name="DynamicParallelismInference" >}} in your Source class。

Review Comment:
   。 -> .



##########
docs/content.zh/docs/deployment/elastic_scaling.md:
##########
@@ -220,6 +220,20 @@ Adaptive Batch Scheduler 是一种可以自动调整执行计划的批作业调
   - 配置 `parallelism.default: -1`
   - 不要通过 `ExecutionEnvironment` 的 `setParallelism()` 方法来指定并行度
 
+#### 让 Source 支持动态并行度推导
+如果你的作业有用到自定义 {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java"
 name="Source" >}},
+你需要让 Source 实现接口 {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java"
 name="DynamicParallelismInference" >}}。
+```java
+public interface DynamicParallelismInference {
+    int inferParallelism(Context context);
+}
+```
+其中 Context 会提供可推导并行度上界、期望每个任务平均处理的数据量大小、动态过滤信息来协助并行度推导。
+Adaptive Batch Scheduler 将会在调度 Source 节点之前调用上述接口,需注意实现中应尽量避免高耗时的操作。
+
+若 Source 
未实现上述接口,[`execution.batch.adaptive.auto-parallelism.default-source-parallelism`]({{<
 ref "docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-default-source-paralle) 将会作为 
Source 节点的并行度。
+
+需注意,Source 动态并行度推导也只会为用户未指定并行度的 Source 算子推导并行度。

Review Comment:
   A new empty line should be added below.



##########
docs/content/docs/deployment/elastic_scaling.md:
##########
@@ -230,6 +230,22 @@ In addition, there are several related configuration 
options that may need adjus
   - Set `parallelism.default: -1`.
   - Don't call `setParallelism()` on `ExecutionEnvironment`.
 
+#### Enable dynamic parallelism inference support for Sources
+If your job uses a custom {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java"
 name="Source" >}},
+you need to implement the interface {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java"
 name="DynamicParallelismInference" >}} in your Source class。
+```java
+public interface DynamicParallelismInference {
+    int inferParallelism(Context context);
+}
+```
+The Context will provide the upper bound for the inferred parallelism, the 
expected average data size to be processed by each task, and dynamic filtering 
information to assist with parallelism inference.
+
+The Adaptive Batch Scheduler will invoke the interface before scheduling the 
source vertices, and it should be noted that implementations should avoid 
time-consuming operations as much as possible.
+
+If the Source does not implement the interface, the configuration setting 
[`execution.batch.adaptive.auto-parallelism.default-source-parallelism`]({{< 
ref "docs/deployment/config" 
>}}#execution-batch-adaptive-auto-parallelism-default-source-paralle) will be 
used as the parallelism of the source vertices.
+
+Note that the dynamic parallelism inference for Sources only decides the 
parallelism for source operators which do not already have a specified 
parallelism.

Review Comment:
   `for Sources` is a bit redundant and can be removed.



##########
docs/content/docs/deployment/elastic_scaling.md:
##########
@@ -230,6 +230,22 @@ In addition, there are several related configuration 
options that may need adjus
   - Set `parallelism.default: -1`.
   - Don't call `setParallelism()` on `ExecutionEnvironment`.
 
+#### Enable dynamic parallelism inference support for Sources
+If your job uses a custom {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java"
 name="Source" >}},

Review Comment:
   If your job uses a custom `Source`, you need to implement the interface 
`DynamicParallelismInference`...
   ->
   New `Source` can implement `DynamicParallelismInference` to enable dynamic 
parallelism inference.
   
   I think it's better to state it as below. Because dynamic parallelism 
inference is not a requirement but just an option for users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to