wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896687126


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we 
introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a 
close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users. 
For batch jobs, a small parallelism may result in long execution time and big 
failover regression. While an unnecessary large parallelism may result in 
resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator 
needs to process. However, It can be hard to predict data volume to be 
processed by a job because it can be different everyday. And it can be harder 
or even impossible (due to complex operators or UDFs) to predict data volume to 
be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 
1.15. The adaptive batch scheduler can automatically decide parallelism for an 
operator according to the size of its consumed datasets. Here are the benefits 
the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is 
particularly beneficial for SQL jobs which can only be set with a global 
parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying 
volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the 
[execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode)
 unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). 
Currently, the adaptive batch scheduler only supports batch jobs whose shuffle 
mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the 
upper bounds and lower bounds of tuned parallelisms, to specify expected data 
volume to process by each operator, and to specify the default parallelism of 
sources. More details can be found in the [feature documentation 
page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for 
operators whose parallelism is not set (which means the parallelism is -1).To 
leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on 
`StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To 
automatically decide parallelism of operators, we introduced the following 
changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper 
parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of 
job vertices to be decided lazily. The execution graph starts with an empty 
execution topology and then gradually attaches the vertices during job 
execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic 
execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img 
src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png"
 width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size 
of input results, so the scheduler needs to know the sizes of result partitions 
produced by tasks. We introduced a numBytesProduced counter to record the size 
of each produced result partition, the accumulated result of the counter will 
be sent to the scheduler when tasks finish. 
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper 
parallelisms for job vertices according to the sizes of their consumed results. 
The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast 
result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result 
sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the 
parallelism calculation.

Review Comment:
   I changed it to `maximum ratio`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to