RocMarshal commented on code in PR #27404:
URL: https://github.com/apache/flink/pull/27404#discussion_r2716911325
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java:
##########
@@ -111,6 +111,20 @@ public JobEdge(
this.intraInputKeyCorrelated = intraInputKeyCorrelated;
}
+ public void forceUpdate(
+ boolean isForward,
+ DistributionPattern distributionPattern,
+ SubtaskStateMapper upstreamSubtaskStateMapper,
+ SubtaskStateMapper downstreamSubtaskStateMapper,
+ String shipStrategyName) {
+ this.isForward = isForward;
+ this.distributionPattern = distributionPattern;
+ this.upstreamSubtaskStateMapper =
checkNotNull(upstreamSubtaskStateMapper);
+ this.downstreamSubtaskStateMapper =
checkNotNull(downstreamSubtaskStateMapper);
+ this.shipStrategyName = shipStrategyName;
+ this.source.forceUpdate(isForward, distributionPattern);
+ }
Review Comment:
Hi, @mxm Please let me have a clarify:
The mutated JogGraph is a copy from the original jog graph of
JobGraphInformation, which is almost equivalent to what you expressed. And the
action didn't change the original existing JobGraph instance.
IIUC, Did you mean that instead of introducing this method `forceUpdate ` ,
we would regenerate the JobGraph based on the existing JobGraph and reuse the
methods already defined in it?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]