This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new ccf3639c [docs][autoscaler] Autoscaler docs and default config
improvement
ccf3639c is described below
commit ccf3639c87400e3bbd94f64ad13dc5d40fd55faf
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Aug 17 09:55:59 2023 +0200
[docs][autoscaler] Autoscaler docs and default config improvement
---
docs/content/docs/custom-resource/autoscaler.md | 93 +++++++++++++++++----
.../generated/auto_scaler_configuration.html | 2 +-
.../static/img/custom-resource/autoscaler_fig1.png | Bin 0 -> 168727 bytes
.../static/img/custom-resource/autoscaler_fig2.png | Bin 0 -> 181157 bytes
.../static/img/custom-resource/autoscaler_fig3.png | Bin 0 -> 178610 bytes
.../autoscaler/config/AutoScalerOptions.java | 2 +-
6 files changed, 78 insertions(+), 19 deletions(-)
diff --git a/docs/content/docs/custom-resource/autoscaler.md
b/docs/content/docs/custom-resource/autoscaler.md
index 201ab3d8..4140122a 100644
--- a/docs/content/docs/custom-resource/autoscaler.md
+++ b/docs/content/docs/custom-resource/autoscaler.md
@@ -26,7 +26,7 @@ under the License.
# Autoscaler
-The operator provides a job autoscaler functionality that collects various
metrics from running Flink jobs and automatically scales individual job
vertexes (chained operator groups) to eliminate backpressure and satisfy the
utilization and catch-up duration target set by the user.
+The operator provides a job autoscaler functionality that collects various
metrics from running Flink jobs and automatically scales individual job
vertexes (chained operator groups) to eliminate backpressure and satisfy the
utilization target set by the user.
By adjusting parallelism on a job vertex level (in contrast to job
parallelism) we can efficiently autoscale complex and heterogeneous streaming
applications.
Key benefits to the user:
@@ -35,26 +35,78 @@ Key benefits to the user:
- Automatic adaptation to changing load patterns
- Detailed utilization metrics for performance debugging
-Job requirements:
- - The autoscaler currently only works with the latest [Flink
1.17](https://hub.docker.com/_/flink) or after backporting the following fixes
to your 1.15/1.16 Flink image
- - Job vertex parallelism overrides (must have)
- - [Add option to override job vertex parallelisms during job
submission](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9)
- - [Change ForwardPartitioner to RebalancePartitioner on parallelism
changes](https://github.com/apache/flink/pull/21443) (consists of 5 commits)
- - [Fix logic for determining downstream subtasks for partitioner
replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3)
- - [Support timespan for busyTime
metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35)
(good to have)
- - Source scaling only supports modern sources which
- - use the new [Source
API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
that exposes the busy time metric (must have, most common connectors already
do)
- - expose the [standardized connector
metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics)
for accessing backlog information (good to have, extra capacity will be added
for catching up with backlog)
+## Overview
-In the current state the autoscaler works best with Kafka sources, as they
expose all the standardized metrics. It also comes with some additional
benefits when using Kafka such as automatically detecting and limiting source
max parallelism to the number of Kafka partitions.
+The autoscaler relies on the metrics exposed by the Flink metric system for
the individual tasks. The metrics are queried directly from the Flink job.
-{{< hint info >}}
-The autoscaler also supports a passive/metrics-only mode where it only
collects and evaluates scaling related performance metrics but does not trigger
any job upgrades.
-This can be used to gain confidence in the module without any impact on the
running applications.
+Collected metrics:
+ - Backlog information at each source
+ - Incoming data rate at the sources (e.g. records/sec written into the Kafka
topic)
+ - Number of records processed per second in each job vertex
+ - Busy time per second of each job vertex (current utilization)
-To disable scaling actions, set:
`kubernetes.operator.job.autoscaler.scaling.enabled: "false"`
+{{< hint info >}}
+Please note that we are not using any container memory / CPU utilization
metrics directly here. High utilization will be reflected in the processing
rate and busy time metrics of the individual job vertexes.
{{< /hint >}}
+The algorithm starts from the sources and recursively computes the required
processing capacity (target data rate) for each operator in the pipeline. At
the source vertices, target data rate is equal to incoming data rate (from the
Kafka topic).
+
+For downstream operators we compute the target data rate as the sum of the
input (upstream) operators output data rate along the given edge in the
processing graph.
+
+{{< img src="/img/custom-resource/autoscaler_fig1.png" alt="Computing Target
Data Rates" >}}
+
+Users configure the target utilization percentage of the operators in the
pipeline, e.g. keep the all operators between 60% - 80% busy. The autoscaler
then finds a parallelism configuration such that the output rates of all
operators match the input rates of all their downstream operators at the
targeted utilization.
+
+In this example we see an upscale operation:
+
+{{< img src="/img/custom-resource/autoscaler_fig2.png" alt="Scaling Up" >}}
+
+Similarly as load decreases, the autoscaler adjusts individual operator
parallelism levels to match the current rate over time.
+
+{{< img src="/img/custom-resource/autoscaler_fig3.png" alt="Scaling Down" >}}
+
+The autoscaler approach is based on [Three steps is all you need: fast,
accurate, automatic scaling decisions for distributed streaming
dataflows](https://www.usenix.org/system/files/osdi18-kalavri.pdf) by Kalavri
et al.
+
+## Executing rescaling operations
+
+By default the autoscaler uses the built in job upgrade mechanism from the
operator to perform the rescaling as detailed in [Job Management and Stateful
upgrades]({{< ref "docs/custom-resource/job-management" >}}).
+
+### Flink 1.18 and in-place scaling support
+
+The upcoming Flink 1.18 release brings very significant improvements to the
speed of scaling operations through the new resource requirements rest endpoint.
+This allows the autoscaler to scale vertices in-place without performing a
full job upgrade cycle.
+
+To try this experimental feature, please use the currently available Flink
1.18 snapshot base image to build you application docker image.
+Furthermore make sure you set Flink version to `v1_18` in your FlinkDeployment
yaml and enable the adaptive scheduler which is required for this feature.
+
+```
+jobmanager.scheduler: adaptive
+```
+
+## Job Requirements and Limitations
+
+### Requirements
+
+The autoscaler currently only works with [Flink
1.17](https://hub.docker.com/_/flink) and later flink images, or after
backporting the following fixes to your 1.15/1.16 Flink images:
+
+ - Job vertex parallelism overrides (must have)
+ - [Add option to override job vertex parallelisms during job
submission](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9)
+ - [Change ForwardPartitioner to RebalancePartitioner on parallelism
changes](https://github.com/apache/flink/pull/21443) (consists of 5 commits)
+ - [Fix logic for determining downstream subtasks for partitioner
replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3)
+ - [Support timespan for busyTime
metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35)
(good to have)
+
+### Limitations
+
+By default the autoscaler can work for all job vertices in the processing
graph.
+
+However source scaling requires that the sources:
+
+ - Use the new [Source
API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
that exposes the busy time metric (must have, most common connectors already
do)
+ - Expose the [standardized connector
metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics)
for accessing backlog information (good to have, extra capacity will be added
for catching up with backlog)
+
+In the current state the autoscaler works best with Kafka sources, as they
expose all the standardized metrics. It also comes with some additional
benefits when using Kafka such as automatically detecting and limiting source
max parallelism to the number of Kafka partitions.
+
+
## Configuration guide
Depending on your environment and job characteristics there are a few very
important configurations that will affect how well the autoscaler works.
@@ -67,6 +119,13 @@ Key configuration areas:
The defaults might work reasonably well for many applications, but some tuning
may be required in this early stage of the autoscaler module.
+{{< hint info >}}
+The autoscaler also supports a passive/metrics-only mode where it only
collects and evaluates scaling related performance metrics but does not trigger
any job upgrades.
+This can be used to gain confidence in the module without any impact on the
running applications.
+
+To disable scaling actions, set:
`kubernetes.operator.job.autoscaler.scaling.enabled: "false"`
+{{< /hint >}}
+
### Job and per operator max parallelism
When computing the scaled parallelism, the autoscaler always considers the max
parallelism settings for each job vertex to ensure that it doesn't introduce
unnecessary data skew.
@@ -109,7 +168,7 @@ The amount of extra capacity is determined automatically by
the following 2 conf
In the future the autoscaler may be able to automatically determine the
restart time, but the target catch-up duration depends on the users SLO.
By lowering the catch-up duration the autoscaler will have to reserve more
extra capacity for the scaling actions.
-We suggest setting this based on your actual objective, such us 1, 5, 10
minutes etc.
+We suggest setting this based on your actual objective, such us 10,30,60
minutes etc.
### Basic configuration example
```yaml
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index a3e7b069..95af5541 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -16,7 +16,7 @@
</tr>
<tr>
<td><h5>kubernetes.operator.job.autoscaler.catch-up.duration</h5></td>
- <td style="word-wrap: break-word;">5 min</td>
+ <td style="word-wrap: break-word;">15 min</td>
<td>Duration</td>
<td>The target duration for fully processing any backlog after a
scaling operation. Set to 0 to disable backlog based scaling.</td>
</tr>
diff --git a/docs/static/img/custom-resource/autoscaler_fig1.png
b/docs/static/img/custom-resource/autoscaler_fig1.png
new file mode 100644
index 00000000..5cb37222
Binary files /dev/null and
b/docs/static/img/custom-resource/autoscaler_fig1.png differ
diff --git a/docs/static/img/custom-resource/autoscaler_fig2.png
b/docs/static/img/custom-resource/autoscaler_fig2.png
new file mode 100644
index 00000000..9047e96c
Binary files /dev/null and
b/docs/static/img/custom-resource/autoscaler_fig2.png differ
diff --git a/docs/static/img/custom-resource/autoscaler_fig3.png
b/docs/static/img/custom-resource/autoscaler_fig3.png
new file mode 100644
index 00000000..739c84f0
Binary files /dev/null and
b/docs/static/img/custom-resource/autoscaler_fig3.png differ
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index b31a9bec..df11e28a 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -109,7 +109,7 @@ public class AutoScalerOptions {
public static final ConfigOption<Duration> CATCH_UP_DURATION =
autoScalerConfig("catch-up.duration")
.durationType()
- .defaultValue(Duration.ofMinutes(5))
+ .defaultValue(Duration.ofMinutes(15))
.withDescription(
"The target duration for fully processing any
backlog after a scaling operation. Set to 0 to disable backlog based scaling.");