This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 03c9eb6f [FLINK-33429] Tolerate missing metrics in the stabilization 
phase (#699)
03c9eb6f is described below

commit 03c9eb6f57d5d61f68cd93ff765b663d13b01ffe
Author: Maximilian Michels <[email protected]>
AuthorDate: Thu Nov 2 14:02:02 2023 +0100

    [FLINK-33429] Tolerate missing metrics in the stabilization phase (#699)
    
    The new code for the 1.7.0 release introduces metric collection during the
    stabilization phase to allow sampling the observed true processing rate. 
Metrics
    might not be fully initialized during that phase, as evident through the 
error
    metrics. The following error is thrown:
    
    ```
    java.lang.RuntimeException: Could not find required metric
    NUM_RECORDS_OUT_PER_SEC for 667f5d5aa757fb217b92c06f0f5d2bf2
    ```
    
    To prevent these errors shadowing actual errors, we should detect and ignore
    this recoverable exception.
---
 .../apache/flink/autoscaler/JobAutoScalerImpl.java |  3 ++
 .../flink/autoscaler/ScalingMetricCollector.java   | 42 ++++++++++++----------
 .../autoscaler/exceptions/NotReadyException.java   | 26 ++++++++++++++
 .../metrics/MetricNotFoundException.java           | 28 +++++++++++++++
 .../flink/autoscaler/JobAutoScalerImplTest.java    | 34 ++++++++++++++++++
 .../MetricsCollectionAndEvaluationTest.java        | 31 ++++++++++++++++
 .../autoscaler/ScalingMetricCollectorTest.java     | 22 +++++++++++-
 7 files changed, 166 insertions(+), 20 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index 3196a3de..ca640371 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
+import org.apache.flink.autoscaler.exceptions.NotReadyException;
 import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
@@ -96,6 +97,8 @@ public class JobAutoScalerImpl<KEY, Context extends 
JobAutoScalerContext<KEY>>
 
             runScalingLogic(ctx, autoscalerMetrics);
             stateStore.flush(ctx);
+        } catch (NotReadyException e) {
+            LOG.debug("Not ready for scaling", e);
         } catch (Throwable e) {
             onError(ctx, autoscalerMetrics, e);
         } finally {
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
index df5f8ad6..51f5931b 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingMetricCollector.java
@@ -20,9 +20,11 @@ package org.apache.flink.autoscaler;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
+import org.apache.flink.autoscaler.exceptions.NotReadyException;
 import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetrics;
 import org.apache.flink.autoscaler.state.AutoScalerStateStore;
@@ -112,6 +114,7 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         }
         var topology = getJobTopology(ctx, stateStore, jobDetailsInfo);
         var stableTime = 
jobUpdateTs.plus(conf.get(AutoScalerOptions.STABILIZATION_INTERVAL));
+        final boolean isStabilizing = now.isBefore(stableTime);
 
         // Calculate timestamp when the metric windows is full
         var metricWindowSize = getMetricWindowSize(conf);
@@ -119,7 +122,7 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
                 getWindowFullTime(metricHistory.tailMap(stableTime), now, 
metricWindowSize);
 
         // The filtered list of metrics we want to query for each vertex
-        var filteredVertexMetricNames = queryFilteredMetricNames(ctx, 
topology);
+        var filteredVertexMetricNames = queryFilteredMetricNames(ctx, 
topology, isStabilizing);
 
         // Aggregated job vertex metrics collected from Flink based on the 
filtered metric names
         var collectedVertexMetrics = queryAllAggregatedMetrics(ctx, 
filteredVertexMetricNames);
@@ -131,7 +134,7 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         // Add scaling metrics to history if they were computed successfully
         metricHistory.put(now, scalingMetrics);
 
-        if (now.isBefore(stableTime)) {
+        if (isStabilizing) {
             LOG.info("Stabilizing until {}", stableTime);
             stateStore.storeCollectedMetrics(ctx, metricHistory);
             return new CollectedMetricHistory(topology, 
Collections.emptySortedMap());
@@ -343,8 +346,19 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         return (currentLag - lastLag) / timeDiff;
     }
 
+    private Map<JobVertexID, Map<String, FlinkMetric>> 
queryFilteredMetricNames(
+            Context ctx, JobTopology topology, boolean isStabilizing) {
+        try {
+            return queryFilteredMetricNames(ctx, topology);
+        } catch (MetricNotFoundException e) {
+            if (isStabilizing) {
+                throw new NotReadyException(e);
+            }
+            throw e;
+        }
+    }
+
     /** Query the available metric names for each job vertex. */
-    @SneakyThrows
     protected Map<JobVertexID, Map<String, FlinkMetric>> 
queryFilteredMetricNames(
             Context ctx, JobTopology topology) {
 
@@ -375,6 +389,7 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
         return names;
     }
 
+    @SneakyThrows
     private Map<JobVertexID, Map<String, FlinkMetric>> 
queryFilteredMetricNames(
             Context ctx, JobTopology topology, Stream<JobVertexID> 
vertexStream) {
         try (var restClient = ctx.getRestClusterClient()) {
@@ -386,21 +401,11 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
                                     v ->
                                             getFilteredVertexMetricNames(
                                                     restClient, 
ctx.getJobID(), v, topology)));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
         }
     }
 
-    /**
-     * Query and filter metric names for a given job vertex.
-     *
-     * @param restClient Flink rest client.
-     * @param jobID Job Id.
-     * @param jobVertexID Job Vertex Id.
-     * @return Map of filtered metric names.
-     */
-    @SneakyThrows
-    protected Map<String, FlinkMetric> getFilteredVertexMetricNames(
+    /** Query and filter metric names for a given job vertex. */
+    Map<String, FlinkMetric> getFilteredVertexMetricNames(
             RestClusterClient<?> restClient,
             JobID jobID,
             JobVertexID jobVertexID,
@@ -449,8 +454,7 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
                 // Add actual Flink metric name to list
                 filteredMetrics.put(flinkMetricName.get(), flinkMetric);
             } else {
-                throw new RuntimeException(
-                        "Could not find required metric " + flinkMetric + " 
for " + jobVertexID);
+                throw new MetricNotFoundException(flinkMetric, jobVertexID);
             }
         }
 
@@ -458,9 +462,9 @@ public abstract class ScalingMetricCollector<KEY, Context 
extends JobAutoScalerC
     }
 
     @VisibleForTesting
+    @SneakyThrows
     protected Collection<AggregatedMetric> queryAggregatedMetricNames(
-            RestClusterClient<?> restClient, JobID jobID, JobVertexID 
jobVertexID)
-            throws Exception {
+            RestClusterClient<?> restClient, JobID jobID, JobVertexID 
jobVertexID) {
         var parameters = new AggregatedSubtaskMetricsParameters();
         var pathIt = parameters.getPathParameters().iterator();
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java
new file mode 100644
index 00000000..f19f6ed7
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.exceptions;
+
+/** An exception to indicate the called code wasn't ready but will be at a 
later point in time. */
+public class NotReadyException extends RuntimeException {
+
+    public NotReadyException(Exception cause) {
+        super(cause);
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricNotFoundException.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricNotFoundException.java
new file mode 100644
index 00000000..2c438b77
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/metrics/MetricNotFoundException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.metrics;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+/** Exception for when an expected metric was not found. */
+public class MetricNotFoundException extends RuntimeException {
+
+    public MetricNotFoundException(FlinkMetric flinkMetric, JobVertexID 
jobVertexID) {
+        super("Could not find required metric " + flinkMetric + " for " + 
jobVertexID);
+    }
+}
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index d041c384..2d2ed88d 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.autoscaler;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.autoscaler.exceptions.NotReadyException;
 import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
 import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
@@ -27,6 +28,7 @@ import 
org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
 import org.apache.flink.autoscaler.state.TestingAutoscalerStateStore;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Metric;
@@ -41,6 +43,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.time.Instant;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -142,6 +146,36 @@ public class JobAutoScalerImplTest {
         assertEquals(0, 
autoscaler.flinkMetrics.get(context.getJobKey()).getNumScalingsCount());
     }
 
+    @Test
+    public void testTolerateRecoverableExceptions() throws Exception {
+        TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>>
+                collectorWhichThrowsRecoverableException =
+                        new TestingMetricsCollector<>(new 
JobTopology(Collections.emptySet())) {
+                            @Override
+                            protected Collection<AggregatedMetric> 
queryAggregatedMetricNames(
+                                    RestClusterClient<?> restClient,
+                                    JobID jobID,
+                                    JobVertexID jobVertexID) {
+                                throw new NotReadyException(new Exception());
+                            }
+                        };
+        collectorWhichThrowsRecoverableException.setJobUpdateTs(Instant.now());
+
+        var autoscaler =
+                new JobAutoScalerImpl<>(
+                        collectorWhichThrowsRecoverableException,
+                        null,
+                        null,
+                        eventCollector,
+                        scalingRealizer,
+                        stateStore);
+
+        // Should not produce an error
+        autoscaler.scale(context);
+        Assertions.assertEquals(
+                0, 
autoscaler.flinkMetrics.get(context.getJobKey()).getNumErrorsCount());
+    }
+
     @Test
     void testParallelismOverrides() throws Exception {
         var autoscaler =
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index 24cc9e2b..ef84c52b 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -20,10 +20,12 @@ package org.apache.flink.autoscaler;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.TestingEventCollector;
+import org.apache.flink.autoscaler.exceptions.NotReadyException;
 import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
 import org.apache.flink.autoscaler.metrics.CollectedMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
 import org.apache.flink.autoscaler.topology.JobTopology;
@@ -46,12 +48,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Supplier;
 
 import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for scaling metrics collection logic. */
@@ -570,6 +574,8 @@ public class MetricsCollectionAndEvaluationTest {
                 metricsCollector.updateMetrics(context, 
stateStore).getMetricHistory().isEmpty());
         assertEquals(2, stateStore.getCollectedMetrics(context).get().size());
 
+        testTolerateMetricsMissingDuringStabilizationPhase(topology);
+
         // Until window is full (time=200) we keep returning stabilizing 
metrics
         metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(150), 
ZoneId.systemDefault()));
         assertEquals(
@@ -588,6 +594,31 @@ public class MetricsCollectionAndEvaluationTest {
         assertEquals(2, stateStore.getCollectedMetrics(context).get().size());
     }
 
+    private void 
testTolerateMetricsMissingDuringStabilizationPhase(JobTopology topology) {
+        var collectorWithMissingMetrics =
+                new TestingMetricsCollector<JobID, 
JobAutoScalerContext<JobID>>(topology) {
+                    @Override
+                    protected Map<JobVertexID, Map<String, FlinkMetric>> 
queryFilteredMetricNames(
+                            JobAutoScalerContext<JobID> ctx, JobTopology 
topology) {
+                        throw new MetricNotFoundException(
+                                FlinkMetric.BUSY_TIME_PER_SEC, new 
JobVertexID());
+                    }
+                };
+        collectorWithMissingMetrics.setClock(
+                Clock.fixed(
+                        Instant.ofEpochMilli(startTime.toEpochMilli()), 
ZoneId.systemDefault()));
+        collectorWithMissingMetrics.setJobUpdateTs(startTime);
+
+        Supplier<Integer> numCollectedMetricsSupplier =
+                () -> stateStore.getCollectedMetrics(context).get().size();
+
+        int numCollectedMetricsBeforeTest = numCollectedMetricsSupplier.get();
+        assertThrows(
+                NotReadyException.class,
+                () -> collectorWithMissingMetrics.updateMetrics(context, 
stateStore));
+        assertEquals(numCollectedMetricsBeforeTest, 
numCollectedMetricsSupplier.get());
+    }
+
     @Test
     public void testScaleDownWithZeroProcessingRate() throws Exception {
         var topology = new JobTopology(new VertexInfo(source1, Set.of(), 2, 
720));
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
index 724aa218..b578ff6c 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.autoscaler;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.metrics.FlinkMetric;
+import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
 import org.apache.flink.autoscaler.topology.JobTopology;
 import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.client.program.rest.RestClusterClient;
@@ -43,6 +44,7 @@ import java.util.Set;
 
 import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -178,7 +180,7 @@ public class ScalingMetricCollectorTest {
     private void testRequiredMetrics(
             List<AggregatedMetric> metricList,
             List<AggregatedMetric> requiredMetrics,
-            RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> 
testCollector,
+            ScalingMetricCollector<JobID, JobAutoScalerContext<JobID>> 
testCollector,
             JobVertexID vertex,
             JobTopology topology) {
         for (var m : requiredMetrics) {
@@ -206,4 +208,22 @@ public class ScalingMetricCollectorTest {
                 new AggregatedMetric("busyTimeMsPerSecond"),
                 new AggregatedMetric("numRecordsInPerSecond"));
     }
+
+    @Test
+    public void testThrowsMetricNotFoundException() {
+        var source = new JobVertexID();
+        var sink = new JobVertexID();
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source, Set.of(), 1, 1),
+                        new VertexInfo(sink, Set.of(source), 1, 1));
+
+        var metricCollector = new TestingMetricsCollector<>(topology);
+
+        assertThrows(
+                MetricNotFoundException.class,
+                () ->
+                        metricCollector.getFilteredVertexMetricNames(
+                                null, new JobID(), source, topology));
+    }
 }

Reply via email to