[ 
https://issues.apache.org/jira/browse/FLINK-39306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dennis-Mircea Ciupitu updated FLINK-39306:
------------------------------------------
    Description: 
h1. Summary

The busy-time {{TRUE_PROCESSING_RATE}} is computed as a ratio, {{busyTimeTpr = 
inputRate / busyTime}}, but the numerator and denominator are estimated over 
the metric window with different methods. The denominator is computed 
consistently with the configured busy-time aggregator, while the numerator 
always uses {{getRate}} on the cumulative records counter. Under the default 
{{MAX}} aggregator the two halves of the ratio therefore use different temporal 
estimators, which is internally inconsistent and can skew the ratio under 
non-uniform metric sampling. This issue aligns the numerator estimator with the 
denominator so the ratio is internally consistent.

h1. Background

{{TRUE_PROCESSING_RATE}} (the vertex capacity used by the scaler) is the larger 
of two sub-paths, a busy-time based estimate and an observed estimate, selected 
by {{selectTprMetric}}.

The busy-time estimate is {{busyTimeTpr = inputRate / (busyTimeAvg / 1000)}}. 
The denominator {{busyTimeAvg}} depends on the 
{{kubernetes.operator.metrics.busy-time.aggregator}} option:

{{AVG}}: {{getRate(ACCUMULATED_BUSY_TIME) / parallelism}}, a cumulative 
(time-integral) rate.
{{MAX}} or {{MIN}} (default {{MAX}}): {{getAverage(LOAD) * 1000}}, an 
arithmetic mean of the per-second busy-time gauge samples.
The numerator {{inputRate}} is always {{getRate(NUM_RECORDS_IN)}}, a cumulative 
endpoint rate.

{{getRate}} (a time-weighted, time-integral average) and {{getAverage}} of a 
per-second gauge (an unweighted sample mean) are two different linear 
estimators. They agree under uniform sampling but diverge under non-uniform 
sampling (bursts, recovery, transients).

h1. Problem

Under the default {{MAX}} aggregator, {{busyTimeTpr}}'s denominator is a 
per-second sample mean while its numerator is a cumulative endpoint rate. 
Because the value is a ratio of two co-varying quantities, using a single 
shared estimator for both lets their common sampling weighting cancel, whereas 
mixing estimators leaves the denominator's sampling artifact in the result. The 
numerator is also the odd one out relative to the observed estimate it is 
compared against in {{selectTprMetric}}, which is built entirely from 
per-second gauges.

h1. Goal

Make the busy-time {{TRUE_PROCESSING_RATE}} numerator follow the same estimator 
as its busy-time denominator: per-second gauge mean under {{MAX}} or {{MIN}}, 
cumulative {{getRate}} under {{AVG}}. This is an internal-consistency fix for 
the ratio. It is scoped to that numerator only. It is not a change to the 
demand or edge data-rate paths, and it does not attempt to revalidate the 
underlying capacity model, the observed-rate formula, or the subtask 
aggregation, which are out of scope.

h1. Notes

Behavior is unchanged under {{AVG}} and unchanged whenever metric sampling is 
uniform (the common steady state). The new estimator only differs under 
non-uniform sampling. Covered by unit tests.

  was:
h1. 1. Summary

The autoscaler's metric evaluation pipeline does not use Flink's native 
per-second rate metrics ({{{}numRecordsInPerSecond{}}}, 
{{{}numRecordsOutPerSecond{}}}) for non-source vertices, even though equivalent 
source-specific variants ({{{}Source__.numRecordsInPerSecond*{}}}) are already 
used for sources. Instead, non-source vertices rely exclusively on accumulated 
counters ({{{}numRecordsIn{}}}, {{{}numRecordsOut{}}}) and compute rates via 
endpoint-based interpolation ({{{}getRate{}}}), which only considers the first 
and last values in the metric history window. This produces inaccurate rate 
estimates (particularly in the presence of spikes or uneven throughput) and 
propagates inconsistencies through the entire scaling decision chain.

The same endpoint-based interpolation problem also affects the {{LAG}} metric 
for source vertices. {{getRate(LAG, ...)}} computes {{{}(last_lag - first_lag) 
/ timeDiff{}}}, which dilutes sudden lag spikes across the entire window. For 
example, a history of [0, 1M, 1M, ..., 1M] produces an artificially low lag 
rate because the spike between sample 1 and 2 is spread over the full window 
duration, rather than being properly weighted in its interval.
h1. 2. Root cause

Non-source vertices never collect {{numRecordsInPerSecond}} / 
{{{}numRecordsOutPerSecond{}}}: The {{ScalingMetricCollector}} only requests 
the source-specific per-second metrics 
({{{}SOURCE_TASK_NUM_RECORDS_IN_PER_SEC{}}}) for source vertices. For 
non-source vertices, only accumulated counters are collected, and rates are 
computed by {{getRate()}} (an endpoint-only interpolation: {{{}(last - first) / 
timeDiff{}}}).
Endpoint-based {{getRate}} on {{{}LAG{}}}: {{LAG}} is a gauge that can jump 
abruptly (e.g., 0 → 1M) and then plateau. {{getRate}} only looks at the first 
and last samples, so a spike that occurred early in the window is diluted 
across the entire window duration, producing an artificially low rate of 
change. The intermediate samples, which show the plateau, are completely 
ignored.
h1. 3. Impact

The inaccurate rate estimates cascade through the scaling formula, compounding 
at each step:
1. {*}{{TARGET_DATA_RATE}} underestimated for all vertices{*}: Since 
{{inputRateAvg}} is computed from accumulated counters via endpoint 
interpolation (which smooths out spikes and underestimates bursty throughput), 
and {{lagRate}} for sources is diluted by the same endpoint-only approach, the 
computed {{TARGET_DATA_RATE = inputRate + lagRate}} is systematically smaller 
than the true ingestion rate. This also means {{SCALE_UP_RATE_THRESHOLD}} and 
{{{}SCALE_DOWN_RATE_THRESHOLD{}}}, which are derived from 
{{{}TARGET_DATA_RATE{}}}, are too low.
2. {*}{{TRUE_PROCESSING_RATE}} overestimated for non-source vertices{*}: 
Non-source vertices that cannot rely on {{OBSERVED_TPR}} fallback to computing 
true processing rate from {{{}busyTimeAvg{}}}. When the underlying input rate 
is underestimated (due to endpoint-only interpolation), the ratio {{inputRate}} 
/ {{busyTimeFraction}} produces an inflated {{{}TRUE_PROCESSING_RATE{}}}.
3. {*}scalingFactor is too small{*}: The scaling factor is derived from 
{{TARGET_DATA_RATE}} / {{{}TRUE_PROCESSING_RATE{}}}. With the numerator 
underestimated and the denominator overestimated, the resulting scaling factor 
is significantly smaller than it should be.
4. End-user impact:
 - {*}Excessive scaling iterations{*}: The autoscaler cannot reach the correct 
target parallelism in a single scaling decision. Instead, it takes multiple 
subsequent scalings to converge, each one undershooting because of the same 
systematic bias.
 - {*}Source-to-downstream parallelism inversion in FORWARD shipping 
strategy{*}: Because source vertices use the more accurate per-second metrics 
while downstream vertices use the less accurate accumulated counters, the 
source can end up with a higher parallelism than its downstream 
FORWARD-connected vertex, resulting in a topology violation that can cause 
runtime failures or suboptimal data distribution.

h1. 4. Proposed solution
h2. 4.1. Collect per-second rate metrics for non-source vertices

Add {{FlinkMetric.NUM_RECORDS_IN_PER_SEC}} ({{{}numRecordsInPerSecond{}}}) and 
{{{}FlinkMetric.NUM_RECORDS_OUT_PER_SEC (numRecordsOutPerSecond{}}}) to the 
metric collection pipeline for non-source vertices in 
{{{}ScalingMetricCollector{}}}. Define corresponding 
{{ScalingMetric.NUM_RECORDS_IN_PER_SECOND}} and 
{{ScalingMetric.NUM_RECORDS_OUT_PER_SECOND}} entries, and store them in 
{{{}ScalingMetrics.computeDataRateMetrics{}}}.
h2. 4.2. Prefer per-second metrics with accumulated-counter fallback

Introduce {{getAverageWithRateFallback(perSecondMetric, accumulatedMetric, 
...)}} in {{{}ScalingMetricEvaluator{}}}:
 - First tries {{{}getAverage(perSecondMetric){}}}, which is the average of 
Flink's native per-second rate gauge across the metric window. This uses all 
samples, not just endpoints.
 - Falls back to {{getRate(accumulatedMetric)}} if the per-second metric is 
unavailable (NaN or Infinite) preserving backward compatibility with older 
Flink versions or custom sources that don't expose per-second metrics.

h2. 4.3. Use average rate of change for LAG

Replace {{getRate(LAG, ...)}} with {{getAverageRate(LAG, ...)}} which computes 
the average of per-interval deltas across the full metric window, rather than 
endpoint-only interpolation. This correctly weights sudden lag spikes in their 
respective intervals instead of diluting them across the entire window.


> Non-source vertices do not use per-second rate metrics, producing inaccurate 
> scaling decisions
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39306
>                 URL: https://issues.apache.org/jira/browse/FLINK-39306
>             Project: Flink
>          Issue Type: Bug
>          Components: Autoscaler, Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.14.0
>            Reporter: Dennis-Mircea Ciupitu
>            Priority: Major
>              Labels: autoscaling, operator, pull-request-available
>             Fix For: kubernetes-operator-1.16.0
>
>
> h1. Summary
> The busy-time {{TRUE_PROCESSING_RATE}} is computed as a ratio, {{busyTimeTpr 
> = inputRate / busyTime}}, but the numerator and denominator are estimated 
> over the metric window with different methods. The denominator is computed 
> consistently with the configured busy-time aggregator, while the numerator 
> always uses {{getRate}} on the cumulative records counter. Under the default 
> {{MAX}} aggregator the two halves of the ratio therefore use different 
> temporal estimators, which is internally inconsistent and can skew the ratio 
> under non-uniform metric sampling. This issue aligns the numerator estimator 
> with the denominator so the ratio is internally consistent.
> h1. Background
> {{TRUE_PROCESSING_RATE}} (the vertex capacity used by the scaler) is the 
> larger of two sub-paths, a busy-time based estimate and an observed estimate, 
> selected by {{selectTprMetric}}.
> The busy-time estimate is {{busyTimeTpr = inputRate / (busyTimeAvg / 1000)}}. 
> The denominator {{busyTimeAvg}} depends on the 
> {{kubernetes.operator.metrics.busy-time.aggregator}} option:
> {{AVG}}: {{getRate(ACCUMULATED_BUSY_TIME) / parallelism}}, a cumulative 
> (time-integral) rate.
> {{MAX}} or {{MIN}} (default {{MAX}}): {{getAverage(LOAD) * 1000}}, an 
> arithmetic mean of the per-second busy-time gauge samples.
> The numerator {{inputRate}} is always {{getRate(NUM_RECORDS_IN)}}, a 
> cumulative endpoint rate.
> {{getRate}} (a time-weighted, time-integral average) and {{getAverage}} of a 
> per-second gauge (an unweighted sample mean) are two different linear 
> estimators. They agree under uniform sampling but diverge under non-uniform 
> sampling (bursts, recovery, transients).
> h1. Problem
> Under the default {{MAX}} aggregator, {{busyTimeTpr}}'s denominator is a 
> per-second sample mean while its numerator is a cumulative endpoint rate. 
> Because the value is a ratio of two co-varying quantities, using a single 
> shared estimator for both lets their common sampling weighting cancel, 
> whereas mixing estimators leaves the denominator's sampling artifact in the 
> result. The numerator is also the odd one out relative to the observed 
> estimate it is compared against in {{selectTprMetric}}, which is built 
> entirely from per-second gauges.
> h1. Goal
> Make the busy-time {{TRUE_PROCESSING_RATE}} numerator follow the same 
> estimator as its busy-time denominator: per-second gauge mean under {{MAX}} 
> or {{MIN}}, cumulative {{getRate}} under {{AVG}}. This is an 
> internal-consistency fix for the ratio. It is scoped to that numerator only. 
> It is not a change to the demand or edge data-rate paths, and it does not 
> attempt to revalidate the underlying capacity model, the observed-rate 
> formula, or the subtask aggregation, which are out of scope.
> h1. Notes
> Behavior is unchanged under {{AVG}} and unchanged whenever metric sampling is 
> uniform (the common steady state). The new estimator only differs under 
> non-uniform sampling. Covered by unit tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to