Repository: flink
Updated Branches:
  refs/heads/master ac8225fd5 -> 71dee4e6f


[hotfix] Modify the classification of partitioner in 
StreamingJobGraphGenerator#connect

This closes #5358.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e7f281c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e7f281c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e7f281c

Branch: refs/heads/master
Commit: 4e7f281c2e635e53d5a477bcfb5cafe3593b71be
Parents: ac8225f
Author: maqingxiang-it <[email protected]>
Authored: Thu Jan 25 16:52:17 2018 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Fri Jan 26 14:04:23 2018 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/graph/StreamingJobGraphGenerator.java | 7 +------
 1 file changed, 1 insertion(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e7f281c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index aa9c685..4e4f84d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -495,12 +495,7 @@ public class StreamingJobGraphGenerator {
 
                StreamPartitioner<?> partitioner = edge.getPartitioner();
                JobEdge jobEdge;
-               if (partitioner instanceof ForwardPartitioner) {
-                       jobEdge = downStreamVertex.connectNewDataSetAsInput(
-                               headVertex,
-                               DistributionPattern.POINTWISE,
-                               ResultPartitionType.PIPELINED_BOUNDED);
-               } else if (partitioner instanceof RescalePartitioner){
+               if (partitioner instanceof ForwardPartitioner || partitioner 
instanceof RescalePartitioner) {
                        jobEdge = downStreamVertex.connectNewDataSetAsInput(
                                headVertex,
                                DistributionPattern.POINTWISE,

Reply via email to