This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 51d7111c [FLINK-30575] Ensure zero rates lead to scale down (#543)
51d7111c is described below

commit 51d7111c03dfa9744638765869d917142d1cf401
Author: Maximilian Michels <[email protected]>
AuthorDate: Mon Mar 6 19:06:33 2023 +0100

    [FLINK-30575] Ensure zero rates lead to scale down (#543)
    
    The logic for computing true processing rates rely on recordsProcessed /
    recordProcessingTime style computations which do not really work well when
    everything is 0. This leads to no scaling actions when the load suddenly 
drops
    to 0.
    
    The change here takes a pragmatic approach. Instead of special-casing zero
    rates throughout the code, we set rates and busyness to near zero values. 
This
    keeps the code functional as-is while ensuring scale down happens.
---
 .../autoscaler/metrics/ScalingMetrics.java         | 78 +++++++++++++++------
 .../MetricsCollectionAndEvaluationTest.java        | 39 +++++++++++
 .../autoscaler/metrics/ScalingMetricsTest.java     | 81 ++++++++++++++++++++++
 3 files changed, 175 insertions(+), 23 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index adadb365..7e99577e 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -36,6 +36,12 @@ public class ScalingMetrics {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ScalingMetrics.class);
 
+    /**
+     * The minimum value to avoid using zero values which cause side effects 
like division by zero
+     * or out of bounds (infinitive) floats.
+     */
+    public static final double EFFECTIVELY_ZERO = 1e-10;
+
     public static void computeLoadMetrics(
             Map<FlinkMetric, AggregatedMetric> flinkMetrics,
             Map<ScalingMetric, Double> scalingMetrics) {
@@ -73,19 +79,13 @@ public class ScalingMetrics {
             return;
         }
 
-        var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
-        if (numRecordsInPerSecond == null) {
-            numRecordsInPerSecond =
-                    
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
-        }
-
-        var outputPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+        Double numRecordsInPerSecond = getNumRecordsInPerSecond(flinkMetrics, 
jobVertexID);
+        Double outputPerSecond = getNumRecordsOutPerSecond(flinkMetrics, 
jobVertexID, sink);
 
-        double busyTimeMultiplier = 1000 / busyTimeOpt.get();
+        double busyTimeMultiplier = 1000 / keepAboveZero(busyTimeOpt.get());
 
         if (source && !conf.getBoolean(SOURCE_SCALING_ENABLED)) {
-            double sourceInputRate =
-                    numRecordsInPerSecond != null ? 
numRecordsInPerSecond.getSum() : Double.NaN;
+            double sourceInputRate = numRecordsInPerSecond;
 
             double targetDataRate;
             if (!Double.isNaN(sourceInputRate) && sourceInputRate > 0) {
@@ -97,9 +97,8 @@ public class ScalingMetrics {
                         
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC).getSum();
             }
             scalingMetrics.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.NaN);
-            scalingMetrics.put(
-                    ScalingMetric.OUTPUT_RATIO, outputPerSecond.getSum() / 
targetDataRate);
-            var trueOutputRate = busyTimeMultiplier * outputPerSecond.getSum();
+            scalingMetrics.put(ScalingMetric.OUTPUT_RATIO, outputPerSecond / 
targetDataRate);
+            var trueOutputRate = busyTimeMultiplier * outputPerSecond;
             scalingMetrics.put(ScalingMetric.TRUE_OUTPUT_RATE, trueOutputRate);
             scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, trueOutputRate);
             LOG.info(
@@ -108,14 +107,13 @@ public class ScalingMetrics {
                     trueOutputRate);
         } else {
             if (source) {
-                if (!lagGrowthOpt.isPresent() || 
numRecordsInPerSecond.getSum().isNaN()) {
+                if (!lagGrowthOpt.isPresent() || 
numRecordsInPerSecond.isNaN()) {
                     LOG.error(
                             "Cannot compute source target data rate without 
numRecordsInPerSecond and pendingRecords (lag) metric for {}.",
                             jobVertexID);
                     scalingMetrics.put(ScalingMetric.TARGET_DATA_RATE, 
Double.NaN);
                 } else {
-                    double sourceDataRate =
-                            Math.max(0, numRecordsInPerSecond.getSum() + 
lagGrowthOpt.get());
+                    double sourceDataRate = Math.max(0, numRecordsInPerSecond 
+ lagGrowthOpt.get());
                     LOG.info(
                             "Using computed source data rate {} for {}",
                             sourceDataRate,
@@ -124,8 +122,8 @@ public class ScalingMetrics {
                 }
             }
 
-            if (!numRecordsInPerSecond.getSum().isNaN()) {
-                double trueProcessingRate = busyTimeMultiplier * 
numRecordsInPerSecond.getSum();
+            if (!numRecordsInPerSecond.isNaN()) {
+                double trueProcessingRate = busyTimeMultiplier * 
numRecordsInPerSecond;
                 if (trueProcessingRate <= 0 || 
!Double.isFinite(trueProcessingRate)) {
                     trueProcessingRate = Double.NaN;
                 }
@@ -135,13 +133,11 @@ public class ScalingMetrics {
             }
 
             if (!sink) {
-                if (!outputPerSecond.getSum().isNaN()) {
+                if (!outputPerSecond.isNaN()) {
                     scalingMetrics.put(
-                            ScalingMetric.OUTPUT_RATIO,
-                            outputPerSecond.getSum() / 
numRecordsInPerSecond.getSum());
+                            ScalingMetric.OUTPUT_RATIO, outputPerSecond / 
numRecordsInPerSecond);
                     scalingMetrics.put(
-                            ScalingMetric.TRUE_OUTPUT_RATE,
-                            busyTimeMultiplier * outputPerSecond.getSum());
+                            ScalingMetric.TRUE_OUTPUT_RATE, busyTimeMultiplier 
* outputPerSecond);
                 } else {
                     LOG.error(
                             "Cannot compute processing and input rate without 
numRecordsOutPerSecond");
@@ -160,4 +156,40 @@ public class ScalingMetrics {
             scalingMetrics.put(ScalingMetric.LAG, 0.);
         }
     }
+
+    private static Double getNumRecordsInPerSecond(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics, JobVertexID 
jobVertexID) {
+        var numRecordsInPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
+        if (numRecordsInPerSecond == null) {
+            numRecordsInPerSecond =
+                    
flinkMetrics.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
+        }
+        if (numRecordsInPerSecond == null) {
+            LOG.warn("Received null input rate for {}. Returning NaN.", 
jobVertexID);
+            return Double.NaN;
+        }
+        return keepAboveZero(numRecordsInPerSecond.getSum());
+    }
+
+    private static Double getNumRecordsOutPerSecond(
+            Map<FlinkMetric, AggregatedMetric> flinkMetrics,
+            JobVertexID jobVertexID,
+            boolean isSink) {
+        AggregatedMetric aggregatedMetric = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
+        if (aggregatedMetric == null) {
+            if (!isSink) {
+                LOG.warn("Received null output rate for {}. Returning NaN.", 
jobVertexID);
+            }
+            return Double.NaN;
+        }
+        return keepAboveZero(aggregatedMetric.getSum());
+    }
+
+    private static Double keepAboveZero(Double number) {
+        if (number <= 0) {
+            // Make busy time really tiny but not zero
+            return EFFECTIVELY_ZERO;
+        }
+        return number;
+    }
 }
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index ac5724e1..93a3097b 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetrics;
 import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
@@ -397,6 +398,44 @@ public class MetricsCollectionAndEvaluationTest {
         assertEquals(1, scaledParallelism.get(source1));
     }
 
+    @Test
+    public void testScaleDownWithZeroProcessingRate() throws Exception {
+        var topology = new JobTopology(new VertexInfo(source1, Set.of(), 10, 
720));
+
+        metricsCollector = new TestingMetricsCollector(topology);
+        metricsCollector.setCurrentMetrics(
+                Map.of(
+                        // Set source1 metrics without the PENDING_RECORDS 
metric
+                        source1,
+                        Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 0.1, 
Double.NaN, Double.NaN),
+                                FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 0.),
+                                FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 0.))));
+
+        var collectedMetrics = collectMetrics();
+
+        Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluation =
+                evaluator.evaluate(conf, collectedMetrics);
+        assertEquals(
+                ScalingMetrics.EFFECTIVELY_ZERO,
+                
evaluation.get(source1).get(ScalingMetric.TARGET_DATA_RATE).getCurrent());
+        assertEquals(
+                1E-6, 
evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getCurrent());
+        assertEquals(
+                0.,
+                
evaluation.get(source1).get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent());
+        assertEquals(
+                0.,
+                
evaluation.get(source1).get(ScalingMetric.SCALE_UP_RATE_THRESHOLD).getCurrent());
+
+        scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation);
+        var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+        assertEquals(1, scaledParallelism.get(source1));
+    }
+
     private CollectedMetrics collectMetrics() throws Exception {
         conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
         conf.set(AutoScalerOptions.METRICS_WINDOW, Duration.ofSeconds(2));
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index de349285..9622d0ba 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -203,4 +203,85 @@ public class ScalingMetricsTest {
         assertEquals(0.2, scalingMetrics.get(ScalingMetric.LOAD_MAX));
         assertEquals(0.1, scalingMetrics.get(ScalingMetric.LOAD_AVG));
     }
+
+    @Test
+    public void testZeroValuesForBusyness() {
+        double dataRate = 10;
+        Map<ScalingMetric, Double> scalingMetrics = 
testZeroValuesForRatesOrBusyness(dataRate, 0);
+        assertEquals(
+                Map.of(
+                        ScalingMetric.TRUE_PROCESSING_RATE,
+                        1.0E14,
+                        ScalingMetric.TRUE_OUTPUT_RATE,
+                        1.0E14,
+                        ScalingMetric.OUTPUT_RATIO,
+                        1.,
+                        ScalingMetric.SOURCE_DATA_RATE,
+                        dataRate),
+                scalingMetrics);
+    }
+
+    @Test
+    public void testZeroValuesForRates() {
+        double busyMillisecondPerSec = 100;
+        Map<ScalingMetric, Double> scalingMetrics =
+                testZeroValuesForRatesOrBusyness(0, busyMillisecondPerSec);
+        assertEquals(
+                Map.of(
+                        ScalingMetric.TRUE_PROCESSING_RATE,
+                        1.0E-9,
+                        ScalingMetric.TRUE_OUTPUT_RATE,
+                        1.0E-9,
+                        ScalingMetric.OUTPUT_RATIO,
+                        1.,
+                        ScalingMetric.SOURCE_DATA_RATE,
+                        ScalingMetrics.EFFECTIVELY_ZERO),
+                scalingMetrics);
+    }
+
+    @Test
+    public void testZeroValuesForRateAndBusyness() {
+        Map<ScalingMetric, Double> scalingMetrics = 
testZeroValuesForRatesOrBusyness(0, 0);
+        assertEquals(
+                Map.of(
+                        ScalingMetric.TRUE_PROCESSING_RATE,
+                        1000.0,
+                        ScalingMetric.TRUE_OUTPUT_RATE,
+                        1000.0,
+                        ScalingMetric.OUTPUT_RATIO,
+                        1.,
+                        ScalingMetric.SOURCE_DATA_RATE,
+                        ScalingMetrics.EFFECTIVELY_ZERO),
+                scalingMetrics);
+    }
+
+    private static Map<ScalingMetric, Double> testZeroValuesForRatesOrBusyness(
+            double rate, double busyness) {
+        var source = new JobVertexID();
+        var op = new JobVertexID();
+        var sink = new JobVertexID();
+
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source, Collections.emptySet(), 1, 1),
+                        new VertexInfo(op, Set.of(source), 1, 1),
+                        new VertexInfo(sink, Set.of(op), 1, 1));
+
+        Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+        ScalingMetrics.computeDataRateMetrics(
+                source,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, busyness, 
Double.NaN, Double.NaN),
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, rate),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, rate)),
+                scalingMetrics,
+                topology,
+                Optional.of(0.),
+                new Configuration());
+
+        return scalingMetrics;
+    }
 }

Reply via email to