noorall commented on code in PR #25366:
URL: https://github.com/apache/flink/pull/25366#discussion_r1798307016
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -355,32 +337,46 @@ private void
waitForSerializationFuturesAndUpdateJobVertices()
}
}
- private void addVertexIndexPrefixInVertexName() {
- if (!streamGraph.isVertexNameIncludeIndexPrefix()) {
+ public static void addVertexIndexPrefixInVertexName(
+ JobVertexBuildContext jobVertexBuildContext,
+ AtomicInteger vertexIndexId,
+ JobGraph jobGraph) {
+ if
(!jobVertexBuildContext.getStreamGraph().isVertexNameIncludeIndexPrefix()) {
return;
}
- final AtomicInteger vertexIndexId = new AtomicInteger(0);
+ Set<JobVertexID> jobVertexIds =
+ jobVertexBuildContext.getJobVertices().values().stream()
+ .map(JobVertex::getID)
+ .collect(Collectors.toSet());
+ // JobVertexBuildContext only contains incrementally generated
jobVertex instances. The
Review Comment:
> The `JobVertexBuildContext` does not contain all generated job vertices at
that time?
Yes, in the implementation of incremental generator, it only includes the
job vertices generated in the current phase, whereas in the
StreamingJobGraphGenerator, it includes all job vertices.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]