This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new 1d17dc71cf9 [FLINK-32671] Document Externalized Declararative Resource
Management + minor Elastic Scaling page restructuring.
1d17dc71cf9 is described below
commit 1d17dc71cf98b6540a506c3c9670bbd0b47052a5
Author: David Moravek <[email protected]>
AuthorDate: Mon Oct 23 12:22:12 2023 +0200
[FLINK-32671] Document Externalized Declararative Resource Management +
minor Elastic Scaling page restructuring.
---
docs/content.zh/docs/deployment/elastic_scaling.md | 113 ++++++++++++++------
docs/content/docs/deployment/elastic_scaling.md | 118 ++++++++++++++-------
docs/static/fig/adaptive_scheduler.png | Bin 0 -> 538791 bytes
docs/static/fig/adaptive_scheduler_rescale.png | Bin 0 -> 859268 bytes
4 files changed, 161 insertions(+), 70 deletions(-)
diff --git a/docs/content.zh/docs/deployment/elastic_scaling.md
b/docs/content.zh/docs/deployment/elastic_scaling.md
index 2b7475a1af0..d9b09ba7e8c 100644
--- a/docs/content.zh/docs/deployment/elastic_scaling.md
+++ b/docs/content.zh/docs/deployment/elastic_scaling.md
@@ -25,17 +25,91 @@ under the License.
# 弹性扩缩容
-在 Apache Flink 中,可以通过手动停止 Job,然后从停止时创建的 Savepoint 恢复,最后重新指定并行度的方式来重新扩缩容 Job。
+Historically, the parallelism of a job has been static throughout its
lifecycle and defined once during its submission. Batch jobs couldn't be
rescaled at all, while Streaming jobs could have been stopped with a savepoint
and restarted with a different parallelism.
-这个文档描述了那些可以使 Flink 自动调整并行度的选项。
+This page describes a new class of schedulers that allow Flink to adjust job's
parallelism at runtime, which pushes Flink one step closer to a truly
cloud-native stream processor. The new schedulers are [Adaptive
Scheduler](#adaptive-scheduler) (streaming) and [Adaptive Batch
Scheduler](#adaptive-batch-scheduler) (batch).
-## Reactive 模式
+## Adaptive 调度器
+
+The Adaptive Scheduler can adjust the parallelism of a job based on available
slots. It will automatically reduce the parallelism if not enough slots are
available to run the job with the originally configured parallelism; be it due
to not enough resources being available at the time of submission, or
TaskManager outages during the job execution. If new slots become available the
job will be scaled up again, up to the configured parallelism.
+
+In Reactive Mode (see below) the configured parallelism is ignored and treated
as if it was set to infinity, letting the job always use as many resources as
possible.
+
+One benefit of the Adaptive Scheduler over the default scheduler is that it
can handle TaskManager losses gracefully, since it would just scale down in
these cases.
+
+{{< img src="/fig/adaptive_scheduler.png" >}}
+
+Adaptive Scheduler builds on top of a feature called [Declarative Resource
Management](https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management).
As you can see, instead of asking for the exact number of slots, JobMaster
declares its desired resources (for reactive mode the maximum is set to
infinity) to the ResourceManager, which then tries to fulfill those resources.
+
+{{< img src="/fig/adaptive_scheduler_rescale.png" >}}
+
+When JobMaster gets more resources during the runtime, it will automatically
rescale the job using the latest available savepoint, eliminating the need for
an external orchestration.
+
+Starting from **Flink 1.18.x**, you can re-declare the resource requirements
of a running job using [Externalized Declarative Resource
Management](#externalized-declarative-resource-management), otherwise the
Adaptive Scheduler won't be able to handle cases where the job needs to be
rescaled due to a change in the input rate, or a change in the performance of
the workload.
+
+### Externalized Declarative Resource Management
+
+{{< hint warning >}}
+Externalized Declarative Resource Management is an MVP ("minimum viable
product") feature. The Flink community is actively looking for feedback by
users through our mailing lists. Please check the limitations listed on this
page.
+{{< /hint >}}
+
+{{< hint info >}}
+You can use Externalized Declarative Resource Management with the [Apache
Flink Kubernetes
operator](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/custom-resource/autoscaler/#flink-118-and-in-place-scaling-support)
for a fully-fledged auto-scaling experience.
+{{< /hint >}}
+
+Externalized Declarative Resource Management aims to address two deployment
scenarios:
+1. Adaptive Scheduler on Session Cluster, where multiple jobs can compete for
resources, and you need a finer-grained control over the distribution of
resources between jobs.
+2. Adaptive Scheduler on Application Cluster in combination with Active
Resource Manager (e.g. [Native Kubernetes]({{< ref
"docs/deployment/resource-providers/native_kubernetes" >}})), where you rely on
Flink to "greedily" spawn new TaskManagers, but you still want to leverage
rescaling capabilities as with [Reactive Mode](#reactive-mode).
+
+by introducing a new [REST API endpoint]({{< ref "docs/ops/rest_api"
>}}#jobs-jobid-resource-requirements-1), that allows you to re-declare resource
requirements of a running job, by setting per-vertex parallelism boundaries.
+
+```
+PUT /jobs/<job-id>/resource-requirements
+
+REQUEST BODY:
+{
+ "<first-vertex-id>": {
+ "parallelism": {
+ "lowerBound": 3,
+ "upperBound": 5
+ }
+ },
+ "<second-vertex-id>": {
+ "parallelism": {
+ "lowerBound": 2,
+ "upperBound": 3
+ }
+ }
+}
+```
+
+To a certain extent, the above endpoint could be thought about as a
"re-scaling endpoint" and it introduces an important building block for
building an auto-scaling experience for Flink.
+
+You can manually try this feature out, by navigating the Job overview in the
Flink UI and using up-scale/down-scale buttons in the task list.
+
+### Usage
{{< hint info >}}
-Reactive 模式是一个 MVP (minimum viable product,最小可行产品)特性。目前 Flink
社区正在积极地从邮件列表中获取用户的使用反馈。请注意文中列举的一些局限性。
+If you are using Adaptive Scheduler on a [session cluster]({{< ref
"docs/deployment/overview" >}}/#session-mode), there are no guarantees
regarding the distribution of slots between multiple running jobs in the same
session, in case the cluster doesn't have enough resources. The [External
Declarative Resource Management](#externalized-declarative-resource-management)
can partially mitigate this issue, but it is still recommended to use Adaptive
Scheduler on a [application cluster]({{< re [...]
{{< /hint >}}
-在 Reactive 模式下,Job 会使用集群中所有的资源。当增加 TaskManager 时,Job 会自动扩容。当删除时,就会自动缩容。Flink
会管理 Job 的并行度,始终会尽可能地使用最大值。
+The `jobmanager.scheduler` needs to be set to on the cluster level for the
adaptive scheduler to be used instead of default scheduler.
+
+```yaml
+jobmanager.scheduler: adaptive
+```
+
+The behavior of Adaptive Scheduler is configured by [all configuration options
prefixed with `jobmanager.adaptive-scheduler`]({{< ref
"docs/deployment/config">}}#advanced-scheduling-options) in their name.
+
+### Limitations
+
+- **Streaming jobs only**: The Adaptive Scheduler runs with streaming jobs
only. When submitting a batch job, Flink will use the default scheduler of
batch jobs, i.e. [Adaptive Batch Scheduler](#adaptive-batch-scheduler)
+- **No support for partial failover**: Partial failover means that the
scheduler is able to restart parts ("regions" in Flink's internals) of a failed
job, instead of the entire job. This limitation impacts only recovery time of
embarrassingly parallel jobs: Flink's default scheduler can restart failed
parts, while Adaptive Scheduler will restart the entire job.
+- Scaling events trigger job and task restarts, which will increase the number
of Task attempts.
+-
+## Reactive 模式
+
+Reactive Mode is a special mode for Adaptive Scheduler, that assumes a single
job per-cluster (enforced by the [Application Mode]({{< ref
"docs/deployment/overview" >}}#application-mode)). Reactive Mode configures a
job so that it always uses all resources available in the cluster. Adding a
TaskManager will scale up your job, removing resources will scale it down.
Flink will manage the parallelism of the job, always setting it to the highest
possible values.
当发生扩缩容时,Job 会被重启,并且会从最新的 Checkpoint 中恢复。这就意味着不需要花费额外的开销去创建
Savepoint。当然,所需要重新处理的数据量取决于 Checkpoint 的间隔时长,而恢复的时间取决于状态的大小。
@@ -116,35 +190,6 @@ cp ./examples/streaming/TopSpeedWindowing.jar lib/
[Adaptive 调度器的局限性](#limitations-1) 同样也适用于 Reactive 模式.
-## Adaptive 调度器
-
-{{< hint warning >}}
-只推荐高级用户直接使用 Adaptive 调度器(而不是通过 Reactive 模式使用),因为在一个 Session 集群中对于多个 Job 的 Slot
的分配行为是不确定的。
-{{< /hint >}}
-
-Adaptive 调度器可以基于现有的 Slot 调整 Job 的并行度。它会在 Slot 数目不足时,自动减少并行度。这种情况包括在提交时资源不够,或者在
Job 运行时 TaskManager 不可用。当有新的 Slot 加入时,Job 将会自动扩容至配置的并行度。
-在 Reactive 模式下(详见上文),并行度配置会被忽略,即无限大,使得 Job 尽可能地使用资源。
-你也可以不使用 Reactive 模式而仅使用 Adaptive 调度器,但这种情况会有如下的局限性:
-- 如果你在 Session 集群上使用 Adaptive 调度器,在这个集群中运行的多个 Job,他们间 Slot 的分布是无法保证的。
-
-相比默认的调度器,Adaptive 调度器其中一个优势在于,它能够优雅地处理 TaskManager 丢失所造成的问题,因为对它来说就仅仅是缩容。
-
-### 用法
-
-需要设置如下的配置参数:
-
-- `jobmanager.scheduler: adaptive`:将默认的调度器换成 Adaptive。
-
-Adaptive 调度器可以通过[所有在名字包含 `adaptive-scheduler` 的配置]({{< ref
"docs/deployment/config">}}#advanced-scheduling-options)修改其行为。
-
-<a name="limitations-1"></a>
-
-### 局限性
-
-- **只支持流式 Job**:Adaptive 调度器仅支持流式 Job。当提交的是一个批处理 Job 时,Flink 会自动使用批处理 Job
的默认调度器,即 Adaptive Batch Scheduler。
-- **不支持部分故障恢复**: 部分故障恢复意味着调度器可以只重启失败 Job 其中某一部分(在 Flink 的内部结构中被称之为
Region)而不是重启整个 Job。这个限制只会影响那些独立并行(Embarrassingly
Parallel)Job的恢复时长,默认的调度器可以重启失败的部分,然而 Adaptive 将需要重启整个 Job。
-- 扩缩容事件会触发 Job 和 Task 重启,Task 重试的次数也会增加。
-
## Adaptive Batch Scheduler
Adaptive Batch Scheduler
是一种可以自动调整执行计划的批作业调度器。它目前支持自动推导算子并行度,如果算子未设置并行度,调度器将根据其消费的数据量的大小来推导其并行度。这可以带来诸多好处:
diff --git a/docs/content/docs/deployment/elastic_scaling.md
b/docs/content/docs/deployment/elastic_scaling.md
index 5259c1066fa..bb38d7549bb 100644
--- a/docs/content/docs/deployment/elastic_scaling.md
+++ b/docs/content/docs/deployment/elastic_scaling.md
@@ -25,22 +25,96 @@ under the License.
# Elastic Scaling
-Apache Flink allows you to rescale your jobs. You can do this manually by
stopping the job and restarting from the savepoint created during shutdown with
a different parallelism.
+Historically, the parallelism of a job has been static throughout its
lifecycle and defined once during its submission. Batch jobs couldn't be
rescaled at all, while Streaming jobs could have been stopped with a savepoint
and restarted with a different parallelism.
-This page describes options where Flink automatically adjusts the parallelism
instead.
+This page describes a new class of schedulers that allow Flink to adjust job's
parallelism at runtime, which pushes Flink one step closer to a truly
cloud-native stream processor. The new schedulers are [Adaptive
Scheduler](#adaptive-scheduler) (streaming) and [Adaptive Batch
Scheduler](#adaptive-batch-scheduler) (batch).
-## Reactive Mode
+## Adaptive Scheduler
+
+The Adaptive Scheduler can adjust the parallelism of a job based on available
slots. It will automatically reduce the parallelism if not enough slots are
available to run the job with the originally configured parallelism; be it due
to not enough resources being available at the time of submission, or
TaskManager outages during the job execution. If new slots become available the
job will be scaled up again, up to the configured parallelism.
+
+In Reactive Mode (see below) the configured parallelism is ignored and treated
as if it was set to infinity, letting the job always use as many resources as
possible.
+
+One benefit of the Adaptive Scheduler over the default scheduler is that it
can handle TaskManager losses gracefully, since it would just scale down in
these cases.
+
+{{< img src="/fig/adaptive_scheduler.png" >}}
+
+Adaptive Scheduler builds on top of a feature called [Declarative Resource
Management](https://cwiki.apache.org/confluence/display/FLINK/FLIP-138%3A+Declarative+Resource+management).
As you can see, instead of asking for the exact number of slots, JobMaster
declares its desired resources (for reactive mode the maximum is set to
infinity) to the ResourceManager, which then tries to fulfill those resources.
+
+{{< img src="/fig/adaptive_scheduler_rescale.png" >}}
+
+When JobMaster gets more resources during the runtime, it will automatically
rescale the job using the latest available savepoint, eliminating the need for
an external orchestration.
+
+Starting from **Flink 1.18.x**, you can re-declare the resource requirements
of a running job using [Externalized Declarative Resource
Management](#externalized-declarative-resource-management), otherwise the
Adaptive Scheduler won't be able to handle cases where the job needs to be
rescaled due to a change in the input rate, or a change in the performance of
the workload.
+
+### Externalized Declarative Resource Management
+
+{{< hint warning >}}
+Externalized Declarative Resource Management is an MVP ("minimum viable
product") feature. The Flink community is actively looking for feedback by
users through our mailing lists. Please check the limitations listed on this
page.
+{{< /hint >}}
{{< hint info >}}
-Reactive mode is an MVP ("minimum viable product") feature. The Flink
community is actively looking for feedback by users through our mailing lists.
Please check the limitations listed on this page.
+You can use Externalized Declarative Resource Management with the [Apache
Flink Kubernetes
operator](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/custom-resource/autoscaler/#flink-118-and-in-place-scaling-support)
for a fully-fledged auto-scaling experience.
{{< /hint >}}
-Reactive Mode configures a job so that it always uses all resources available
in the cluster. Adding a TaskManager will scale up your job, removing resources
will scale it down. Flink will manage the parallelism of the job, always
setting it to the highest possible values.
+Externalized Declarative Resource Management aims to address two deployment
scenarios:
+1. Adaptive Scheduler on Session Cluster, where multiple jobs can compete for
resources, and you need a finer-grained control over the distribution of
resources between jobs.
+2. Adaptive Scheduler on Application Cluster in combination with Active
Resource Manager (e.g. [Native Kubernetes]({{< ref
"docs/deployment/resource-providers/native_kubernetes" >}})), where you rely on
Flink to "greedily" spawn new TaskManagers, but you still want to leverage
rescaling capabilities as with [Reactive Mode](#reactive-mode).
-Reactive Mode restarts a job on a rescaling event, restoring it from the
latest completed checkpoint. This means that there is no overhead of creating a
savepoint (which is needed for manually rescaling a job). Also, the amount of
data that is reprocessed after rescaling depends on the checkpointing interval,
and the restore time depends on the state size.
+by introducing a new [REST API endpoint]({{< ref "docs/ops/rest_api"
>}}#jobs-jobid-resource-requirements-1), that allows you to re-declare resource
requirements of a running job, by setting per-vertex parallelism boundaries.
-The Reactive Mode allows Flink users to implement a powerful autoscaling
mechanism, by having an external service monitor certain metrics, such as
consumer lag, aggregate CPU utilization, throughput or latency. As soon as
these metrics are above or below a certain threshold, additional TaskManagers
can be added or removed from the Flink cluster. This could be implemented
through changing the [replica
factor](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#replicas)
[...]
+```
+PUT /jobs/<job-id>/resource-requirements
+REQUEST BODY:
+{
+ "<first-vertex-id>": {
+ "parallelism": {
+ "lowerBound": 3,
+ "upperBound": 5
+ }
+ },
+ "<second-vertex-id>": {
+ "parallelism": {
+ "lowerBound": 2,
+ "upperBound": 3
+ }
+ }
+}
+```
+
+To a certain extent, the above endpoint could be thought about as a
"re-scaling endpoint" and it introduces an important building block for
building an auto-scaling experience for Flink.
+
+You can manually try this feature out, by navigating the Job overview in the
Flink UI and using up-scale/down-scale buttons in the task list.
+
+### Usage
+
+{{< hint info >}}
+If you are using Adaptive Scheduler on a [session cluster]({{< ref
"docs/deployment/overview" >}}/#session-mode), there are no guarantees
regarding the distribution of slots between multiple running jobs in the same
session, in case the cluster doesn't have enough resources. The [External
Declarative Resource Management](#externalized-declarative-resource-management)
can partially mitigate this issue, but it is still recommended to use Adaptive
Scheduler on a [application cluster]({{< re [...]
+{{< /hint >}}
+
+The `jobmanager.scheduler` needs to be set to on the cluster level for the
adaptive scheduler to be used instead of default scheduler.
+
+```yaml
+jobmanager.scheduler: adaptive
+```
+
+The behavior of Adaptive Scheduler is configured by [all configuration options
prefixed with `jobmanager.adaptive-scheduler`]({{< ref
"docs/deployment/config">}}#advanced-scheduling-options) in their name.
+
+### Limitations
+
+- **Streaming jobs only**: The Adaptive Scheduler runs with streaming jobs
only. When submitting a batch job, Flink will use the default scheduler of
batch jobs, i.e. [Adaptive Batch Scheduler](#adaptive-batch-scheduler)
+- **No support for partial failover**: Partial failover means that the
scheduler is able to restart parts ("regions" in Flink's internals) of a failed
job, instead of the entire job. This limitation impacts only recovery time of
embarrassingly parallel jobs: Flink's default scheduler can restart failed
parts, while Adaptive Scheduler will restart the entire job.
+- Scaling events trigger job and task restarts, which will increase the number
of Task attempts.
+
+## Reactive Mode
+
+Reactive Mode is a special mode for Adaptive Scheduler, that assumes a single
job per-cluster (enforced by the [Application Mode]({{< ref
"docs/deployment/overview" >}}#application-mode)). Reactive Mode configures a
job so that it always uses all resources available in the cluster. Adding a
TaskManager will scale up your job, removing resources will scale it down.
Flink will manage the parallelism of the job, always setting it to the highest
possible values.
+
+Reactive Mode restarts a job on a rescaling event, restoring it from the
latest completed checkpoint. This means that there is no overhead of creating a
savepoint (which is needed for manually rescaling a job). Also, the amount of
data that is reprocessed after rescaling depends on the checkpointing interval,
and the restore time depends on the state size.
+
+The Reactive Mode allows Flink users to implement a powerful autoscaling
mechanism, by having an external service monitor certain metrics, such as
consumer lag, aggregate CPU utilization, throughput or latency. As soon as
these metrics are above or below a certain threshold, additional TaskManagers
can be added or removed from the Flink cluster. This could be implemented
through changing the [replica
factor](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#replicas)
[...]
+
### Getting started
If you just want to try out Reactive Mode, follow these instructions. They
assume that you are deploying Flink on a single machine.
@@ -113,40 +187,12 @@ Additionally, one can configure
[`jobmanager.adaptive-scheduler.min-parallelism-
Since Reactive Mode is a new, experimental feature, not all features supported
by the default scheduler are also available with Reactive Mode (and its
adaptive scheduler). The Flink community is working on addressing these
limitations.
-- **Deployment is only supported as a standalone application deployment**.
Active resource providers (such as native Kubernetes, YARN) are explicitly not
supported. Standalone session clusters are not supported either. The
application deployment is limited to single job applications.
+- **Deployment is only supported as a standalone application deployment**.
Active resource providers (such as native Kubernetes, YARN) are explicitly not
supported. Standalone session clusters are not supported either. The
application deployment is limited to single job applications.
The only supported deployment options are [Standalone in Application
Mode]({{< ref "docs/deployment/resource-providers/standalone/overview"
>}}#application-mode) ([described](#getting-started) on this page), [Docker in
Application Mode]({{< ref
"docs/deployment/resource-providers/standalone/docker"
>}}#application-mode-on-docker) and [Standalone Kubernetes Application
Cluster]({{< ref "docs/deployment/resource-providers/standalone/kubernetes"
>}}#deploy-application-cluster).
The [limitations of Adaptive Scheduler](#limitations-1) also apply to Reactive
Mode.
-
-## Adaptive Scheduler
-
-{{< hint warning >}}
-Using Adaptive Scheduler directly (not through Reactive Mode) is only advised
for advanced users because slot allocation on a session cluster with multiple
jobs is not defined.
-{{< /hint >}}
-
-The Adaptive Scheduler can adjust the parallelism of a job based on available
slots. It will automatically reduce the parallelism if not enough slots are
available to run the job with the originally configured parallelism; be it due
to not enough resources being available at the time of submission, or
TaskManager outages during the job execution. If new slots become available the
job will be scaled up again, up to the configured parallelism.
-In Reactive Mode (see above) the configured parallelism is ignored and treated
as if it was set to infinity, letting the job always use as many resources as
possible.
-You can also use Adaptive Scheduler without Reactive Mode, but there are some
practical limitations:
-- If you are using Adaptive Scheduler on a session cluster, there are no
guarantees regarding the distribution of slots between multiple running jobs in
the same session.
-
-One benefit of the Adaptive Scheduler over the default scheduler is that it
can handle TaskManager losses gracefully, since it would just scale down in
these cases.
-
-### Usage
-
-The following configuration parameter need to be set:
-
-- `jobmanager.scheduler: adaptive`: Change from the default scheduler to
adaptive scheduler
-
-The behavior of Adaptive Scheduler is configured by [all configuration options
containing `adaptive-scheduler`]({{< ref
"docs/deployment/config">}}#advanced-scheduling-options) in their name.
-
-### Limitations
-
-- **Streaming jobs only**: The Adaptive Scheduler runs with streaming jobs
only. When submitting a batch job, Flink will use the default scheduler of
batch jobs, i.e. Adaptive Batch Scheduler.
-- **No support for partial failover**: Partial failover means that the
scheduler is able to restart parts ("regions" in Flink's internals) of a failed
job, instead of the entire job. This limitation impacts only recovery time of
embarrassingly parallel jobs: Flink's default scheduler can restart failed
parts, while Adaptive Scheduler will restart the entire job.
-- Scaling events trigger job and task restarts, which will increase the number
of Task attempts.
-
## Adaptive Batch Scheduler
The Adaptive Batch Scheduler is a batch job scheduler that can automatically
adjust the execution plan. It currently supports automatically deciding
parallelisms of operators for batch jobs. If an operator is not set with a
parallelism, the scheduler will decide parallelism for it according to the size
of its consumed datasets. This can bring many benefits:
diff --git a/docs/static/fig/adaptive_scheduler.png
b/docs/static/fig/adaptive_scheduler.png
new file mode 100644
index 00000000000..4962392ae58
Binary files /dev/null and b/docs/static/fig/adaptive_scheduler.png differ
diff --git a/docs/static/fig/adaptive_scheduler_rescale.png
b/docs/static/fig/adaptive_scheduler_rescale.png
new file mode 100644
index 00000000000..4515a9a2be8
Binary files /dev/null and b/docs/static/fig/adaptive_scheduler_rescale.png
differ