This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 92034fa9 [FLINK-30652] Use max busytime to compute true processing rate
92034fa9 is described below
commit 92034fa912f39f5c8bd57632295c7ca85801f33a
Author: Gyula Fora <[email protected]>
AuthorDate: Thu Jan 12 17:14:30 2023 +0100
[FLINK-30652] Use max busytime to compute true processing rate
---
.../generated/auto_scaler_configuration.html | 6 ++++
.../autoscaler/config/AutoScalerOptions.java | 8 +++++
.../autoscaler/metrics/MetricAggregator.java | 40 ++++++++++++++++++++++
.../autoscaler/metrics/ScalingMetrics.java | 8 +++--
.../autoscaler/BacklogBasedScalingTest.java | 28 +++++++--------
.../MetricsCollectionAndEvaluationTest.java | 8 ++---
.../autoscaler/metrics/ScalingMetricsTest.java | 36 ++++++++++++++++---
7 files changed, 109 insertions(+), 25 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index e1e98204..ca468948 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -32,6 +32,12 @@
<td>Integer</td>
<td>Maximum number of past scaling decisions to retain per
vertex.</td>
</tr>
+ <tr>
+
<td><h5>kubernetes.operator.job.autoscaler.metrics.busy-time.aggregator</h5></td>
+ <td style="word-wrap: break-word;">MAX</td>
+ <td><p>Enum</p></td>
+ <td>Metric aggregator to use for busyTime metrics. This affects
how true processing/output rate will be computed. Using max allows us to handle
jobs with data skew more robustly, while avg may provide better stability when
we know that the load distribution is even.<br /><br />Possible
values:<ul><li>"AVG"</li><li>"MAX"</li><li>"MIN"</li></ul></td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.job.autoscaler.metrics.window</h5></td>
<td style="word-wrap: break-word;">5 min</td>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index 07fa6069..c457b462 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -19,6 +19,7 @@ package
org.apache.flink.kubernetes.operator.autoscaler.config;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.MetricAggregator;
import java.time.Duration;
@@ -145,4 +146,11 @@ public class AutoScalerOptions {
.durationType()
.defaultValue(Duration.ofHours(24))
.withDescription("Maximum age for past scaling decisions
to retain.");
+
+ public static final ConfigOption<MetricAggregator> BUSY_TIME_AGGREGATOR =
+ autoScalerConfig("metrics.busy-time.aggregator")
+ .enumType(MetricAggregator.class)
+ .defaultValue(MetricAggregator.MAX)
+ .withDescription(
+ "Metric aggregator to use for busyTime metrics.
This affects how true processing/output rate will be computed. Using max allows
us to handle jobs with data skew more robustly, while avg may provide better
stability when we know that the load distribution is even.");
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
new file mode 100644
index 00000000..07765fe8
--- /dev/null
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/MetricAggregator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler.metrics;
+
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+/** Enum specifying which aggregator to use when getting a metric value. */
+public enum MetricAggregator {
+ AVG(AggregatedMetric::getAvg),
+ MAX(AggregatedMetric::getMax),
+ MIN(AggregatedMetric::getMin);
+
+ private final Function<AggregatedMetric, Double> getter;
+
+ MetricAggregator(Function<AggregatedMetric, Double> getter) {
+ this.getter = getter;
+ }
+
+ public Optional<Double> get(AggregatedMetric metric) {
+ return Optional.ofNullable(metric).map(getter).filter(d -> !d.isNaN());
+ }
+}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index 27b3d97d..c7c189ef 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.autoscaler.metrics;
import org.apache.flink.configuration.Configuration;
+import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
@@ -64,9 +65,10 @@ public class ScalingMetrics {
var source = topology.getInputs().get(jobVertexID).isEmpty();
var sink = topology.getOutputs().get(jobVertexID).isEmpty();
- var busyTime = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC);
+ var busyTimeAggregator =
conf.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR);
+ var busyTimeOpt =
busyTimeAggregator.get(flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC));
- if (busyTime == null || busyTime.getAvg().isNaN()) {
+ if (busyTimeOpt.isEmpty()) {
LOG.error("Cannot compute true processing/output rate without
busyTimeMsPerSecond");
return;
}
@@ -79,7 +81,7 @@ public class ScalingMetrics {
var outputPerSecond =
flinkMetrics.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
- double busyTimeMultiplier = 1000 / busyTime.getAvg();
+ double busyTimeMultiplier = 1000 / busyTimeOpt.get();
if (source && !conf.getBoolean(SOURCE_SCALING_ENABLED)) {
double sourceInputRate =
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 6c919a77..af8662b6 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -116,7 +116,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
source1,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 850., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 850.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -127,7 +127,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
sink,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 850., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 850.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric(
"", Double.NaN, Double.NaN,
Double.NaN, 500.))));
@@ -152,7 +152,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
source1,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 1000., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 1800.),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -163,7 +163,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
sink,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 1000., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric(
"", Double.NaN, Double.NaN,
Double.NaN, 1800.))));
@@ -182,7 +182,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
source1,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 1000., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 1800.),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -193,7 +193,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
sink,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 1000., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric(
"", Double.NaN, Double.NaN,
Double.NaN, 1800.))));
@@ -212,7 +212,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
source1,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 600., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 600.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 800.),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -222,7 +222,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
sink,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 600., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 600.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric(
"", Double.NaN, Double.NaN,
Double.NaN, 800.))));
@@ -243,7 +243,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
source1,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 1000., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 900.),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -253,7 +253,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
sink,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 1000., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric(
"", Double.NaN, Double.NaN,
Double.NaN, 900.))));
@@ -270,7 +270,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
source1,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 1000., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 900.),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -280,7 +280,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
sink,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 1000., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric(
"", Double.NaN, Double.NaN,
Double.NaN, 900.))));
@@ -296,7 +296,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
source1,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 500., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 500.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -306,7 +306,7 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
sink,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 500., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 500.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric(
"", Double.NaN, Double.NaN,
Double.NaN, 500.))));
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 8387c620..d12e6bc3 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -205,7 +205,7 @@ public class MetricsCollectionAndEvaluationTest {
source1,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 1000., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -215,7 +215,7 @@ public class MetricsCollectionAndEvaluationTest {
source2,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 1000., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 1000.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -225,7 +225,7 @@ public class MetricsCollectionAndEvaluationTest {
map,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 500., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 500.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 2000.),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
@@ -234,7 +234,7 @@ public class MetricsCollectionAndEvaluationTest {
sink,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN,
Double.NaN, 500., Double.NaN),
+ new AggregatedMetric("", Double.NaN, 500.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric(
"", Double.NaN, Double.NaN,
Double.NaN, 2000.))));
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index 5aa84b1c..de349285 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -54,7 +54,7 @@ public class ScalingMetricsTest {
source,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN, 100.,
Double.NaN),
+ new AggregatedMetric("", Double.NaN, 100., Double.NaN,
Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 1000.),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
@@ -82,7 +82,7 @@ public class ScalingMetricsTest {
source,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN, 100.,
Double.NaN),
+ new AggregatedMetric("", Double.NaN, 100., Double.NaN,
Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 1000.),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
@@ -109,7 +109,7 @@ public class ScalingMetricsTest {
op,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN, 100.,
Double.NaN),
+ new AggregatedMetric("", Double.NaN, 100., Double.NaN,
Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 1000.),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
@@ -128,6 +128,34 @@ public class ScalingMetricsTest {
ScalingMetric.OUTPUT_RATIO,
2.),
scalingMetrics);
+
+ // Test using avg busyTime aggregator
+ scalingMetrics.clear();
+ var conf = new Configuration();
+ conf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.AVG);
+ ScalingMetrics.computeDataRateMetrics(
+ op,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN, 100.,
Double.NaN),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 1000.),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.)),
+ scalingMetrics,
+ topology,
+ Optional.empty(),
+ conf);
+
+ assertEquals(
+ Map.of(
+ ScalingMetric.TRUE_PROCESSING_RATE,
+ 10000.,
+ ScalingMetric.TRUE_OUTPUT_RATE,
+ 20000.,
+ ScalingMetric.OUTPUT_RATIO,
+ 2.),
+ scalingMetrics);
}
@Test
@@ -145,7 +173,7 @@ public class ScalingMetricsTest {
source,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, Double.NaN, 500.,
Double.NaN),
+ new AggregatedMetric("", Double.NaN, 500., Double.NaN,
Double.NaN),
FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN, Double.NaN,
Double.NaN, 2000.),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,