yuchen-ecnu commented on code in PR #25798:
URL: https://github.com/apache/flink/pull/25798#discussion_r1890425998


##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java:
##########
@@ -58,4 +58,11 @@ int getConsumersMaxParallelism(
      * @return the number of pending operators.
      */
     int getPendingOperatorCount();
+
+    /**
+     * Retrieves the JSON representation of the stream graph for the original 
job.
+     *
+     * @return the JSON representation of the stream graph.
+     */
+    String getJsonStreamGraph();

Review Comment:
   Updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java:
##########
@@ -170,4 +174,58 @@ public static String generatePlan(
             throw new RuntimeException("Failed to generate plan", e);
         }
     }
+
+    public static String generateJsonStreamGraph(
+            ImmutableStreamGraph sg,

Review Comment:
   Yes, it has been updated.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java:
##########
@@ -70,6 +70,8 @@ public enum ExecutionState {
 
     RECONCILING,
 
+    PENDING,

Review Comment:
   Okay, I have added `pending-operator-counts` to `JobDetailsInfo`. Since 
`JobDetailsHandler` needs to handle `pendingOperatorCounts`, I have also added 
a `getPendingOperatorCounts()` function to `AccessExecutionGraph`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java:
##########
@@ -118,6 +119,14 @@ public int getPendingOperatorCount() {
         return adaptiveGraphManager.getPendingOperatorsCount();
     }
 
+    @Override
+    public String getJsonStreamGraph() {
+        return JsonPlanGenerator.generateJsonStreamGraph(
+                adaptiveGraphManager.getStreamGraphContext().getStreamGraph(),
+                adaptiveGraphManager.getStreamNodeIdsToJobVertexMap(),

Review Comment:
   Good point! I made the following three changes in the revised version:
   
   1. Add `getStreamGraphJson` and `onStreamGraphUpdated` method to the 
`AdaptiveGraphManager`
   2. Regenerate StreamGraphJson only after the new JobVertices added or the 
stream edge changed.
   3. I noticed that `DefaultStreamGraphContext#modifyStreamEdge` has not being 
used right now. To track changes in the stream graph of 
`DefaultStreamGraphContext` in the `AdaptiveGraphManager`, I added a listener 
to `DefaultStreamGraphContext` and set it up during creation. This way, we can 
update the stream graph JSON whenever it gets modified. 
   
   Please let me know if this approach is not appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to