[
https://issues.apache.org/jira/browse/FLINK-39306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dennis-Mircea Ciupitu updated FLINK-39306:
------------------------------------------
Description:
h1. Summary
The busy-time {{TRUE_PROCESSING_RATE}} is computed as a ratio, {{busyTimeTpr =
inputRate / busyTime}}, but the numerator and denominator are estimated over
the metric window with different methods. The denominator is computed
consistently with the configured busy-time aggregator, while the numerator
always uses {{getRate}} on the cumulative records counter. Under the default
{{MAX}} aggregator the two halves of the ratio therefore use different temporal
estimators, which is internally inconsistent and can skew the ratio under
non-uniform metric sampling. This issue aligns the numerator estimator with the
denominator so the ratio is internally consistent.
h1. Background
{{TRUE_PROCESSING_RATE}} (the vertex capacity used by the scaler) is the larger
of two sub-paths, a busy-time based estimate and an observed estimate, selected
by {{selectTprMetric}}.
The busy-time estimate is {{busyTimeTpr = inputRate / (busyTimeAvg / 1000)}}.
The denominator {{busyTimeAvg}} depends on the
{{kubernetes.operator.metrics.busy-time.aggregator}} option:
{{AVG}}: {{getRate(ACCUMULATED_BUSY_TIME) / parallelism}}, a cumulative
(time-integral) rate.
{{MAX}} or {{MIN}} (default {{MAX}}): {{getAverage(LOAD) * 1000}}, an
arithmetic mean of the per-second busy-time gauge samples.
The numerator {{inputRate}} is always {{getRate(NUM_RECORDS_IN)}}, a cumulative
endpoint rate.
{{getRate}} (a time-weighted, time-integral average) and {{getAverage}} of a
per-second gauge (an unweighted sample mean) are two different linear
estimators. They agree under uniform sampling but diverge under non-uniform
sampling (bursts, recovery, transients).
h1. Problem
Under the default {{MAX}} aggregator, {{busyTimeTpr}}'s denominator is a
per-second sample mean while its numerator is a cumulative endpoint rate.
Because the value is a ratio of two co-varying quantities, using a single
shared estimator for both lets their common sampling weighting cancel, whereas
mixing estimators leaves the denominator's sampling artifact in the result. The
numerator is also the odd one out relative to the observed estimate it is
compared against in {{selectTprMetric}}, which is built entirely from
per-second gauges.
h1. Goal
Make the busy-time {{TRUE_PROCESSING_RATE}} numerator follow the same estimator
as its busy-time denominator: per-second gauge mean under {{MAX}} or {{MIN}},
cumulative {{getRate}} under {{AVG}}. This is an internal-consistency fix for
the ratio. It is scoped to that numerator only. It is not a change to the
demand or edge data-rate paths, and it does not attempt to revalidate the
underlying capacity model, the observed-rate formula, or the subtask
aggregation, which are out of scope.
h1. Notes
Behavior is unchanged under {{AVG}} and unchanged whenever metric sampling is
uniform (the common steady state). The new estimator only differs under
non-uniform sampling. Covered by unit tests.
was:
h1. 1. Summary
The autoscaler's metric evaluation pipeline does not use Flink's native
per-second rate metrics ({{{}numRecordsInPerSecond{}}},
{{{}numRecordsOutPerSecond{}}}) for non-source vertices, even though equivalent
source-specific variants ({{{}Source__.numRecordsInPerSecond*{}}}) are already
used for sources. Instead, non-source vertices rely exclusively on accumulated
counters ({{{}numRecordsIn{}}}, {{{}numRecordsOut{}}}) and compute rates via
endpoint-based interpolation ({{{}getRate{}}}), which only considers the first
and last values in the metric history window. This produces inaccurate rate
estimates (particularly in the presence of spikes or uneven throughput) and
propagates inconsistencies through the entire scaling decision chain.
The same endpoint-based interpolation problem also affects the {{LAG}} metric
for source vertices. {{getRate(LAG, ...)}} computes {{{}(last_lag - first_lag)
/ timeDiff{}}}, which dilutes sudden lag spikes across the entire window. For
example, a history of [0, 1M, 1M, ..., 1M] produces an artificially low lag
rate because the spike between sample 1 and 2 is spread over the full window
duration, rather than being properly weighted in its interval.
h1. 2. Root cause
Non-source vertices never collect {{numRecordsInPerSecond}} /
{{{}numRecordsOutPerSecond{}}}: The {{ScalingMetricCollector}} only requests
the source-specific per-second metrics
({{{}SOURCE_TASK_NUM_RECORDS_IN_PER_SEC{}}}) for source vertices. For
non-source vertices, only accumulated counters are collected, and rates are
computed by {{getRate()}} (an endpoint-only interpolation: {{{}(last - first) /
timeDiff{}}}).
Endpoint-based {{getRate}} on {{{}LAG{}}}: {{LAG}} is a gauge that can jump
abruptly (e.g., 0 → 1M) and then plateau. {{getRate}} only looks at the first
and last samples, so a spike that occurred early in the window is diluted
across the entire window duration, producing an artificially low rate of
change. The intermediate samples, which show the plateau, are completely
ignored.
h1. 3. Impact
The inaccurate rate estimates cascade through the scaling formula, compounding
at each step:
1. {*}{{TARGET_DATA_RATE}} underestimated for all vertices{*}: Since
{{inputRateAvg}} is computed from accumulated counters via endpoint
interpolation (which smooths out spikes and underestimates bursty throughput),
and {{lagRate}} for sources is diluted by the same endpoint-only approach, the
computed {{TARGET_DATA_RATE = inputRate + lagRate}} is systematically smaller
than the true ingestion rate. This also means {{SCALE_UP_RATE_THRESHOLD}} and
{{{}SCALE_DOWN_RATE_THRESHOLD{}}}, which are derived from
{{{}TARGET_DATA_RATE{}}}, are too low.
2. {*}{{TRUE_PROCESSING_RATE}} overestimated for non-source vertices{*}:
Non-source vertices that cannot rely on {{OBSERVED_TPR}} fallback to computing
true processing rate from {{{}busyTimeAvg{}}}. When the underlying input rate
is underestimated (due to endpoint-only interpolation), the ratio {{inputRate}}
/ {{busyTimeFraction}} produces an inflated {{{}TRUE_PROCESSING_RATE{}}}.
3. {*}scalingFactor is too small{*}: The scaling factor is derived from
{{TARGET_DATA_RATE}} / {{{}TRUE_PROCESSING_RATE{}}}. With the numerator
underestimated and the denominator overestimated, the resulting scaling factor
is significantly smaller than it should be.
4. End-user impact:
- {*}Excessive scaling iterations{*}: The autoscaler cannot reach the correct
target parallelism in a single scaling decision. Instead, it takes multiple
subsequent scalings to converge, each one undershooting because of the same
systematic bias.
- {*}Source-to-downstream parallelism inversion in FORWARD shipping
strategy{*}: Because source vertices use the more accurate per-second metrics
while downstream vertices use the less accurate accumulated counters, the
source can end up with a higher parallelism than its downstream
FORWARD-connected vertex, resulting in a topology violation that can cause
runtime failures or suboptimal data distribution.
h1. 4. Proposed solution
h2. 4.1. Collect per-second rate metrics for non-source vertices
Add {{FlinkMetric.NUM_RECORDS_IN_PER_SEC}} ({{{}numRecordsInPerSecond{}}}) and
{{{}FlinkMetric.NUM_RECORDS_OUT_PER_SEC (numRecordsOutPerSecond{}}}) to the
metric collection pipeline for non-source vertices in
{{{}ScalingMetricCollector{}}}. Define corresponding
{{ScalingMetric.NUM_RECORDS_IN_PER_SECOND}} and
{{ScalingMetric.NUM_RECORDS_OUT_PER_SECOND}} entries, and store them in
{{{}ScalingMetrics.computeDataRateMetrics{}}}.
h2. 4.2. Prefer per-second metrics with accumulated-counter fallback
Introduce {{getAverageWithRateFallback(perSecondMetric, accumulatedMetric,
...)}} in {{{}ScalingMetricEvaluator{}}}:
- First tries {{{}getAverage(perSecondMetric){}}}, which is the average of
Flink's native per-second rate gauge across the metric window. This uses all
samples, not just endpoints.
- Falls back to {{getRate(accumulatedMetric)}} if the per-second metric is
unavailable (NaN or Infinite) preserving backward compatibility with older
Flink versions or custom sources that don't expose per-second metrics.
h2. 4.3. Use average rate of change for LAG
Replace {{getRate(LAG, ...)}} with {{getAverageRate(LAG, ...)}} which computes
the average of per-interval deltas across the full metric window, rather than
endpoint-only interpolation. This correctly weights sudden lag spikes in their
respective intervals instead of diluting them across the entire window.
> Non-source vertices do not use per-second rate metrics, producing inaccurate
> scaling decisions
> ----------------------------------------------------------------------------------------------
>
> Key: FLINK-39306
> URL: https://issues.apache.org/jira/browse/FLINK-39306
> Project: Flink
> Issue Type: Bug
> Components: Autoscaler, Kubernetes Operator
> Affects Versions: kubernetes-operator-1.14.0
> Reporter: Dennis-Mircea Ciupitu
> Priority: Major
> Labels: autoscaling, operator, pull-request-available
> Fix For: kubernetes-operator-1.16.0
>
>
> h1. Summary
> The busy-time {{TRUE_PROCESSING_RATE}} is computed as a ratio, {{busyTimeTpr
> = inputRate / busyTime}}, but the numerator and denominator are estimated
> over the metric window with different methods. The denominator is computed
> consistently with the configured busy-time aggregator, while the numerator
> always uses {{getRate}} on the cumulative records counter. Under the default
> {{MAX}} aggregator the two halves of the ratio therefore use different
> temporal estimators, which is internally inconsistent and can skew the ratio
> under non-uniform metric sampling. This issue aligns the numerator estimator
> with the denominator so the ratio is internally consistent.
> h1. Background
> {{TRUE_PROCESSING_RATE}} (the vertex capacity used by the scaler) is the
> larger of two sub-paths, a busy-time based estimate and an observed estimate,
> selected by {{selectTprMetric}}.
> The busy-time estimate is {{busyTimeTpr = inputRate / (busyTimeAvg / 1000)}}.
> The denominator {{busyTimeAvg}} depends on the
> {{kubernetes.operator.metrics.busy-time.aggregator}} option:
> {{AVG}}: {{getRate(ACCUMULATED_BUSY_TIME) / parallelism}}, a cumulative
> (time-integral) rate.
> {{MAX}} or {{MIN}} (default {{MAX}}): {{getAverage(LOAD) * 1000}}, an
> arithmetic mean of the per-second busy-time gauge samples.
> The numerator {{inputRate}} is always {{getRate(NUM_RECORDS_IN)}}, a
> cumulative endpoint rate.
> {{getRate}} (a time-weighted, time-integral average) and {{getAverage}} of a
> per-second gauge (an unweighted sample mean) are two different linear
> estimators. They agree under uniform sampling but diverge under non-uniform
> sampling (bursts, recovery, transients).
> h1. Problem
> Under the default {{MAX}} aggregator, {{busyTimeTpr}}'s denominator is a
> per-second sample mean while its numerator is a cumulative endpoint rate.
> Because the value is a ratio of two co-varying quantities, using a single
> shared estimator for both lets their common sampling weighting cancel,
> whereas mixing estimators leaves the denominator's sampling artifact in the
> result. The numerator is also the odd one out relative to the observed
> estimate it is compared against in {{selectTprMetric}}, which is built
> entirely from per-second gauges.
> h1. Goal
> Make the busy-time {{TRUE_PROCESSING_RATE}} numerator follow the same
> estimator as its busy-time denominator: per-second gauge mean under {{MAX}}
> or {{MIN}}, cumulative {{getRate}} under {{AVG}}. This is an
> internal-consistency fix for the ratio. It is scoped to that numerator only.
> It is not a change to the demand or edge data-rate paths, and it does not
> attempt to revalidate the underlying capacity model, the observed-rate
> formula, or the subtask aggregation, which are out of scope.
> h1. Notes
> Behavior is unchanged under {{AVG}} and unchanged whenever metric sampling is
> uniform (the common steady state). The new estimator only differs under
> non-uniform sampling. Covered by unit tests.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)