This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new dfd66c29 [FLINK-36645] [flink-autoscaler] Gracefully handle null 
execution plan from autoscaler
dfd66c29 is described below

commit dfd66c29cb936f711cee79270f8d11e64ffce377
Author: dsaisharath <dsaishar...@uber.com>
AuthorDate: Thu Jan 9 13:30:03 2025 -0800

    [FLINK-36645] [flink-autoscaler] Gracefully handle null execution plan from 
autoscaler
---
 .../autoscaler/exceptions/NotReadyException.java   |   4 +
 .../flink/autoscaler/topology/JobTopology.java     |   5 +
 .../autoscaler/ScalingMetricCollectorTest.java     | 126 +++++++++++++++++++++
 3 files changed, 135 insertions(+)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java
index f19f6ed7..d3703591 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/exceptions/NotReadyException.java
@@ -23,4 +23,8 @@ public class NotReadyException extends RuntimeException {
     public NotReadyException(Exception cause) {
         super(cause);
     }
+
+    public NotReadyException(String message) {
+        super(message);
+    }
 }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
index 851db6b0..83e81117 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.autoscaler.topology;
 
+import org.apache.flink.autoscaler.exceptions.NotReadyException;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
@@ -150,6 +151,10 @@ public class JobTopology {
         ObjectNode plan = objectMapper.readValue(jsonPlan, ObjectNode.class);
         ArrayNode nodes = (ArrayNode) plan.get("nodes");
 
+        if (nodes == null || nodes.isEmpty()) {
+            throw new NotReadyException("No nodes found in the plan, job is 
not ready yet");
+        }
+
         var vertexInfo = new HashSet<VertexInfo>();
 
         for (JsonNode node : nodes) {
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
index d5a05b40..70e3f539 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.autoscaler;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.autoscaler.exceptions.NotReadyException;
 import org.apache.flink.autoscaler.metrics.FlinkMetric;
 import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
 import org.apache.flink.autoscaler.topology.IOMetrics;
@@ -228,6 +229,131 @@ public class ScalingMetricCollectorTest {
                 new JobTopology(source, sink), 
metricsCollector.getJobTopology(jobDetailsInfo));
     }
 
+    @Test
+    public void testJobTopologyParsingThrowsNotReadyException() throws 
Exception {
+        String s =
+                "{\n"
+                        + "  \"jid\": \"bb8f15efbb37f2ce519f55cdc0e049bf\",\n"
+                        + "  \"name\": \"State machine job\",\n"
+                        + "  \"isStoppable\": false,\n"
+                        + "  \"state\": \"RUNNING\",\n"
+                        + "  \"start-time\": 1707893512027,\n"
+                        + "  \"end-time\": -1,\n"
+                        + "  \"duration\": 214716,\n"
+                        + "  \"maxParallelism\": -1,\n"
+                        + "  \"now\": 1707893726743,\n"
+                        + "  \"timestamps\": {\n"
+                        + "    \"SUSPENDED\": 0,\n"
+                        + "    \"CREATED\": 1707893512139,\n"
+                        + "    \"FAILING\": 0,\n"
+                        + "    \"FAILED\": 0,\n"
+                        + "    \"INITIALIZING\": 1707893512027,\n"
+                        + "    \"RECONCILING\": 0,\n"
+                        + "    \"RUNNING\": 1707893512217,\n"
+                        + "    \"RESTARTING\": 0,\n"
+                        + "    \"CANCELLING\": 0,\n"
+                        + "    \"FINISHED\": 0,\n"
+                        + "    \"CANCELED\": 0\n"
+                        + "  },\n"
+                        + "  \"vertices\": [\n"
+                        + "    {\n"
+                        + "      \"id\": 
\"bc764cd8ddf7a0cff126f51c16239658\",\n"
+                        + "      \"name\": \"Source: Custom Source\",\n"
+                        + "      \"maxParallelism\": 128,\n"
+                        + "      \"parallelism\": 2,\n"
+                        + "      \"status\": \"FINISHED\",\n"
+                        + "      \"start-time\": 1707893517277,\n"
+                        + "      \"end-time\": -1,\n"
+                        + "      \"duration\": 209466,\n"
+                        + "      \"tasks\": {\n"
+                        + "        \"DEPLOYING\": 0,\n"
+                        + "        \"INITIALIZING\": 0,\n"
+                        + "        \"SCHEDULED\": 0,\n"
+                        + "        \"CANCELING\": 0,\n"
+                        + "        \"CANCELED\": 0,\n"
+                        + "        \"RECONCILING\": 0,\n"
+                        + "        \"RUNNING\": 2,\n"
+                        + "        \"FAILED\": 0,\n"
+                        + "        \"CREATED\": 0,\n"
+                        + "        \"FINISHED\": 0\n"
+                        + "      },\n"
+                        + "      \"metrics\": {\n"
+                        + "        \"read-bytes\": 0,\n"
+                        + "        \"read-bytes-complete\": true,\n"
+                        + "        \"write-bytes\": 4036982,\n"
+                        + "        \"write-bytes-complete\": true,\n"
+                        + "        \"read-records\": 0,\n"
+                        + "        \"read-records-complete\": true,\n"
+                        + "        \"write-records\": 291629,\n"
+                        + "        \"write-records-complete\": true,\n"
+                        + "        \"accumulated-backpressured-time\": 0,\n"
+                        + "        \"accumulated-idle-time\": 0,\n"
+                        + "        \"accumulated-busy-time\": \"NaN\"\n"
+                        + "      }\n"
+                        + "    },\n"
+                        + "    {\n"
+                        + "      \"id\": 
\"20ba6b65f97481d5570070de90e4e791\",\n"
+                        + "      \"name\": \"Flat Map -> Sink: Print to Std. 
Out\",\n"
+                        + "      \"maxParallelism\": 128,\n"
+                        + "      \"parallelism\": 2,\n"
+                        + "      \"status\": \"RUNNING\",\n"
+                        + "      \"start-time\": 1707893517280,\n"
+                        + "      \"end-time\": -1,\n"
+                        + "      \"duration\": 209463,\n"
+                        + "      \"tasks\": {\n"
+                        + "        \"DEPLOYING\": 0,\n"
+                        + "        \"INITIALIZING\": 0,\n"
+                        + "        \"SCHEDULED\": 0,\n"
+                        + "        \"CANCELING\": 0,\n"
+                        + "        \"CANCELED\": 0,\n"
+                        + "        \"RECONCILING\": 0,\n"
+                        + "        \"RUNNING\": 2,\n"
+                        + "        \"FAILED\": 0,\n"
+                        + "        \"CREATED\": 0,\n"
+                        + "        \"FINISHED\": 0\n"
+                        + "      },\n"
+                        + "      \"metrics\": {\n"
+                        + "        \"read-bytes\": 4078629,\n"
+                        + "        \"read-bytes-complete\": true,\n"
+                        + "        \"write-bytes\": 0,\n"
+                        + "        \"write-bytes-complete\": true,\n"
+                        + "        \"read-records\": 291532,\n"
+                        + "        \"read-records-complete\": true,\n"
+                        + "        \"write-records\": 1,\n"
+                        + "        \"write-records-complete\": true,\n"
+                        + "        \"accumulated-backpressured-time\": 0,\n"
+                        + "        \"accumulated-idle-time\": 407702,\n"
+                        + "        \"accumulated-busy-time\": 2\n"
+                        + "      }\n"
+                        + "    }\n"
+                        + "  ],\n"
+                        + "  \"status-counts\": {\n"
+                        + "    \"DEPLOYING\": 0,\n"
+                        + "    \"INITIALIZING\": 0,\n"
+                        + "    \"SCHEDULED\": 0,\n"
+                        + "    \"CANCELING\": 0,\n"
+                        + "    \"CANCELED\": 0,\n"
+                        + "    \"RECONCILING\": 0,\n"
+                        + "    \"RUNNING\": 2,\n"
+                        + "    \"FAILED\": 0,\n"
+                        + "    \"CREATED\": 0,\n"
+                        + "    \"FINISHED\": 0\n"
+                        + "  },\n"
+                        + "  \"plan\": {\n"
+                        + "    \"jid\": 
\"bb8f15efbb37f2ce519f55cdc0e049bf\",\n"
+                        + "    \"name\": \"State machine job\",\n"
+                        + "    \"type\": \"STREAMING\",\n"
+                        + "    \"nodes\": [\n"
+                        + "    ]\n"
+                        + "  }\n"
+                        + "}";
+        JobDetailsInfo jobDetailsInfo = new ObjectMapper().readValue(s, 
JobDetailsInfo.class);
+
+        var metricsCollector = new RestApiMetricsCollector<>();
+        assertThrows(
+                NotReadyException.class, () -> 
metricsCollector.getJobTopology(jobDetailsInfo));
+    }
+
     @Test
     public void testJobTopologyParsingFromJobDetailsWithSlotSharingGroup() 
throws Exception {
         String s =

Reply via email to