zhuzhurk commented on code in PR #546:
URL: https://github.com/apache/flink-web/pull/546#discussion_r899708483
##########
_posts/2022-06-06-adaptive-batch-scheduler.md:
##########
@@ -0,0 +1,204 @@
+---
+layout: post
+title: "Adaptive Batch Scheduler: Automatically Decide Parallelism for Flink
Batch Jobs"
+date: 2022-06-06T08:00:00.000Z
+authors:
+- Lijie Wang:
+ name: "Lijie Wang"
+- Zhu Zhu:
+ name: "Zhu Zhu"
+excerpt: To automatically decide parallelism for Flink batch jobs, we
introduced adaptive batch scheduler in Flink 1.15. In this post, we'll take a
close look at the design & implementation details.
+
+---
+
+{% toc %}
+
+# Introduction
+
+Deciding proper parallelisms for operators is not an easy work for many users.
For batch jobs, a small parallelism may result in long execution time and big
failover regression. While an unnecessary large parallelism may result in
resource waste and more overhead cost in task deployment and network shuffling.
+
+To decide a proper parallelism, one needs to know how much data each operator
needs to process. However, It can be hard to predict data volume to be
processed by a job because it can be different everyday. And it can be harder
or even impossible (due to complex operators or UDFs) to predict data volume to
be processed by each operator.
+
+To solve this problem, we introduced the adaptive batch scheduler in Flink
1.15. The adaptive batch scheduler can automatically decide parallelism for an
operator according to the size of its consumed datasets. Here are the benefits
the adaptive batch scheduler can bring:
+
+1. Batch job users can be relieved from parallelism tuning.
+2. Parallelism tuning is fine grained considering different operators. This is
particularly beneficial for SQL jobs which can only be set with a global
parallelism previously.
+3. Parallelism tuning can better fit consumed datasets which have a varying
volume size every day.
+
+# Get Started
+
+To automatically decide parallelism for operators, you need to:
+
+1. Configure to use adaptive batch scheduler.
+2. Set the parallelism of operators to -1.
+
+
+## Configure to use adaptive batch scheduler
+
+To use adaptive batch scheduler, you need to set configurations as below:
+
+- Set `jobmanager.scheduler: AdaptiveBatch`.
+- Leave the
[execution.batch-shuffle-mode]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/config/#execution-batch-shuffle-mode)
unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value).
Currently, the adaptive batch scheduler only supports batch jobs whose shuffle
mode is `ALL-EXCHANGES-BLOCKING`.
+
+In addition, there are several related configuration options to control the
upper bounds and lower bounds of tuned parallelisms, to specify expected data
volume to process by each operator, and to specify the default parallelism of
sources. More details can be found in the [feature documentation
page]({{site.DOCS_BASE_URL}}flink-docs-release-1.15/docs/deployment/elastic_scaling/#configure-to-use-adaptive-batch-scheduler).
+
+## Set the parallelism of operators to -1
+
+The adaptive batch scheduler only automatically decides parallelism for
operators whose parallelism is not set (which means the parallelism is -1). To
leave parallelism unset, you should configure as follows:
+
+- Set `parallelism.default: -1` for all jobs.
+- Set `table.exec.resource.default-parallelism: -1` for SQL jobs.
+- Don't call `setParallelism()` for operators in DataStream/DataSet jobs.
+- Don't call `setParallelism()` on
`StreamExecutionEnvironment/ExecutionEnvironment` in DataStream/DataSet jobs.
+
+
+# Implementation Details
+
+In this section, we will elaborate the details of the implementation. Before
that, we need to briefly introduce some concepts involved:
+
+-
[JobVertex](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java)
and
[JobGraph](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java):
A job vertex is an operator chain formed by chaining several operators
together for better performance. The job graph is a data flow consisting of job
vertices.
+-
[ExecutionVertex](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java)
and
[ExecutionGraph](https://github.com/apache/flink/blob/release-1.15/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java):
An execution vertex represents a parallel subtask of a job vertex, which will
eventually be instantiated as a physical task. For example, a job vertex with a
parallelism of 100 will generate 100 execution vertices. The execution graph is
the physical execution topology consisting of all execution vertices.
+
+More details about the above concepts can be found in the [Flink
documentation](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/internals/job_scheduling/#jobmanager-data-structures).
Note that the adaptive batch scheduler decides the parallelism of operators by
deciding the parallelism of the job vertices to which they belong. To
automatically decide parallelism of job vertices, we introduced the following
changes:
Review Comment:
to which they belong -> which consist of these operators
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]