This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 45209817 [FLINK-31299] PendingRecords metric might not be available
(#542)
45209817 is described below
commit 452098178aac60bb535021695021f7ae4735adb2
Author: Maximilian Michels <[email protected]>
AuthorDate: Mon Mar 6 17:33:55 2023 +0100
[FLINK-31299] PendingRecords metric might not be available (#542)
The Kafka pendingRecords metric is only initialized on receiving the first
record. For empty topics or checkpointed topics without any incoming data,
the
metric won't appear.
We need to handle this case in the autoscaler and allow downscaling.
---
.../autoscaler/ScalingMetricCollector.java | 38 +++++++++-----
.../autoscaler/metrics/ScalingMetrics.java | 2 +
.../MetricsCollectionAndEvaluationTest.java | 60 ++++++++++++++++++++++
3 files changed, 86 insertions(+), 14 deletions(-)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
index 0fd8c491..66869829 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricCollector.java
@@ -244,7 +244,9 @@ public abstract class ScalingMetricCollector {
var vertexScalingMetrics = new HashMap<ScalingMetric,
Double>();
out.put(jobVertexID, vertexScalingMetrics);
- ScalingMetrics.computeLagMetrics(vertexFlinkMetrics,
vertexScalingMetrics);
+ if (jobTopology.isSource(jobVertexID)) {
+ ScalingMetrics.computeLagMetrics(vertexFlinkMetrics,
vertexScalingMetrics);
+ }
ScalingMetrics.computeLoadMetrics(vertexFlinkMetrics,
vertexScalingMetrics);
Optional<Double> lagGrowthRate =
@@ -391,19 +393,27 @@ public abstract class ScalingMetricCollector {
requiredMetrics.add(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
}
- requiredMetrics.forEach(
- flinkMetric ->
- filteredMetrics.put(
- flinkMetric
- .findAny(allMetricNames)
- .orElseThrow(
- () ->
- new RuntimeException(
- "Could not
find required metric "
- +
flinkMetric.name()
- + "
for "
- +
jobVertexID)),
- flinkMetric));
+ for (FlinkMetric flinkMetric : requiredMetrics) {
+ Optional<String> flinkMetricName =
flinkMetric.findAny(allMetricNames);
+ if (flinkMetricName.isPresent()) {
+ // Add actual Flink metric name to list
+ filteredMetrics.put(flinkMetricName.get(), flinkMetric);
+ } else if (flinkMetric == FlinkMetric.PENDING_RECORDS) {
+ // Pending records metric won't be available for some sources.
+ // The Kafka source, for instance, lazily initializes this
metric on receiving
+ // the first record. If this is a fresh topic or no new data
has been read since
+ // the last checkpoint, the pendingRecords metrics won't be
available.
+ LOG.warn(
+ "pendingRecords metric for {} could not be found. This
usually means the source hasn't read data. Assuming 0 pending records.",
+ jobVertexID);
+ } else {
+ throw new RuntimeException(
+ "Could not find required metric "
+ + flinkMetric.name()
+ + " for "
+ + jobVertexID);
+ }
+ }
return filteredMetrics;
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
index c7c189ef..adadb365 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/metrics/ScalingMetrics.java
@@ -156,6 +156,8 @@ public class ScalingMetrics {
var pendingRecords = flinkMetrics.get(FlinkMetric.PENDING_RECORDS);
if (pendingRecords != null) {
scalingMetrics.put(ScalingMetric.LAG, pendingRecords.getSum());
+ } else {
+ scalingMetrics.put(ScalingMetric.LAG, 0.);
}
}
}
diff --git
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index e87167e1..ac5724e1 100644
---
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -24,7 +24,10 @@ import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.CollectedMetrics;
+import
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
@@ -54,6 +57,7 @@ import java.util.Map;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -353,4 +357,60 @@ public class MetricsCollectionAndEvaluationTest {
var collectedMetrics = metricsCollector.updateMetrics(app,
scalingInfo, service, conf);
assertTrue(collectedMetrics.getMetricHistory().isEmpty());
}
+
+ @Test
+ public void testTolerateAbsenceOfPendingRecordsMetric() throws Exception {
+ var topology = new JobTopology(new VertexInfo(source1, Set.of(), 5,
720));
+
+ metricsCollector = new TestingMetricsCollector(topology);
+ metricsCollector.setCurrentMetrics(
+ Map.of(
+ // Set source1 metrics without the PENDING_RECORDS
metric
+ source1,
+ Map.of(
+ FlinkMetric.BUSY_TIME_PER_SEC,
+ new AggregatedMetric("", Double.NaN, 100.,
Double.NaN, Double.NaN),
+ FlinkMetric.NUM_RECORDS_OUT_PER_SEC,
+ new AggregatedMetric("", Double.NaN,
Double.NaN, Double.NaN, 500.),
+ FlinkMetric.NUM_RECORDS_IN_PER_SEC,
+ new AggregatedMetric(
+ "", Double.NaN, Double.NaN,
Double.NaN, 500.))));
+
+ var collectedMetrics = collectMetrics();
+
+ Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>
evaluation =
+ evaluator.evaluate(conf, collectedMetrics);
+ assertEquals(
+ 500.,
evaluation.get(source1).get(ScalingMetric.TARGET_DATA_RATE).getCurrent());
+ assertEquals(
+ 5000.,
+
evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getCurrent());
+ assertEquals(
+ 833.,
+
evaluation.get(source1).get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent());
+ assertEquals(
+ 625.,
+
evaluation.get(source1).get(ScalingMetric.SCALE_UP_RATE_THRESHOLD).getCurrent());
+
+ scalingExecutor.scaleResource(app, scalingInfo, conf, evaluation);
+ var scaledParallelism = ScalingExecutorTest.getScaledParallelism(app);
+ assertEquals(1, scaledParallelism.get(source1));
+ }
+
+ private CollectedMetrics collectMetrics() throws Exception {
+ conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
+ conf.set(AutoScalerOptions.METRICS_WINDOW, Duration.ofSeconds(2));
+
+ metricsCollector.setClock(Clock.offset(clock, Duration.ofSeconds(1)));
+
+ var collectedMetrics = metricsCollector.updateMetrics(app,
scalingInfo, service, conf);
+ assertTrue(collectedMetrics.getMetricHistory().isEmpty());
+
+ metricsCollector.setClock(Clock.offset(clock, Duration.ofSeconds(2)));
+
+ collectedMetrics = metricsCollector.updateMetrics(app, scalingInfo,
service, conf);
+ assertFalse(collectedMetrics.getMetricHistory().isEmpty());
+
+ return collectedMetrics;
+ }
}