This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 03c9eb6f [FLINK-33429] Tolerate missing metrics in the stabilization
phase (#699)
03c9eb6f is described below
commit 03c9eb6f57d5d61f68cd93ff765b663d13b01ffe
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Nov 2 14:02:02 2023 +0100
[FLINK-33429] Tolerate missing metrics in the stabilization phase (#699)
The new code for the 1.7.0 release introduces metric collection during the
stabilization phase to allow sampling the observed true processing rate.
Metrics
might not be fully initialized during that phase, as evident through the
error
metrics. The following error is thrown:
```
java.lang.RuntimeException: Could not find required metric
NUM_RECORDS_OUT_PER_SEC for 667f5d5aa757fb217b92c06f0f5d2bf2
```
To prevent these errors shadowing actual errors, we should detect and ignore
this recoverable exception.
---
.../apache/flink/autoscaler/JobAutoScalerImpl.java | 3 ++
.../flink/autoscaler/ScalingMetricCollector.java | 42 ++++++++++++----------
.../autoscaler/exceptions/NotReadyException.java | 26 ++++++++++++++
.../metrics/MetricNotFoundException.java | 28 +++++++++++++++
.../flink/autoscaler/JobAutoScalerImplTest.java | 34 ++++++++++++++++++
.../MetricsCollectionAndEvaluationTest.java | 31 ++++++++++++++++
.../autoscaler/ScalingMetricCollectorTest.java | 22 +++++++++++-
7 files changed, 166 insertions(+), 20 deletions(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 3196a3de..ca640371 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.exceptions.NotReadyException;
import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
@@ -96,6 +97,8 @@ public class JobAutoScalerImpl<KEY, Context extends
JobAutoScalerContext<KEY>>
runScalingLogic(ctx, autoscalerMetrics);
stateStore.flush(ctx);
+ } catch (NotReadyException e) {
+ LOG.debug("Not ready for scaling", e);
} catch (Throwable e) {
onError(ctx, autoscalerMetrics, e);
} finally {
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index df5f8ad6..51f5931b 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -20,9 +20,11 @@ package org.apache.flink.autoscaler;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.exceptions.NotReadyException;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
@@ -112,6 +114,7 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
}
var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
var stableTime =
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
+ final boolean isStabilizing = now.isBefore(stableTime);
// Calculate timestamp when the metric windows is full
var metricWindowSize = getMetricWindowSize(conf);
@@ -119,7 +122,7 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
getWindowFullTime(metricHistory.tailMap(stableTime), now,
metricWindowSize);
// The filtered list of metrics we want to query for each vertex
- var filteredVertexMetricNames = queryFilteredMetricNames(ctx,
topology);
+ var filteredVertexMetricNames = queryFilteredMetricNames(ctx,
topology, isStabilizing);
// Aggregated job vertex metrics collected from Flink based on the
filtered metric names
var collectedVertexMetrics = queryAllAggregatedMetrics(ctx,
filteredVertexMetricNames);
@@ -131,7 +134,7 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
// Add scaling metrics to history if they were computed successfully
metricHistory.put(now, scalingMetrics);
- if (now.isBefore(stableTime)) {
+ if (isStabilizing) {
LOG.info("Stabilizing until {}", stableTime);
stateStore.storeCollectedMetrics(ctx, metricHistory);
return new CollectedMetricHistory(topology,
Collections.emptySortedMap());
@@ -343,8 +346,19 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
return (currentLag - lastLag) / timeDiff;
}
+ private Map<JobVertexID, Map<String, FlinkMetric>>
queryFilteredMetricNames(
+ Context ctx, JobTopology topology, boolean isStabilizing) {
+ try {
+ return queryFilteredMetricNames(ctx, topology);
+ } catch (MetricNotFoundException e) {
+ if (isStabilizing) {
+ throw new NotReadyException(e);
+ }
+ throw e;
+ }
+ }
+
/** Query the available metric names for each job vertex. */
- @SneakyThrows
protected Map<JobVertexID, Map<String, FlinkMetric>>
queryFilteredMetricNames(
Context ctx, JobTopology topology) {
@@ -375,6 +389,7 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
return names;
}
+ @SneakyThrows
private Map<JobVertexID, Map<String, FlinkMetric>>
queryFilteredMetricNames(
Context ctx, JobTopology topology, Stream<JobVertexID>
vertexStream) {
try (var restClient = ctx.getRestClusterClient()) {
@@ -386,21 +401,11 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
v ->
getFilteredVertexMetricNames(
restClient,
ctx.getJobID(), v, topology)));
- } catch (Exception e) {
- throw new RuntimeException(e);
}
}
- /**
- * Query and filter metric names for a given job vertex.
- *
- * @param restClient Flink rest client.
- * @param jobID Job Id.
- * @param jobVertexID Job Vertex Id.
- * @return Map of filtered metric names.
- */
- @SneakyThrows
- protected Map<String, FlinkMetric> getFilteredVertexMetricNames(
+ /** Query and filter metric names for a given job vertex. */
+ Map<String, FlinkMetric> getFilteredVertexMetricNames(
RestClusterClient<?> restClient,
JobID jobID,
JobVertexID jobVertexID,
@@ -449,8 +454,7 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
// Add actual Flink metric name to list
filteredMetrics.put(flinkMetricName.get(), flinkMetric);
} else {
- throw new RuntimeException(
- "Could not find required metric " + flinkMetric + "
for " + jobVertexID);
+ throw new MetricNotFoundException(flinkMetric, jobVertexID);
}
}
@@ -458,9 +462,9 @@ public abstract class ScalingMetricCollector<KEY, Context
extends JobAutoScalerC
}
@VisibleForTesting
+ @SneakyThrows
protected Collection<AggregatedMetric> queryAggregatedMetricNames(
- RestClusterClient<?> restClient, JobID jobID, JobVertexID
jobVertexID)
- throws Exception {
+ RestClusterClient<?> restClient, JobID jobID, JobVertexID
jobVertexID) {
var parameters = new AggregatedSubtaskMetricsParameters();
var pathIt = parameters.getPathParameters().iterator();
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java
new file mode 100644
index 00000000..f19f6ed7
--- /dev/null
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.exceptions;
+
+/** An exception to indicate the called code wasn't ready but will be at a
later point in time. */
+public class NotReadyException extends RuntimeException {
+
+ public NotReadyException(Exception cause) {
+ super(cause);
+ }
+}
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricNotFoundException.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricNotFoundException.java
new file mode 100644
index 00000000..2c438b77
--- /dev/null
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.metrics;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/** Exception for when an expected metric was not found. */
+public class MetricNotFoundException extends RuntimeException {
+
+ public MetricNotFoundException(FlinkMetric flinkMetric, JobVertexID
jobVertexID) {
+ super("Could not find required metric " + flinkMetric + " for " +
jobVertexID);
+ }
+}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index d041c384..2d2ed88d 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.autoscaler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.autoscaler.exceptions.NotReadyException;
import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
@@ -27,6 +28,7 @@ import
org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
import org.apache.flink.autoscaler.state.TestingAutoscalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
@@ -41,6 +43,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -142,6 +146,36 @@ public class JobAutoScalerImplTest {
assertEquals(0,
autoscaler.flinkMetrics.get(context.getJobKey()).getNumScalingsCount());
}
+ @Test
+ public void testTolerateRecoverableExceptions() throws Exception {
+ TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>>
+ collectorWhichThrowsRecoverableException =
+ new TestingMetricsCollector<>(new
JobTopology(Collections.emptySet())) {
+ @Override
+ protected Collection<AggregatedMetric>
queryAggregatedMetricNames(
+ RestClusterClient<?> restClient,
+ JobID jobID,
+ JobVertexID jobVertexID) {
+ throw new NotReadyException(new Exception());
+ }
+ };
+ collectorWhichThrowsRecoverableException.setJobUpdateTs(Instant.now());
+
+ var autoscaler =
+ new JobAutoScalerImpl<>(
+ collectorWhichThrowsRecoverableException,
+ null,
+ null,
+ eventCollector,
+ scalingRealizer,
+ stateStore);
+
+ // Should not produce an error
+ autoscaler.scale(context);
+ Assertions.assertEquals(
+ 0,
autoscaler.flinkMetrics.get(context.getJobKey()).getNumErrorsCount());
+ }
+
@Test
void testParallelismOverrides() throws Exception {
var autoscaler =
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index 24cc9e2b..ef84c52b 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -20,10 +20,12 @@ package org.apache.flink.autoscaler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.autoscaler.exceptions.NotReadyException;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
@@ -46,12 +48,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Supplier;
import static
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
/** Test for scaling metrics collection logic. */
@@ -570,6 +574,8 @@ public class MetricsCollectionAndEvaluationTest {
metricsCollector.updateMetrics(context,
stateStore).getMetricHistory().isEmpty());
assertEquals(2, stateStore.getCollectedMetrics(context).get().size());
+ testTolerateMetricsMissingDuringStabilizationPhase(topology);
+
// Until window is full (time=200) we keep returning stabilizing
metrics
metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(150),
ZoneId.systemDefault()));
assertEquals(
@@ -588,6 +594,31 @@ public class MetricsCollectionAndEvaluationTest {
assertEquals(2, stateStore.getCollectedMetrics(context).get().size());
}
+ private void
testTolerateMetricsMissingDuringStabilizationPhase(JobTopology topology) {
+ var collectorWithMissingMetrics =
+ new TestingMetricsCollector<JobID,
JobAutoScalerContext<JobID>>(topology) {
+ @Override
+ protected Map<JobVertexID, Map<String, FlinkMetric>>
queryFilteredMetricNames(
+ JobAutoScalerContext<JobID> ctx, JobTopology
topology) {
+ throw new MetricNotFoundException(
+ FlinkMetric.BUSY_TIME_PER_SEC, new
JobVertexID());
+ }
+ };
+ collectorWithMissingMetrics.setClock(
+ Clock.fixed(
+ Instant.ofEpochMilli(startTime.toEpochMilli()),
ZoneId.systemDefault()));
+ collectorWithMissingMetrics.setJobUpdateTs(startTime);
+
+ Supplier<Integer> numCollectedMetricsSupplier =
+ () -> stateStore.getCollectedMetrics(context).get().size();
+
+ int numCollectedMetricsBeforeTest = numCollectedMetricsSupplier.get();
+ assertThrows(
+ NotReadyException.class,
+ () -> collectorWithMissingMetrics.updateMetrics(context,
stateStore));
+ assertEquals(numCollectedMetricsBeforeTest,
numCollectedMetricsSupplier.get());
+ }
+
@Test
public void testScaleDownWithZeroProcessingRate() throws Exception {
var topology = new JobTopology(new VertexInfo(source1, Set.of(), 2,
720));
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
index 724aa218..b578ff6c 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.autoscaler;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.client.program.rest.RestClusterClient;
@@ -43,6 +44,7 @@ import java.util.Set;
import static
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -178,7 +180,7 @@ public class ScalingMetricCollectorTest {
private void testRequiredMetrics(
List<AggregatedMetric> metricList,
List<AggregatedMetric> requiredMetrics,
- RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>>
testCollector,
+ ScalingMetricCollector<JobID, JobAutoScalerContext<JobID>>
testCollector,
JobVertexID vertex,
JobTopology topology) {
for (var m : requiredMetrics) {
@@ -206,4 +208,22 @@ public class ScalingMetricCollectorTest {
new AggregatedMetric("busyTimeMsPerSecond"),
new AggregatedMetric("numRecordsInPerSecond"));
}
+
+ @Test
+ public void testThrowsMetricNotFoundException() {
+ var source = new JobVertexID();
+ var sink = new JobVertexID();
+ var topology =
+ new JobTopology(
+ new VertexInfo(source, Set.of(), 1, 1),
+ new VertexInfo(sink, Set.of(source), 1, 1));
+
+ var metricCollector = new TestingMetricsCollector<>(topology);
+
+ assertThrows(
+ MetricNotFoundException.class,
+ () ->
+ metricCollector.getFilteredVertexMetricNames(
+ null, new JobID(), source, topology));
+ }
}