This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 86458aac1c1c6fc675a38ea54c93cb46900a2419 Author: Gyula Fora <[email protected]> AuthorDate: Thu Dec 15 11:27:00 2022 +0100 [docs][autoscaler] Add docs page + enable config doc generation for Autoscaler options --- .github/workflows/ci.yml | 2 +- docs/content/docs/custom-resource/autoscaler.md | 158 +++++++++++++++++++++ docs/content/docs/operations/configuration.md | 6 + .../generated/auto_scaler_configuration.html | 90 ++++++++++++ .../configuration/ConfigOptionsDocGenerator.java | 6 +- 5 files changed, 260 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c48b631c..c3891788 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -41,7 +41,7 @@ jobs: - name: Build with Maven run: | set -o pipefail; mvn clean install javadoc:javadoc -Pgenerate-docs | tee ./mvn.log; set +o pipefail - if [[ $(cat ./mvn.log | grep -E -v '(flink-runtime-.*.jar, flink-kubernetes-operator-.*.jar)|(flink-kubernetes-operator-.*.jar, flink-runtime-.*.jar) define 2 overlapping classes' | grep -c "overlapping classes" -) -gt 0 ]];then + if [[ $(cat ./mvn.log | grep -E -v '(flink-runtime-.*.jar, flink-kubernetes-operator-.*.jar)|(flink-kubernetes-operator-.*.jar, flink-runtime-.*.jar) define 3 overlapping classes' | grep -c "overlapping classes" -) -gt 0 ]];then echo "Found overlapping classes: " cat ./mvn.log | grep "overlapping classes" exit 1 diff --git a/docs/content/docs/custom-resource/autoscaler.md b/docs/content/docs/custom-resource/autoscaler.md new file mode 100644 index 00000000..db7a6d36 --- /dev/null +++ b/docs/content/docs/custom-resource/autoscaler.md @@ -0,0 +1,158 @@ +--- +title: "Autoscaler" +weight: 4 +type: docs +aliases: +- /custom-resource/autoscaler.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Autoscaler + +The operator provides a job autoscaler functionality that collects various metrics from running Flink jobs and automatically scales individual job vertexes (chained operator groups) to eliminate backpressure and satisfy the utilization and catch-up duration target set by the user. +By adjusting parallelism on a job vertex level (in contrast to job parallelism) we can efficiently autoscale complex and heterogeneous streaming applications. + +Key benefits to the user: + - Better cluster resource utilization and lower operating costs + - Automatic parallelism tuning for even complex streaming pipelines + - Automatic adaptation to changing load patterns + - Detailed utilization metrics for performance debugging + +Job requirements: + - The autoscaler currently only works with the latest [Flink 1.17 snapshot images](ghcr.io/apache/flink-docker:1.17-SNAPSHOT-scala_2.12-java11-debian) or after backporting the following fixes to your 1.15/1.16 Flink image + - [Job vertex parallelism overrides](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9) (must have) + - [Support timespan for busyTime metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35) (good to have) + - All sources must use the new [Source API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) (most common connectors already do) + - Source scaling requires sources to expose the [standardized connector metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics) for accessing backlog information (source scaling can be disabled) + +In the current state the autoscaler works best with Kafka sources, as they expose all the standardized metrics. It also comes with some additional benefits when using Kafka such as automatically detecting and limiting source max parallelism to the number of Kafka partitions. + +{{< hint info >}} +The autoscaler also supports a passive/metrics-only mode where it only collects and evaluates scaling related performance metrics but does not trigger any job upgrades. +This can be used to gain confidence in the module without any impact on the running applications. + +To disable scaling actions, set: `kubernetes.operator.job.autoscaler.scaling.enabled: "false"` +{{< /hint >}} + +## Configuration guide + +Depending on your environment and job characteristics there are a few very important configurations that will affect how well the autoscaler works. + +Key configuration areas + - Job and per operator max parallelism + - Stabilization and metrics collection intervals + - Target utilization and flexible boundaries + - Target catch-up duration and restart time + +The defaults might work reasonably well for many applications, but some tuning may be required in this early stage of the autoscaler module. + +### Job and per operator max parallelism + +When computing the scaled parallelism, the autoscaler always considers the max parallelism settings for each job vertex to ensure that it doesn't introduce unnecessary data skew. +The computed parallelism will always be a divisor of the max_parallelism number. + +To ensure flexible scaling it is therefore recommended to chose max parallelism settings that have a [lot of divisors](https://en.wikipedia.org/wiki/Highly_composite_number) instead of relying on the Flink provided defaults. +You can then use the `pipeline.max-parallelism` to configure this for your pipeline. + +Some good numbers for max-parallelism are: 120, 180, 240, 360, 720 etc. + +It is also possible to set maxParallelism on a per operator level, which can be useful if we want to avoid scaling some sources/sinks beyond a certain number. + +### Stabilization and metrics collection intervals + +The autoscaler always looks at average metrics in the collection time window defined by `kubernetes.operator.job.autoscaler.metrics.window`. +The size of this window determines how small fluctuations will affect the autoscaler. The larger the window, the more smoothing and stability we get, but we may be slower to react to sudden load changes. +We suggest you experiment with setting this anywhere between 3-60 minutes for best experience. + +To allow jobs to stabilize after recovery users can configure a stabilization window by setting `kubernetes.operator.job.autoscaler.stabilization.interval`. +During this time period no scaling actions will be taken. + +{{< hint warning >}} +Currently the autoscaler treats the collection window as the **maximum** window. Metric evaluation will start right after the stabilization period. +We also include metrics collected during the stabilization period at the moment which might cause some instability with very low stabilization periods. + +We are working on improving this. +{{< /hint >}} + +### Target utilization and flexible boundaries + +In order to provide stable job performance and some buffer for load fluctuations, the autoscaler allows users to set a target utilization level for the job (`kubernetes.operator.job.autoscaler.target.utilization`). +A target of `0.6` means we are targeting 60% utilization/load for the job vertexes. + +In general, it's not recommended to set target utilization close to 100% as performance usually degrades as we reach capacity limits in most real world systems. + +In addition to the utilization target we can set a utilization boundary, that serves as extra buffer to avoid immediate scaling on load fluctuations. +Setting `kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"` means that we allow 20% deviation from the target utilization before triggering a scaling action. + +### Target catch-up duration and restart time + +When taking scaling decisions the operator need to account for the extra capacity required to catch up the backlog created during scaling operations. +The amount of extra capacity is determined automatically by the following 2 configs: + + - `kubernetes.operator.job.autoscaler.restart.time` : Time it usually takes to restart the application + - `kubernetes.operator.job.autoscaler.catch-up.duration` : Time to job is expected to catch up after scaling + +In the future the autoscaler may be able to automatically determine the restart time, but the target catch-up duration depends on the users SLO. + +By lowering the catch-up duration the autoscaler will have to reserve more extra capacity for the scaling actions. +We suggest setting this based on your actual objective, such us 1, 5, 10 minutes etc. + +### Basic configuration example +```yaml +... +flinkVersion: v1_17 +flinkConfiguration: + kubernetes.operator.job.autoscaler.enabled: "true" + kubernetes.operator.job.autoscaler.stabilization.interval: "5m" + kubernetes.operator.job.autoscaler.metrics.window: "5m" + kubernetes.operator.job.autoscaler.target.utilization: "0.6" + kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2" + kubernetes.operator.job.autoscaler.restart.time: 2m + kubernetes.operator.job.autoscaler.catch-up.duration: 5m + + pipeline.max-parallelism: "720" +``` + +### Advanced config parameters + +The autoscaler also exposes various more advanced config parameters that affect scaling actions: + + - Minimum time before scaling down after scaling up a vertex + - Maximum parallelism change when scaling down + - Min/max parallelism + +The list of options will likely grow to cover more complex scaling scenarios. + +For a detailed config reference check the [general configuration page]({{< ref "docs/operations/configuration#autoscaler-configuration" >}}) + +## Metrics + +The operator reports detailed jobvertex level metrics about the evaluated Flink job metrics that are collected and used in the scaling decision. + +This includes: + - Utilization, input rate, target rate metrics + - Scaling thresholds + - Parallelism and max parallelism changes over time + +These metrics are reported under the Kubernetes Operator Resource metric group: + +``` +[resource_prefix].Autoscaler.[jobVertexID].[ScalingMetric].Current/Average +``` diff --git a/docs/content/docs/operations/configuration.md b/docs/content/docs/operations/configuration.md index 4a328399..5db2f1b5 100644 --- a/docs/content/docs/operations/configuration.md +++ b/docs/content/docs/operations/configuration.md @@ -95,6 +95,12 @@ These options can be configured on both an operator and a per-resource level. Wh {{< generated/dynamic_section >}} +### Autoscaler Configuration + +Like other resource options these can be configured on both an operator and a per-resource level. When set under `spec.flinkConfiguration` for the Flink resources it will override the default value provided in the operator default configuration (`flink-conf.yaml`). + +{{< generated/auto_scaler_configuration >}} + ### System Metrics Configuration Operator system metrics configuration. Cannot be overridden on a per-resource basis. diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html new file mode 100644 index 00000000..af9b4396 --- /dev/null +++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html @@ -0,0 +1,90 @@ +<table class="configuration table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 20%">Key</th> + <th class="text-left" style="width: 15%">Default</th> + <th class="text-left" style="width: 10%">Type</th> + <th class="text-left" style="width: 55%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.catch-up.duration</h5></td> + <td style="word-wrap: break-word;">10 min</td> + <td>Duration</td> + <td>The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.enabled</h5></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Enable job autoscaler module.</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.metrics.window</h5></td> + <td style="word-wrap: break-word;">5 min</td> + <td>Duration</td> + <td>Scaling metrics aggregation window size.</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.restart.time</h5></td> + <td style="word-wrap: break-word;">5 min</td> + <td>Duration</td> + <td>Expected restart time to be used until the operator can determine it reliably from history.</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.scale-down.max-factor</h5></td> + <td style="word-wrap: break-word;">0.6</td> + <td>Double</td> + <td>Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.scale-up.grace-period</h5></td> + <td style="word-wrap: break-word;">10 min</td> + <td>Duration</td> + <td>Period in which no scale down is allowed after a scale up</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.scaling.enabled</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.scaling.sources.enabled</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Whether to enable scaling source vertices. Source vertices set the baseline ingestion rate for the processing based on the backlog size. If disabled, only regular job vertices will be scaled and source vertices will be unchanged.</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.stabilization.interval</h5></td> + <td style="word-wrap: break-word;">5 min</td> + <td>Duration</td> + <td>Stabilization period in which no new scaling will be executed</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.target.utilization</h5></td> + <td style="word-wrap: break-word;">0.7</td> + <td>Double</td> + <td>Target vertex utilization</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.target.utilization.boundary</h5></td> + <td style="word-wrap: break-word;">0.1</td> + <td>Double</td> + <td>Target vertex utilization boundary. Scaling won't be performed if utilization is within (target - boundary, target + boundary)</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.vertex.max-parallelism</h5></td> + <td style="word-wrap: break-word;">2147483647</td> + <td>Integer</td> + <td>The maximum parallelism the autoscaler can use. Note that this limit will be ignored if it is higher than the max parallelism configured in the Flink config or directly on each operator.</td> + </tr> + <tr> + <td><h5>kubernetes.operator.job.autoscaler.vertex.min-parallelism</h5></td> + <td style="word-wrap: break-word;">1</td> + <td>Integer</td> + <td>The minimum parallelism the autoscaler can use.</td> + </tr> + </tbody> +</table> diff --git a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java index 43c81b42..0f302dcd 100644 --- a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java +++ b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java @@ -74,7 +74,11 @@ public class ConfigOptionsDocGenerator { new OptionsClassLocation( "flink-kubernetes-operator", "org.apache.flink.kubernetes.operator.config"), new OptionsClassLocation( - "flink-kubernetes-operator", "org.apache.flink.kubernetes.operator.metrics") + "flink-kubernetes-operator", + "org.apache.flink.kubernetes.operator.metrics"), + new OptionsClassLocation( + "flink-kubernetes-operator", + "org.apache.flink.kubernetes.operator.autoscaler.config") }; static final String DEFAULT_PATH_PREFIX = "src/main/java";
