mxm commented on code in PR #27404:
URL: https://github.com/apache/flink/pull/27404#discussion_r2717040404
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobEdge.java:
##########
@@ -111,6 +111,20 @@ public JobEdge(
this.intraInputKeyCorrelated = intraInputKeyCorrelated;
}
+ public void forceUpdate(
+ boolean isForward,
+ DistributionPattern distributionPattern,
+ SubtaskStateMapper upstreamSubtaskStateMapper,
+ SubtaskStateMapper downstreamSubtaskStateMapper,
+ String shipStrategyName) {
+ this.isForward = isForward;
+ this.distributionPattern = distributionPattern;
+ this.upstreamSubtaskStateMapper =
checkNotNull(upstreamSubtaskStateMapper);
+ this.downstreamSubtaskStateMapper =
checkNotNull(downstreamSubtaskStateMapper);
+ this.shipStrategyName = shipStrategyName;
+ this.source.forceUpdate(isForward, distributionPattern);
+ }
Review Comment:
>IIUC, Did you mean that instead of introducing this method forceUpdate , we
would regenerate the JobGraph based on the existing JobGraph and reuse the
methods already defined in it?
Yes, exactly. I want to avoid adding any methods for mutation in JobGraph /
JobVertex / JobEdge. Let me know if that is feasible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]