wanglijie95 commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r896699847
##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,198 @@
+---
+layout: post
+title: "Automatically decide parallelism for Flink batch jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+ name: "Lijie Wang"
+- Zhu Zhu:
+ name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we
introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a
close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not easy work for many users.
For batch jobs, a small parallelism may result in long execution time and big
failover regression. While an unnecessary large parallelism may result in
resource waste and more overhead cost in task deployment and network shuffling.
+
+To decide a proper parallelism, one needs to know how much data each operator
needs to process. However, It can be hard to predict data volume to be
processed by a job because it can be different everyday. And it can be harder
or even impossible (due to complex operators or UDFs) to predict data volume to
be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink
1.15. The adaptive batch scheduler can automatically decide parallelism for an
operator according to the size of its consumed datasets. Here are the benefits
the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is
particularly beneficial for SQL jobs which can only be set with a global
parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying
volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the
[execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode)
unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value).
Currently, the adaptive batch scheduler only supports batch jobs whose shuffle
mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the
upper bounds and lower bounds of tuned parallelisms, to specify expected data
volume to process by each operator, and to specify the default parallelism of
sources. More details can be found in the [feature documentation
page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler will only automatically decide parallelism for
operators whose parallelism is not set (which means the parallelism is -1).To
leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1`
+- Set `table.exec.resource.default-parallelism: -1` in SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on
`StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. To
automatically decide parallelism of operators, we introduced the following
changes:
+
+1. Enables the scheduler to collect sizes of finished datasets.
+2. Introduces a new component(VertexParallelismDecider) to compute proper
parallelisms for job vertices according to the sizes of their consumed results.
+3. Enables to dynamically build up ExecutionGraph to allow the parallelisms of
job vertices to be decided lazily. The execution graph starts with an empty
execution topology and then gradually attaches the vertices during job
execution.
+4. Introduces the adaptive batch scheduler to update and schedule the dynamic
execution graph.
+
+The details will be introduced in the following sections.
+
+<center>
+<br/>
+<img
src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/1-overall-structure.png"
width="60%"/>
+<br/>
+Fig. 1 - The overall structure of automatically deciding parallelism
+</center>
+
+<br/>
+
+## Collect sizes of consumed datasets
+
+The adaptive batch scheduler decides the parallelism of vertices by the size
of input results, so the scheduler needs to know the sizes of result partitions
produced by tasks. We introduced a numBytesProduced counter to record the size
of each produced result partition, the accumulated result of the counter will
be sent to the scheduler when tasks finish.
+
+## Decide proper parallelisms for job vertices
+
+We introduced a new component(VertexParallelismDecider) to compute proper
parallelisms for job vertices according to the sizes of their consumed results.
The computation algorithm is as follows:
+Suppose:
+
+- ***V*** is the bytes of data the user expects to be processed by each task.
+- ***totalBytes<sub>non-broadcast</sub>*** is the sum of the non-broadcast
result sizes consumed by this job vertex.
+- ***totalBytes<sub>broadcast</sub>*** is the sum of the broadcast result
sizes consumed by this job vertex.
+- ***broadcastCapRatio*** is the cap ratio of broadcast bytes that affects the
parallelism calculation.
+- ***normalize(***x***)*** is a function that round ***x*** to the closest
power of 2.
+
+then the parallelism of this job vertex ***P*** will be:
+<center>
+<img
src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/parallelism-formula.png"
width="60%"/>
+</center>
+
+Note that we introduced two special treatment in the above formula :
+
+- [Limit the cap ratio of broadcast
bytes](#limit-the-cap-ratio-of-broadcast-bytes)
+- [Normalize the parallelism to the closest power of
2](#normalize-the-parallelism-to-the-closest-power-of-2)
+
+However, the above formula cannot be used to decide the parallelism of the
source vertices, because the source vertices have no input. To solve it, we
introduced the configuration option
`jobmanager.adaptive-batch-scheduler.default-source-parallelism` to allow users
to manually configure the parallelism of source vertices. Note that not all
data sources need this option, because some data sources can automatically
infer parallelism (For example, HiveTableSource, see
[HiveParallelismInference](https://github.com/apache/flink/blob/release-1.15/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveParallelismInference.java)
for more detail). For these sources, it is recommended to decide parallelism
by themselves.
+
+### Limit the cap ratio of broadcast bytes
+As you can see, we limit the cap ratio of broadcast bytes that affects the
parallelism calculation to ***broadcastCapRatio***. That is, the non-broadcast
bytes processed by each task is at least ***(1-broadcastCapRatio) * V***. If
not so,when the total broadcast bytes is close to ***V***, even if the total
non-broadcast bytes is very small, it may cause a large parallelism, which is
unnecessary and may lead to resource waste and large task deployment overhead.
+
+Generally, the broadcast dataset is usually relatively small against the other
co-processed datasets. Therefore, we set the cap ratio to 0.5 by default
because we usually expect the broadcast bytes to be smaller than non-broadcast
bytes. The value is hard coded in the first version, and we may make it
configurable later.
+
+
+### Normalize the parallelism to the closest power of 2
+The normalize is to avoid introducing data skew. To better understand this
section, we suggest you read the [Flexible subpartition
mapping](#flexible-subpartition-mapping) section first.
+
+Taking Fig. 4 (b) as example, A1/A2 produces 4 subpartitions, and the decided
parallelism of B is 3. In this case, B1 will consume 1 subpartition, B2 will
consume 1 subpartition, and B3 will consume 2 subpartitions. We assume that
subpartitions have the same amount of data, which means B3 will consume twice
the data of other tasks, data skew is introduced due to the subpartition
mapping.
+
+To solve this problem, we need to make the subpartitions evenly consumed by
downstream tasks, which means the number of subpartitions should be a multiple
of the number of downstream tasks. For simplicity, we require the
user-specified max parallelism to be 2<sup>N</sup>, and then adjust the
calculated parallelism to a closest 2<sup>M</sup> (M <= N), so that we can
guarantee that subpartitions will be evenly consumed by downstream tasks.
+
+Note that this is a temporary solution, the ultimate solution would be the
[Auto-rebalancing of workloads](#auto-rebalancing-of-workloads), which may come
soon.
+
+## Build up execution graph dynamically
+Before Flink 1.15, the execution graph was fully built in a static way before
starting scheduling. To allow parallelisms of job vertices to be decided
lazily, the execution graph must be able to be built up dynamically.
+
+### Create execution vertices and execution edges lazily
+A dynamic execution graph means that a Flink job starts with an empty
execution topology, and then gradually attaches vertices during job execution,
as shown in Fig. 2.
+
+The execution topology consists of execution vertices and execution edges. The
execution vertices will be created and attached to the execution topology only
when:
+
+- The parallelism of the corresponding job vertex is decided.
+- All upstream execution vertices are already attached.
+
+A decided parallelism of the job vertex is needed so that Flink knows how many
execution vertices should be created. Upstream execution vertices need to be
attached first so that Flink can connect the newly created execution vertices
to the upstream vertices with execution edges.
+
+<center>
+<br/>
+<img
src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/2-dynamic-graph.png"
width="90%"/>
+<br/>
+Fig. 2 - Build up execution graph dynamically
+</center>
+
+<br/>
+
+### Flexible subpartition mapping
+Before Flink 1.15, when deploying a task, Flink needs to know the parallelism
of its consumer job vertex. This is because consumer vertex parallelism is used
to decide the number of subpartitions produced by each upstream task. The
reason behind that is, for one result partition, different subpartitions serve
different consumer execution vertices. More specifically, one consumer
execution vertex only consumes data from subpartition with the same index.
+
+Taking Fig. 3 as example, parallelism of the consumer B is 2, so the result
partition produced by A1/A2 should contain 2 subpartitions, the subpartition
with index 0 serves B1, and the subpartition with index 1 serves B2.
+
+<center>
+<br/>
+<img
src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/3-static-graph-subpartition-mapping.png"
width="30%"/>
+<br/>
+Fig. 3 - How subpartitions serve consumer execution vertices with static
execution graph
+</center>
+
+<br/>
+
+But obviously, this doesn't work for dynamic graphs, because when a job vertex
is deployed, the parallelism of its consumer job vertices may not have been
decided yet. To enable Flink to work in this case, we need a way to allow a job
vertex to run without knowing the parallelism of its consumer job vertices(or
rather, we need a way to allow execution vertices to run without knowing the
number of their consumer execution vertices).
+
+To achieve this goal, we can set the number of subpartitions to be the max
parallelism of the consumer job vertex. Then when the consumer execution
vertices are deployed, they should be assigned with a subpartition range to
consume. Suppose N is the number of consumer execution vertices and P is the
number of subpartitions. For the kth consumer execution vertex, the consumed
subpartition range should be:
+
+<center>
+<img
src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/range-formula.png"
width="55%"/>
+</center>
+
+Taking Fig. 4 as example, the max parallelism of B is 4, so A1/A2 have 4
subpartitions. And then if the decided parallelism of B is 2, then the
subpartitions mapping will be Fig. 4 (a), if the decided parallelism of B is 3,
then the subpartitions mapping will be Fig. 4 (b).
+
+<center>
+<br/>
+<img
src="{{site.baseurl}}/img/blog/2022-06-06-adaptive-batch-scheduler/4-dynamic-graph-subpartition-mapping.png"
width="75%"/>
+<br/>
+Fig. 4 - How subpartitions serve consumer execution vertices with dynamic graph
+</center>
+
+<br/>
+
+## Update and schedule the dynamic execution graph
+The adaptive batch scheduler scheduling is similar to the default scheduler,
the only difference is that an empty dynamic execution graph will be generated
initially and vertices will be attached later. Before handling any scheduling
event, the scheduler will try deciding the parallelisms for job vertices, and
then initialize them to generate execution vertices, connecting execution
edges, and update the execution graph.
+
+The scheduler will try to decide the parallelism for all job vertices before
each scheduling, and the parallelism decision will be made for each job vertex
in topological order:
Review Comment:
I change it to `before handling any scheduling event`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]