This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 7fe557b6 [FLINK-32690] report Double.NAN instead of null for missing
autoscaler metrics
7fe557b6 is described below
commit 7fe557b69e353ecbd77896228cb471dba86c069d
Author: Matyas Orhidi <[email protected]>
AuthorDate: Wed Jun 14 11:44:39 2023 -0700
[FLINK-32690] report Double.NAN instead of null for missing autoscaler
metrics
---
.../autoscaler/AutoscalerFlinkMetrics.java | 35 ++++-
.../operator/autoscaler/JobAutoScalerImpl.java | 20 +--
.../autoscaler/AutoScalerFlinkMetricsTest.java | 161 +++++++++++++++++++++
3 files changed, 193 insertions(+), 23 deletions(-)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
index fab3cb32..b443935c 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoscalerFlinkMetrics.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.autoscaler;
+import org.apache.flink.annotation.VisibleForTesting;
import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.metrics.Counter;
@@ -32,10 +33,16 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+
/** Autoscaler metrics for observability. */
public class AutoscalerFlinkMetrics {
private static final Logger LOG =
LoggerFactory.getLogger(AutoscalerFlinkMetrics.class);
+ @VisibleForTesting static final String CURRENT = "Current";
+ @VisibleForTesting static final String AVERAGE = "Average";
+ @VisibleForTesting static final String JOB_VERTEX_ID = "jobVertexID";
final Counter numScalings;
@@ -66,14 +73,14 @@ public class AutoscalerFlinkMetrics {
}
LOG.info("Registering scaling metrics for job
vertex {}", jobVertexID);
var jobVertexMg =
- metricGroup.addGroup("jobVertexID",
jobVertexID.toHexString());
+ metricGroup.addGroup(JOB_VERTEX_ID,
jobVertexID.toHexString());
evaluated.forEach(
(sm, esm) -> {
var smGroup =
jobVertexMg.addGroup(sm.name());
smGroup.gauge(
- "Current",
+ CURRENT,
() ->
Optional.ofNullable(
currentVertexMetrics.get())
@@ -82,11 +89,11 @@ public class AutoscalerFlinkMetrics {
.map(
EvaluatedScalingMetric
::getCurrent)
- .orElse(null));
+
.orElse(Double.NaN));
if (sm.isCalculateAverage()) {
smGroup.gauge(
- "Average",
+ AVERAGE,
() ->
Optional.ofNullable(
currentVertexMetrics
@@ -96,9 +103,27 @@ public class AutoscalerFlinkMetrics {
.map(
EvaluatedScalingMetric
::getAverage)
-
.orElse(null));
+
.orElse(Double.NaN));
}
});
});
}
+
+ @VisibleForTesting
+ static void initRecommendedParallelism(
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluatedMetrics) {
+ evaluatedMetrics.forEach(
+ (jobVertexID, evaluatedScalingMetricMap) ->
+ evaluatedScalingMetricMap.put(
+ RECOMMENDED_PARALLELISM,
+ evaluatedScalingMetricMap.get(PARALLELISM)));
+ }
+
+ @VisibleForTesting
+ static void resetRecommendedParallelism(
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluatedMetrics) {
+ evaluatedMetrics.forEach(
+ (jobVertexID, evaluatedScalingMetricMap) ->
+ evaluatedScalingMetricMap.put(RECOMMENDED_PARALLELISM,
null));
+ }
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index fbefd15a..75ef9cd0 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -36,9 +36,9 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import static
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.resetRecommendedParallelism;
import static
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.AUTOSCALER_ENABLED;
-import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
-import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
/** Application and SessionJob autoscaler. */
public class JobAutoScalerImpl implements JobAutoScaler {
@@ -191,20 +191,4 @@ public class JobAutoScalerImpl implements JobAutoScaler {
new AutoscalerFlinkMetrics(
ctx.getResourceMetricGroup().addGroup("AutoScaler")));
}
-
- private void initRecommendedParallelism(
- Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluatedMetrics) {
- evaluatedMetrics.forEach(
- (jobVertexID, evaluatedScalingMetricMap) ->
- evaluatedScalingMetricMap.put(
- RECOMMENDED_PARALLELISM,
- evaluatedScalingMetricMap.get(PARALLELISM)));
- }
-
- private void resetRecommendedParallelism(
- Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluatedMetrics) {
- evaluatedMetrics.forEach(
- (jobVertexID, evaluatedScalingMetricMap) ->
- evaluatedScalingMetricMap.put(RECOMMENDED_PARALLELISM,
null));
- }
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java
new file mode 100644
index 00000000..b5760ec7
--- /dev/null
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerFlinkMetricsTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.autoscaler;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.metrics.TestingMetricListener;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.AVERAGE;
+import static
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.CURRENT;
+import static
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.JOB_VERTEX_ID;
+import static
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.initRecommendedParallelism;
+import static
org.apache.flink.kubernetes.operator.autoscaler.AutoscalerFlinkMetrics.resetRecommendedParallelism;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** {@link AutoscalerFlinkMetrics} tests. */
+public class AutoScalerFlinkMetricsTest {
+
+ private final Configuration configuration = new Configuration();
+ private final JobVertexID jobVertexID = new JobVertexID();
+ private ResourceID resourceID;
+ private TestingMetricListener listener;
+ private AutoscalerFlinkMetrics metrics;
+
+ @BeforeEach
+ public void init() {
+ listener = new TestingMetricListener(configuration);
+ metrics = new AutoscalerFlinkMetrics(listener.getMetricGroup());
+ resourceID =
ResourceID.fromResource(TestUtils.buildApplicationCluster());
+ }
+
+ @Test
+ public void testMetricsRegistration() {
+ var evaluatedMetrics = Map.of(jobVertexID, testMetrics());
+ var lastEvaluatedMetrics =
+ new HashMap<
+ ResourceID, Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>>();
+ initRecommendedParallelism(evaluatedMetrics);
+ lastEvaluatedMetrics.put(resourceID, evaluatedMetrics);
+
+ metrics.registerScalingMetrics(() ->
lastEvaluatedMetrics.get(resourceID));
+ metrics.registerScalingMetrics(() ->
lastEvaluatedMetrics.get(resourceID));
+
+ assertEquals(1.0, getCurrentMetricValue(PARALLELISM));
+ assertEquals(1.0, getCurrentMetricValue(RECOMMENDED_PARALLELISM));
+ assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE));
+ assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE));
+ }
+
+ @Test
+ public void testMetricsCleanup() {
+ var evaluatedMetrics = Map.of(jobVertexID, testMetrics());
+ var lastEvaluatedMetrics =
+ new HashMap<
+ ResourceID, Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>>();
+ initRecommendedParallelism(evaluatedMetrics);
+ lastEvaluatedMetrics.put(resourceID, evaluatedMetrics);
+ metrics.registerScalingMetrics(() ->
lastEvaluatedMetrics.get(resourceID));
+
+ assertEquals(1.0, getCurrentMetricValue(PARALLELISM));
+ assertEquals(1.0, getCurrentMetricValue(RECOMMENDED_PARALLELISM));
+ assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE));
+ assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE));
+
+ lastEvaluatedMetrics.remove(resourceID);
+ assertEquals(Double.NaN, getCurrentMetricValue(PARALLELISM));
+ assertEquals(Double.NaN,
getCurrentMetricValue(RECOMMENDED_PARALLELISM));
+ assertEquals(Double.NaN, getCurrentMetricValue(TRUE_PROCESSING_RATE));
+ assertEquals(Double.NaN, getAverageMetricValue(TRUE_PROCESSING_RATE));
+ }
+
+ @Test
+ public void testRecommendedParallelismWithinMetricWindow() {
+ var evaluatedMetrics = Map.of(jobVertexID, testMetrics());
+ var lastEvaluatedMetrics =
+ new HashMap<
+ ResourceID, Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>>();
+ initRecommendedParallelism(evaluatedMetrics);
+ resetRecommendedParallelism(evaluatedMetrics);
+ lastEvaluatedMetrics.put(resourceID, evaluatedMetrics);
+
+ metrics.registerScalingMetrics(() ->
lastEvaluatedMetrics.get(resourceID));
+ assertEquals(1.0, getCurrentMetricValue(PARALLELISM));
+ assertEquals(Double.NaN,
getCurrentMetricValue(RECOMMENDED_PARALLELISM));
+ assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE));
+ assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE));
+ }
+
+ @Test
+ public void testRecommendedParallelismPastMetricWindow() {
+ var evaluatedMetrics = Map.of(jobVertexID, testMetrics());
+ var lastEvaluatedMetrics =
+ new HashMap<
+ ResourceID, Map<JobVertexID, Map<ScalingMetric,
EvaluatedScalingMetric>>>();
+ initRecommendedParallelism(evaluatedMetrics);
+ lastEvaluatedMetrics.put(resourceID, evaluatedMetrics);
+
+ metrics.registerScalingMetrics(() ->
lastEvaluatedMetrics.get(resourceID));
+ assertEquals(1.0, getCurrentMetricValue(PARALLELISM));
+ assertEquals(1.0, getCurrentMetricValue(RECOMMENDED_PARALLELISM));
+ assertEquals(1000., getCurrentMetricValue(TRUE_PROCESSING_RATE));
+ assertEquals(2000., getAverageMetricValue(TRUE_PROCESSING_RATE));
+ }
+
+ private static Map<ScalingMetric, EvaluatedScalingMetric> testMetrics() {
+ var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
+ metrics.put(PARALLELISM, EvaluatedScalingMetric.of(1));
+ metrics.put(ScalingMetric.TRUE_PROCESSING_RATE, new
EvaluatedScalingMetric(1000., 2000.));
+
+ return metrics;
+ }
+
+ private Object getCurrentMetricValue(ScalingMetric metric) {
+ return listener.getGauge(getCurrentMetricId(metric)).orElse(() ->
Double.NaN).getValue();
+ }
+
+ private Object getAverageMetricValue(ScalingMetric metric) {
+ return listener.getGauge(getAverageMetricId(metric)).get().getValue();
+ }
+
+ private String getCurrentMetricId(ScalingMetric metric) {
+ return getMetricId(metric, CURRENT);
+ }
+
+ private String getAverageMetricId(ScalingMetric metric) {
+ return getMetricId(metric, AVERAGE);
+ }
+
+ private String getMetricId(ScalingMetric metric, String classifier) {
+ return listener.getMetricId(
+ JOB_VERTEX_ID, jobVertexID.toHexString(), metric.name(),
classifier);
+ }
+}