zhuzhurk commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1871058207


##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java:
##########
@@ -206,6 +204,36 @@ private void modifyOutputPartitioner(
                 targetEdge.getPartitioner());
     }
 
+    private void tryConvertForwardPartitionerAndMergeForwardGroup(StreamEdge 
targetEdge) {
+        checkState(targetEdge.getPartitioner() instanceof ForwardPartitioner);
+        Integer sourceNodeId = targetEdge.getSourceId();
+        Integer targetNodeId = targetEdge.getTargetId();
+        if (targetEdge.getPartitioner() instanceof 
ForwardForUnspecifiedPartitioner
+                || targetEdge.getPartitioner() instanceof 
ForwardForConsecutiveHashPartitioner) {
+            // For non-chainable edges, we change the ForwardPartitioner to 
RescalePartitioner or
+            // HashPartitioner to avoid limiting the parallelism of the 
downstream node by the
+            // forward edge:
+            // 1. If the upstream job vertex is created.
+            // 2. If the source and target are non-chainable or the forward 
group cannot be merged.
+            if (frozenNodeToStartNodeMap.containsKey(sourceNodeId)
+                    || !StreamingJobGraphGenerator.isChainable(targetEdge, 
streamGraph, true)
+                    || !mergeForwardGroups(sourceNodeId, targetNodeId)) {
+                if (targetEdge.getPartitioner() instanceof 
ForwardForUnspecifiedPartitioner) {
+                    targetEdge.setPartitioner(new RescalePartitioner<>());
+                } else {
+                    targetEdge.setPartitioner(
+                            ((ForwardForConsecutiveHashPartitioner<?>) 
targetEdge.getPartitioner())
+                                    .getHashPartitioner());

Review Comment:
   Unlike `RescalePartitioner`,  `HashPartitioner` will be worse than 
`ForwardPartitioner`. So a `ForwardPartitioner` should be used in this case 
only if the nodes have different parallelisms (the forward group merge fails).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to