This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 86458aac1c1c6fc675a38ea54c93cb46900a2419
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Dec 15 11:27:00 2022 +0100

    [docs][autoscaler] Add docs page + enable config doc generation for 
Autoscaler options
---
 .github/workflows/ci.yml                           |   2 +-
 docs/content/docs/custom-resource/autoscaler.md    | 158 +++++++++++++++++++++
 docs/content/docs/operations/configuration.md      |   6 +
 .../generated/auto_scaler_configuration.html       |  90 ++++++++++++
 .../configuration/ConfigOptionsDocGenerator.java   |   6 +-
 5 files changed, 260 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c48b631c..c3891788 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -41,7 +41,7 @@ jobs:
       - name: Build with Maven
         run: |
           set -o pipefail; mvn clean install javadoc:javadoc -Pgenerate-docs | 
tee ./mvn.log; set +o pipefail
-          if [[ $(cat ./mvn.log | grep -E -v '(flink-runtime-.*.jar, 
flink-kubernetes-operator-.*.jar)|(flink-kubernetes-operator-.*.jar, 
flink-runtime-.*.jar) define 2 overlapping classes' | grep -c "overlapping 
classes" -) -gt 0 ]];then
+          if [[ $(cat ./mvn.log | grep -E -v '(flink-runtime-.*.jar, 
flink-kubernetes-operator-.*.jar)|(flink-kubernetes-operator-.*.jar, 
flink-runtime-.*.jar) define 3 overlapping classes' | grep -c "overlapping 
classes" -) -gt 0 ]];then
             echo "Found overlapping classes: "
             cat ./mvn.log | grep "overlapping classes"
             exit 1
diff --git a/docs/content/docs/custom-resource/autoscaler.md 
b/docs/content/docs/custom-resource/autoscaler.md
new file mode 100644
index 00000000..db7a6d36
--- /dev/null
+++ b/docs/content/docs/custom-resource/autoscaler.md
@@ -0,0 +1,158 @@
+---
+title: "Autoscaler"
+weight: 4
+type: docs
+aliases:
+- /custom-resource/autoscaler.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Autoscaler
+
+The operator provides a job autoscaler functionality that collects various 
metrics from running Flink jobs and automatically scales individual job 
vertexes (chained operator groups) to eliminate backpressure and satisfy the 
utilization and catch-up duration target set by the user.
+By adjusting parallelism on a job vertex level (in contrast to job 
parallelism) we can efficiently autoscale complex and heterogeneous streaming 
applications.
+
+Key benefits to the user:
+ - Better cluster resource utilization and lower operating costs
+ - Automatic parallelism tuning for even complex streaming pipelines
+ - Automatic adaptation to changing load patterns
+ - Detailed utilization metrics for performance debugging
+
+Job requirements:
+ - The autoscaler currently only works with the latest [Flink 1.17 snapshot 
images](ghcr.io/apache/flink-docker:1.17-SNAPSHOT-scala_2.12-java11-debian) or 
after backporting the following fixes to your 1.15/1.16 Flink image
+   - [Job vertex parallelism 
overrides](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9)
 (must have)
+   - [Support timespan for busyTime 
metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35)
 (good to have)
+ - All sources must use the new [Source 
API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface)
 (most common connectors already do)
+ - Source scaling requires sources to expose the [standardized connector 
metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics)
 for accessing backlog information (source scaling can be disabled)
+
+In the current state the autoscaler works best with Kafka sources, as they 
expose all the standardized metrics. It also comes with some additional 
benefits when using Kafka such as automatically detecting and limiting source 
max parallelism to the number of Kafka partitions.
+
+{{< hint info >}}
+The autoscaler also supports a passive/metrics-only mode where it only 
collects and evaluates scaling related performance metrics but does not trigger 
any job upgrades.
+This can be used to gain confidence in the module without any impact on the 
running applications.
+
+To disable scaling actions, set: 
`kubernetes.operator.job.autoscaler.scaling.enabled: "false"`
+{{< /hint >}}
+
+## Configuration guide
+
+Depending on your environment and job characteristics there are a few very 
important configurations that will affect how well the autoscaler works.
+
+Key configuration areas
+ - Job and per operator max parallelism
+ - Stabilization and metrics collection intervals
+ - Target utilization and flexible boundaries
+ - Target catch-up duration and restart time
+
+The defaults might work reasonably well for many applications, but some tuning 
may be required in this early stage of the autoscaler module.
+
+### Job and per operator max parallelism
+
+When computing the scaled parallelism, the autoscaler always considers the max 
parallelism settings for each job vertex to ensure that it doesn't introduce 
unnecessary data skew.
+The computed parallelism will always be a divisor of the max_parallelism 
number.
+
+To ensure flexible scaling it is therefore recommended to chose max 
parallelism settings that have a [lot of 
divisors](https://en.wikipedia.org/wiki/Highly_composite_number) instead of 
relying on the Flink provided defaults.
+You can then use the `pipeline.max-parallelism` to configure this for your 
pipeline.
+
+Some good numbers for max-parallelism are: 120, 180, 240, 360, 720 etc.
+
+It is also possible to set maxParallelism on a per operator level, which can 
be useful if we want to avoid scaling some sources/sinks beyond a certain 
number.
+
+### Stabilization and metrics collection intervals
+
+The autoscaler always looks at average metrics in the collection time window 
defined by `kubernetes.operator.job.autoscaler.metrics.window`.
+The size of this window determines how small fluctuations will affect the 
autoscaler. The larger the window, the more smoothing and stability we get, but 
we may be slower to react to sudden load changes.
+We suggest you experiment with setting this anywhere between 3-60 minutes for 
best experience.
+
+To allow jobs to stabilize after recovery users can configure a stabilization 
window by setting `kubernetes.operator.job.autoscaler.stabilization.interval`. 
+During this time period no scaling actions will be taken.
+
+{{< hint warning >}}
+Currently the autoscaler treats the collection window as the **maximum** 
window. Metric evaluation will start right after the stabilization period.
+We also include metrics collected during the stabilization period at the 
moment which might cause some instability with very low stabilization periods.
+
+We are working on improving this.
+{{< /hint >}}
+
+### Target utilization and flexible boundaries
+
+In order to provide stable job performance and some buffer for load 
fluctuations, the autoscaler allows users to set a target utilization level for 
the job (`kubernetes.operator.job.autoscaler.target.utilization`).
+A target of `0.6` means we are targeting 60% utilization/load for the job 
vertexes.
+
+In general, it's not recommended to set target utilization close to 100% as 
performance usually degrades as we reach capacity limits in most real world 
systems.
+
+In addition to the utilization target we can set a utilization boundary, that 
serves as extra buffer to avoid immediate scaling on load fluctuations.
+Setting `kubernetes.operator.job.autoscaler.target.utilization.boundary: 
"0.2"` means that we allow 20% deviation from the target utilization before 
triggering a scaling action.
+
+### Target catch-up duration and restart time
+
+When taking scaling decisions the operator need to account for the extra 
capacity required to catch up the backlog created during scaling operations.
+The amount of extra capacity is determined automatically by the following 2 
configs:
+
+ - `kubernetes.operator.job.autoscaler.restart.time` : Time it usually takes 
to restart the application
+ - `kubernetes.operator.job.autoscaler.catch-up.duration` : Time to job is 
expected to catch up after scaling 
+
+In the future the autoscaler may be able to automatically determine the 
restart time, but the target catch-up duration depends on the users SLO.
+
+By lowering the catch-up duration the autoscaler will have to reserve more 
extra capacity for the scaling actions.
+We suggest setting this based on your actual objective, such us 1, 5, 10 
minutes etc.
+
+### Basic configuration example
+```yaml
+...
+flinkVersion: v1_17
+flinkConfiguration:
+    kubernetes.operator.job.autoscaler.enabled: "true"
+    kubernetes.operator.job.autoscaler.stabilization.interval: "5m"
+    kubernetes.operator.job.autoscaler.metrics.window: "5m"
+    kubernetes.operator.job.autoscaler.target.utilization: "0.6"
+    kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"
+    kubernetes.operator.job.autoscaler.restart.time: 2m
+    kubernetes.operator.job.autoscaler.catch-up.duration: 5m
+
+    pipeline.max-parallelism: "720"
+```
+
+### Advanced config parameters
+
+The autoscaler also exposes various more advanced config parameters that 
affect scaling actions:
+
+ - Minimum time before scaling down after scaling up a vertex
+ - Maximum parallelism change when scaling down
+ - Min/max parallelism
+
+The list of options will likely grow to cover more complex scaling scenarios.
+
+For a detailed config reference check the [general configuration page]({{< ref 
"docs/operations/configuration#autoscaler-configuration" >}})
+
+## Metrics
+
+The operator reports detailed jobvertex level metrics about the evaluated 
Flink job metrics that are collected and used in the scaling decision.
+
+This includes:
+ - Utilization, input rate, target rate metrics
+ - Scaling thresholds
+ - Parallelism and max parallelism changes over time
+
+These metrics are reported under the Kubernetes Operator Resource metric group:
+
+```
+[resource_prefix].Autoscaler.[jobVertexID].[ScalingMetric].Current/Average
+```
diff --git a/docs/content/docs/operations/configuration.md 
b/docs/content/docs/operations/configuration.md
index 4a328399..5db2f1b5 100644
--- a/docs/content/docs/operations/configuration.md
+++ b/docs/content/docs/operations/configuration.md
@@ -95,6 +95,12 @@ These options can be configured on both an operator and a 
per-resource level. Wh
 
 {{< generated/dynamic_section >}}
 
+### Autoscaler Configuration
+
+Like other resource options these can be configured on both an operator and a 
per-resource level. When set under `spec.flinkConfiguration` for the Flink 
resources it will override the default value provided in the operator default 
configuration (`flink-conf.yaml`).
+
+{{< generated/auto_scaler_configuration >}}
+
 ### System Metrics Configuration
 
 Operator system metrics configuration. Cannot be overridden on a per-resource 
basis.
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
new file mode 100644
index 00000000..af9b4396
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -0,0 +1,90 @@
+<table class="configuration table table-bordered">
+    <thead>
+        <tr>
+            <th class="text-left" style="width: 20%">Key</th>
+            <th class="text-left" style="width: 15%">Default</th>
+            <th class="text-left" style="width: 10%">Type</th>
+            <th class="text-left" style="width: 55%">Description</th>
+        </tr>
+    </thead>
+    <tbody>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.catch-up.duration</h5></td>
+            <td style="word-wrap: break-word;">10 min</td>
+            <td>Duration</td>
+            <td>The target duration for fully processing any backlog after a 
scaling operation. Set to 0 to disable backlog based scaling.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Enable job autoscaler module.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.metrics.window</h5></td>
+            <td style="word-wrap: break-word;">5 min</td>
+            <td>Duration</td>
+            <td>Scaling metrics aggregation window size.</td>
+        </tr>
+        <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.restart.time</h5></td>
+            <td style="word-wrap: break-word;">5 min</td>
+            <td>Duration</td>
+            <td>Expected restart time to be used until the operator can 
determine it reliably from history.</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.scale-down.max-factor</h5></td>
+            <td style="word-wrap: break-word;">0.6</td>
+            <td>Double</td>
+            <td>Max scale down factor. 1 means no limit on scale down, 0.6 
means job can only be scaled down with 60% of the original parallelism.</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.scale-up.grace-period</h5></td>
+            <td style="word-wrap: break-word;">10 min</td>
+            <td>Duration</td>
+            <td>Period in which no scale down is allowed after a scale up</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.scaling.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Enable vertex scaling execution by the autoscaler. If 
disabled, the autoscaler will only collect metrics and evaluate the suggested 
parallelism for each vertex but will not upgrade the jobs.</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.scaling.sources.enabled</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to enable scaling source vertices. Source vertices set 
the baseline ingestion rate for the processing based on the backlog size. If 
disabled, only regular job vertices will be scaled and source vertices will be 
unchanged.</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.stabilization.interval</h5></td>
+            <td style="word-wrap: break-word;">5 min</td>
+            <td>Duration</td>
+            <td>Stabilization period in which no new scaling will be 
executed</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.target.utilization</h5></td>
+            <td style="word-wrap: break-word;">0.7</td>
+            <td>Double</td>
+            <td>Target vertex utilization</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.target.utilization.boundary</h5></td>
+            <td style="word-wrap: break-word;">0.1</td>
+            <td>Double</td>
+            <td>Target vertex utilization boundary. Scaling won't be performed 
if utilization is within (target - boundary, target + boundary)</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.vertex.max-parallelism</h5></td>
+            <td style="word-wrap: break-word;">2147483647</td>
+            <td>Integer</td>
+            <td>The maximum parallelism the autoscaler can use. Note that this 
limit will be ignored if it is higher than the max parallelism configured in 
the Flink config or directly on each operator.</td>
+        </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.vertex.min-parallelism</h5></td>
+            <td style="word-wrap: break-word;">1</td>
+            <td>Integer</td>
+            <td>The minimum parallelism the autoscaler can use.</td>
+        </tr>
+    </tbody>
+</table>
diff --git 
a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
 
b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
index 43c81b42..0f302dcd 100644
--- 
a/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
+++ 
b/flink-kubernetes-docs/src/main/java/org/apache/flink/kubernetes/operator/docs/configuration/ConfigOptionsDocGenerator.java
@@ -74,7 +74,11 @@ public class ConfigOptionsDocGenerator {
                 new OptionsClassLocation(
                         "flink-kubernetes-operator", 
"org.apache.flink.kubernetes.operator.config"),
                 new OptionsClassLocation(
-                        "flink-kubernetes-operator", 
"org.apache.flink.kubernetes.operator.metrics")
+                        "flink-kubernetes-operator",
+                        "org.apache.flink.kubernetes.operator.metrics"),
+                new OptionsClassLocation(
+                        "flink-kubernetes-operator",
+                        
"org.apache.flink.kubernetes.operator.autoscaler.config")
             };
     static final String DEFAULT_PATH_PREFIX = "src/main/java";
 

Reply via email to