This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e40f108f [FLINK-32272] Expose LOAD as autoscaler metric
e40f108f is described below

commit e40f108fbafc80d48971f883531be5137f3f6d1e
Author: Matyas Orhidi <[email protected]>
AuthorDate: Fri Jun 9 22:58:33 2023 -0700

    [FLINK-32272] Expose LOAD as autoscaler metric
---
 .../autoscaler/ScalingMetricCollector.java         |  4 +-
 .../autoscaler/ScalingMetricEvaluator.java         |  6 +++
 .../operator/autoscaler/metrics/ScalingMetric.java |  7 +--
 .../autoscaler/metrics/ScalingMetrics.java         | 18 ++-----
 .../autoscaler/BacklogBasedScalingTest.java        |  4 ++
 .../autoscaler/ScalingMetricEvaluatorTest.java     | 55 ++++++++++++++++------
 .../autoscaler/metrics/ScalingMetricsTest.java     | 32 +++++++++++--
 7 files changed, 89 insertions(+), 37 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 033029a5..99c0c0da 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -245,7 +245,9 @@ public abstract class ScalingMetricCollector {
                     if (jobTopology.isSource(jobVertexID)) {
                         ScalingMetrics.computeLagMetrics(vertexFlinkMetrics, 
vertexScalingMetrics);
                     }
-                    ScalingMetrics.computeLoadMetrics(vertexFlinkMetrics, 
vertexScalingMetrics);
+
+                    ScalingMetrics.computeLoadMetrics(
+                            jobVertexID, vertexFlinkMetrics, 
vertexScalingMetrics, conf);
 
                     double lagGrowthRate =
                             computeLagGrowthRate(
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index ebf87e25..af6d1732 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -45,6 +45,7 @@ import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerO
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
@@ -137,6 +138,11 @@ public class ScalingMetricEvaluator {
                         latestVertexMetrics.get(TRUE_PROCESSING_RATE),
                         getAverage(TRUE_PROCESSING_RATE, vertex, 
metricsHistory)));
 
+        evaluatedMetrics.put(
+                LOAD,
+                new EvaluatedScalingMetric(
+                        latestVertexMetrics.get(LOAD), getAverage(LOAD, 
vertex, metricsHistory)));
+
         evaluatedMetrics.put(
                 PARALLELISM, 
EvaluatedScalingMetric.of(topology.getParallelisms().get(vertex)));
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
index 59f0d26a..608e4f1c 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
@@ -23,11 +23,8 @@ package 
org.apache.flink.kubernetes.operator.autoscaler.metrics;
  */
 public enum ScalingMetric {
 
-    /** Max subtask load (busy time ratio 0 (idle) to 1 (fully utilized)). */
-    LOAD_MAX(true),
-
-    /** Average subtask load (busy time ratio 0 (idle) to 1 (fully utilized)). 
*/
-    LOAD_AVG(true),
+    /** Subtask load (busy time ratio 0 (idle) to 1 (fully utilized)). */
+    LOAD(true),
 
     /** Processing rate at full capacity (records/sec). */
     TRUE_PROCESSING_RATE(true),
diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index dd92c780..624a57ec 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -39,21 +39,13 @@ public class ScalingMetrics {
     private static final Logger LOG = 
LoggerFactory.getLogger(ScalingMetrics.class);
 
     public static void computeLoadMetrics(
+            JobVertexID jobVertexID,
             Map<FlinkMetric, AggregatedMetric> flinkMetrics,
-            Map<ScalingMetric, Double> scalingMetrics) {
-
-        var busyTime = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC);
-        if (busyTime == null) {
-            return;
-        }
-
-        if (!busyTime.getAvg().isNaN()) {
-            scalingMetrics.put(ScalingMetric.LOAD_AVG, busyTime.getAvg() / 
1000);
-        }
+            Map<ScalingMetric, Double> scalingMetrics,
+            Configuration conf) {
 
-        if (!busyTime.getMax().isNaN()) {
-            scalingMetrics.put(ScalingMetric.LOAD_MAX, busyTime.getMax() / 
1000);
-        }
+        double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics, 
conf, jobVertexID);
+        scalingMetrics.put(ScalingMetric.LOAD, busyTimeMsPerSecond / 1000);
     }
 
     public static void computeDataRateMetrics(
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 4ce069ca..a4506d7b 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -377,12 +377,16 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                 Map.of(
                         source1,
                         Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 850., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.)),
                         sink,
                         Map.of(
+                                FlinkMetric.BUSY_TIME_PER_SEC,
+                                new AggregatedMetric("", Double.NaN, 850., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                                 new AggregatedMetric(
                                         "", Double.NaN, Double.NaN, 
Double.NaN, 500.))));
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
index e1bc3d65..c67b27fe 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
@@ -46,6 +46,7 @@ import static 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerO
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
 import static 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
@@ -77,9 +78,17 @@ public class ScalingMetricEvaluatorTest {
                 new CollectedMetrics(
                         Map.of(
                                 source,
-                                Map.of(SOURCE_DATA_RATE, 100., LAG, 0., 
TRUE_PROCESSING_RATE, 200.),
+                                Map.of(
+                                        SOURCE_DATA_RATE,
+                                        100.,
+                                        LAG,
+                                        0.,
+                                        TRUE_PROCESSING_RATE,
+                                        200.,
+                                        LOAD,
+                                        .8),
                                 sink,
-                                Map.of(TRUE_PROCESSING_RATE, 2000.)),
+                                Map.of(TRUE_PROCESSING_RATE, 2000., LOAD, .4)),
                         Map.of(new Edge(source, sink), 2.)));
 
         metricHistory.put(
@@ -88,19 +97,29 @@ public class ScalingMetricEvaluatorTest {
                         Map.of(
                                 source,
                                 Map.of(
-                                        SOURCE_DATA_RATE, 200.,
-                                        LAG, 1000.,
-                                        TRUE_PROCESSING_RATE, 200.),
+                                        SOURCE_DATA_RATE,
+                                        200.,
+                                        LAG,
+                                        1000.,
+                                        TRUE_PROCESSING_RATE,
+                                        200.,
+                                        LOAD,
+                                        .6),
                                 sink,
-                                Map.of(TRUE_PROCESSING_RATE, 2000.)),
+                                Map.of(TRUE_PROCESSING_RATE, 2000., LOAD, .3)),
                         Map.of(new Edge(source, sink), 2.)));
 
         var conf = new Configuration();
 
-        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
-        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+        conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2));
+        conf.set(RESTART_TIME, Duration.ZERO);
         var evaluatedMetrics =
                 evaluator.evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory));
+
+        assertEquals(new EvaluatedScalingMetric(.6, .7), 
evaluatedMetrics.get(source).get(LOAD));
+
+        assertEquals(new EvaluatedScalingMetric(.3, .35), 
evaluatedMetrics.get(sink).get(LOAD));
+
         assertEquals(
                 new EvaluatedScalingMetric(200, 150),
                 evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -114,7 +133,7 @@ public class ScalingMetricEvaluatorTest {
                 EvaluatedScalingMetric.of(1000),
                 evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
 
-        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(1));
+        conf.set(CATCH_UP_DURATION, Duration.ofSeconds(1));
         evaluatedMetrics =
                 evaluator.evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory));
         assertEquals(
@@ -131,7 +150,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
 
         // Restart time should not affect evaluated metrics
-        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(2));
+        conf.set(RESTART_TIME, Duration.ofSeconds(2));
 
         evaluatedMetrics =
                 evaluator.evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory));
@@ -149,7 +168,7 @@ public class ScalingMetricEvaluatorTest {
                 evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
 
         // Turn off lag based scaling
-        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+        conf.set(CATCH_UP_DURATION, Duration.ZERO);
         evaluatedMetrics =
                 evaluator.evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory));
         assertEquals(
@@ -170,12 +189,20 @@ public class ScalingMetricEvaluatorTest {
                 new CollectedMetrics(
                         Map.of(
                                 source,
-                                Map.of(SOURCE_DATA_RATE, 100., LAG, 0., 
TRUE_PROCESSING_RATE, 200.),
+                                Map.of(
+                                        SOURCE_DATA_RATE,
+                                        100.,
+                                        LAG,
+                                        0.,
+                                        TRUE_PROCESSING_RATE,
+                                        200.,
+                                        LOAD,
+                                        .85),
                                 sink,
-                                Map.of(TRUE_PROCESSING_RATE, 2000.)),
+                                Map.of(TRUE_PROCESSING_RATE, 2000., LOAD, 
.85)),
                         Map.of(new Edge(source, sink), 2.)));
 
-        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofMinutes(1));
+        conf.set(CATCH_UP_DURATION, Duration.ofMinutes(1));
         evaluatedMetrics =
                 evaluator.evaluate(conf, new CollectedMetricHistory(topology, 
metricHistory));
         assertEquals(
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index 4783bd98..0ae3e2b7 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -194,15 +194,39 @@ public class ScalingMetricsTest {
 
     @Test
     public void testLoadMetrics() {
+        var source = new JobVertexID();
         Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+        var conf = new Configuration();
+
+        conf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.MAX);
         ScalingMetrics.computeLoadMetrics(
+                source,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, 200., 100., 
Double.NaN)),
-                scalingMetrics);
+                        new AggregatedMetric("", 100., 200., 150., 
Double.NaN)),
+                scalingMetrics,
+                conf);
+        assertEquals(.2, scalingMetrics.get(ScalingMetric.LOAD));
 
-        assertEquals(0.2, scalingMetrics.get(ScalingMetric.LOAD_MAX));
-        assertEquals(0.1, scalingMetrics.get(ScalingMetric.LOAD_AVG));
+        conf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.MIN);
+        ScalingMetrics.computeLoadMetrics(
+                source,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", 100., 200., 150., 
Double.NaN)),
+                scalingMetrics,
+                conf);
+        assertEquals(.1, scalingMetrics.get(ScalingMetric.LOAD));
+
+        conf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.AVG);
+        ScalingMetrics.computeLoadMetrics(
+                source,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", 100., 200., 150., 
Double.NaN)),
+                scalingMetrics,
+                conf);
+        assertEquals(.15, scalingMetrics.get(ScalingMetric.LOAD));
     }
 
     @Test

Reply via email to