This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 0c709754 [FLINK-39306][flink-autoscaler] Align the busy-time
TRUE_PROCESSING_RATE numerator estimator with the busy-time aggregator (#1078)
0c709754 is described below
commit 0c7097549ef72a5d9092bce15f6010e1bfa368a2
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Mon Jun 8 18:16:52 2026 +0300
[FLINK-39306][flink-autoscaler] Align the busy-time TRUE_PROCESSING_RATE
numerator estimator with the busy-time aggregator (#1078)
---
.../flink/autoscaler/ScalingMetricCollector.java | 4 ++
.../flink/autoscaler/ScalingMetricEvaluator.java | 42 +++++++++++++++++++-
.../flink/autoscaler/metrics/FlinkMetric.java | 1 +
.../flink/autoscaler/metrics/ScalingMetric.java | 2 +
.../flink/autoscaler/metrics/ScalingMetrics.java | 5 +++
.../flink/autoscaler/JobAutoScalerImplTest.java | 5 +++
.../autoscaler/ScalingMetricEvaluatorTest.java | 46 ++++++++++++++++++++++
7 files changed, 104 insertions(+), 1 deletion(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index 310e063b..d538ab63 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -484,6 +484,10 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
.findAny(allMetricNames)
.ifPresent(
m -> filteredMetrics.put(m,
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT));
+ } else {
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC
+ .findAny(allMetricNames)
+ .ifPresent(m -> filteredMetrics.put(m,
FlinkMetric.NUM_RECORDS_IN_PER_SEC));
}
for (FlinkMetric flinkMetric : requiredMetrics) {
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index b2a4a277..17956ec0 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -158,7 +158,11 @@ public class ScalingMetricEvaluator {
TRUE_PROCESSING_RATE,
EvaluatedScalingMetric.avg(
computeTrueProcessingRate(
- busyTimeAvg, inputRateAvg, metricsHistory,
vertex, conf)));
+ busyTimeAvg,
+ computeTprInputRate(conf, vertex,
metricsHistory),
+ metricsHistory,
+ vertex,
+ conf)));
evaluatedMetrics.put(LOAD, EvaluatedScalingMetric.avg(busyTimeAvg /
1000.));
@@ -494,6 +498,42 @@ public class ScalingMetricEvaluator {
return n < minElements ? Double.NaN : sum / n;
}
+ /**
+ * Estimate the input record rate used as the {@link
ScalingMetric#TRUE_PROCESSING_RATE}
+ * numerator. The busy-time TPR is {@code rate / busyTime}, so to keep the
ratio internally
+ * consistent the numerator must be estimated the same way as the
busy-time denominator (see
+ * {@link #computeBusyTimeAvg}). The default {@code MAX} (and {@code MIN})
busy-time aggregator
+ * derives busy time from the per-second {@code LOAD} gauge averaged over
the window, so we use
+ * the averaged per-second record-rate gauge here too. The {@code AVG}
aggregator derives busy
+ * time from the cumulative accumulated busy-time counter via {@link
#getRate}, so we keep the
+ * cumulative {@code getRate} in that case.
+ */
+ @VisibleForTesting
+ static double computeTprInputRate(
+ Configuration conf,
+ JobVertexID vertex,
+ SortedMap<Instant, CollectedMetrics> metricsHistory) {
+ if (conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR) ==
MetricAggregator.AVG) {
+ return getRate(ScalingMetric.NUM_RECORDS_IN, vertex,
metricsHistory);
+ }
+ return getAverageWithRateFallback(
+ ScalingMetric.NUM_RECORDS_IN_PER_SECOND,
+ ScalingMetric.NUM_RECORDS_IN,
+ vertex,
+ metricsHistory);
+ }
+
+ public static double getAverageWithRateFallback(
+ ScalingMetric metric,
+ ScalingMetric fallbackMetric,
+ @Nullable JobVertexID jobVertexId,
+ SortedMap<Instant, CollectedMetrics> metricsHistory) {
+ double average = getAverage(metric, jobVertexId, metricsHistory);
+ return Double.isInfinite(average) || Double.isNaN(average)
+ ? getRate(fallbackMetric, jobVertexId, metricsHistory)
+ : average;
+ }
+
/**
* Compute the In/Out ratio between the (from, to) vertices. The rate
estimates the number of
* output records produced to the downstream vertex for every input
received for the upstream
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
index 12e99ddc..e968716c 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
@@ -40,6 +40,7 @@ public enum FlinkMetric {
SOURCE_TASK_NUM_RECORDS_IN(s -> s.startsWith("Source__") &&
s.endsWith(".numRecordsIn")),
PENDING_RECORDS(s -> s.endsWith(".pendingRecords")),
BACKPRESSURE_TIME_PER_SEC(s -> s.equals("backPressuredTimeMsPerSecond")),
+ NUM_RECORDS_IN_PER_SEC(s -> s.equals("numRecordsInPerSecond")),
HEAP_MEMORY_MAX(s -> s.equals("Status.JVM.Memory.Heap.Max")),
HEAP_MEMORY_USED(s -> s.equals("Status.JVM.Memory.Heap.Used")),
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index 2b98bdcc..81eb39dc 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -68,6 +68,8 @@ public enum ScalingMetric {
NUM_RECORDS_IN(false),
+ NUM_RECORDS_IN_PER_SECOND(false),
+
NUM_RECORDS_OUT(false),
ACCUMULATED_BUSY_TIME(false),
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index 9069978f..2b0d78c5 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -98,6 +98,11 @@ public class ScalingMetrics {
.orElseGet(observedTprAvg);
scalingMetrics.put(ScalingMetric.OBSERVED_TPR, observedTprOpt);
}
+ } else {
+ var inPerSec =
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+ if (inPerSec != null) {
+ scalingMetrics.put(ScalingMetric.NUM_RECORDS_IN_PER_SECOND,
inPerSec.getSum());
+ }
}
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index fba97f99..334a27a4 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -48,8 +48,10 @@ import org.junit.jupiter.api.Test;
import javax.annotation.Nullable;
+import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
+import java.time.ZoneId;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -114,9 +116,12 @@ public class JobAutoScalerImplTest {
scalingRealizer,
stateStore);
+ var now = Instant.now();
+ autoscaler.setClock(Clock.fixed(now, ZoneId.systemDefault()));
autoscaler.scale(context);
metricsCollector.updateMetrics(jobVertexID, m ->
m.setNumRecordsIn(100));
+ autoscaler.setClock(Clock.fixed(now.plus(Duration.ofSeconds(10)),
ZoneId.systemDefault()));
autoscaler.scale(context);
MetricGroup metricGroup =
autoscaler.flinkMetrics.get(context.getJobKey()).getMetricGroup();
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 03e8fce5..6b530ddf 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -806,6 +806,52 @@ public class ScalingMetricEvaluatorTest {
assertEquals(Double.NaN, ScalingMetricEvaluator.getRate(m2, v2,
history));
}
+ @Test
+ public void computeTprInputRateTest() {
+ var vertex = new JobVertexID();
+ var cumulative = ScalingMetric.NUM_RECORDS_IN;
+ var perSecond = ScalingMetric.NUM_RECORDS_IN_PER_SECOND;
+
+ // Cumulative getRate = 1000 * (100 - 0) / 1000ms = 100/s.
+ // Per-second average = (40 + 60) / 2 = 50/s. The two intentionally
differ so we can tell
+ // which estimator is used.
+ var history = new TreeMap<Instant, CollectedMetrics>();
+ history.put(
+ Instant.ofEpochMilli(1000),
+ new CollectedMetrics(Map.of(vertex, Map.of(cumulative, 0.,
perSecond, 40.)), null));
+ history.put(
+ Instant.ofEpochMilli(2000),
+ new CollectedMetrics(
+ Map.of(vertex, Map.of(cumulative, 100., perSecond,
60.)), null));
+
+ // The default MAX (and MIN) aggregator derives busy time from the
per-second LOAD gauge
+ // mean, so the TPR numerator uses the per-second record-rate gauge
mean to stay consistent.
+ for (var aggregator : new MetricAggregator[] {MetricAggregator.MAX,
MetricAggregator.MIN}) {
+ var conf = new Configuration();
+ conf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, aggregator);
+ assertEquals(50., ScalingMetricEvaluator.computeTprInputRate(conf,
vertex, history));
+ }
+
+ // The AVG aggregator derives busy time from the cumulative counter
via getRate, so the TPR
+ // numerator keeps the cumulative getRate.
+ var avgConf = new Configuration();
+ avgConf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR,
MetricAggregator.AVG);
+ assertEquals(100., ScalingMetricEvaluator.computeTprInputRate(avgConf,
vertex, history));
+
+ // When the per-second gauge is unavailable, MAX falls back to the
cumulative getRate.
+ var noPerSecond = new TreeMap<Instant, CollectedMetrics>();
+ noPerSecond.put(
+ Instant.ofEpochMilli(1000),
+ new CollectedMetrics(Map.of(vertex, Map.of(cumulative, 0.)),
null));
+ noPerSecond.put(
+ Instant.ofEpochMilli(2000),
+ new CollectedMetrics(Map.of(vertex, Map.of(cumulative, 100.)),
null));
+ var maxConf = new Configuration();
+ maxConf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR,
MetricAggregator.MAX);
+ assertEquals(
+ 100., ScalingMetricEvaluator.computeTprInputRate(maxConf,
vertex, noPerSecond));
+ }
+
private Tuple2<Double, Double> getThresholds(
double inputTargetRate, double catchUpRate, Configuration conf) {
return getThresholds(inputTargetRate, catchUpRate, conf, false);