[ 
https://issues.apache.org/jira/browse/FLINK-38538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18038396#comment-18038396
 ] 

Michael Koepf commented on FLINK-38538:
---------------------------------------

Hi [~varunpathania],

> Can we have a configuration or scaling policy/mode to the Flink autoscaler 
> that allows scaling (up or down) {*}primarily or solely on busy metric{*}.

In case you haven’t come across it, I want to point out that the Flink 
autoscaling mechanism is designed to be 
[extensible|https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/#extensibility-of-autoscaler].
 However, I haven't used this feature myself yet.

Probably the quickest path forward would be to use the existing autoscaler 
logic as a starting point and tailor it to your needs.

--
Best,
Michael

> Flink Autoscaler: Need Option to Autoscale Based Primarily (or Solely) on 
> Busy Metric, Not Output Ratio
> -------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-38538
>                 URL: https://issues.apache.org/jira/browse/FLINK-38538
>             Project: Flink
>          Issue Type: Technical Debt
>          Components: Autoscaler
>    Affects Versions: 1.19.0
>            Reporter: Varun
>            Priority: Minor
>
> *Components:* Autoscaler, Flink Kubernetes Operator
> *Affects Version:*
>  * Flink: 1.19.2
>  * Flink Kubernetes Operator: 1.13.0
> *Environment:*
>  * Flink job launched as a FlinkDeployment on an on-premise Kubernetes 
> cluster managed by Rancher, in application mode.
> Typical deployment config (excerpt):
> {code:java}
> jobmanager.scheduler: adaptive
> job.autoscaler.enabled: "true" 
> job.autoscaler.stabilization.interval: "5m"
> job.autoscaler.metrics.window: "5m"
> job.autoscaler.utilization.target: "0.4"
> job.autoscaler.utilization.max: "0.6"
> job.autoscaler.utilization.min: "0.2"
> job.autoscaler.vertex.min-parallelism: "10"
> job.autoscaler.vertex.max-parallelism: "200"
> job.autoscaler.scale-up.grace-period: "5m"
> job.autoscaler.metrics.busy-time.aggregator: "MAX"{code}
> *Problem Statement:*
> In the current autoscaler implementation, scaling decisions are made using 
> both operator busy time ({*}busyTimeMsPerSecond{*}) and the *output ratio* 
> (i.e., the ratio of output records to input records per edge), combined as 
> part of the recursive target data rate calculation.
> However, we observe cases where an operator/sub-job remains *100% busy* 
> across all subtasks, yet is aggressively scaled _down,_ sometimes to bare 
> minimum parallelism, purely because the autoscaler's recursively computed 
> output ratio (or downstream demand) is low. This reflects scenarios with 
> heavy filtering, aggregations, or temporarily slow/blocked sinks.
> There is {*}currently no way to configure the Flink autoscaler to prioritize, 
> or exclusively use, the busy metric for scaling decisions{*}, even if this 
> would be more appropriate for certain classes of workloads.
> ----
> *Behavior Observed (with Log Snapshots):*
> We provide a multi-stage pipeline where one "hot" vertex is highly loaded:
>  * *Vertex scaling metrics* (all subtasks busy):
> {code:java}
> Vertex scaling metrics for 20b73495bccfee0c65322e5852f3f496: 
> {ACCUMULATED_BUSY_TIME=4021203.0, NUM_RECORDS_IN=47229.0, LOAD=1.0, 
> NUM_RECORDS_OUT=7834.0}{code}
>  * *Autoscaler parallelism overrides:* (scaling up due to busy metric)
> {code:java}
> {{[DEBUG][flink/example-flink-pipeline] Applying parallelism overrides: { 
> ..., 20b73495bccfee0c65322e5852f3f496=44, ... }}}{code}
>  * *Output ratio computed near zero:*
> {code:java}
> Computed output ratio for edge (f134ca61df556898f135a2c691a13fc5 -> 
> 20b73495bccfee0c65322e5852f3f496): 0.0{code}
>  * *Target processing capacity forcibly goes to zero, so scaling down is 
> triggered:*
> {code:java}
> {{Vertex 20b73495bccfee0c65322e5852f3f496 processing rate 404.562 is outside 
> (0.0, 0.0)
> [DEBUG] Applying parallelism overrides: {..., 
> 20b73495bccfee0c65322e5852f3f496=18, ...}
> [DEBUG] Applying parallelism overrides: {..., 
> 20b73495bccfee0c65322e5852f3f496=10, ...}}}{code}
>  * *Even at 10 parallelism, the stage remains 100% busy:*
> {code:java}
> {{[{ "id": }}
> {{"busyTimeMsPerSecond", }}
> {{"min": 1000, }}
> {{"max": 1000, }}
> {{"avg": 1000, }}
> "sum": 10000 }]{code}
> ----
> *Interpretation:*
> Even though the operator is maxed out, the autoscaler determines required 
> target rate = 0 due to output ratio/backpropagated demand, scaling down 
> parallelism to minimum and leaving vertices overloaded.
> *Request:*
> Can we have a configuration or scaling policy/mode to the Flink autoscaler 
> that allows scaling (up or down) {*}primarily or solely on busy metric{*}.
>  * Optionally, allow users to disable or strongly de-emphasize the use of 
> output ratio and strictly propagate scaling based on busyTime for selected 
> vertices or globally.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to