[ 
https://issues.apache.org/jira/browse/FLINK-39306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dennis-Mircea Ciupitu updated FLINK-39306:
------------------------------------------
    Description: 
h1. 1. Summary

The autoscaler's metric evaluation pipeline does not use Flink's native 
per-second rate metrics ({{{}numRecordsInPerSecond{}}}, 
{{{}numRecordsOutPerSecond{}}}) for non-source vertices, even though equivalent 
source-specific variants ({{{}Source__.numRecordsInPerSecond*{}}}) are already 
used for sources. Instead, non-source vertices rely exclusively on accumulated 
counters ({{{}numRecordsIn{}}}, {{{}numRecordsOut{}}}) and compute rates via 
endpoint-based interpolation ({{{}getRate{}}}), which only considers the first 
and last values in the metric history window. This produces inaccurate rate 
estimates (particularly in the presence of spikes or uneven throughput) and 
propagates inconsistencies through the entire scaling decision chain.

The same endpoint-based interpolation problem also affects the {{LAG}} metric 
for source vertices. {{getRate(LAG, ...)}} computes {{{}(last_lag - first_lag) 
/ timeDiff{}}}, which dilutes sudden lag spikes across the entire window. For 
example, a history of [0, 1M, 1M, ..., 1M] produces an artificially low lag 
rate because the spike between sample 1 and 2 is spread over the full window 
duration, rather than being properly weighted in its interval.
h1. 2. Root cause

Non-source vertices never collect {{numRecordsInPerSecond}} / 
{{{}numRecordsOutPerSecond{}}}: The {{ScalingMetricCollector}} only requests 
the source-specific per-second metrics 
({{{}SOURCE_TASK_NUM_RECORDS_IN_PER_SEC{}}}) for source vertices. For 
non-source vertices, only accumulated counters are collected, and rates are 
computed by {{getRate()}} (an endpoint-only interpolation: {{{}(last - first) / 
timeDiff{}}}).
Endpoint-based {{getRate}} on {{{}LAG{}}}: {{LAG}} is a gauge that can jump 
abruptly (e.g., 0 → 1M) and then plateau. {{getRate}} only looks at the first 
and last samples, so a spike that occurred early in the window is diluted 
across the entire window duration, producing an artificially low rate of 
change. The intermediate samples, which show the plateau, are completely 
ignored.
h1. 3. Impact

The inaccurate rate estimates cascade through the scaling formula, compounding 
at each step:
1. {*}{{TARGET_DATA_RATE}} underestimated for all vertices{*}: Since 
{{inputRateAvg}} is computed from accumulated counters via endpoint 
interpolation (which smooths out spikes and underestimates bursty throughput), 
and {{lagRate}} for sources is diluted by the same endpoint-only approach, the 
computed {{TARGET_DATA_RATE = inputRate + lagRate}} is systematically smaller 
than the true ingestion rate. This also means {{SCALE_UP_RATE_THRESHOLD}} and 
{{{}SCALE_DOWN_RATE_THRESHOLD{}}}, which are derived from 
{{{}TARGET_DATA_RATE{}}}, are too low.
2. {*}{{TRUE_PROCESSING_RATE}} overestimated for non-source vertices{*}: 
Non-source vertices that cannot rely on {{OBSERVED_TPR}} fallback to computing 
true processing rate from {{{}busyTimeAvg{}}}. When the underlying input rate 
is underestimated (due to endpoint-only interpolation), the ratio {{inputRate}} 
/ {{busyTimeFraction}} produces an inflated {{{}TRUE_PROCESSING_RATE{}}}.
3. {*}scalingFactor is too small{*}: The scaling factor is derived from 
{{TARGET_DATA_RATE}} / {{{}TRUE_PROCESSING_RATE{}}}. With the numerator 
underestimated and the denominator overestimated, the resulting scaling factor 
is significantly smaller than it should be.
4. End-user impact:
 - {*}Excessive scaling iterations{*}: The autoscaler cannot reach the correct 
target parallelism in a single scaling decision. Instead, it takes multiple 
subsequent scalings to converge, each one undershooting because of the same 
systematic bias.
 - {*}Source-to-downstream parallelism inversion in FORWARD shipping 
strategy{*}: Because source vertices use the more accurate per-second metrics 
while downstream vertices use the less accurate accumulated counters, the 
source can end up with a higher parallelism than its downstream 
FORWARD-connected vertex, resulting in a topology violation that can cause 
runtime failures or suboptimal data distribution.

h1. 4. Proposed solution
h2. 4.1. Collect per-second rate metrics for non-source vertices

Add {{FlinkMetric.NUM_RECORDS_IN_PER_SEC}} ({{{}numRecordsInPerSecond{}}}) and 
{{{}FlinkMetric.NUM_RECORDS_OUT_PER_SEC (numRecordsOutPerSecond{}}}) to the 
metric collection pipeline for non-source vertices in 
{{{}ScalingMetricCollector{}}}. Define corresponding 
{{ScalingMetric.NUM_RECORDS_IN_PER_SECOND}} and 
{{ScalingMetric.NUM_RECORDS_OUT_PER_SECOND}} entries, and store them in 
{{{}ScalingMetrics.computeDataRateMetrics{}}}.
h2. 4.2. Prefer per-second metrics with accumulated-counter fallback

Introduce {{getAverageWithRateFallback(perSecondMetric, accumulatedMetric, 
...)}} in {{{}ScalingMetricEvaluator{}}}:
 - First tries {{{}getAverage(perSecondMetric){}}}, which is the average of 
Flink's native per-second rate gauge across the metric window. This uses all 
samples, not just endpoints.
 - Falls back to {{getRate(accumulatedMetric)}} if the per-second metric is 
unavailable (NaN or Infinite) preserving backward compatibility with older 
Flink versions or custom sources that don't expose per-second metrics.

h2. 4.3. Use average rate of change for LAG

Replace {{getRate(LAG, ...)}} with {{getAverageRate(LAG, ...)}} which computes 
the average of per-interval deltas across the full metric window, rather than 
endpoint-only interpolation. This correctly weights sudden lag spikes in their 
respective intervals instead of diluting them across the entire window.

  was:
h1. 1. Summary

The autoscaler's metric evaluation pipeline does not use Flink's native 
*per-second rate* metrics (*numRecordsInPerSecond*, *numRecordsOutPerSecond*) 
for non-source vertices, even though equivalent source-specific variants 
(*Source__*.numRecordsInPerSecond*) are already used for sources. Instead, 
non-source vertices rely exclusively on accumulated counters (*numRecordsIn*, 
*numRecordsOut*) and compute rates via endpoint-based interpolation 
(*getRate*), which only considers the first and last values in the metric 
history window. This produces inaccurate rate estimates (particularly in the 
presence of spikes or uneven throughput) and propagates inconsistencies through 
the entire scaling decision chain.

The same endpoint-based interpolation problem also affects the *LAG* metric for 
source vertices. *getRate(LAG, ...)* computes *(last_lag - first_lag) / 
timeDiff*, which dilutes sudden lag spikes across the entire window. For 
example, a history of [0, 1M, 1M, ..., 1M] produces an artificially low lag 
rate because the spike between sample 1 and 2 is spread over the full window 
duration, rather than being properly weighted in its interval.

h1. 2. Root cause
Non-source vertices never collect *numRecordsInPerSecond* / 
*numRecordsOutPerSecond*: The *ScalingMetricCollector* only requests the 
source-specific per-second metrics (*SOURCE_TASK_NUM_RECORDS_IN_PER_SEC*) for 
source vertices. For non-source vertices, only accumulated counters are 
collected, and rates are computed by *getRate()* (an endpoint-only 
interpolation: *(last - first) / timeDiff*).
Endpoint-based *getRate* on *LAG*: *LAG* is a gauge that can jump abruptly 
(e.g., 0 → 1M) and then plateau. *getRate* only looks at the first and last 
samples, so a spike that occurred early in the window is diluted across the 
entire window duration, producing an artificially low rate of change. The 
intermediate samples, which show the plateau, are completely ignored.

h1. 3. Impact

The inaccurate rate estimates cascade through the scaling formula, compounding 
at each step:
1. *TARGET_DATA_RATE underestimated for all vertices*: Since inputRateAvg is 
computed from accumulated counters via endpoint interpolation (which smooths 
out spikes and underestimates bursty throughput), and lagRate for sources is 
diluted by the same endpoint-only approach, the computed *TARGET_DATA_RATE = 
inputRate + lagRate* is systematically smaller than the true ingestion rate. 
This also means *SCALE_UP_RATE_THRESHOLD* and *SCALE_DOWN_RATE_THRESHOLD*, 
which are derived from *TARGET_DATA_RATE*, are too low.
2. *TRUE_PROCESSING_RATE overestimated for non-source vertices*: Non-source 
vertices that cannot rely on OBSERVED_TPR fall back to computing true 
processing rate from busyTimeAvg. When the underlying input rate is 
underestimated (due to endpoint-only interpolation), the ratio inputRate / 
busyTimeFraction produces an inflated TRUE_PROCESSING_RATE.
3. *scalingFactor is too small*: The scaling factor is derived from 
*TARGET_DATA_RATE / TRUE_PROCESSING_RATE*. With the numerator underestimated 
and the denominator overestimated, the resulting scaling factor is 
significantly smaller than it should be.
4. End-user impact:
   - *Excessive scaling iterations*: The autoscaler cannot reach the correct 
target parallelism in a single scaling decision. Instead, it takes multiple 
subsequent scalings to converge, each one undershooting because of the same 
systematic bias.
   - *Source-to-downstream parallelism inversion in FORWARD shipping strategy*: 
Because source vertices use the more accurate per-second metrics while 
downstream vertices use the less accurate accumulated counters, the source can 
end up with a higher parallelism than its downstream FORWARD-connected vertex, 
resulting in a topology violation that can cause runtime failures or suboptimal 
data distribution.

h1. 4. Proposed solution
h2. 4.1. Collect per-second rate metrics for non-source vertices

Add *FlinkMetric.NUM_RECORDS_IN_PER_SEC* (*numRecordsInPerSecond*) and 
*FlinkMetric.NUM_RECORDS_OUT_PER_SEC* (*numRecordsOutPerSecond*) to the metric 
collection pipeline for non-source vertices in *ScalingMetricCollector*. Define 
corresponding *ScalingMetric.NUM_RECORDS_IN_PER_SECOND* and 
*ScalingMetric.NUM_RECORDS_OUT_PER_SECOND* entries, and store them in 
*ScalingMetrics.computeDataRateMetrics*.

h2. 4.2. Prefer per-second metrics with accumulated-counter fallback

Introduce *getAverageWithRateFallback(perSecondMetric, accumulatedMetric, ...)* 
in *ScalingMetricEvaluator*:
- First tries *getAverage(perSecondMetric)*, which is the average of Flink's 
native per-second rate gauge across the metric window. This uses all samples, 
not just endpoints.
- Falls back to *getRate(accumulatedMetric)* if the per-second metric is 
unavailable (NaN or Infinite) preserving backward compatibility with older 
Flink versions or custom sources that don't expose per-second metrics.

h2. 4.3. Use average rate of change for LAG
Replace *getRate(LAG, ...)* with *getAverageRate(LAG, ...)* which computes the 
average of per-interval deltas across the full metric window, rather than 
endpoint-only interpolation. This correctly weights sudden lag spikes in their 
respective intervals instead of diluting them across the entire window.


> Non-source vertices do not use per-second rate metrics, producing inaccurate 
> scaling decisions
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39306
>                 URL: https://issues.apache.org/jira/browse/FLINK-39306
>             Project: Flink
>          Issue Type: Bug
>          Components: Autoscaler, Kubernetes Operator
>    Affects Versions: kubernetes-operator-1.14.0
>            Reporter: Dennis-Mircea Ciupitu
>            Priority: Major
>              Labels: autoscaling, operator, pull-request-available
>             Fix For: kubernetes-operator-1.16.0
>
>
> h1. 1. Summary
> The autoscaler's metric evaluation pipeline does not use Flink's native 
> per-second rate metrics ({{{}numRecordsInPerSecond{}}}, 
> {{{}numRecordsOutPerSecond{}}}) for non-source vertices, even though 
> equivalent source-specific variants ({{{}Source__.numRecordsInPerSecond*{}}}) 
> are already used for sources. Instead, non-source vertices rely exclusively 
> on accumulated counters ({{{}numRecordsIn{}}}, {{{}numRecordsOut{}}}) and 
> compute rates via endpoint-based interpolation ({{{}getRate{}}}), which only 
> considers the first and last values in the metric history window. This 
> produces inaccurate rate estimates (particularly in the presence of spikes or 
> uneven throughput) and propagates inconsistencies through the entire scaling 
> decision chain.
> The same endpoint-based interpolation problem also affects the {{LAG}} metric 
> for source vertices. {{getRate(LAG, ...)}} computes {{{}(last_lag - 
> first_lag) / timeDiff{}}}, which dilutes sudden lag spikes across the entire 
> window. For example, a history of [0, 1M, 1M, ..., 1M] produces an 
> artificially low lag rate because the spike between sample 1 and 2 is spread 
> over the full window duration, rather than being properly weighted in its 
> interval.
> h1. 2. Root cause
> Non-source vertices never collect {{numRecordsInPerSecond}} / 
> {{{}numRecordsOutPerSecond{}}}: The {{ScalingMetricCollector}} only requests 
> the source-specific per-second metrics 
> ({{{}SOURCE_TASK_NUM_RECORDS_IN_PER_SEC{}}}) for source vertices. For 
> non-source vertices, only accumulated counters are collected, and rates are 
> computed by {{getRate()}} (an endpoint-only interpolation: {{{}(last - first) 
> / timeDiff{}}}).
> Endpoint-based {{getRate}} on {{{}LAG{}}}: {{LAG}} is a gauge that can jump 
> abruptly (e.g., 0 → 1M) and then plateau. {{getRate}} only looks at the first 
> and last samples, so a spike that occurred early in the window is diluted 
> across the entire window duration, producing an artificially low rate of 
> change. The intermediate samples, which show the plateau, are completely 
> ignored.
> h1. 3. Impact
> The inaccurate rate estimates cascade through the scaling formula, 
> compounding at each step:
> 1. {*}{{TARGET_DATA_RATE}} underestimated for all vertices{*}: Since 
> {{inputRateAvg}} is computed from accumulated counters via endpoint 
> interpolation (which smooths out spikes and underestimates bursty 
> throughput), and {{lagRate}} for sources is diluted by the same endpoint-only 
> approach, the computed {{TARGET_DATA_RATE = inputRate + lagRate}} is 
> systematically smaller than the true ingestion rate. This also means 
> {{SCALE_UP_RATE_THRESHOLD}} and {{{}SCALE_DOWN_RATE_THRESHOLD{}}}, which are 
> derived from {{{}TARGET_DATA_RATE{}}}, are too low.
> 2. {*}{{TRUE_PROCESSING_RATE}} overestimated for non-source vertices{*}: 
> Non-source vertices that cannot rely on {{OBSERVED_TPR}} fallback to 
> computing true processing rate from {{{}busyTimeAvg{}}}. When the underlying 
> input rate is underestimated (due to endpoint-only interpolation), the ratio 
> {{inputRate}} / {{busyTimeFraction}} produces an inflated 
> {{{}TRUE_PROCESSING_RATE{}}}.
> 3. {*}scalingFactor is too small{*}: The scaling factor is derived from 
> {{TARGET_DATA_RATE}} / {{{}TRUE_PROCESSING_RATE{}}}. With the numerator 
> underestimated and the denominator overestimated, the resulting scaling 
> factor is significantly smaller than it should be.
> 4. End-user impact:
>  - {*}Excessive scaling iterations{*}: The autoscaler cannot reach the 
> correct target parallelism in a single scaling decision. Instead, it takes 
> multiple subsequent scalings to converge, each one undershooting because of 
> the same systematic bias.
>  - {*}Source-to-downstream parallelism inversion in FORWARD shipping 
> strategy{*}: Because source vertices use the more accurate per-second metrics 
> while downstream vertices use the less accurate accumulated counters, the 
> source can end up with a higher parallelism than its downstream 
> FORWARD-connected vertex, resulting in a topology violation that can cause 
> runtime failures or suboptimal data distribution.
> h1. 4. Proposed solution
> h2. 4.1. Collect per-second rate metrics for non-source vertices
> Add {{FlinkMetric.NUM_RECORDS_IN_PER_SEC}} ({{{}numRecordsInPerSecond{}}}) 
> and {{{}FlinkMetric.NUM_RECORDS_OUT_PER_SEC (numRecordsOutPerSecond{}}}) to 
> the metric collection pipeline for non-source vertices in 
> {{{}ScalingMetricCollector{}}}. Define corresponding 
> {{ScalingMetric.NUM_RECORDS_IN_PER_SECOND}} and 
> {{ScalingMetric.NUM_RECORDS_OUT_PER_SECOND}} entries, and store them in 
> {{{}ScalingMetrics.computeDataRateMetrics{}}}.
> h2. 4.2. Prefer per-second metrics with accumulated-counter fallback
> Introduce {{getAverageWithRateFallback(perSecondMetric, accumulatedMetric, 
> ...)}} in {{{}ScalingMetricEvaluator{}}}:
>  - First tries {{{}getAverage(perSecondMetric){}}}, which is the average of 
> Flink's native per-second rate gauge across the metric window. This uses all 
> samples, not just endpoints.
>  - Falls back to {{getRate(accumulatedMetric)}} if the per-second metric is 
> unavailable (NaN or Infinite) preserving backward compatibility with older 
> Flink versions or custom sources that don't expose per-second metrics.
> h2. 4.3. Use average rate of change for LAG
> Replace {{getRate(LAG, ...)}} with {{getAverageRate(LAG, ...)}} which 
> computes the average of per-interval deltas across the full metric window, 
> rather than endpoint-only interpolation. This correctly weights sudden lag 
> spikes in their respective intervals instead of diluting them across the 
> entire window.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to