This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 8933435e [FLINK-31318] Improve stability during backlog processing
8933435e is described below

commit 8933435ecc647438967e6f522056b8e560d7750b
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Mar 7 17:03:59 2023 +0100

    [FLINK-31318] Improve stability during backlog processing
---
 .../generated/auto_scaler_configuration.html       |  6 ++
 .../autoscaler/ScalingMetricEvaluator.java         | 81 +++++++++++++++-----
 .../autoscaler/config/AutoScalerOptions.java       |  7 ++
 .../operator/autoscaler/metrics/ScalingMetric.java |  3 +
 .../autoscaler/metrics/ScalingMetrics.java         |  2 +
 .../autoscaler/BacklogBasedScalingTest.java        |  1 -
 .../operator/autoscaler/JobVertexScalerTest.java   |  2 +-
 .../MetricsCollectionAndEvaluationTest.java        |  3 -
 .../operator/autoscaler/ScalingExecutorTest.java   |  2 +-
 .../autoscaler/ScalingMetricEvaluatorTest.java     | 88 +++++++++++++++++++++-
 .../autoscaler/metrics/ScalingMetricsTest.java     | 24 ++++--
 11 files changed, 187 insertions(+), 32 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 85d30e78..f21a16c8 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -8,6 +8,12 @@
         </tr>
     </thead>
     <tbody>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.backlog-processing.lag-threshold</h5></td>
+            <td style="word-wrap: break-word;">5 min</td>
+            <td>Duration</td>
+            <td>Lag threshold which will prevent unnecessary scalings while 
removing the pending messages responsible for the lag.</td>
+        </tr>
         <tr>
             
<td><h5>kubernetes.operator.job.autoscaler.catch-up.duration</h5></td>
             <td style="word-wrap: break-word;">10 min</td>
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index 36b11d49..6b61416b 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -26,22 +26,22 @@ import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.math3.stat.StatUtils;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.time.Clock;
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 
+import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
@@ -58,8 +58,6 @@ public class ScalingMetricEvaluator {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ScalingMetricEvaluator.class);
 
-    private Clock clock = Clock.systemDefaultZone();
-
     public Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluate(
             Configuration conf, CollectedMetrics collectedMetrics) {
 
@@ -67,23 +65,58 @@ public class ScalingMetricEvaluator {
         var metricsHistory = collectedMetrics.getMetricHistory();
         var topology = collectedMetrics.getJobTopology();
 
+        boolean processingBacklog = isProcessingBacklog(topology, 
metricsHistory, conf);
+
         for (var vertex : topology.getVerticesInTopologicalOrder()) {
             scalingOutput.put(
                     vertex,
                     computeVertexScalingSummary(
-                            conf, scalingOutput, metricsHistory, topology, 
vertex));
+                            conf,
+                            scalingOutput,
+                            metricsHistory,
+                            topology,
+                            vertex,
+                            processingBacklog));
         }
 
         return scalingOutput;
     }
 
+    @VisibleForTesting
+    protected static boolean isProcessingBacklog(
+            JobTopology topology,
+            SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricsHistory,
+            Configuration conf) {
+        var lastMetrics = metricsHistory.get(metricsHistory.lastKey());
+        return topology.getVerticesInTopologicalOrder().stream()
+                .filter(topology::isSource)
+                .anyMatch(
+                        vertex -> {
+                            double lag = 
lastMetrics.get(vertex).getOrDefault(LAG, 0.0);
+                            double avgProcRate =
+                                    getAverage(CURRENT_PROCESSING_RATE, 
vertex, metricsHistory);
+                            if (Double.isNaN(avgProcRate)) {
+                                return false;
+                            }
+                            double lagSeconds = lag / avgProcRate;
+                            if (lagSeconds
+                                    > 
conf.get(BACKLOG_PROCESSING_LAG_THRESHOLD).toSeconds()) {
+                                LOG.info("Currently processing backlog at 
source {}", vertex);
+                                return true;
+                            } else {
+                                return false;
+                            }
+                        });
+    }
+
     @NotNull
     private Map<ScalingMetric, EvaluatedScalingMetric> 
computeVertexScalingSummary(
             Configuration conf,
             HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
scalingOutput,
             SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricsHistory,
             JobTopology topology,
-            JobVertexID vertex) {
+            JobVertexID vertex,
+            boolean processingBacklog) {
 
         var latestVertexMetrics = 
metricsHistory.get(metricsHistory.lastKey()).get(vertex);
 
@@ -109,7 +142,7 @@ public class ScalingMetricEvaluator {
                 MAX_PARALLELISM,
                 
EvaluatedScalingMetric.of(topology.getMaxParallelisms().get(vertex)));
 
-        computeProcessingRateThresholds(evaluatedMetrics, conf);
+        computeProcessingRateThresholds(evaluatedMetrics, conf, 
processingBacklog);
 
         var isSink = topology.getOutputs().get(vertex).isEmpty();
         if (!isSink) {
@@ -130,17 +163,31 @@ public class ScalingMetricEvaluator {
 
     @VisibleForTesting
     protected static void computeProcessingRateThresholds(
-            Map<ScalingMetric, EvaluatedScalingMetric> metrics, Configuration 
conf) {
+            Map<ScalingMetric, EvaluatedScalingMetric> metrics,
+            Configuration conf,
+            boolean processingBacklog) {
 
         double utilizationBoundary = 
conf.getDouble(TARGET_UTILIZATION_BOUNDARY);
+        double targetUtilization = conf.get(TARGET_UTILIZATION);
+
+        double upperUtilization;
+        double lowerUtilization;
+
+        if (processingBacklog) {
+            // When we are processing backlog we allow max utilization and we 
do not trigger scale
+            // down on under utilization to avoid creating more lag.
+            upperUtilization = 1.0;
+            lowerUtilization = 0.0;
+        } else {
+            upperUtilization = targetUtilization + utilizationBoundary;
+            lowerUtilization = targetUtilization - utilizationBoundary;
+        }
 
         double scaleUpThreshold =
-                AutoScalerUtils.getTargetProcessingCapacity(
-                        metrics, conf, conf.get(TARGET_UTILIZATION) + 
utilizationBoundary, false);
+                AutoScalerUtils.getTargetProcessingCapacity(metrics, conf, 
upperUtilization, false);
 
         double scaleDownThreshold =
-                AutoScalerUtils.getTargetProcessingCapacity(
-                        metrics, conf, conf.get(TARGET_UTILIZATION) - 
utilizationBoundary, true);
+                AutoScalerUtils.getTargetProcessingCapacity(metrics, conf, 
lowerUtilization, true);
 
         metrics.put(SCALE_UP_RATE_THRESHOLD, 
EvaluatedScalingMetric.of(scaleUpThreshold));
         metrics.put(SCALE_DOWN_RATE_THRESHOLD, 
EvaluatedScalingMetric.of(scaleDownThreshold));
@@ -155,8 +202,7 @@ public class ScalingMetricEvaluator {
             Map<ScalingMetric, Double> latestVertexMetrics,
             Map<ScalingMetric, EvaluatedScalingMetric> out) {
 
-        boolean isSource = topology.getInputs().get(vertex).isEmpty();
-        if (isSource) {
+        if (topology.isSource(vertex)) {
             double catchUpTargetSec = 
conf.get(AutoScalerOptions.CATCH_UP_DURATION).toSeconds();
 
             var sourceRateMetric =
@@ -205,7 +251,7 @@ public class ScalingMetricEvaluator {
         }
     }
 
-    private double getAverage(
+    private static double getAverage(
             ScalingMetric metric,
             JobVertexID jobVertexId,
             SortedMap<Instant, Map<JobVertexID, Map<ScalingMetric, Double>>> 
metricsHistory) {
@@ -217,9 +263,4 @@ public class ScalingMetricEvaluator {
                         .filter(d -> !Double.isNaN(d))
                         .toArray());
     }
-
-    @VisibleForTesting
-    protected void setClock(Clock clock) {
-        this.clock = Preconditions.checkNotNull(clock);
-    }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index 0d5e5435..64643433 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -121,6 +121,13 @@ public class AutoScalerOptions {
                     .withDescription(
                             "Expected restart time to be used until the 
operator can determine it reliably from history.");
 
+    public static final ConfigOption<Duration> 
BACKLOG_PROCESSING_LAG_THRESHOLD =
+            autoScalerConfig("backlog-processing.lag-threshold")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(5))
+                    .withDescription(
+                            "Lag threshold which will prevent unnecessary 
scalings while removing the pending messages responsible for the lag.");
+
     public static final ConfigOption<Boolean> 
SCALING_EFFECTIVENESS_DETECTION_ENABLED =
             autoScalerConfig("scaling.effectiveness.detection.enabled")
                     .booleanType()
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
index aad95d39..1b543ed4 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
@@ -35,6 +35,9 @@ public enum ScalingMetric {
     /** Output rate at full capacity (records/sec). */
     TRUE_OUTPUT_RATE(true),
 
+    /** Current processing rate. */
+    CURRENT_PROCESSING_RATE(true),
+
     /**
      * Incoming data rate to the source, e.g. rate of records written to the 
Kafka topic
      * (records/sec).
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 7e99577e..03fb3ea6 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -96,6 +96,7 @@ public class ScalingMetrics {
                 targetDataRate =
                         
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC).getSum();
             }
+            scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE, 
targetDataRate);
             scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.NaN);
             scalingMetrics.put(ScalingMetric.OUTPUT_RATIO, outputPerSecond / 
targetDataRate);
             var trueOutputRate = busyTimeMultiplier * outputPerSecond;
@@ -128,6 +129,7 @@ public class ScalingMetrics {
                     trueProcessingRate = Double.NaN;
                 }
                 scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, 
trueProcessingRate);
+                scalingMetrics.put(ScalingMetric.CURRENT_PROCESSING_RATE, 
numRecordsInPerSecond);
             } else {
                 LOG.error("Cannot compute true processing rate without 
numRecordsInPerSecond");
             }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 26a1c614..8dba5636 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -337,7 +337,6 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
     private void setClocksTo(Instant time) {
         var clock = Clock.fixed(time, ZoneId.systemDefault());
         metricsCollector.setClock(clock);
-        evaluator.setClock(clock);
         scalingExecutor.setClock(clock);
     }
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index 6e933d1c..d4c3d6a0 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -345,7 +345,7 @@ public class JobVertexScalerTest {
         metrics.put(
                 ScalingMetric.TRUE_PROCESSING_RATE,
                 new EvaluatedScalingMetric(trueProcessingRate, 
trueProcessingRate));
-        ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
+        ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf, 
false);
         return metrics;
     }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 93a3097b..6cc0d839 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -173,7 +173,6 @@ public class MetricsCollectionAndEvaluationTest {
         collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo, 
service, conf);
         assertEquals(3, collectedMetrics.getMetricHistory().size());
 
-        evaluator.setClock(clock);
         var evaluation = evaluator.evaluate(conf, collectedMetrics);
         scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation);
 
@@ -258,7 +257,6 @@ public class MetricsCollectionAndEvaluationTest {
 
         var clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), 
ZoneId.systemDefault());
         metricsCollector.setClock(clock);
-        evaluator.setClock(clock);
 
         var collectedMetrics = metricsCollector.updateMetrics(app, 
scalingInfo, service, conf);
 
@@ -267,7 +265,6 @@ public class MetricsCollectionAndEvaluationTest {
 
         clock = Clock.fixed(Instant.now().plus(Duration.ofSeconds(3)), 
ZoneId.systemDefault());
         metricsCollector.setClock(clock);
-        evaluator.setClock(clock);
 
         metricsCollector.setMetricNames(
                 Map.of(
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 0bc1ecb7..cfe0695e 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -252,7 +252,7 @@ public class ScalingExecutorTest {
         metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, 
EvaluatedScalingMetric.of(catchupRate));
         metrics.put(
                 ScalingMetric.TRUE_PROCESSING_RATE, new 
EvaluatedScalingMetric(procRate, procRate));
-        ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
+        ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf, 
false);
         return metrics;
     }
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
index 7a4af640..3a535763 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
@@ -42,6 +42,7 @@ import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerO
 import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.OUTPUT_RATIO;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
@@ -51,6 +52,8 @@ import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMet
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_OUTPUT_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Scaling evaluator test. */
 public class ScalingMetricEvaluatorTest {
@@ -210,16 +213,99 @@ public class ScalingMetricEvaluatorTest {
         conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2));
         assertEquals(Tuple2.of(1128.0, 1700.0), getThresholds(700, 350, conf));
         assertEquals(Tuple2.of(778.0, 1350.0), getThresholds(700, 0, conf));
+
+        // Test thresholds during catchup periods
+        assertEquals(
+                Tuple2.of(1050., Double.POSITIVE_INFINITY), getThresholds(700, 
350, conf, true));
+        assertEquals(Tuple2.of(700., Double.POSITIVE_INFINITY), 
getThresholds(700, 0, conf, true));
+    }
+
+    @Test
+    public void testBacklogProcessingEvaluation() {
+        var source = new JobVertexID();
+        var sink = new JobVertexID();
+        var conf = new Configuration();
+
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source, Collections.emptySet(), 1, 1),
+                        new VertexInfo(sink, Set.of(source), 1, 1));
+
+        var metricHistory = new TreeMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>();
+
+        // 0 lag
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(LAG, 0., CURRENT_PROCESSING_RATE, 100.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+        assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology, 
metricHistory, conf));
+
+        // Missing lag
+        metricHistory.clear();
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(CURRENT_PROCESSING_RATE, 100.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology, 
metricHistory, conf));
+
+        // Catch up time is more than a minute at avg proc rate (200)
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                LAG,
+                                250.
+                                        * conf.get(
+                                                        AutoScalerOptions
+                                                                
.BACKLOG_PROCESSING_LAG_THRESHOLD)
+                                                .toSeconds(),
+                                CURRENT_PROCESSING_RATE,
+                                300.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        assertTrue(ScalingMetricEvaluator.isProcessingBacklog(topology, 
metricHistory, conf));
+
+        // Catch up time is less than a minute at avg proc rate (200)
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                LAG,
+                                180.
+                                        * conf.get(
+                                                        AutoScalerOptions
+                                                                
.BACKLOG_PROCESSING_LAG_THRESHOLD)
+                                                .toSeconds(),
+                                CURRENT_PROCESSING_RATE,
+                                200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+        assertFalse(ScalingMetricEvaluator.isProcessingBacklog(topology, 
metricHistory, conf));
     }
 
     private Tuple2<Double, Double> getThresholds(
             double inputTargetRate, double catchUpRate, Configuration conf) {
+        return getThresholds(inputTargetRate, catchUpRate, conf, false);
+    }
+
+    private Tuple2<Double, Double> getThresholds(
+            double inputTargetRate, double catchUpRate, Configuration conf, 
boolean catchingUp) {
         var map = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
 
         map.put(TARGET_DATA_RATE, new EvaluatedScalingMetric(Double.NaN, 
inputTargetRate));
         map.put(CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(catchUpRate));
 
-        ScalingMetricEvaluator.computeProcessingRateThresholds(map, conf);
+        ScalingMetricEvaluator.computeProcessingRateThresholds(map, conf, 
catchingUp);
         return Tuple2.of(
                 map.get(SCALE_UP_RATE_THRESHOLD).getCurrent(),
                 map.get(SCALE_DOWN_RATE_THRESHOLD).getCurrent());
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index 9622d0ba..55567cbe 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -73,7 +73,9 @@ public class ScalingMetricsTest {
                         ScalingMetric.OUTPUT_RATIO,
                         2.,
                         ScalingMetric.SOURCE_DATA_RATE,
-                        1015.),
+                        1015.,
+                        ScalingMetric.CURRENT_PROCESSING_RATE,
+                        1000.),
                 scalingMetrics);
 
         // test negative lag growth (catch up)
@@ -101,7 +103,9 @@ public class ScalingMetricsTest {
                         ScalingMetric.OUTPUT_RATIO,
                         2.,
                         ScalingMetric.SOURCE_DATA_RATE,
-                        950.),
+                        950.,
+                        ScalingMetric.CURRENT_PROCESSING_RATE,
+                        1000.),
                 scalingMetrics);
 
         scalingMetrics.clear();
@@ -126,7 +130,9 @@ public class ScalingMetricsTest {
                         ScalingMetric.TRUE_OUTPUT_RATE,
                         20000.,
                         ScalingMetric.OUTPUT_RATIO,
-                        2.),
+                        2.,
+                        ScalingMetric.CURRENT_PROCESSING_RATE,
+                        1000.),
                 scalingMetrics);
 
         // Test using avg busyTime aggregator
@@ -154,7 +160,9 @@ public class ScalingMetricsTest {
                         ScalingMetric.TRUE_OUTPUT_RATE,
                         20000.,
                         ScalingMetric.OUTPUT_RATIO,
-                        2.),
+                        2.,
+                        ScalingMetric.CURRENT_PROCESSING_RATE,
+                        1000.),
                 scalingMetrics);
     }
 
@@ -217,7 +225,9 @@ public class ScalingMetricsTest {
                         ScalingMetric.OUTPUT_RATIO,
                         1.,
                         ScalingMetric.SOURCE_DATA_RATE,
-                        dataRate),
+                        dataRate,
+                        ScalingMetric.CURRENT_PROCESSING_RATE,
+                        10.),
                 scalingMetrics);
     }
 
@@ -235,6 +245,8 @@ public class ScalingMetricsTest {
                         ScalingMetric.OUTPUT_RATIO,
                         1.,
                         ScalingMetric.SOURCE_DATA_RATE,
+                        ScalingMetrics.EFFECTIVELY_ZERO,
+                        ScalingMetric.CURRENT_PROCESSING_RATE,
                         ScalingMetrics.EFFECTIVELY_ZERO),
                 scalingMetrics);
     }
@@ -251,6 +263,8 @@ public class ScalingMetricsTest {
                         ScalingMetric.OUTPUT_RATIO,
                         1.,
                         ScalingMetric.SOURCE_DATA_RATE,
+                        ScalingMetrics.EFFECTIVELY_ZERO,
+                        ScalingMetric.CURRENT_PROCESSING_RATE,
                         ScalingMetrics.EFFECTIVELY_ZERO),
                 scalingMetrics);
     }

Reply via email to