This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 92034fa9 [FLINK-30652] Use max busytime to compute true processing rate
92034fa9 is described below

commit 92034fa912f39f5c8bd57632295c7ca85801f33a
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Jan 12 17:14:30 2023 +0100

    [FLINK-30652] Use max busytime to compute true processing rate
---
 .../generated/auto_scaler_configuration.html       |  6 ++++
 .../autoscaler/config/AutoScalerOptions.java       |  8 +++++
 .../autoscaler/metrics/MetricAggregator.java       | 40 ++++++++++++++++++++++
 .../autoscaler/metrics/ScalingMetrics.java         |  8 +++--
 .../autoscaler/BacklogBasedScalingTest.java        | 28 +++++++--------
 .../MetricsCollectionAndEvaluationTest.java        |  8 ++---
 .../autoscaler/metrics/ScalingMetricsTest.java     | 36 ++++++++++++++++---
 7 files changed, 109 insertions(+), 25 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html 
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index e1e98204..ca468948 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -32,6 +32,12 @@
             <td>Integer</td>
             <td>Maximum number of past scaling decisions to retain per 
vertex.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.job.autoscaler.metrics.busy-time.aggregator</h5></td>
+            <td style="word-wrap: break-word;">MAX</td>
+            <td><p>Enum</p></td>
+            <td>Metric aggregator to use for busyTime metrics. This affects 
how true processing/output rate will be computed. Using max allows us to handle 
jobs with data skew more robustly, while avg may provide better stability when 
we know that the load distribution is even.<br /><br />Possible 
values:<ul><li>"AVG"</li><li>"MAX"</li><li>"MIN"</li></ul></td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.job.autoscaler.metrics.window</h5></td>
             <td style="word-wrap: break-word;">5 min</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index 07fa6069..c457b462 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.kubernetes.operator.autoscaler.config;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
 
 import java.time.Duration;
 
@@ -145,4 +146,11 @@ public class AutoScalerOptions {
                     .durationType()
                     .defaultValue(Duration.ofHours(24))
                     .withDescription("Maximum age for past scaling decisions 
to retain.");
+
+    public static final ConfigOption<MetricAggregator> BUSY_TIME_AGGREGATOR =
+            autoScalerConfig("metrics.busy-time.aggregator")
+                    .enumType(MetricAggregator.class)
+                    .defaultValue(MetricAggregator.MAX)
+                    .withDescription(
+                            "Metric aggregator to use for busyTime metrics. 
This affects how true processing/output rate will be computed. Using max allows 
us to handle jobs with data skew more robustly, while avg may provide better 
stability when we know that the load distribution is even.");
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
new file mode 100644
index 00000000..07765fe8
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/** Enum specifying which aggregator to use when getting a metric value. */
+public enum MetricAggregator {
+    AVG(AggregatedMetric::getAvg),
+    MAX(AggregatedMetric::getMax),
+    MIN(AggregatedMetric::getMin);
+
+    private final Function<AggregatedMetric, Double> getter;
+
+    MetricAggregator(Function<AggregatedMetric, Double> getter) {
+        this.getter = getter;
+    }
+
+    public Optional<Double> get(AggregatedMetric metric) {
+        return Optional.ofNullable(metric).map(getter).filter(d -> !d.isNaN());
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 27b3d97d..c7c189ef 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.autoscaler.metrics;
 
 import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
@@ -64,9 +65,10 @@ public class ScalingMetrics {
         var source = topology.getInputs().get(jobVertexID).isEmpty();
         var sink = topology.getOutputs().get(jobVertexID).isEmpty();
 
-        var busyTime = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC);
+        var busyTimeAggregator = 
conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR);
+        var busyTimeOpt = 
busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC));
 
-        if (busyTime == null || busyTime.getAvg().isNaN()) {
+        if (busyTimeOpt.isEmpty()) {
             LOG.error("Cannot compute true processing/output rate without 
busyTimeMsPerSecond");
             return;
         }
@@ -79,7 +81,7 @@ public class ScalingMetrics {
 
         var outputPerSecond = 
flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
 
-        double busyTimeMultiplier = 1000 / busyTime.getAvg();
+        double busyTimeMultiplier = 1000 / busyTimeOpt.get();
 
         if (source && !conf.getBoolean(SOURCE_SCALING_ENABLED)) {
             double sourceInputRate =
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 6c919a77..af8662b6 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -116,7 +116,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         source1,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 850., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 850., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -127,7 +127,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         sink,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 850., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 850., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                                 new AggregatedMetric(
                                         "", Double.NaN, Double.NaN, 
Double.NaN, 500.))));
@@ -152,7 +152,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         source1,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 1000., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 1800.),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -163,7 +163,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         sink,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 1000., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                                 new AggregatedMetric(
                                         "", Double.NaN, Double.NaN, 
Double.NaN, 1800.))));
@@ -182,7 +182,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         source1,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 1000., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 1800.),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -193,7 +193,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         sink,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 1000., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                                 new AggregatedMetric(
                                         "", Double.NaN, Double.NaN, 
Double.NaN, 1800.))));
@@ -212,7 +212,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         source1,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 600., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 600., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 800.),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -222,7 +222,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         sink,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 600., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 600., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                                 new AggregatedMetric(
                                         "", Double.NaN, Double.NaN, 
Double.NaN, 800.))));
@@ -243,7 +243,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         source1,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 1000., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 900.),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -253,7 +253,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         sink,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 1000., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                                 new AggregatedMetric(
                                         "", Double.NaN, Double.NaN, 
Double.NaN, 900.))));
@@ -270,7 +270,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         source1,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 1000., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 900.),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -280,7 +280,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         sink,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 1000., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                                 new AggregatedMetric(
                                         "", Double.NaN, Double.NaN, 
Double.NaN, 900.))));
@@ -296,7 +296,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         source1,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 500., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 500., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -306,7 +306,7 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
                         sink,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 500., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 500., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                                 new AggregatedMetric(
                                         "", Double.NaN, Double.NaN, 
Double.NaN, 500.))));
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 8387c620..d12e6bc3 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -205,7 +205,7 @@ public class MetricsCollectionAndEvaluationTest {
                         source1,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 1000., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -215,7 +215,7 @@ public class MetricsCollectionAndEvaluationTest {
                         source2,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 1000., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 1000., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 500.),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -225,7 +225,7 @@ public class MetricsCollectionAndEvaluationTest {
                         map,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 500., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 500., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
                                 new AggregatedMetric("", Double.NaN, 
Double.NaN, Double.NaN, 2000.),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -234,7 +234,7 @@ public class MetricsCollectionAndEvaluationTest {
                         sink,
                         Map.of(
                                 FlinkMetric.BUSY_TIME_PER_SEC,
-                                new AggregatedMetric("", Double.NaN, 
Double.NaN, 500., Double.NaN),
+                                new AggregatedMetric("", Double.NaN, 500., 
Double.NaN, Double.NaN),
                                 FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                                 new AggregatedMetric(
                                         "", Double.NaN, Double.NaN, 
Double.NaN, 2000.))));
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index 5aa84b1c..de349285 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -54,7 +54,7 @@ public class ScalingMetricsTest {
                 source,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 100., 
Double.NaN),
+                        new AggregatedMetric("", Double.NaN, 100., Double.NaN, 
Double.NaN),
                         FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
@@ -82,7 +82,7 @@ public class ScalingMetricsTest {
                 source,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 100., 
Double.NaN),
+                        new AggregatedMetric("", Double.NaN, 100., Double.NaN, 
Double.NaN),
                         FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
@@ -109,7 +109,7 @@ public class ScalingMetricsTest {
                 op,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 100., 
Double.NaN),
+                        new AggregatedMetric("", Double.NaN, 100., Double.NaN, 
Double.NaN),
                         FlinkMetric.NUM_RECORDS_IN_PER_SEC,
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
@@ -128,6 +128,34 @@ public class ScalingMetricsTest {
                         ScalingMetric.OUTPUT_RATIO,
                         2.),
                 scalingMetrics);
+
+        // Test using avg busyTime aggregator
+        scalingMetrics.clear();
+        var conf = new Configuration();
+        conf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.AVG);
+        ScalingMetrics.computeDataRateMetrics(
+                op,
+                Map.of(
+                        FlinkMetric.BUSY_TIME_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 100., 
Double.NaN),
+                        FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 1000.),
+                        FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+                        new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.)),
+                scalingMetrics,
+                topology,
+                Optional.empty(),
+                conf);
+
+        assertEquals(
+                Map.of(
+                        ScalingMetric.TRUE_PROCESSING_RATE,
+                        10000.,
+                        ScalingMetric.TRUE_OUTPUT_RATE,
+                        20000.,
+                        ScalingMetric.OUTPUT_RATIO,
+                        2.),
+                scalingMetrics);
     }
 
     @Test
@@ -145,7 +173,7 @@ public class ScalingMetricsTest {
                 source,
                 Map.of(
                         FlinkMetric.BUSY_TIME_PER_SEC,
-                        new AggregatedMetric("", Double.NaN, Double.NaN, 500., 
Double.NaN),
+                        new AggregatedMetric("", Double.NaN, 500., Double.NaN, 
Double.NaN),
                         FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
                         new AggregatedMetric("", Double.NaN, Double.NaN, 
Double.NaN, 2000.),
                         FlinkMetric.NUM_RECORDS_OUT_PER_SEC,

Reply via email to