This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c709754 [FLINK-39306][flink-autoscaler] Align the busy-time 
TRUE_PROCESSING_RATE numerator estimator with the busy-time aggregator (#1078)
0c709754 is described below

commit 0c7097549ef72a5d9092bce15f6010e1bfa368a2
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Mon Jun 8 18:16:52 2026 +0300

    [FLINK-39306][flink-autoscaler] Align the busy-time TRUE_PROCESSING_RATE 
numerator estimator with the busy-time aggregator (#1078)
---
 .../flink/autoscaler/ScalingMetricCollector.java   |  4 ++
 .../flink/autoscaler/ScalingMetricEvaluator.java   | 42 +++++++++++++++++++-
 .../flink/autoscaler/metrics/FlinkMetric.java      |  1 +
 .../flink/autoscaler/metrics/ScalingMetric.java    |  2 +
 .../flink/autoscaler/metrics/ScalingMetrics.java   |  5 +++
 .../flink/autoscaler/JobAutoScalerImplTest.java    |  5 +++
 .../autoscaler/ScalingMetricEvaluatorTest.java     | 46 ++++++++++++++++++++++
 7 files changed, 104 insertions(+), 1 deletion(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index 310e063b..d538ab63 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -484,6 +484,10 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
                     .findAny(allMetricNames)
                     .ifPresent(
                             m -> filteredMetrics.put(m, 
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT));
+        } else {
+            FlinkMetric.NUM_RECORDS_IN_PER_SEC
+                    .findAny(allMetricNames)
+                    .ifPresent(m -> filteredMetrics.put(m, 
FlinkMetric.NUM_RECORDS_IN_PER_SEC));
         }
 
         for (FlinkMetric flinkMetric : requiredMetrics) {
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
index b2a4a277..17956ec0 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricEvaluator.java
@@ -158,7 +158,11 @@ public class ScalingMetricEvaluator {
                 TRUE_PROCESSING_RATE,
                 EvaluatedScalingMetric.avg(
                         computeTrueProcessingRate(
-                                busyTimeAvg, inputRateAvg, metricsHistory, 
vertex, conf)));
+                                busyTimeAvg,
+                                computeTprInputRate(conf, vertex, 
metricsHistory),
+                                metricsHistory,
+                                vertex,
+                                conf)));
 
         evaluatedMetrics.put(LOAD, EvaluatedScalingMetric.avg(busyTimeAvg / 
1000.));
 
@@ -494,6 +498,42 @@ public class ScalingMetricEvaluator {
         return n < minElements ? Double.NaN : sum / n;
     }
 
+    /**
+     * Estimate the input record rate used as the {@link 
ScalingMetric#TRUE_PROCESSING_RATE}
+     * numerator. The busy-time TPR is {@code rate / busyTime}, so to keep the 
ratio internally
+     * consistent the numerator must be estimated the same way as the 
busy-time denominator (see
+     * {@link #computeBusyTimeAvg}). The default {@code MAX} (and {@code MIN}) 
busy-time aggregator
+     * derives busy time from the per-second {@code LOAD} gauge averaged over 
the window, so we use
+     * the averaged per-second record-rate gauge here too. The {@code AVG} 
aggregator derives busy
+     * time from the cumulative accumulated busy-time counter via {@link 
#getRate}, so we keep the
+     * cumulative {@code getRate} in that case.
+     */
+    @VisibleForTesting
+    static double computeTprInputRate(
+            Configuration conf,
+            JobVertexID vertex,
+            SortedMap<Instant, CollectedMetrics> metricsHistory) {
+        if (conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR) == 
MetricAggregator.AVG) {
+            return getRate(ScalingMetric.NUM_RECORDS_IN, vertex, 
metricsHistory);
+        }
+        return getAverageWithRateFallback(
+                ScalingMetric.NUM_RECORDS_IN_PER_SECOND,
+                ScalingMetric.NUM_RECORDS_IN,
+                vertex,
+                metricsHistory);
+    }
+
+    public static double getAverageWithRateFallback(
+            ScalingMetric metric,
+            ScalingMetric fallbackMetric,
+            @Nullable JobVertexID jobVertexId,
+            SortedMap<Instant, CollectedMetrics> metricsHistory) {
+        double average = getAverage(metric, jobVertexId, metricsHistory);
+        return Double.isInfinite(average) || Double.isNaN(average)
+                ? getRate(fallbackMetric, jobVertexId, metricsHistory)
+                : average;
+    }
+
     /**
      * Compute the In/Out ratio between the (from, to) vertices. The rate 
estimates the number of
      * output records produced to the downstream vertex for every input 
received for the upstream
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
index 12e99ddc..e968716c 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/FlinkMetric.java
@@ -40,6 +40,7 @@ public enum FlinkMetric {
     SOURCE_TASK_NUM_RECORDS_IN(s -> s.startsWith("Source__") && 
s.endsWith(".numRecordsIn")),
     PENDING_RECORDS(s -> s.endsWith(".pendingRecords")),
     BACKPRESSURE_TIME_PER_SEC(s -> s.equals("backPressuredTimeMsPerSecond")),
+    NUM_RECORDS_IN_PER_SEC(s -> s.equals("numRecordsInPerSecond")),
 
     HEAP_MEMORY_MAX(s -> s.equals("Status.JVM.Memory.Heap.Max")),
     HEAP_MEMORY_USED(s -> s.equals("Status.JVM.Memory.Heap.Used")),
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
index 2b98bdcc..81eb39dc 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetric.java
@@ -68,6 +68,8 @@ public enum ScalingMetric {
 
     NUM_RECORDS_IN(false),
 
+    NUM_RECORDS_IN_PER_SECOND(false),
+
     NUM_RECORDS_OUT(false),
 
     ACCUMULATED_BUSY_TIME(false),
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
index 9069978f..2b0d78c5 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/ScalingMetrics.java
@@ -98,6 +98,11 @@ public class ScalingMetrics {
                                 .orElseGet(observedTprAvg);
                 scalingMetrics.put(ScalingMetric.OBSERVED_TPR, observedTprOpt);
             }
+        } else {
+            var inPerSec = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+            if (inPerSec != null) {
+                scalingMetrics.put(ScalingMetric.NUM_RECORDS_IN_PER_SECOND, 
inPerSec.getSum());
+            }
         }
     }
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index fba97f99..334a27a4 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -48,8 +48,10 @@ import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nullable;
 
+import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
+import java.time.ZoneId;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -114,9 +116,12 @@ public class JobAutoScalerImplTest {
                         scalingRealizer,
                         stateStore);
 
+        var now = Instant.now();
+        autoscaler.setClock(Clock.fixed(now, ZoneId.systemDefault()));
         autoscaler.scale(context);
 
         metricsCollector.updateMetrics(jobVertexID, m -> 
m.setNumRecordsIn(100));
+        autoscaler.setClock(Clock.fixed(now.plus(Duration.ofSeconds(10)), 
ZoneId.systemDefault()));
         autoscaler.scale(context);
 
         MetricGroup metricGroup = 
autoscaler.flinkMetrics.get(context.getJobKey()).getMetricGroup();
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index 03e8fce5..6b530ddf 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -806,6 +806,52 @@ public class ScalingMetricEvaluatorTest {
         assertEquals(Double.NaN, ScalingMetricEvaluator.getRate(m2, v2, 
history));
     }
 
+    @Test
+    public void computeTprInputRateTest() {
+        var vertex = new JobVertexID();
+        var cumulative = ScalingMetric.NUM_RECORDS_IN;
+        var perSecond = ScalingMetric.NUM_RECORDS_IN_PER_SECOND;
+
+        // Cumulative getRate = 1000 * (100 - 0) / 1000ms = 100/s.
+        // Per-second average = (40 + 60) / 2 = 50/s. The two intentionally 
differ so we can tell
+        // which estimator is used.
+        var history = new TreeMap<Instant, CollectedMetrics>();
+        history.put(
+                Instant.ofEpochMilli(1000),
+                new CollectedMetrics(Map.of(vertex, Map.of(cumulative, 0., 
perSecond, 40.)), null));
+        history.put(
+                Instant.ofEpochMilli(2000),
+                new CollectedMetrics(
+                        Map.of(vertex, Map.of(cumulative, 100., perSecond, 
60.)), null));
+
+        // The default MAX (and MIN) aggregator derives busy time from the 
per-second LOAD gauge
+        // mean, so the TPR numerator uses the per-second record-rate gauge 
mean to stay consistent.
+        for (var aggregator : new MetricAggregator[] {MetricAggregator.MAX, 
MetricAggregator.MIN}) {
+            var conf = new Configuration();
+            conf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, aggregator);
+            assertEquals(50., ScalingMetricEvaluator.computeTprInputRate(conf, 
vertex, history));
+        }
+
+        // The AVG aggregator derives busy time from the cumulative counter 
via getRate, so the TPR
+        // numerator keeps the cumulative getRate.
+        var avgConf = new Configuration();
+        avgConf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, 
MetricAggregator.AVG);
+        assertEquals(100., ScalingMetricEvaluator.computeTprInputRate(avgConf, 
vertex, history));
+
+        // When the per-second gauge is unavailable, MAX falls back to the 
cumulative getRate.
+        var noPerSecond = new TreeMap<Instant, CollectedMetrics>();
+        noPerSecond.put(
+                Instant.ofEpochMilli(1000),
+                new CollectedMetrics(Map.of(vertex, Map.of(cumulative, 0.)), 
null));
+        noPerSecond.put(
+                Instant.ofEpochMilli(2000),
+                new CollectedMetrics(Map.of(vertex, Map.of(cumulative, 100.)), 
null));
+        var maxConf = new Configuration();
+        maxConf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, 
MetricAggregator.MAX);
+        assertEquals(
+                100., ScalingMetricEvaluator.computeTprInputRate(maxConf, 
vertex, noPerSecond));
+    }
+
     private Tuple2<Double, Double> getThresholds(
             double inputTargetRate, double catchUpRate, Configuration conf) {
         return getThresholds(inputTargetRate, catchUpRate, conf, false);

Reply via email to