gyfora commented on code in PR #652: URL: https://github.com/apache/flink-kubernetes-operator/pull/652#discussion_r1296783335
########## docs/content/docs/custom-resource/autoscaler.md: ########## @@ -35,26 +35,78 @@ Key benefits to the user: - Automatic adaptation to changing load patterns - Detailed utilization metrics for performance debugging -Job requirements: - - The autoscaler currently only works with the latest [Flink 1.17](https://hub.docker.com/_/flink) or after backporting the following fixes to your 1.15/1.16 Flink image - - Job vertex parallelism overrides (must have) - - [Add option to override job vertex parallelisms during job submission](https://github.com/apache/flink/commit/23ce2281a0bb4047c64def9af7ddd5f19d88e2a9) - - [Change ForwardPartitioner to RebalancePartitioner on parallelism changes](https://github.com/apache/flink/pull/21443) (consists of 5 commits) - - [Fix logic for determining downstream subtasks for partitioner replacement](https://github.com/apache/flink/commit/fb482fe39844efda33a4c05858903f5b64e158a3) - - [Support timespan for busyTime metrics](https://github.com/apache/flink/commit/a7fdab8b23cddf568fa32ee7eb804d7c3eb23a35) (good to have) - - Source scaling only supports modern sources which - - use the new [Source API](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) that exposes the busy time metric (must have, most common connectors already do) - - expose the [standardized connector metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics) for accessing backlog information (good to have, extra capacity will be added for catching up with backlog) +## Overview -In the current state the autoscaler works best with Kafka sources, as they expose all the standardized metrics. It also comes with some additional benefits when using Kafka such as automatically detecting and limiting source max parallelism to the number of Kafka partitions. +The autoscaler relies on the metrics exposed by the Flink metric system for the individual tasks. The metrics are queried directly from the Flink job. -{{< hint info >}} -The autoscaler also supports a passive/metrics-only mode where it only collects and evaluates scaling related performance metrics but does not trigger any job upgrades. -This can be used to gain confidence in the module without any impact on the running applications. +Collected metrics: + - Backlog information at each source + - Incoming data rate at the sources (e.g. records/sec written into the Kafka topic) + - Number of records processed per second in each job vertex + - Busy time per second of each job vertex (current utilization) -To disable scaling actions, set: `kubernetes.operator.job.autoscaler.scaling.enabled: "false"` +{{< hint info >}} +Please note that we are not using any container memory / CPU utilization metrics directly here. High utilization will be reflected in the processing rate and busy time metrics of the individual job vertexes. {{< /hint >}} +The algorithm starts from the sources and recursively computes the required processing capacity (target data rate) for each operator in the pipeline. At the source vertices, target data rate is equal to incoming data rate (from the Kafka topic). + +For downstream operators we compute the target data rate as the sum of the input (upstream) operators output data rate along the given edge in the processing graph. + +{{< img src="/img/custom-resource/autoscaler_fig1.png" alt="Computing Target Data Rates" >}} + +Users configure the target utilization percentage of the operators in the pipeline, e.g. keep the all operators between 60% - 80% busy. The autoscaler then finds a parallelism configuration such that the output rates of all operators match the input rates of all their downstream operators at the targeted utilization. + +In this example we see an upscale operation: + +{{< img src="/img/custom-resource/autoscaler_fig2.png" alt="Scaling Up" >}} + +Similarly as load decreases, the autoscaler adjusts individual operator parallelism levels to match the current rate over time. + +{{< img src="/img/custom-resource/autoscaler_fig3.png" alt="Scaling Down" >}} + +The autoscaler approach is based on [Three steps is all you need: fast, accurate, automatic scaling decisions for distributed streaming dataflows](https://www.usenix.org/system/files/osdi18-kalavri.pdf) by Kalavri et al. + +## Executing rescaling operations + +By default the autoscaler uses the built in job upgrade mechanism from the operator to perform the rescaling as detailed in [Job Management and Stateful upgrades]({{< ref "docs/custom-resource/job-management" >}}). + +### Flink 1.18 and in-place scaling support + +The upcoming Flink 1.18 release brings very significant improvements to the speed of scaling operations through the new resource requirements rest endpoint. +This allows the autoscaler to scale vertices in-place without performing a full job upgrade cycle. + +To try this experimental feature, please use the currently available Flink 1.18 snapshot base image to build you application docker image. +Furthermore make sure you set Flink version to `v1_18` in your FlinkDeployment yaml and enable the adaptive scheduler which is required for this feature. + +``` +jobmanager.scheduler: adaptive +``` + +## Job Requirements and Limitations + +### Requirements + +The autoscaler currently only works with the latest [Flink 1.17](https://hub.docker.com/_/flink) or after backporting the following fixes to your 1.15/1.16 Flink image: Review Comment: makes sense :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
