This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new e40f108f [FLINK-32272] Expose LOAD as autoscaler metric
e40f108f is described below
commit e40f108fbafc80d48971f883531be5137f3f6d1e
Author: Matyas Orhidi <[email protected]>
AuthorDate: Fri Jun 9 22:58:33 2023 -0700
[FLINK-32272] Expose LOAD as autoscaler metric
---
.../autoscaler/ScalingMetricCollector.java | 4 +-
.../autoscaler/ScalingMetricEvaluator.java | 6 +++
.../operator/autoscaler/metrics/ScalingMetric.java | 7 +--
.../autoscaler/metrics/ScalingMetrics.java | 18 ++-----
.../autoscaler/BacklogBasedScalingTest.java | 4 ++
.../autoscaler/ScalingMetricEvaluatorTest.java | 55 ++++++++++++++++------
.../autoscaler/metrics/ScalingMetricsTest.java | 32 +++++++++++--
7 files changed, 89 insertions(+), 37 deletions(-)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 033029a5..99c0c0da 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -245,7 +245,9 @@ public abstract class ScalingMetricCollector {
if (jobTopology.isSource(jobVertexID)) {
ScalingMetrics.computeLagMetrics(vertexFlinkMetrics,
vertexScalingMetrics);
}
- ScalingMetrics.computeLoadMetrics(vertexFlinkMetrics,
vertexScalingMetrics);
+
+ ScalingMetrics.computeLoadMetrics(
+ jobVertexID, vertexFlinkMetrics,
vertexScalingMetrics, conf);
double lagGrowthRate =
computeLagGrowthRate(
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
index ebf87e25..af6d1732 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluator.java
@@ -45,6 +45,7 @@ import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerO
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
@@ -137,6 +138,11 @@ public class ScalingMetricEvaluator {
latestVertexMetrics.get(TRUE_PROCESSING_RATE),
getAverage(TRUE_PROCESSING_RATE, vertex,
metricsHistory)));
+ evaluatedMetrics.put(
+ LOAD,
+ new EvaluatedScalingMetric(
+ latestVertexMetrics.get(LOAD), getAverage(LOAD,
vertex, metricsHistory)));
+
evaluatedMetrics.put(
PARALLELISM,
EvaluatedScalingMetric.of(topology.getParallelisms().get(vertex)));
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
index 59f0d26a..608e4f1c 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetric.java
@@ -23,11 +23,8 @@ package
org.apache.flink.kubernetes.operator.autoscaler.metrics;
*/
public enum ScalingMetric {
- /** Max subtask load (busy time ratio 0 (idle) to 1 (fully utilized)). */
- LOAD_MAX(true),
-
- /** Average subtask load (busy time ratio 0 (idle) to 1 (fully utilized)).
*/
- LOAD_AVG(true),
+ /** Subtask load (busy time ratio 0 (idle) to 1 (fully utilized)). */
+ LOAD(true),
/** Processing rate at full capacity (records/sec). */
TRUE_PROCESSING_RATE(true),
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index dd92c780..624a57ec 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -39,21 +39,13 @@ public class ScalingMetrics {
private static final Logger LOG =
LoggerFactory.getLogger(ScalingMetrics.class);
public static void computeLoadMetrics(
+ JobVertexID jobVertexID,
Map<FlinkMetric, AggregatedMetric> flinkMetrics,
- Map<ScalingMetric, Double> scalingMetrics) {
-
- var busyTime = flinkMetrics.get(FlinkMetric.BUSY_TIME_PER_SEC);
- if (busyTime == null) {
- return;
- }
-
- if (!busyTime.getAvg().isNaN()) {
- scalingMetrics.put(ScalingMetric.LOAD_AVG, busyTime.getAvg() /
1000);
- }
+ Map<ScalingMetric, Double> scalingMetrics,
+ Configuration conf) {
- if (!busyTime.getMax().isNaN()) {
- scalingMetrics.put(ScalingMetric.LOAD_MAX, busyTime.getMax() /
1000);
- }
+ double busyTimeMsPerSecond = getBusyTimeMsPerSecond(flinkMetrics,
conf, jobVertexID);
+ scalingMetrics.put(ScalingMetric.LOAD, busyTimeMsPerSecond / 1000);
}
public static void computeDataRateMetrics(
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 4ce069ca..a4506d7b 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -377,12 +377,16 @@ public class BacklogBasedScalingTest extends
OperatorTestBase {
Map.of(
source1,
Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, 850.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.)),
sink,
Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, 850.,
Double.NaN, Double.NaN),
FlinkMetric.NUM_RECORDS_IN_PER_SEC,
new AggregatedMetric(
"", Double.NaN, Double.NaN,
Double.NaN, 500.))));
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
index e1bc3d65..c67b27fe 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java
@@ -46,6 +46,7 @@ import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerO
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CATCH_UP_DATA_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.CURRENT_PROCESSING_RATE;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LAG;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.LOAD;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.SOURCE_DATA_RATE;
@@ -77,9 +78,17 @@ public class ScalingMetricEvaluatorTest {
new CollectedMetrics(
Map.of(
source,
- Map.of(SOURCE_DATA_RATE, 100., LAG, 0.,
TRUE_PROCESSING_RATE, 200.),
+ Map.of(
+ SOURCE_DATA_RATE,
+ 100.,
+ LAG,
+ 0.,
+ TRUE_PROCESSING_RATE,
+ 200.,
+ LOAD,
+ .8),
sink,
- Map.of(TRUE_PROCESSING_RATE, 2000.)),
+ Map.of(TRUE_PROCESSING_RATE, 2000., LOAD, .4)),
Map.of(new Edge(source, sink), 2.)));
metricHistory.put(
@@ -88,19 +97,29 @@ public class ScalingMetricEvaluatorTest {
Map.of(
source,
Map.of(
- SOURCE_DATA_RATE, 200.,
- LAG, 1000.,
- TRUE_PROCESSING_RATE, 200.),
+ SOURCE_DATA_RATE,
+ 200.,
+ LAG,
+ 1000.,
+ TRUE_PROCESSING_RATE,
+ 200.,
+ LOAD,
+ .6),
sink,
- Map.of(TRUE_PROCESSING_RATE, 2000.)),
+ Map.of(TRUE_PROCESSING_RATE, 2000., LOAD, .3)),
Map.of(new Edge(source, sink), 2.)));
var conf = new Configuration();
- conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
- conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+ conf.set(CATCH_UP_DURATION, Duration.ofSeconds(2));
+ conf.set(RESTART_TIME, Duration.ZERO);
var evaluatedMetrics =
evaluator.evaluate(conf, new CollectedMetricHistory(topology,
metricHistory));
+
+ assertEquals(new EvaluatedScalingMetric(.6, .7),
evaluatedMetrics.get(source).get(LOAD));
+
+ assertEquals(new EvaluatedScalingMetric(.3, .35),
evaluatedMetrics.get(sink).get(LOAD));
+
assertEquals(
new EvaluatedScalingMetric(200, 150),
evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
@@ -114,7 +133,7 @@ public class ScalingMetricEvaluatorTest {
EvaluatedScalingMetric.of(1000),
evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
- conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(1));
+ conf.set(CATCH_UP_DURATION, Duration.ofSeconds(1));
evaluatedMetrics =
evaluator.evaluate(conf, new CollectedMetricHistory(topology,
metricHistory));
assertEquals(
@@ -131,7 +150,7 @@ public class ScalingMetricEvaluatorTest {
evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
// Restart time should not affect evaluated metrics
- conf.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(2));
+ conf.set(RESTART_TIME, Duration.ofSeconds(2));
evaluatedMetrics =
evaluator.evaluate(conf, new CollectedMetricHistory(topology,
metricHistory));
@@ -149,7 +168,7 @@ public class ScalingMetricEvaluatorTest {
evaluatedMetrics.get(sink).get(CATCH_UP_DATA_RATE));
// Turn off lag based scaling
- conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
+ conf.set(CATCH_UP_DURATION, Duration.ZERO);
evaluatedMetrics =
evaluator.evaluate(conf, new CollectedMetricHistory(topology,
metricHistory));
assertEquals(
@@ -170,12 +189,20 @@ public class ScalingMetricEvaluatorTest {
new CollectedMetrics(
Map.of(
source,
- Map.of(SOURCE_DATA_RATE, 100., LAG, 0.,
TRUE_PROCESSING_RATE, 200.),
+ Map.of(
+ SOURCE_DATA_RATE,
+ 100.,
+ LAG,
+ 0.,
+ TRUE_PROCESSING_RATE,
+ 200.,
+ LOAD,
+ .85),
sink,
- Map.of(TRUE_PROCESSING_RATE, 2000.)),
+ Map.of(TRUE_PROCESSING_RATE, 2000., LOAD,
.85)),
Map.of(new Edge(source, sink), 2.)));
- conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofMinutes(1));
+ conf.set(CATCH_UP_DURATION, Duration.ofMinutes(1));
evaluatedMetrics =
evaluator.evaluate(conf, new CollectedMetricHistory(topology,
metricHistory));
assertEquals(
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
index 4783bd98..0ae3e2b7 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetricsTest.java
@@ -194,15 +194,39 @@ public class ScalingMetricsTest {
@Test
public void testLoadMetrics() {
+ var source = new JobVertexID();
Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
+ var conf = new Configuration();
+
+ conf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.MAX);
ScalingMetrics.computeLoadMetrics(
+ source,
Map.of(
FlinkMetric.BUSY_TIME_PER_SEC,
- new AggregatedMetric("", Double.NaN, 200., 100.,
Double.NaN)),
- scalingMetrics);
+ new AggregatedMetric("", 100., 200., 150.,
Double.NaN)),
+ scalingMetrics,
+ conf);
+ assertEquals(.2, scalingMetrics.get(ScalingMetric.LOAD));
- assertEquals(0.2, scalingMetrics.get(ScalingMetric.LOAD_MAX));
- assertEquals(0.1, scalingMetrics.get(ScalingMetric.LOAD_AVG));
+ conf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.MIN);
+ ScalingMetrics.computeLoadMetrics(
+ source,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", 100., 200., 150.,
Double.NaN)),
+ scalingMetrics,
+ conf);
+ assertEquals(.1, scalingMetrics.get(ScalingMetric.LOAD));
+
+ conf.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.AVG);
+ ScalingMetrics.computeLoadMetrics(
+ source,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", 100., 200., 150.,
Double.NaN)),
+ scalingMetrics,
+ conf);
+ assertEquals(.15, scalingMetrics.get(ScalingMetric.LOAD));
}
@Test