This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new dfd66c29 [FLINK-36645] [flink-autoscaler] Gracefully handle null execution plan from autoscaler dfd66c29 is described below commit dfd66c29cb936f711cee79270f8d11e64ffce377 Author: dsaisharath <dsaishar...@uber.com> AuthorDate: Thu Jan 9 13:30:03 2025 -0800 [FLINK-36645] [flink-autoscaler] Gracefully handle null execution plan from autoscaler --- .../autoscaler/exceptions/NotReadyException.java | 4 + .../flink/autoscaler/topology/JobTopology.java | 5 + .../autoscaler/ScalingMetricCollectorTest.java | 126 +++++++++++++++++++++ 3 files changed, 135 insertions(+) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java index f19f6ed7..d3703591 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java @@ -23,4 +23,8 @@ public class NotReadyException extends RuntimeException { public NotReadyException(Exception cause) { super(cause); } + + public NotReadyException(String message) { + super(message); + } } diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java index 851db6b0..83e81117 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java @@ -17,6 +17,7 @@ package org.apache.flink.autoscaler.topology; +import org.apache.flink.autoscaler.exceptions.NotReadyException; import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -150,6 +151,10 @@ public class JobTopology { ObjectNode plan = objectMapper.readValue(jsonPlan, ObjectNode.class); ArrayNode nodes = (ArrayNode) plan.get("nodes"); + if (nodes == null || nodes.isEmpty()) { + throw new NotReadyException("No nodes found in the plan, job is not ready yet"); + } + var vertexInfo = new HashSet<VertexInfo>(); for (JsonNode node : nodes) { diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java index d5a05b40..70e3f539 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.autoscaler; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.autoscaler.exceptions.NotReadyException; import org.apache.flink.autoscaler.metrics.FlinkMetric; import org.apache.flink.autoscaler.metrics.MetricNotFoundException; import org.apache.flink.autoscaler.topology.IOMetrics; @@ -228,6 +229,131 @@ public class ScalingMetricCollectorTest { new JobTopology(source, sink), metricsCollector.getJobTopology(jobDetailsInfo)); } + @Test + public void testJobTopologyParsingThrowsNotReadyException() throws Exception { + String s = + "{\n" + + " \"jid\": \"bb8f15efbb37f2ce519f55cdc0e049bf\",\n" + + " \"name\": \"State machine job\",\n" + + " \"isStoppable\": false,\n" + + " \"state\": \"RUNNING\",\n" + + " \"start-time\": 1707893512027,\n" + + " \"end-time\": -1,\n" + + " \"duration\": 214716,\n" + + " \"maxParallelism\": -1,\n" + + " \"now\": 1707893726743,\n" + + " \"timestamps\": {\n" + + " \"SUSPENDED\": 0,\n" + + " \"CREATED\": 1707893512139,\n" + + " \"FAILING\": 0,\n" + + " \"FAILED\": 0,\n" + + " \"INITIALIZING\": 1707893512027,\n" + + " \"RECONCILING\": 0,\n" + + " \"RUNNING\": 1707893512217,\n" + + " \"RESTARTING\": 0,\n" + + " \"CANCELLING\": 0,\n" + + " \"FINISHED\": 0,\n" + + " \"CANCELED\": 0\n" + + " },\n" + + " \"vertices\": [\n" + + " {\n" + + " \"id\": \"bc764cd8ddf7a0cff126f51c16239658\",\n" + + " \"name\": \"Source: Custom Source\",\n" + + " \"maxParallelism\": 128,\n" + + " \"parallelism\": 2,\n" + + " \"status\": \"FINISHED\",\n" + + " \"start-time\": 1707893517277,\n" + + " \"end-time\": -1,\n" + + " \"duration\": 209466,\n" + + " \"tasks\": {\n" + + " \"DEPLOYING\": 0,\n" + + " \"INITIALIZING\": 0,\n" + + " \"SCHEDULED\": 0,\n" + + " \"CANCELING\": 0,\n" + + " \"CANCELED\": 0,\n" + + " \"RECONCILING\": 0,\n" + + " \"RUNNING\": 2,\n" + + " \"FAILED\": 0,\n" + + " \"CREATED\": 0,\n" + + " \"FINISHED\": 0\n" + + " },\n" + + " \"metrics\": {\n" + + " \"read-bytes\": 0,\n" + + " \"read-bytes-complete\": true,\n" + + " \"write-bytes\": 4036982,\n" + + " \"write-bytes-complete\": true,\n" + + " \"read-records\": 0,\n" + + " \"read-records-complete\": true,\n" + + " \"write-records\": 291629,\n" + + " \"write-records-complete\": true,\n" + + " \"accumulated-backpressured-time\": 0,\n" + + " \"accumulated-idle-time\": 0,\n" + + " \"accumulated-busy-time\": \"NaN\"\n" + + " }\n" + + " },\n" + + " {\n" + + " \"id\": \"20ba6b65f97481d5570070de90e4e791\",\n" + + " \"name\": \"Flat Map -> Sink: Print to Std. Out\",\n" + + " \"maxParallelism\": 128,\n" + + " \"parallelism\": 2,\n" + + " \"status\": \"RUNNING\",\n" + + " \"start-time\": 1707893517280,\n" + + " \"end-time\": -1,\n" + + " \"duration\": 209463,\n" + + " \"tasks\": {\n" + + " \"DEPLOYING\": 0,\n" + + " \"INITIALIZING\": 0,\n" + + " \"SCHEDULED\": 0,\n" + + " \"CANCELING\": 0,\n" + + " \"CANCELED\": 0,\n" + + " \"RECONCILING\": 0,\n" + + " \"RUNNING\": 2,\n" + + " \"FAILED\": 0,\n" + + " \"CREATED\": 0,\n" + + " \"FINISHED\": 0\n" + + " },\n" + + " \"metrics\": {\n" + + " \"read-bytes\": 4078629,\n" + + " \"read-bytes-complete\": true,\n" + + " \"write-bytes\": 0,\n" + + " \"write-bytes-complete\": true,\n" + + " \"read-records\": 291532,\n" + + " \"read-records-complete\": true,\n" + + " \"write-records\": 1,\n" + + " \"write-records-complete\": true,\n" + + " \"accumulated-backpressured-time\": 0,\n" + + " \"accumulated-idle-time\": 407702,\n" + + " \"accumulated-busy-time\": 2\n" + + " }\n" + + " }\n" + + " ],\n" + + " \"status-counts\": {\n" + + " \"DEPLOYING\": 0,\n" + + " \"INITIALIZING\": 0,\n" + + " \"SCHEDULED\": 0,\n" + + " \"CANCELING\": 0,\n" + + " \"CANCELED\": 0,\n" + + " \"RECONCILING\": 0,\n" + + " \"RUNNING\": 2,\n" + + " \"FAILED\": 0,\n" + + " \"CREATED\": 0,\n" + + " \"FINISHED\": 0\n" + + " },\n" + + " \"plan\": {\n" + + " \"jid\": \"bb8f15efbb37f2ce519f55cdc0e049bf\",\n" + + " \"name\": \"State machine job\",\n" + + " \"type\": \"STREAMING\",\n" + + " \"nodes\": [\n" + + " ]\n" + + " }\n" + + "}"; + JobDetailsInfo jobDetailsInfo = new ObjectMapper().readValue(s, JobDetailsInfo.class); + + var metricsCollector = new RestApiMetricsCollector<>(); + assertThrows( + NotReadyException.class, () -> metricsCollector.getJobTopology(jobDetailsInfo)); + } + @Test public void testJobTopologyParsingFromJobDetailsWithSlotSharingGroup() throws Exception { String s =