This is an automated email from the ASF dual-hosted git repository.

junrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 08990c7d212 [FLINK-36069][runtime/rest] Extending job detail rest API 
to expose json stream graph
08990c7d212 is described below

commit 08990c7d21260dff55e3a499354f5a5980ae68e6
Author: Yu Chen <yuchen.e...@gmail.com>
AuthorDate: Sun Sep 1 14:12:53 2024 +0800

    [FLINK-36069][runtime/rest] Extending job detail rest API to expose json 
stream graph
---
 .../client/program/rest/RestClusterClientTest.java |   4 +-
 .../runtime/webmonitor/WebFrontendITCase.java      | 119 +++++++++++
 .../src/test/resources/rest_api_v1.snapshot        |   7 +
 .../executiongraph/AccessExecutionGraph.java       |  18 ++
 .../executiongraph/ArchivedExecutionGraph.java     |  28 ++-
 .../executiongraph/DefaultExecutionGraph.java      |  10 +
 .../jobgraph/jsonplan/JsonPlanGenerator.java       |  54 +++++
 .../rest/handler/job/JobDetailsHandler.java        |   9 +-
 .../runtime/rest/messages/job/JobDetailsInfo.java  |  48 ++++-
 .../AdaptiveExecutionPlanSchedulingContext.java    |   5 +
 .../ExecutionPlanSchedulingContext.java            |  11 +
 .../NonAdaptiveExecutionPlanSchedulingContext.java |   5 +
 .../streaming/api/graph/AdaptiveGraphManager.java  |  29 ++-
 .../api/graph/DefaultStreamGraphContext.java       |  26 +++
 .../streaming/api/graph/StreamGraphContext.java    |   7 +
 .../jobgraph/jsonplan/JsonGeneratorTest.java       |  77 +++++++
 .../jobgraph/jsonplan/StreamGraphJsonSchema.java   | 228 +++++++++++++++++++++
 .../rest/handler/job/JobDetailsHandlerTest.java    | 119 +++++++++++
 .../utils/ArchivedExecutionGraphBuilder.java       |  10 +-
 .../rest/messages/job/JobDetailsInfoTest.java      |   4 +-
 .../adapter/DefaultExecutionTopologyTest.java      |   5 +
 .../adaptive/StateTrackingMockExecutionGraph.java  |  10 +
 22 files changed, 821 insertions(+), 12 deletions(-)

diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 29968f192b9..19e5b2684c8 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -1277,7 +1277,9 @@ class RestClusterClientTest {
                         Collections.singletonMap(JobStatus.RUNNING, 1L),
                         jobVertexDetailsInfos,
                         Collections.singletonMap(ExecutionState.RUNNING, 1),
-                        new JobPlanInfo.RawJson("{\"id\":\"1234\"}"));
+                        new JobPlanInfo.RawJson("{\"id\":\"1234\"}"),
+                        new JobPlanInfo.RawJson("{\"id\":\"1234\"}"),
+                        0);
         final TestJobDetailsInfoHandler jobDetailsInfoHandler =
                 new TestJobDetailsInfoHandler(jobDetailsInfo);
 
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index 0f70ba2f5a2..5fd80337473 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -19,13 +19,22 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
+import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -35,9 +44,13 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import 
org.apache.flink.runtime.rest.messages.job.coordination.ClientCoordinationRequestBody;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
 import 
org.apache.flink.streaming.api.operators.collect.CollectCoordinationRequest;
 import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator;
@@ -523,6 +536,63 @@ class WebFrontendITCase {
         BlockingInvokable.reset();
     }
 
+    @Test
+    void getStreamGraphFromBatchJobDetailsHandler(
+            @InjectClusterClient ClusterClient<?> clusterClient,
+            @InjectClusterRESTAddress URI restAddress)
+            throws Exception {
+        // this only works if there is no active job at this point
+        assertThat(getRunningJobs(clusterClient).isEmpty());
+
+        // Create a task
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        env.fromSource(
+                        new BlockingNumberSequenceSource(),
+                        WatermarkStrategy.noWatermarks(),
+                        "block-source")
+                .setParallelism(2)
+                .addSink(new SinkFunction<>() {});
+        StreamGraph streamGraph = env.getStreamGraph();
+        final JobID jid = streamGraph.getJobID();
+
+        clusterClient.submitJob(streamGraph).get();
+
+        // wait for job to show up
+        while (getRunningJobs(clusterClient).isEmpty()) {
+            Thread.sleep(10);
+        }
+
+        // wait for tasks to be properly running
+        BlockingInvokable.latch.await();
+
+        final Duration testTimeout = Duration.ofMinutes(2);
+        final Deadline deadline = Deadline.fromNow(testTimeout);
+        try (HttpTestClient client = new HttpTestClient("localhost", 
restAddress.getPort())) {
+            // Request the file from the web server
+            client.sendGetRequest("/jobs/" + jid, deadline.timeLeft());
+
+            HttpTestClient.SimpleHttpResponse response =
+                    client.getNextResponse(deadline.timeLeft());
+
+            assertThat(response.getStatus()).isEqualTo(HttpResponseStatus.OK);
+            assertThat(response.getType()).isEqualTo("application/json; 
charset=UTF-8");
+            String content = response.getContent();
+            JsonNode jsonNode = OBJECT_MAPPER.readTree(content);
+            assertThat(jsonNode.has("stream-graph")).isTrue();
+            assertThat(jsonNode.has("pending-operators")).isTrue();
+        }
+
+        clusterClient.cancel(jid).get();
+
+        // ensure cancellation is finished
+        while (!getRunningJobs(clusterClient).isEmpty()) {
+            Thread.sleep(20);
+        }
+
+        BlockingInvokable.reset();
+    }
+
     private static List<JobID> getRunningJobs(ClusterClient<?> client) throws 
Exception {
         Collection<JobStatusMessage> statusMessages = client.listJobs().get();
         return statusMessages.stream()
@@ -616,6 +686,55 @@ class WebFrontendITCase {
                 "Could not get HTTP response in time since the service is 
still unavailable.");
     }
 
+    /**
+     * Test sequence source with blocker, which blocks reader by blocker at 
EOI and only exit when
+     * job cancelled.
+     */
+    private static class BlockingNumberSequenceSource extends 
NumberSequenceSource {
+        public BlockingNumberSequenceSource() {
+            super(0, 1);
+        }
+
+        @Override
+        public SourceReader<Long, NumberSequenceSplit> createReader(
+                SourceReaderContext readerContext) {
+            return new BlockingIteratorSourceReader<>(readerContext);
+        }
+    }
+
+    private static class BlockingIteratorSourceReader<
+                    E, IterT extends Iterator<E>, SplitT extends 
IteratorSourceSplit<E, IterT>>
+            extends IteratorSourceReader<E, IterT, SplitT> {
+        private transient BlockingInvokable blocker;
+
+        public BlockingIteratorSourceReader(SourceReaderContext context) {
+            super(context);
+        }
+
+        @Override
+        public void start() {
+            super.start();
+            blocker = new BlockingInvokable(new DummyEnvironment());
+        }
+
+        @Override
+        public InputStatus pollNext(ReaderOutput<E> output) {
+            InputStatus inputStatus = super.pollNext(output);
+            if (inputStatus == InputStatus.END_OF_INPUT) {
+                try {
+                    blocker.invoke();
+                } catch (Exception ignored) {
+                }
+            }
+            return inputStatus;
+        }
+
+        @Override
+        public void close() {
+            blocker.cancel();
+        }
+    }
+
     /** Test invokable that allows waiting for all subtasks to be running. */
     public static class BlockingInvokable extends AbstractInvokable {
 
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 2c3601bf15b..c1801407d1c 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -967,6 +967,13 @@
         "plan" : {
           "type" : "object",
           "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson"
+        },
+        "stream-graph" : {
+          "type" : "object",
+          "$ref" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:JobPlanInfo:RawJson"
+        },
+        "pending-operators" : {
+          "type" : "integer"
         }
       }
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index e9998b1a78f..1cac672a896 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -47,6 +47,15 @@ public interface AccessExecutionGraph extends 
JobStatusProvider {
      */
     String getJsonPlan();
 
+    /**
+     * Returns the stream graph as a JSON string.
+     *
+     * @return stream graph as a JSON string, or null if the job is submitted 
with a JobGraph or if
+     *     it's a streaming job.
+     */
+    @Nullable
+    String getStreamGraphJson();
+
     /**
      * Returns the {@link JobID} for this execution graph.
      *
@@ -199,4 +208,13 @@ public interface AccessExecutionGraph extends 
JobStatusProvider {
      * @return The changelog storage name, or an empty Optional in the case of 
batch jobs
      */
     Optional<String> getChangelogStorageName();
+
+    /**
+     * Retrieves the count of pending operators waiting to be transferred to 
job vertices in the
+     * adaptive execution of batch jobs. This value will be zero if the job is 
submitted with a
+     * JobGraph or if it's a streaming job.
+     *
+     * @return the number of pending operators.
+     */
+    int getPendingOperatorCount();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index f322523bcbf..f93eaa4a6ae 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -113,6 +113,10 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
 
     @Nullable private final String changelogStorageName;
 
+    @Nullable private final String streamGraphJson;
+
+    private final int pendingOperatorCount;
+
     public ArchivedExecutionGraph(
             JobID jobID,
             String jobName,
@@ -132,7 +136,9 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
             @Nullable String stateBackendName,
             @Nullable String checkpointStorageName,
             @Nullable TernaryBoolean stateChangelogEnabled,
-            @Nullable String changelogStorageName) {
+            @Nullable String changelogStorageName,
+            @Nullable String streamGraphJson,
+            int pendingOperatorCount) {
 
         this.jobID = Preconditions.checkNotNull(jobID);
         this.jobName = Preconditions.checkNotNull(jobName);
@@ -153,6 +159,8 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
         this.checkpointStorageName = checkpointStorageName;
         this.stateChangelogEnabled = stateChangelogEnabled;
         this.changelogStorageName = changelogStorageName;
+        this.streamGraphJson = streamGraphJson;
+        this.pendingOperatorCount = pendingOperatorCount;
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -162,6 +170,11 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
         return jsonPlan;
     }
 
+    @Override
+    public String getStreamGraphJson() {
+        return streamGraphJson;
+    }
+
     @Override
     public JobID getJobID() {
         return jobID;
@@ -298,6 +311,11 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
         return Optional.ofNullable(changelogStorageName);
     }
 
+    @Override
+    public int getPendingOperatorCount() {
+        return pendingOperatorCount;
+    }
+
     /**
      * Create a {@link ArchivedExecutionGraph} from the given {@link 
ExecutionGraph}.
      *
@@ -366,7 +384,9 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                 executionGraph.getStateBackendName().orElse(null),
                 executionGraph.getCheckpointStorageName().orElse(null),
                 executionGraph.isChangelogStateBackendEnabled(),
-                executionGraph.getChangelogStorageName().orElse(null));
+                executionGraph.getChangelogStorageName().orElse(null),
+                executionGraph.getStreamGraphJson(),
+                executionGraph.getPendingOperatorCount());
     }
 
     /**
@@ -487,6 +507,8 @@ public class ArchivedExecutionGraph implements 
AccessExecutionGraph, Serializabl
                 checkpointingSettings == null
                         ? TernaryBoolean.UNDEFINED
                         : 
checkpointingSettings.isChangelogStateBackendEnabled(),
-                checkpointingSettings == null ? null : "Unknown");
+                checkpointingSettings == null ? null : "Unknown",
+                null,
+                0);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 1114671a1b5..2d551ec6dfd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -457,6 +457,11 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
         return Optional.ofNullable(changelogStorageName);
     }
 
+    @Override
+    public int getPendingOperatorCount() {
+        return executionPlanSchedulingContext.getPendingOperatorCount();
+    }
+
     @Override
     public void enableCheckpointing(
             CheckpointCoordinatorConfiguration chkConfig,
@@ -612,6 +617,11 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
         this.jsonPlan = jsonPlan;
     }
 
+    @Override
+    public String getStreamGraphJson() {
+        return executionPlanSchedulingContext.getStreamGraphJson();
+    }
+
     @Override
     public String getJsonPlan() {
         return jsonPlan;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
index 6cdcc77cc53..6005c324642 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java
@@ -26,6 +26,9 @@ import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -34,6 +37,7 @@ import org.apache.commons.text.StringEscapeUtils;
 
 import java.io.StringWriter;
 import java.util.List;
+import java.util.Map;
 
 @Internal
 public class JsonPlanGenerator {
@@ -170,4 +174,54 @@ public class JsonPlanGenerator {
             throw new RuntimeException("Failed to generate plan", e);
         }
     }
+
+    public static String generateStreamGraphJson(
+            StreamGraph sg, Map<Integer, JobVertexID> jobVertexIdMap) {
+        try (final StringWriter writer = new StringWriter(1024)) {
+            try (final JsonGenerator gen = new 
JsonFactory().createGenerator(writer)) {
+                // start of everything
+                gen.writeStartObject();
+
+                gen.writeArrayFieldStart("nodes");
+
+                // info per vertex
+                for (StreamNode node : sg.getStreamNodes()) {
+                    gen.writeStartObject();
+                    gen.writeStringField("id", String.valueOf(node.getId()));
+                    gen.writeNumberField("parallelism", node.getParallelism());
+                    gen.writeStringField("operator", node.getOperatorName());
+                    gen.writeStringField("description", 
node.getOperatorDescription());
+                    if (jobVertexIdMap.containsKey(node.getId())) {
+                        gen.writeStringField(
+                                "job_vertex_id", 
jobVertexIdMap.get(node.getId()).toString());
+                    }
+
+                    // write the input edge properties
+                    gen.writeArrayFieldStart("inputs");
+
+                    List<StreamEdge> inEdges = node.getInEdges();
+                    for (int inputNum = 0; inputNum < inEdges.size(); 
inputNum++) {
+                        StreamEdge edge = inEdges.get(inputNum);
+                        gen.writeStartObject();
+                        gen.writeNumberField("num", inputNum);
+                        gen.writeStringField("id", 
String.valueOf(edge.getSourceId()));
+                        gen.writeStringField("ship_strategy", 
edge.getPartitioner().toString());
+                        gen.writeStringField("exchange", 
edge.getExchangeMode().name());
+                        gen.writeEndObject();
+                    }
+
+                    gen.writeEndArray();
+
+                    gen.writeEndObject();
+                }
+
+                // end of everything
+                gen.writeEndArray();
+                gen.writeEndObject();
+            }
+            return writer.toString();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to generate json stream plan", 
e);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index efc49c1d8c7..0ee703cfa51 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -141,6 +141,11 @@ public class JobDetailsHandler
                     executionState, 
jobVerticesPerState[executionState.ordinal()]);
         }
 
+        JobPlanInfo.RawJson streamGraphJson = null;
+        if (executionGraph.getStreamGraphJson() != null) {
+            streamGraphJson = new 
JobPlanInfo.RawJson(executionGraph.getStreamGraphJson());
+        }
+
         return new JobDetailsInfo(
                 executionGraph.getJobID(),
                 executionGraph.getJobName(),
@@ -155,7 +160,9 @@ public class JobDetailsHandler
                 timestamps,
                 jobVertexInfos,
                 jobVerticesPerStateMap,
-                new JobPlanInfo.RawJson(executionGraph.getJsonPlan()));
+                new JobPlanInfo.RawJson(executionGraph.getJsonPlan()),
+                streamGraphJson,
+                executionGraph.getPendingOperatorCount());
     }
 
     private static JobDetailsInfo.JobVertexDetailsInfo 
createJobVertexDetailsInfo(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
index 250407005c1..250afd3a041 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
@@ -37,12 +37,15 @@ import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import io.swagger.v3.oas.annotations.media.Schema;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
@@ -79,6 +82,14 @@ public class JobDetailsInfo implements ResponseBody {
 
     public static final String FIELD_NAME_JSON_PLAN = "plan";
 
+    /**
+     * The {@link JobPlanInfo.RawJson} of the submitted stream graph, or null 
if the job is
+     * submitted with a JobGraph or if it's a streaming job.
+     */
+    public static final String FIELD_NAME_STREAM_GRAPH_JSON = "stream-graph";
+
+    public static final String FIELD_NAME_PENDING_OPERATORS = 
"pending-operators";
+
     @JsonProperty(FIELD_NAME_JOB_ID)
     @JsonSerialize(using = JobIDSerializer.class)
     private final JobID jobId;
@@ -122,6 +133,14 @@ public class JobDetailsInfo implements ResponseBody {
     @JsonProperty(FIELD_NAME_JSON_PLAN)
     private final JobPlanInfo.RawJson jsonPlan;
 
+    @JsonProperty(FIELD_NAME_STREAM_GRAPH_JSON)
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @Nullable
+    private final JobPlanInfo.RawJson streamGraphJson;
+
+    @JsonProperty(FIELD_NAME_PENDING_OPERATORS)
+    private final int pendingOperators;
+
     @JsonCreator
     public JobDetailsInfo(
             @JsonDeserialize(using = JobIDDeserializer.class) 
@JsonProperty(FIELD_NAME_JOB_ID)
@@ -140,7 +159,10 @@ public class JobDetailsInfo implements ResponseBody {
                     Collection<JobVertexDetailsInfo> jobVertexInfos,
             @JsonProperty(FIELD_NAME_JOB_VERTICES_PER_STATE)
                     Map<ExecutionState, Integer> jobVerticesPerState,
-            @JsonProperty(FIELD_NAME_JSON_PLAN) JobPlanInfo.RawJson jsonPlan) {
+            @JsonProperty(FIELD_NAME_JSON_PLAN) JobPlanInfo.RawJson jsonPlan,
+            @JsonProperty(FIELD_NAME_STREAM_GRAPH_JSON) @Nullable
+                    JobPlanInfo.RawJson streamGraphJson,
+            @JsonProperty(FIELD_NAME_PENDING_OPERATORS) int pendingOperators) {
         this.jobId = Preconditions.checkNotNull(jobId);
         this.name = Preconditions.checkNotNull(name);
         this.isStoppable = isStoppable;
@@ -155,6 +177,8 @@ public class JobDetailsInfo implements ResponseBody {
         this.jobVertexInfos = Preconditions.checkNotNull(jobVertexInfos);
         this.jobVerticesPerState = 
Preconditions.checkNotNull(jobVerticesPerState);
         this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
+        this.streamGraphJson = streamGraphJson;
+        this.pendingOperators = pendingOperators;
     }
 
     @Override
@@ -179,7 +203,9 @@ public class JobDetailsInfo implements ResponseBody {
                 && Objects.equals(timestamps, that.timestamps)
                 && Objects.equals(jobVertexInfos, that.jobVertexInfos)
                 && Objects.equals(jobVerticesPerState, 
that.jobVerticesPerState)
-                && Objects.equals(jsonPlan, that.jsonPlan);
+                && Objects.equals(jsonPlan, that.jsonPlan)
+                && Objects.equals(streamGraphJson, that.streamGraphJson)
+                && Objects.equals(pendingOperators, that.pendingOperators);
     }
 
     @Override
@@ -198,7 +224,9 @@ public class JobDetailsInfo implements ResponseBody {
                 timestamps,
                 jobVertexInfos,
                 jobVerticesPerState,
-                jsonPlan);
+                jsonPlan,
+                streamGraphJson,
+                pendingOperators);
     }
 
     @JsonIgnore
@@ -271,6 +299,20 @@ public class JobDetailsInfo implements ResponseBody {
         return jsonPlan.toString();
     }
 
+    @JsonIgnore
+    @Nullable
+    public String getStreamGraphJson() {
+        if (streamGraphJson != null) {
+            return streamGraphJson.toString();
+        }
+        return null;
+    }
+
+    @JsonIgnore
+    public int getPendingOperators() {
+        return pendingOperators;
+    }
+
     // ---------------------------------------------------
     // Static inner classes
     // ---------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java
index 5575f4e27be..e96c97c1c82 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java
@@ -118,6 +118,11 @@ public class AdaptiveExecutionPlanSchedulingContext 
implements ExecutionPlanSche
         return adaptiveGraphManager.getPendingOperatorsCount();
     }
 
+    @Override
+    public String getStreamGraphJson() {
+        return adaptiveGraphManager.getStreamGraphJson();
+    }
+
     private int getParallelism(int streamNodeId) {
         return adaptiveGraphManager
                 .getStreamGraphContext()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java
index 86919814e9c..ed00382dc48 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.scheduler.adaptivebatch;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import javax.annotation.Nullable;
+
 import java.util.function.Function;
 
 /** Interface for retrieving stream graph context details for adaptive batch 
jobs. */
@@ -58,4 +60,13 @@ public interface ExecutionPlanSchedulingContext {
      * @return the number of pending operators.
      */
     int getPendingOperatorCount();
+
+    /**
+     * Retrieves the JSON representation of the stream graph for the original 
job.
+     *
+     * @return the JSON representation of the stream graph, or null if the 
stream graph is not
+     *     available.
+     */
+    @Nullable
+    String getStreamGraphJson();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java
index 72de8edec66..53fae3075ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionPlanSchedulingContext.java
@@ -107,4 +107,9 @@ public final class NonAdaptiveExecutionPlanSchedulingContext
     public int getPendingOperatorCount() {
         return 0;
     }
+
+    @Override
+    public String getStreamGraphJson() {
+        return null;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
index f683f586d4e..965187fae5b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
 import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext;
 import org.apache.flink.streaming.api.graph.util.OperatorChainInfo;
@@ -74,7 +75,8 @@ import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.va
 
 /** Default implementation for {@link AdaptiveGraphGenerator}. */
 @Internal
-public class AdaptiveGraphManager implements AdaptiveGraphGenerator {
+public class AdaptiveGraphManager
+        implements AdaptiveGraphGenerator, 
StreamGraphContext.StreamGraphUpdateListener {
 
     private final StreamGraph streamGraph;
 
@@ -128,6 +130,8 @@ public class AdaptiveGraphManager implements 
AdaptiveGraphGenerator {
     // We need cache all job vertices to create JobEdge for downstream vertex.
     private final Map<Integer, JobVertex> startNodeToJobVertexMap;
 
+    private final Map<Integer, JobVertexID> streamNodeIdsToJobVertexMap;
+
     // Records the ID of the job vertex that has completed execution.
     private final Set<JobVertexID> finishedJobVertices;
 
@@ -135,6 +139,8 @@ public class AdaptiveGraphManager implements 
AdaptiveGraphGenerator {
 
     private final SlotSharingGroup defaultSlotSharingGroup;
 
+    private String streamGraphJson;
+
     public AdaptiveGraphManager(
             ClassLoader userClassloader, StreamGraph streamGraph, Executor 
serializationExecutor) {
         preValidate(streamGraph, userClassloader);
@@ -162,6 +168,7 @@ public class AdaptiveGraphManager implements 
AdaptiveGraphGenerator {
 
         this.jobVertexToStartNodeMap = new HashMap<>();
         this.jobVertexToChainedStreamNodeIdsMap = new HashMap<>();
+        this.streamNodeIdsToJobVertexMap = new HashMap<>();
 
         this.finishedJobVertices = new HashSet<>();
 
@@ -171,7 +178,8 @@ public class AdaptiveGraphManager implements 
AdaptiveGraphGenerator {
                         steamNodeIdToForwardGroupMap,
                         frozenNodeToStartNodeMap,
                         intermediateOutputsCaches,
-                        consumerEdgeIdToIntermediateDataSetMap);
+                        consumerEdgeIdToIntermediateDataSetMap,
+                        this);
 
         this.jobGraph = createAndInitializeJobGraph(streamGraph, 
streamGraph.getJobID());
 
@@ -295,6 +303,8 @@ public class AdaptiveGraphManager implements 
AdaptiveGraphGenerator {
 
         generateConfigForJobVertices(jobVertexBuildContext);
 
+        generateStreamGraphJson();
+
         return new 
ArrayList<>(jobVertexBuildContext.getJobVerticesInOrder().values());
     }
 
@@ -438,6 +448,7 @@ public class AdaptiveGraphManager implements 
AdaptiveGraphGenerator {
                                         .computeIfAbsent(
                                                 jobVertex.getID(), key -> new 
ArrayList<>())
                                         .add(node.getId());
+                                streamNodeIdsToJobVertexMap.put(node.getId(), 
jobVertex.getID());
                             });
         }
     }
@@ -661,4 +672,18 @@ public class AdaptiveGraphManager implements 
AdaptiveGraphGenerator {
     private Integer getStartNodeId(Integer streamNodeId) {
         return frozenNodeToStartNodeMap.get(streamNodeId);
     }
+
+    private void generateStreamGraphJson() {
+        streamGraphJson =
+                JsonPlanGenerator.generateStreamGraphJson(streamGraph, 
streamNodeIdsToJobVertexMap);
+    }
+
+    public String getStreamGraphJson() {
+        return streamGraphJson;
+    }
+
+    @Override
+    public void onStreamGraphUpdated() {
+        generateStreamGraphJson();
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java
index 324f3466e7c..f0ce8f8cbd7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
@@ -74,12 +75,31 @@ public class DefaultStreamGraphContext implements 
StreamGraphContext {
 
     private final Map<String, IntermediateDataSet> 
consumerEdgeIdToIntermediateDataSetMap;
 
+    @Nullable private final StreamGraphUpdateListener 
streamGraphUpdateListener;
+
+    @VisibleForTesting
     public DefaultStreamGraphContext(
             StreamGraph streamGraph,
             Map<Integer, StreamNodeForwardGroup> steamNodeIdToForwardGroupMap,
             Map<Integer, Integer> frozenNodeToStartNodeMap,
             Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputsCaches,
             Map<String, IntermediateDataSet> 
consumerEdgeIdToIntermediateDataSetMap) {
+        this(
+                streamGraph,
+                steamNodeIdToForwardGroupMap,
+                frozenNodeToStartNodeMap,
+                opIntermediateOutputsCaches,
+                consumerEdgeIdToIntermediateDataSetMap,
+                null);
+    }
+
+    public DefaultStreamGraphContext(
+            StreamGraph streamGraph,
+            Map<Integer, StreamNodeForwardGroup> steamNodeIdToForwardGroupMap,
+            Map<Integer, Integer> frozenNodeToStartNodeMap,
+            Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputsCaches,
+            Map<String, IntermediateDataSet> 
consumerEdgeIdToIntermediateDataSetMap,
+            @Nullable StreamGraphUpdateListener streamGraphUpdateListener) {
         this.streamGraph = checkNotNull(streamGraph);
         this.steamNodeIdToForwardGroupMap = 
checkNotNull(steamNodeIdToForwardGroupMap);
         this.frozenNodeToStartNodeMap = checkNotNull(frozenNodeToStartNodeMap);
@@ -87,6 +107,7 @@ public class DefaultStreamGraphContext implements 
StreamGraphContext {
         this.immutableStreamGraph = new ImmutableStreamGraph(this.streamGraph);
         this.consumerEdgeIdToIntermediateDataSetMap =
                 checkNotNull(consumerEdgeIdToIntermediateDataSetMap);
+        this.streamGraphUpdateListener = streamGraphUpdateListener;
     }
 
     @Override
@@ -121,6 +142,11 @@ public class DefaultStreamGraphContext implements 
StreamGraphContext {
             }
         }
 
+        // Notify the listener that the StreamGraph has been updated.
+        if (streamGraphUpdateListener != null) {
+            streamGraphUpdateListener.onStreamGraphUpdated();
+        }
+
         return true;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java
index 82cbac56135..13d1c8aa513 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java
@@ -62,4 +62,11 @@ public interface StreamGraphContext {
      * @return true if all modifications were successful and applied 
atomically, false otherwise.
      */
     boolean modifyStreamEdge(List<StreamEdgeUpdateRequestInfo> requestInfos);
+
+    /** Interface for observers that monitor the status of a StreamGraph. */
+    @Internal
+    interface StreamGraphUpdateListener {
+        /** This method is called whenever the StreamGraph is updated. */
+        void onStreamGraphUpdated();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
index 5eaf28a7bc1..b187e997f00 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonGeneratorTest.java
@@ -25,15 +25,25 @@ import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.util.jackson.JacksonMapperFactory;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.TextNode;
 
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.runtime.util.JobVertexConnectionUtils.connectNewDataSetAsInput;
 import static org.junit.Assert.assertEquals;
@@ -151,4 +161,71 @@ public class JsonGeneratorTest {
         }
         fail("could not find vertex with id " + vertexId + " in JobGraph");
     }
+
+    @Test
+    public void testGenerateStreamGraphJson() throws JsonProcessingException {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.fromSequence(0L, 1L).disableChaining().print();
+        StreamGraph streamGraph = env.getStreamGraph();
+        Map<Integer, JobVertexID> jobVertexIdMap = new HashMap<>();
+        String streamGraphJson =
+                JsonPlanGenerator.generateStreamGraphJson(streamGraph, 
jobVertexIdMap);
+
+        ObjectMapper mapper = JacksonMapperFactory.createObjectMapper();
+        StreamGraphJsonSchema parsedStreamGraph =
+                mapper.readValue(streamGraphJson, StreamGraphJsonSchema.class);
+
+        List<String> expectedJobVertexIds = new ArrayList<>();
+        expectedJobVertexIds.add(null);
+        expectedJobVertexIds.add(null);
+        validateStreamGraph(streamGraph, parsedStreamGraph, 
expectedJobVertexIds);
+
+        jobVertexIdMap.put(1, new JobVertexID());
+        jobVertexIdMap.put(2, new JobVertexID());
+        streamGraphJson = 
JsonPlanGenerator.generateStreamGraphJson(streamGraph, jobVertexIdMap);
+
+        parsedStreamGraph = mapper.readValue(streamGraphJson, 
StreamGraphJsonSchema.class);
+        validateStreamGraph(
+                streamGraph,
+                parsedStreamGraph,
+                jobVertexIdMap.values().stream()
+                        .map(JobVertexID::toString)
+                        .collect(Collectors.toList()));
+    }
+
+    private static void validateStreamGraph(
+            StreamGraph streamGraph,
+            StreamGraphJsonSchema parsedStreamGraph,
+            List<String> expectedJobVertexIds) {
+        List<String> realJobVertexIds = new ArrayList<>();
+        parsedStreamGraph
+                .getNodes()
+                .forEach(
+                        node -> {
+                            StreamNode streamNode =
+                                    
streamGraph.getStreamNode(Integer.parseInt(node.getId()));
+                            assertEquals(node.getOperator(), 
streamNode.getOperatorName());
+                            assertEquals(
+                                    node.getParallelism(), (Integer) 
streamNode.getParallelism());
+                            assertEquals(
+                                    node.getDescription(), 
streamNode.getOperatorDescription());
+                            validateStreamEdge(node.getInputs(), 
streamNode.getInEdges());
+                            realJobVertexIds.add(node.getJobVertexId());
+                        });
+        assertEquals(expectedJobVertexIds, realJobVertexIds);
+    }
+
+    private static void validateStreamEdge(
+            List<StreamGraphJsonSchema.JsonStreamEdgeSchema> jsonStreamEdges,
+            List<StreamEdge> streamEdges) {
+        assertEquals(jsonStreamEdges.size(), streamEdges.size());
+        for (int i = 0; i < jsonStreamEdges.size(); i++) {
+            StreamGraphJsonSchema.JsonStreamEdgeSchema edgeToValidate = 
jsonStreamEdges.get(i);
+            StreamEdge expectedEdge = streamEdges.get(i);
+            assertEquals(String.valueOf(expectedEdge.getSourceId()), 
edgeToValidate.getId());
+            assertEquals(
+                    expectedEdge.getPartitioner().toString(), 
edgeToValidate.getShipStrategy());
+            assertEquals(expectedEdge.getExchangeMode().name(), 
edgeToValidate.getExchange());
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/StreamGraphJsonSchema.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/StreamGraphJsonSchema.java
new file mode 100644
index 00000000000..37774266e83
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/jsonplan/StreamGraphJsonSchema.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.jsonplan;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/** A utility class for deserializing the JSON string of a stream graph. */
+public class StreamGraphJsonSchema {
+    public static final String FIELD_NAME_NODES = "nodes";
+
+    @JsonProperty(FIELD_NAME_NODES)
+    private final List<JsonStreamNodeSchema> nodes;
+
+    @JsonCreator
+    public StreamGraphJsonSchema(@JsonProperty(FIELD_NAME_NODES) 
List<JsonStreamNodeSchema> nodes) {
+        this.nodes = nodes;
+    }
+
+    @JsonIgnore
+    public List<JsonStreamNodeSchema> getNodes() {
+        return nodes;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        StreamGraphJsonSchema that = (StreamGraphJsonSchema) o;
+        return Objects.equals(nodes, that.nodes);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(nodes);
+    }
+
+    public static class JsonStreamNodeSchema {
+        public static final String FIELD_NAME_NODE_ID = "id";
+        public static final String FIELD_NAME_NODE_PARALLELISM = "parallelism";
+        public static final String FIELD_NAME_NODE_OPERATOR = "operator";
+        public static final String FIELD_NAME_NODE_DESCRIPTION = "description";
+        public static final String FIELD_NAME_NODE_JOB_VERTEX_ID = 
"job_vertex_id";
+        public static final String FIELD_NAME_NODE_INPUTS = "inputs";
+
+        @JsonProperty(FIELD_NAME_NODE_ID)
+        private final String id;
+
+        @JsonProperty(FIELD_NAME_NODE_PARALLELISM)
+        private final Integer parallelism;
+
+        @JsonProperty(FIELD_NAME_NODE_OPERATOR)
+        private final String operator;
+
+        @JsonProperty(FIELD_NAME_NODE_DESCRIPTION)
+        private final String description;
+
+        @JsonProperty(FIELD_NAME_NODE_JOB_VERTEX_ID)
+        private final String jobVertexId;
+
+        @JsonProperty(FIELD_NAME_NODE_INPUTS)
+        private final List<JsonStreamEdgeSchema> inputs;
+
+        @JsonCreator
+        public JsonStreamNodeSchema(
+                @JsonProperty(FIELD_NAME_NODE_ID) String id,
+                @JsonProperty(FIELD_NAME_NODE_PARALLELISM) Integer parallelism,
+                @JsonProperty(FIELD_NAME_NODE_OPERATOR) String operator,
+                @JsonProperty(FIELD_NAME_NODE_DESCRIPTION) String description,
+                @JsonProperty(FIELD_NAME_NODE_JOB_VERTEX_ID) String 
jobVertexId,
+                @JsonProperty(FIELD_NAME_NODE_INPUTS) 
List<JsonStreamEdgeSchema> inputs) {
+            this.id = id;
+            this.parallelism = parallelism;
+            this.operator = operator;
+            this.description = description;
+            this.jobVertexId = jobVertexId;
+            this.inputs = inputs;
+        }
+
+        @JsonIgnore
+        public String getId() {
+            return id;
+        }
+
+        @JsonIgnore
+        public Integer getParallelism() {
+            return parallelism;
+        }
+
+        @JsonIgnore
+        public String getOperator() {
+            return operator;
+        }
+
+        @JsonIgnore
+        public String getDescription() {
+            return description;
+        }
+
+        @JsonIgnore
+        public String getJobVertexId() {
+            return jobVertexId;
+        }
+
+        @JsonIgnore
+        public List<JsonStreamEdgeSchema> getInputs() {
+            return inputs;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            JsonStreamNodeSchema that = (JsonStreamNodeSchema) o;
+            return Objects.equals(id, that.id)
+                    && Objects.equals(parallelism, that.parallelism)
+                    && Objects.equals(operator, that.operator)
+                    && Objects.equals(description, that.description)
+                    && Objects.equals(jobVertexId, that.jobVertexId)
+                    && Objects.equals(inputs, that.inputs);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(id, parallelism, operator, description, 
jobVertexId, inputs);
+        }
+    }
+
+    public static class JsonStreamEdgeSchema {
+        public static final String FIELD_NAME_EDGE_INPUT_NUM = "num";
+        public static final String FIELD_NAME_EDGE_ID = "id";
+        public static final String FIELD_NAME_EDGE_SHIP_STRATEGY = 
"ship_strategy";
+        public static final String FIELD_NAME_EDGE_EXCHANGE = "exchange";
+
+        @JsonProperty(FIELD_NAME_EDGE_INPUT_NUM)
+        private final Integer num;
+
+        @JsonProperty(FIELD_NAME_EDGE_ID)
+        private final String id;
+
+        @JsonProperty(FIELD_NAME_EDGE_SHIP_STRATEGY)
+        private final String shipStrategy;
+
+        @JsonProperty(FIELD_NAME_EDGE_EXCHANGE)
+        private final String exchange;
+
+        @JsonCreator
+        public JsonStreamEdgeSchema(
+                @JsonProperty(FIELD_NAME_EDGE_INPUT_NUM) Integer num,
+                @JsonProperty(FIELD_NAME_EDGE_ID) String id,
+                @JsonProperty(FIELD_NAME_EDGE_SHIP_STRATEGY) String 
shipStrategy,
+                @JsonProperty(FIELD_NAME_EDGE_EXCHANGE) String exchange) {
+            this.num = num;
+            this.id = id;
+            this.shipStrategy = shipStrategy;
+            this.exchange = exchange;
+        }
+
+        @JsonIgnore
+        public Integer getNum() {
+            return num;
+        }
+
+        @JsonIgnore
+        public String getId() {
+            return id;
+        }
+
+        @JsonIgnore
+        public String getShipStrategy() {
+            return shipStrategy;
+        }
+
+        @JsonIgnore
+        public String getExchange() {
+            return exchange;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            JsonStreamEdgeSchema that = (JsonStreamEdgeSchema) o;
+            return Objects.equals(num, that.num)
+                    && Objects.equals(id, that.id)
+                    && Objects.equals(shipStrategy, that.shipStrategy)
+                    && Objects.equals(exchange, that.exchange);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(num, id, shipStrategy, exchange);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java
new file mode 100644
index 00000000000..1ecb68cdf7d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandlerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionConfigBuilder;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.util.concurrent.Executors;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for the {@link JobDetailsHandler}. */
+class JobDetailsHandlerTest {
+    private JobDetailsHandler jobDetailsHandler;
+    private HandlerRequest<EmptyRequestBody> handlerRequest;
+    private AccessExecutionGraph archivedExecutionGraph;
+    private final String expectedStreamGraphJson =
+            
"{\"pending_operators:2,\":\"nodes\":[{\"id\":\"1\",\"parallelism\":1,\"operator\":\"Source:
 Sequence Source\",\"description\":\"Source: Sequence 
Source\",\"inputs\":[]},{\"id\":\"2\",\"parallelism\":1,\"operator\":\"Sink: 
Print to Std. Out\",\"description\":\"Sink: Print to Std. 
Out\",\"inputs\":[{\"num\":0,\"id\":\"1\",\"ship_strategy\":\"FORWARD\",\"exchange\":\"UNDEFINED\"}]}]}";
+
+    private static HandlerRequest<EmptyRequestBody> createRequest(JobID jobId)
+            throws HandlerRequestException {
+        Map<String, String> pathParameters = new HashMap<>();
+        pathParameters.put(JobIDPathParameter.KEY, jobId.toString());
+        return HandlerRequest.resolveParametersAndCreate(
+                EmptyRequestBody.getInstance(),
+                new TaskManagerMessageParameters(),
+                pathParameters,
+                Collections.emptyMap(),
+                Collections.emptyList());
+    }
+
+    @BeforeEach
+    void setUp() throws HandlerRequestException {
+        GatewayRetriever<RestfulGateway> leaderRetriever =
+                () -> CompletableFuture.completedFuture(null);
+        final RestHandlerConfiguration restHandlerConfiguration =
+                RestHandlerConfiguration.fromConfiguration(new 
Configuration());
+        final MetricFetcher metricFetcher =
+                new MetricFetcherImpl<>(
+                        () -> null,
+                        address -> null,
+                        Executors.directExecutor(),
+                        Duration.ofMillis(1000L),
+                        
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue().toMillis());
+        final ArchivedExecutionConfig archivedExecutionConfig =
+                new ArchivedExecutionConfigBuilder().build();
+
+        archivedExecutionGraph =
+                new ArchivedExecutionGraphBuilder()
+                        .setArchivedExecutionConfig(archivedExecutionConfig)
+                        .setStreamGraphJson(expectedStreamGraphJson)
+                        .build();
+        jobDetailsHandler =
+                new JobDetailsHandler(
+                        leaderRetriever,
+                        TestingUtils.TIMEOUT,
+                        Collections.emptyMap(),
+                        JobDetailsHeaders.getInstance(),
+                        new DefaultExecutionGraphCache(
+                                restHandlerConfiguration.getTimeout(),
+                                
Duration.ofMillis(restHandlerConfiguration.getRefreshInterval())),
+                        Executors.directExecutor(),
+                        metricFetcher);
+        handlerRequest = createRequest(archivedExecutionGraph.getJobID());
+    }
+
+    @Test
+    void testGetJobDetailsWithStreamGraphJson() throws RestHandlerException {
+        JobDetailsInfo jobDetailsInfo =
+                jobDetailsHandler.handleRequest(handlerRequest, 
archivedExecutionGraph);
+        assertThat(jobDetailsInfo.getStreamGraphJson())
+                .isEqualTo(new 
JobPlanInfo.RawJson(expectedStreamGraphJson).toString());
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
index 58255a5b6dc..da2b84c3f31 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
@@ -57,6 +57,7 @@ public class ArchivedExecutionGraphBuilder {
     private boolean isStoppable;
     private Map<String, SerializedValue<OptionalFailure<Object>>> 
serializedUserAccumulators;
     private CheckpointStatsSnapshot checkpointStatsSnapshot;
+    private String streamGraphJson;
 
     public ArchivedExecutionGraphBuilder setJobID(JobID jobID) {
         this.jobID = jobID;
@@ -101,6 +102,11 @@ public class ArchivedExecutionGraphBuilder {
         return this;
     }
 
+    public ArchivedExecutionGraphBuilder setStreamGraphJson(String 
streamGraphJson) {
+        this.streamGraphJson = streamGraphJson;
+        return this;
+    }
+
     public ArchivedExecutionGraphBuilder setArchivedUserAccumulators(
             StringifiedAccumulatorResult[] archivedUserAccumulators) {
         this.archivedUserAccumulators = archivedUserAccumulators;
@@ -171,6 +177,8 @@ public class ArchivedExecutionGraphBuilder {
                 "stateBackendName",
                 "checkpointStorageName",
                 TernaryBoolean.UNDEFINED,
-                "changelogStorageName");
+                "changelogStorageName",
+                streamGraphJson,
+                0);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
index 6bb69b357e0..fa346bdfbad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -84,7 +84,9 @@ class JobDetailsInfoTest extends 
RestResponseMarshallingTestBase<JobDetailsInfo>
                 timestamps,
                 jobVertexInfos,
                 jobVerticesPerState,
-                new JobPlanInfo.RawJson(jsonPlan));
+                new JobPlanInfo.RawJson(jsonPlan),
+                null,
+                0);
     }
 
     private JobDetailsInfo.JobVertexDetailsInfo 
createJobVertexDetailsInfo(Random random) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
index 2b4813b2e93..e01bc6f2487 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopologyTest.java
@@ -449,5 +449,10 @@ class DefaultExecutionTopologyTest {
         public int getPendingOperatorCount() {
             return 0;
         }
+
+        @Override
+        public String getStreamGraphJson() {
+            return null;
+        }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
index ce49ff5bded..becb10fdb8a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java
@@ -187,6 +187,11 @@ class StateTrackingMockExecutionGraph implements 
ExecutionGraph {
         return "";
     }
 
+    @Override
+    public String getStreamGraphJson() {
+        return null;
+    }
+
     @Override
     public void setJsonPlan(String jsonPlan) {}
 
@@ -249,6 +254,11 @@ class StateTrackingMockExecutionGraph implements 
ExecutionGraph {
         return Optional.empty();
     }
 
+    @Override
+    public int getPendingOperatorCount() {
+        return 0;
+    }
+
     @Override
     public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
         return new StringifiedAccumulatorResult[0];

Reply via email to