Dennis-Mircea Ciupitu created FLINK-39306:
---------------------------------------------
Summary: Non-source vertices do not use per-second rate metrics,
producing inaccurate scaling decisions
Key: FLINK-39306
URL: https://issues.apache.org/jira/browse/FLINK-39306
Project: Flink
Issue Type: Bug
Components: Autoscaler, Kubernetes Operator
Affects Versions: 1.15.4
Reporter: Dennis-Mircea Ciupitu
Fix For: 1.15.5
h1. 1. Summary
The autoscaler's metric evaluation pipeline does not use Flink's native
*per-second rate* metrics (*numRecordsInPerSecond*, *numRecordsOutPerSecond*)
for non-source vertices, even though equivalent source-specific variants
(*Source__*.numRecordsInPerSecond*) are already used for sources. Instead,
non-source vertices rely exclusively on accumulated counters (*numRecordsIn*,
*numRecordsOut*) and compute rates via endpoint-based interpolation
(*getRate*), which only considers the first and last values in the metric
history window. This produces inaccurate rate estimates (particularly in the
presence of spikes or uneven throughput) and propagates inconsistencies through
the entire scaling decision chain.
The same endpoint-based interpolation problem also affects the *LAG* metric for
source vertices. *getRate(LAG, ...)* computes *(last_lag - first_lag) /
timeDiff*, which dilutes sudden lag spikes across the entire window. For
example, a history of [0, 1M, 1M, ..., 1M] produces an artificially low lag
rate because the spike between sample 1 and 2 is spread over the full window
duration, rather than being properly weighted in its interval.
h1. 2. Root cause
Non-source vertices never collect *numRecordsInPerSecond* /
*numRecordsOutPerSecond*: The *ScalingMetricCollector* only requests the
source-specific per-second metrics (*SOURCE_TASK_NUM_RECORDS_IN_PER_SEC*) for
source vertices. For non-source vertices, only accumulated counters are
collected, and rates are computed by *getRate()* (an endpoint-only
interpolation: *(last - first) / timeDiff*).
Endpoint-based *getRate* on *LAG*: *LAG* is a gauge that can jump abruptly
(e.g., 0 → 1M) and then plateau. *getRate* only looks at the first and last
samples, so a spike that occurred early in the window is diluted across the
entire window duration, producing an artificially low rate of change. The
intermediate samples, which show the plateau, are completely ignored.
h1. 3. Impact
The inaccurate rate estimates cascade through the scaling formula, compounding
at each step:
1. *TARGET_DATA_RATE underestimated for all vertices*: Since inputRateAvg is
computed from accumulated counters via endpoint interpolation (which smooths
out spikes and underestimates bursty throughput), and lagRate for sources is
diluted by the same endpoint-only approach, the computed *TARGET_DATA_RATE =
inputRate + lagRate* is systematically smaller than the true ingestion rate.
This also means *SCALE_UP_RATE_THRESHOLD* and *SCALE_DOWN_RATE_THRESHOLD*,
which are derived from *TARGET_DATA_RATE*, are too low.
2. *TRUE_PROCESSING_RATE overestimated for non-source vertices*: Non-source
vertices that cannot rely on OBSERVED_TPR fall back to computing true
processing rate from busyTimeAvg. When the underlying input rate is
underestimated (due to endpoint-only interpolation), the ratio inputRate /
busyTimeFraction produces an inflated TRUE_PROCESSING_RATE.
3. *scalingFactor is too small*: The scaling factor is derived from
*TARGET_DATA_RATE / TRUE_PROCESSING_RATE*. With the numerator underestimated
and the denominator overestimated, the resulting scaling factor is
significantly smaller than it should be.
4. End-user impact:
- *Excessive scaling iterations*: The autoscaler cannot reach the correct
target parallelism in a single scaling decision. Instead, it takes multiple
subsequent scalings to converge, each one undershooting because of the same
systematic bias.
- *Source-to-downstream parallelism inversion in FORWARD shipping strategy*:
Because source vertices use the more accurate per-second metrics while
downstream vertices use the less accurate accumulated counters, the source can
end up with a higher parallelism than its downstream FORWARD-connected vertex,
resulting in a topology violation that can cause runtime failures or suboptimal
data distribution.
h1. 4. Proposed solution
h2. 4.1. Collect per-second rate metrics for non-source vertices
Add *FlinkMetric.NUM_RECORDS_IN_PER_SEC* (*numRecordsInPerSecond*) and
*FlinkMetric.NUM_RECORDS_OUT_PER_SEC* (*numRecordsOutPerSecond*) to the metric
collection pipeline for non-source vertices in *ScalingMetricCollector*. Define
corresponding *ScalingMetric.NUM_RECORDS_IN_PER_SECOND* and
*ScalingMetric.NUM_RECORDS_OUT_PER_SECOND* entries, and store them in
*ScalingMetrics.computeDataRateMetrics*.
h2. 4.2. Prefer per-second metrics with accumulated-counter fallback
Introduce *getAverageWithRateFallback(perSecondMetric, accumulatedMetric, ...)*
in *ScalingMetricEvaluator*:
- First tries *getAverage(perSecondMetric)*, which is the average of Flink's
native per-second rate gauge across the metric window. This uses all samples,
not just endpoints.
- Falls back to *getRate(accumulatedMetric)* if the per-second metric is
unavailable (NaN or Infinite) preserving backward compatibility with older
Flink versions or custom sources that don't expose per-second metrics.
h2. 4.3. Use average rate of change for LAG
Replace *getRate(LAG, ...)* with *getAverageRate(LAG, ...)* which computes the
average of per-interval deltas across the full metric window, rather than
endpoint-only interpolation. This correctly weights sudden lag spikes in their
respective intervals instead of diluting them across the entire window.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)