zhuzhurk commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1871058207
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java:
##########
@@ -206,6 +204,36 @@ private void modifyOutputPartitioner(
targetEdge.getPartitioner());
}
+ private void tryConvertForwardPartitionerAndMergeForwardGroup(StreamEdge
targetEdge) {
+ checkState(targetEdge.getPartitioner() instanceof ForwardPartitioner);
+ Integer sourceNodeId = targetEdge.getSourceId();
+ Integer targetNodeId = targetEdge.getTargetId();
+ if (targetEdge.getPartitioner() instanceof
ForwardForUnspecifiedPartitioner
+ || targetEdge.getPartitioner() instanceof
ForwardForConsecutiveHashPartitioner) {
+ // For non-chainable edges, we change the ForwardPartitioner to
RescalePartitioner or
+ // HashPartitioner to avoid limiting the parallelism of the
downstream node by the
+ // forward edge:
+ // 1. If the upstream job vertex is created.
+ // 2. If the source and target are non-chainable or the forward
group cannot be merged.
+ if (frozenNodeToStartNodeMap.containsKey(sourceNodeId)
+ || !StreamingJobGraphGenerator.isChainable(targetEdge,
streamGraph, true)
+ || !mergeForwardGroups(sourceNodeId, targetNodeId)) {
+ if (targetEdge.getPartitioner() instanceof
ForwardForUnspecifiedPartitioner) {
+ targetEdge.setPartitioner(new RescalePartitioner<>());
+ } else {
+ targetEdge.setPartitioner(
+ ((ForwardForConsecutiveHashPartitioner<?>)
targetEdge.getPartitioner())
+ .getHashPartitioner());
Review Comment:
Unlike `RescalePartitioner`, `HashPartitioner` will be worse than
`ForwardPartitioner`. So a `ForwardPartitioner` should be used in this case
only if the nodes have different parallelisms (the forward group merge fails).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]