Repository: flink Updated Branches: refs/heads/master ac8225fd5 -> 71dee4e6f
[hotfix] Modify the classification of partitioner in StreamingJobGraphGenerator#connect This closes #5358. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e7f281c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e7f281c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e7f281c Branch: refs/heads/master Commit: 4e7f281c2e635e53d5a477bcfb5cafe3593b71be Parents: ac8225f Author: maqingxiang-it <[email protected]> Authored: Thu Jan 25 16:52:17 2018 +0800 Committer: Till Rohrmann <[email protected]> Committed: Fri Jan 26 14:04:23 2018 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/graph/StreamingJobGraphGenerator.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4e7f281c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index aa9c685..4e4f84d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -495,12 +495,7 @@ public class StreamingJobGraphGenerator { StreamPartitioner<?> partitioner = edge.getPartitioner(); JobEdge jobEdge; - if (partitioner instanceof ForwardPartitioner) { - jobEdge = downStreamVertex.connectNewDataSetAsInput( - headVertex, - DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED_BOUNDED); - } else if (partitioner instanceof RescalePartitioner){ + if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE,
