Dennis-Mircea Ciupitu created FLINK-39306:
---------------------------------------------

             Summary: Non-source vertices do not use per-second rate metrics, 
producing inaccurate scaling decisions
                 Key: FLINK-39306
                 URL: https://issues.apache.org/jira/browse/FLINK-39306
             Project: Flink
          Issue Type: Bug
          Components: Autoscaler, Kubernetes Operator
    Affects Versions: 1.15.4
            Reporter: Dennis-Mircea Ciupitu
             Fix For: 1.15.5


h1. 1. Summary

The autoscaler's metric evaluation pipeline does not use Flink's native 
*per-second rate* metrics (*numRecordsInPerSecond*, *numRecordsOutPerSecond*) 
for non-source vertices, even though equivalent source-specific variants 
(*Source__*.numRecordsInPerSecond*) are already used for sources. Instead, 
non-source vertices rely exclusively on accumulated counters (*numRecordsIn*, 
*numRecordsOut*) and compute rates via endpoint-based interpolation 
(*getRate*), which only considers the first and last values in the metric 
history window. This produces inaccurate rate estimates (particularly in the 
presence of spikes or uneven throughput) and propagates inconsistencies through 
the entire scaling decision chain.

The same endpoint-based interpolation problem also affects the *LAG* metric for 
source vertices. *getRate(LAG, ...)* computes *(last_lag - first_lag) / 
timeDiff*, which dilutes sudden lag spikes across the entire window. For 
example, a history of [0, 1M, 1M, ..., 1M] produces an artificially low lag 
rate because the spike between sample 1 and 2 is spread over the full window 
duration, rather than being properly weighted in its interval.

h1. 2. Root cause
Non-source vertices never collect *numRecordsInPerSecond* / 
*numRecordsOutPerSecond*: The *ScalingMetricCollector* only requests the 
source-specific per-second metrics (*SOURCE_TASK_NUM_RECORDS_IN_PER_SEC*) for 
source vertices. For non-source vertices, only accumulated counters are 
collected, and rates are computed by *getRate()* (an endpoint-only 
interpolation: *(last - first) / timeDiff*).
Endpoint-based *getRate* on *LAG*: *LAG* is a gauge that can jump abruptly 
(e.g., 0 → 1M) and then plateau. *getRate* only looks at the first and last 
samples, so a spike that occurred early in the window is diluted across the 
entire window duration, producing an artificially low rate of change. The 
intermediate samples, which show the plateau, are completely ignored.

h1. 3. Impact

The inaccurate rate estimates cascade through the scaling formula, compounding 
at each step:
1. *TARGET_DATA_RATE underestimated for all vertices*: Since inputRateAvg is 
computed from accumulated counters via endpoint interpolation (which smooths 
out spikes and underestimates bursty throughput), and lagRate for sources is 
diluted by the same endpoint-only approach, the computed *TARGET_DATA_RATE = 
inputRate + lagRate* is systematically smaller than the true ingestion rate. 
This also means *SCALE_UP_RATE_THRESHOLD* and *SCALE_DOWN_RATE_THRESHOLD*, 
which are derived from *TARGET_DATA_RATE*, are too low.
2. *TRUE_PROCESSING_RATE overestimated for non-source vertices*: Non-source 
vertices that cannot rely on OBSERVED_TPR fall back to computing true 
processing rate from busyTimeAvg. When the underlying input rate is 
underestimated (due to endpoint-only interpolation), the ratio inputRate / 
busyTimeFraction produces an inflated TRUE_PROCESSING_RATE.
3. *scalingFactor is too small*: The scaling factor is derived from 
*TARGET_DATA_RATE / TRUE_PROCESSING_RATE*. With the numerator underestimated 
and the denominator overestimated, the resulting scaling factor is 
significantly smaller than it should be.
4. End-user impact:
   - *Excessive scaling iterations*: The autoscaler cannot reach the correct 
target parallelism in a single scaling decision. Instead, it takes multiple 
subsequent scalings to converge, each one undershooting because of the same 
systematic bias.
   - *Source-to-downstream parallelism inversion in FORWARD shipping strategy*: 
Because source vertices use the more accurate per-second metrics while 
downstream vertices use the less accurate accumulated counters, the source can 
end up with a higher parallelism than its downstream FORWARD-connected vertex, 
resulting in a topology violation that can cause runtime failures or suboptimal 
data distribution.

h1. 4. Proposed solution
h2. 4.1. Collect per-second rate metrics for non-source vertices

Add *FlinkMetric.NUM_RECORDS_IN_PER_SEC* (*numRecordsInPerSecond*) and 
*FlinkMetric.NUM_RECORDS_OUT_PER_SEC* (*numRecordsOutPerSecond*) to the metric 
collection pipeline for non-source vertices in *ScalingMetricCollector*. Define 
corresponding *ScalingMetric.NUM_RECORDS_IN_PER_SECOND* and 
*ScalingMetric.NUM_RECORDS_OUT_PER_SECOND* entries, and store them in 
*ScalingMetrics.computeDataRateMetrics*.

h2. 4.2. Prefer per-second metrics with accumulated-counter fallback

Introduce *getAverageWithRateFallback(perSecondMetric, accumulatedMetric, ...)* 
in *ScalingMetricEvaluator*:
- First tries *getAverage(perSecondMetric)*, which is the average of Flink's 
native per-second rate gauge across the metric window. This uses all samples, 
not just endpoints.
- Falls back to *getRate(accumulatedMetric)* if the per-second metric is 
unavailable (NaN or Infinite) preserving backward compatibility with older 
Flink versions or custom sources that don't expose per-second metrics.

h2. 4.3. Use average rate of change for LAG
Replace *getRate(LAG, ...)* with *getAverageRate(LAG, ...)* which computes the 
average of per-interval deltas across the full metric window, rather than 
endpoint-only interpolation. This correctly weights sudden lag spikes in their 
respective intervals instead of diluting them across the entire window.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to