noorall commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1871064446
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/DefaultStreamGraphContext.java:
##########
@@ -166,33 +181,16 @@ private void modifyOutputPartitioner(
if (newPartitioner == null || targetEdge == null) {
return;
}
- Integer sourceNodeId = targetEdge.getSourceId();
- Integer targetNodeId = targetEdge.getTargetId();
-
StreamPartitioner<?> oldPartitioner = targetEdge.getPartitioner();
-
targetEdge.setPartitioner(newPartitioner);
- // For non-chainable edges, we change the ForwardPartitioner to
RescalePartitioner to avoid
- // limiting the parallelism of the downstream node by the forward edge.
- // 1. If the upstream job vertex is created.
- // 2. If the source and target are non-chainable.
- // 3. If the forward group cannot be merged.
- if (targetEdge.getPartitioner() instanceof ForwardPartitioner) {
- if (frozenNodeToStartNodeMap.containsKey(sourceNodeId)) {
- targetEdge.setPartitioner(new RescalePartitioner<>());
- } else if (!StreamingJobGraphGenerator.isChainable(targetEdge,
streamGraph)) {
- targetEdge.setPartitioner(new RescalePartitioner<>());
- } else if (!mergeForwardGroups(sourceNodeId, targetNodeId)) {
- targetEdge.setPartitioner(new RescalePartitioner<>());
- }
- }
+ tryConvertForwardPartitionerAndMergeForwardGroup(targetEdge);
Review Comment:
if (targetEdge.getPartitioner() instanceof ForwardPartitioner) {
tryConvertForwardPartitionerAndMergeForwardGroup(targetEdge);
}
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]