RocMarshal commented on code in PR #27404:
URL: https://github.com/apache/flink/pull/27404#discussion_r2717151403
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java:
##########
@@ -111,6 +111,20 @@ public JobEdge(
this.intraInputKeyCorrelated = intraInputKeyCorrelated;
}
+ public void forceUpdate(
+ boolean isForward,
+ DistributionPattern distributionPattern,
+ SubtaskStateMapper upstreamSubtaskStateMapper,
+ SubtaskStateMapper downstreamSubtaskStateMapper,
+ String shipStrategyName) {
+ this.isForward = isForward;
+ this.distributionPattern = distributionPattern;
+ this.upstreamSubtaskStateMapper =
checkNotNull(upstreamSubtaskStateMapper);
+ this.downstreamSubtaskStateMapper =
checkNotNull(downstreamSubtaskStateMapper);
+ this.shipStrategyName = shipStrategyName;
+ this.source.forceUpdate(isForward, distributionPattern);
+ }
Review Comment:
Hi, @mxm
sorry, in my limited reading:
Currently, the JobGraph does not carry any StreamGraph-related information,
so it is impossible for the JobGraph to be created based on the StreamGraph in
the same way as it is during the initial generation.
We can only generate a new JobGraph from the original JobGraph. Frankly
speaking, when constructing the JobEdges of the new graph, if we do not
introduce any additional methods or resort to reflection, we have no way to
modify the JobEdge#distributionPattern and JobEdge#isForward attributes.
Pls correct me if anything wrong.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]