zhuzhurk commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r899708483


##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,204 @@
+---
+layout: post
+title: "Adaptive Batch Scheduler: Automatically Decide Parallelism for Flink 
Batch Jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+  name: "Lijie Wang"
+- Zhu Zhu:
+  name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we 
introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a 
close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not an easy work for many users. 
For batch jobs, a small parallelism may result in long execution time and big 
failover regression. While an unnecessary large parallelism may result in 
resource waste and more overhead cost in task deployment and network shuffling. 
+
+To decide a proper parallelism, one needs to know how much data each operator 
needs to process. However, It can be hard to predict data volume to be 
processed by a job because it can be different everyday. And it can be harder 
or even impossible (due to complex operators or UDFs) to predict data volume to 
be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink 
1.15. The adaptive batch scheduler can automatically decide parallelism for an 
operator according to the size of its consumed datasets. Here are the benefits 
the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is 
particularly beneficial for SQL jobs which can only be set with a global 
parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying 
volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the 
[execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode)
 unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value). 
Currently, the adaptive batch scheduler only supports batch jobs whose shuffle 
mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the 
upper bounds and lower bounds of tuned parallelisms, to specify expected data 
volume to process by each operator, and to specify the default parallelism of 
sources. More details can be found in the [feature documentation 
page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler only automatically decides parallelism for 
operators whose parallelism is not set (which means the parallelism is -1). To 
leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1` for all jobs.
+- Set `table.exec.resource.default-parallelism: -1` for SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on 
`StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. Before 
that, we need to briefly introduce some concepts involved:
+
+- 
[JobVertex](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java)
 and 
[JobGraph](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java):
 A job vertex is an operator chain formed by chaining several operators 
together for better performance. The job graph is a data flow consisting of job 
vertices.
+- 
[ExecutionVertex](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java)
 and 
[ExecutionGraph](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java):
 An execution vertex represents a parallel subtask of a job vertex, which will 
eventually be instantiated as a physical task. For example, a job vertex with a 
parallelism of 100 will generate 100 execution vertices. The execution graph is 
the physical execution topology consisting of all execution vertices.
+
+More details about the above concepts can be found in the [Flink 
documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/internals/job_scheduling/#jobmanager-data-structures).
 Note that the adaptive batch scheduler decides the parallelism of operators by 
deciding the parallelism of the job vertices to which they belong. To 
automatically decide parallelism of job vertices, we introduced the following 
changes:

Review Comment:
   to which they belong -> which consist of these operators



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to