yuchen-ecnu commented on code in PR #25798: URL: https://github.com/apache/flink/pull/25798#discussion_r1890425998
########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/ExecutionPlanSchedulingContext.java: ########## @@ -58,4 +58,11 @@ int getConsumersMaxParallelism( * @return the number of pending operators. */ int getPendingOperatorCount(); + + /** + * Retrieves the JSON representation of the stream graph for the original job. + * + * @return the JSON representation of the stream graph. + */ + String getJsonStreamGraph(); Review Comment: Updated. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/jsonplan/JsonPlanGenerator.java: ########## @@ -170,4 +174,58 @@ public static String generatePlan( throw new RuntimeException("Failed to generate plan", e); } } + + public static String generateJsonStreamGraph( + ImmutableStreamGraph sg, Review Comment: Yes, it has been updated. ########## flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java: ########## @@ -70,6 +70,8 @@ public enum ExecutionState { RECONCILING, + PENDING, Review Comment: Okay, I have added `pending-operator-counts` to `JobDetailsInfo`. Since `JobDetailsHandler` needs to handle `pendingOperatorCounts`, I have also added a `getPendingOperatorCounts()` function to `AccessExecutionGraph`. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionPlanSchedulingContext.java: ########## @@ -118,6 +119,14 @@ public int getPendingOperatorCount() { return adaptiveGraphManager.getPendingOperatorsCount(); } + @Override + public String getJsonStreamGraph() { + return JsonPlanGenerator.generateJsonStreamGraph( + adaptiveGraphManager.getStreamGraphContext().getStreamGraph(), + adaptiveGraphManager.getStreamNodeIdsToJobVertexMap(), Review Comment: Good point! I made the following three changes in the revised version: 1. Add `getStreamGraphJson` and `onStreamGraphUpdated` method to the `AdaptiveGraphManager` 2. Regenerate StreamGraphJson only after the new JobVertices added or the stream edge changed. 3. I noticed that `DefaultStreamGraphContext#modifyStreamEdge` has not being used right now. To track changes in the stream graph of `DefaultStreamGraphContext` in the `AdaptiveGraphManager`, I added a listener to `DefaultStreamGraphContext` and set it up during creation. This way, we can update the stream graph JSON whenever it gets modified. Please let me know if this approach is not appropriate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org