This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ef4daeb  [FLINK-17020][runtime] Introduce GlobalDataExchangeMode for 
JobGraph generation
ef4daeb is described below

commit ef4daeba7881cecc1548e387ab68d829f998dc67
Author: Zhu Zhu <reed...@gmail.com>
AuthorDate: Fri Apr 24 11:39:36 2020 +0800

    [FLINK-17020][runtime] Introduce GlobalDataExchangeMode for JobGraph 
generation
---
 .../api/graph/GlobalDataExchangeMode.java          |  48 +++++++
 .../flink/streaming/api/graph/StreamGraph.java     |  22 +--
 .../streaming/api/graph/StreamGraphGenerator.java  |  12 +-
 .../api/graph/StreamingJobGraphGenerator.java      |  32 ++++-
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  73 ----------
 ...aphGeneratorWithGlobalDataExchangeModeTest.java | 153 +++++++++++++++++++++
 .../flink/table/planner/utils/ExecutorUtils.java   |   3 +-
 7 files changed, 241 insertions(+), 102 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
new file mode 100644
index 0000000..a74d9f3
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+
+/**
+ * This mode decides the default {@link ResultPartitionType} of job edges.
+ * Note that this only affects job edges which are {@link 
ShuffleMode#UNDEFINED}.
+ */
+public enum GlobalDataExchangeMode {
+       /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */
+       ALL_EDGES_BLOCKING,
+
+       /**
+        * Set job edges with {@link ForwardPartitioner} to be {@link 
ResultPartitionType#PIPELINED_BOUNDED}
+        * and other edges to be {@link ResultPartitionType#BLOCKING}.
+        **/
+       FORWARD_EDGES_PIPELINED,
+
+       /**
+        * Set job edges with {@link ForwardPartitioner} or {@link 
RescalePartitioner} to be
+        * {@link ResultPartitionType#PIPELINED_BOUNDED} and other edges to be 
{@link ResultPartitionType#BLOCKING}.
+        **/
+       POINTWISE_EDGES_PIPELINED,
+
+       /** Set all job edges {@link ResultPartitionType#PIPELINED_BOUNDED}. */
+       ALL_EDGES_PIPELINED
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 39393fc..83d9aca 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -96,11 +96,7 @@ public class StreamGraph implements Pipeline {
 
        private TimeCharacteristic timeCharacteristic;
 
-       /**
-        * If there are some stream edges that can not be chained and the 
shuffle mode of edge is not
-        * specified, translate these edges into {@code BLOCKING} result 
partition type.
-        */
-       private boolean blockingConnectionsBetweenChains;
+       private GlobalDataExchangeMode globalDataExchangeMode;
 
        /** Flag to indicate whether to put all vertices into the same slot 
sharing group by default. */
        private boolean allVerticesInSameSlotSharingGroupByDefault = true;
@@ -201,20 +197,12 @@ public class StreamGraph implements Pipeline {
                this.timeCharacteristic = timeCharacteristic;
        }
 
-       /**
-        * If there are some stream edges that can not be chained and the 
shuffle mode of edge is not
-        * specified, translate these edges into {@code BLOCKING} result 
partition type.
-        */
-       public boolean isBlockingConnectionsBetweenChains() {
-               return blockingConnectionsBetweenChains;
+       public GlobalDataExchangeMode getGlobalDataExchangeMode() {
+               return globalDataExchangeMode;
        }
 
-       /**
-        * If there are some stream edges that can not be chained and the 
shuffle mode of edge is not
-        * specified, translate these edges into {@code BLOCKING} result 
partition type.
-        */
-       public void setBlockingConnectionsBetweenChains(boolean 
blockingConnectionsBetweenChains) {
-               this.blockingConnectionsBetweenChains = 
blockingConnectionsBetweenChains;
+       public void setGlobalDataExchangeMode(GlobalDataExchangeMode 
globalDataExchangeMode) {
+               this.globalDataExchangeMode = globalDataExchangeMode;
        }
 
        /**
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index dab9bb7..4cf427b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -130,11 +130,7 @@ public class StreamGraphGenerator {
 
        private String jobName = DEFAULT_JOB_NAME;
 
-       /**
-        * If there are some stream edges that can not be chained and the 
shuffle mode of edge is not
-        * specified, translate these edges into {@code BLOCKING} result 
partition type.
-        */
-       private boolean blockingConnectionsBetweenChains = false;
+       private GlobalDataExchangeMode globalDataExchangeMode = 
GlobalDataExchangeMode.ALL_EDGES_PIPELINED;
 
        // This is used to assign a unique ID to iteration source/sink
        protected static Integer iterationIdCounter = 0;
@@ -190,8 +186,8 @@ public class StreamGraphGenerator {
                return this;
        }
 
-       public StreamGraphGenerator setBlockingConnectionsBetweenChains(boolean 
blockingConnectionsBetweenChains) {
-               this.blockingConnectionsBetweenChains = 
blockingConnectionsBetweenChains;
+       public StreamGraphGenerator 
setGlobalDataExchangeMode(GlobalDataExchangeMode globalDataExchangeMode) {
+               this.globalDataExchangeMode = globalDataExchangeMode;
                return this;
        }
 
@@ -207,7 +203,7 @@ public class StreamGraphGenerator {
                streamGraph.setUserArtifacts(userArtifacts);
                streamGraph.setTimeCharacteristic(timeCharacteristic);
                streamGraph.setJobName(jobName);
-               
streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);
+               streamGraph.setGlobalDataExchangeMode(globalDataExchangeMode);
 
                alreadyTransformed = new HashMap<>();
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index ec58aed..f84242b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -575,8 +575,7 @@ public class StreamingJobGraphGenerator {
                                resultPartitionType = 
ResultPartitionType.BLOCKING;
                                break;
                        case UNDEFINED:
-                               resultPartitionType = 
streamGraph.isBlockingConnectionsBetweenChains() ?
-                                               ResultPartitionType.BLOCKING : 
ResultPartitionType.PIPELINED_BOUNDED;
+                               resultPartitionType = 
determineResultPartitionType(partitioner);
                                break;
                        default:
                                throw new UnsupportedOperationException("Data 
exchange mode " +
@@ -584,7 +583,7 @@ public class StreamingJobGraphGenerator {
                }
 
                JobEdge jobEdge;
-               if (partitioner instanceof ForwardPartitioner || partitioner 
instanceof RescalePartitioner) {
+               if (isPointwisePartitioner(partitioner)) {
                        jobEdge = downStreamVertex.connectNewDataSetAsInput(
                                headVertex,
                                DistributionPattern.POINTWISE,
@@ -604,6 +603,33 @@ public class StreamingJobGraphGenerator {
                }
        }
 
+       private static boolean isPointwisePartitioner(StreamPartitioner<?> 
partitioner) {
+               return partitioner instanceof ForwardPartitioner || partitioner 
instanceof RescalePartitioner;
+       }
+
+       private ResultPartitionType 
determineResultPartitionType(StreamPartitioner<?> partitioner) {
+               switch (streamGraph.getGlobalDataExchangeMode()) {
+                       case ALL_EDGES_BLOCKING:
+                               return ResultPartitionType.BLOCKING;
+                       case FORWARD_EDGES_PIPELINED:
+                               if (partitioner instanceof ForwardPartitioner) {
+                                       return 
ResultPartitionType.PIPELINED_BOUNDED;
+                               } else {
+                                       return ResultPartitionType.BLOCKING;
+                               }
+                       case POINTWISE_EDGES_PIPELINED:
+                               if (isPointwisePartitioner(partitioner)) {
+                                       return 
ResultPartitionType.PIPELINED_BOUNDED;
+                               } else {
+                                       return ResultPartitionType.BLOCKING;
+                               }
+                       case ALL_EDGES_PIPELINED:
+                               return ResultPartitionType.PIPELINED_BOUNDED;
+                       default:
+                               throw new RuntimeException("Unrecognized global 
data exchange mode " + streamGraph.getGlobalDataExchangeMode());
+               }
+       }
+
        public static boolean isChainable(StreamEdge edge, StreamGraph 
streamGraph) {
                StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
                StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4eb15a6..3ee0002 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -589,79 +589,6 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                assertEquals(ScheduleMode.LAZY_FROM_SOURCES, 
jobGraph.getScheduleMode());
        }
 
-       /**
-        * Verify that "blockingConnectionsBetweenChains" is off by default.
-        */
-       @Test
-       public void testBlockingAfterChainingOffDisabled() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               // fromElements -> Filter -> Print
-               DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 
3);
-
-               // partition transformation with an undefined shuffle mode 
between source and filter
-               DataStream<Integer> partitionAfterSourceDataStream = new 
DataStream<>(env, new PartitionTransformation<>(
-                       sourceDataStream.getTransformation(), new 
RescalePartitioner<>(), ShuffleMode.UNDEFINED));
-               DataStream<Integer> filterDataStream = 
partitionAfterSourceDataStream.filter(value -> true).setParallelism(2);
-
-               DataStream<Integer> partitionAfterFilterDataStream = new 
DataStream<>(env, new PartitionTransformation<>(
-                       filterDataStream.getTransformation(), new 
ForwardPartitioner<>(), ShuffleMode.UNDEFINED));
-
-               partitionAfterFilterDataStream.print().setParallelism(2);
-
-               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
-
-               List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
-               assertEquals(2, verticesSorted.size());
-
-               JobVertex sourceVertex = verticesSorted.get(0);
-               JobVertex filterAndPrintVertex = verticesSorted.get(1);
-
-               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
-               assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
-                               
filterAndPrintVertex.getInputs().get(0).getSource().getResultType());
-       }
-
-       /**
-        * Test enabling the property "blockingConnectionsBetweenChains".
-        */
-       @Test
-       public void testBlockingConnectionsBetweenChainsEnabled() {
-               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-               // fromElements -> Filter -> Map -> Print
-               DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 
3);
-
-               // partition transformation with an undefined shuffle mode 
between source and filter
-               DataStream<Integer> partitionAfterSourceDataStream = new 
DataStream<>(env, new PartitionTransformation<>(
-                       sourceDataStream.getTransformation(), new 
RescalePartitioner<>(), ShuffleMode.UNDEFINED));
-               DataStream<Integer> filterDataStream = 
partitionAfterSourceDataStream.filter(value -> true).setParallelism(2);
-
-               DataStream<Integer> partitionAfterFilterDataStream = new 
DataStream<>(env, new PartitionTransformation<>(
-                       filterDataStream.getTransformation(), new 
ForwardPartitioner<>(), ShuffleMode.UNDEFINED));
-               partitionAfterFilterDataStream.map(value -> 
value).setParallelism(2);
-
-               DataStream<Integer> partitionAfterMapDataStream = new 
DataStream<>(env, new PartitionTransformation<>(
-                       filterDataStream.getTransformation(), new 
RescalePartitioner<>(), ShuffleMode.PIPELINED));
-               partitionAfterMapDataStream.print().setParallelism(1);
-
-               StreamGraph streamGraph = env.getStreamGraph();
-               streamGraph.setBlockingConnectionsBetweenChains(true);
-               JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
-
-               List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
-               assertEquals(3, verticesSorted.size());
-
-               JobVertex sourceVertex = verticesSorted.get(0);
-               // still can be chained
-               JobVertex filterAndMapVertex = verticesSorted.get(1);
-               JobVertex printVertex = verticesSorted.get(2);
-
-               // the edge with undefined shuffle mode is translated into 
BLOCKING
-               assertEquals(ResultPartitionType.BLOCKING, 
sourceVertex.getProducedDataSets().get(0).getResultType());
-               // the edge with PIPELINED shuffle mode is translated into 
PIPELINED_BOUNDED
-               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
filterAndMapVertex.getProducedDataSets().get(0).getResultType());
-               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
printVertex.getInputs().get(0).getSource().getResultType());
-       }
-
        @Test
        public void 
testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
                StreamExecutionEnvironment chainEnv = 
StreamExecutionEnvironment.createLocalEnvironment(1);
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java
new file mode 100644
index 0000000..0123df7
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link StreamingJobGraphGenerator} on different {@link 
GlobalDataExchangeMode} settings.
+ */
+public class StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest extends 
TestLogger {
+
+       @Test
+       public void testDefaultGlobalDataExchangeModeIsAllEdgesPipelined() {
+               final StreamGraph streamGraph = createStreamGraph();
+               assertThat(streamGraph.getGlobalDataExchangeMode(), 
is(GlobalDataExchangeMode.ALL_EDGES_PIPELINED));
+       }
+
+       @Test
+       public void testAllEdgesBlockingMode() {
+               final StreamGraph streamGraph = createStreamGraph();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.BLOCKING, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testAllEdgesPipelinedMode() {
+               final StreamGraph streamGraph = createStreamGraph();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testForwardEdgesPipelinedMode() {
+               final StreamGraph streamGraph = createStreamGraph();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testPointwiseEdgesPipelinedMode() {
+               final StreamGraph streamGraph = createStreamGraph();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void 
testGlobalDataExchangeModeDoesNotOverrideSpecifiedShuffleMode() {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               final DataStream<Integer> source = env.fromElements(1, 2, 
3).setParallelism(1);
+               final DataStream<Integer> forward = new DataStream<>(env, new 
PartitionTransformation<>(
+                       source.getTransformation(), new ForwardPartitioner<>(), 
ShuffleMode.PIPELINED));
+               forward.map(i -> i).startNewChain().setParallelism(1);
+               final StreamGraph streamGraph = env.getStreamGraph();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       /**
+        * Topology: source(parallelism=1) --(forward)--> map1(parallelism=1)
+        *           --(rescale)--> map2(parallelism=2) --(rebalance)--> 
sink(parallelism=2).
+        */
+       private static StreamGraph createStreamGraph() {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               final DataStream<Integer> source = env.fromElements(1, 2, 
3).setParallelism(1);
+
+               final DataStream<Integer> forward = new DataStream<>(env, new 
PartitionTransformation<>(
+                       source.getTransformation(), new ForwardPartitioner<>(), 
ShuffleMode.UNDEFINED));
+               final DataStream<Integer> map1 = forward.map(i -> 
i).startNewChain().setParallelism(1);
+
+               final DataStream<Integer> rescale = new DataStream<>(env, new 
PartitionTransformation<>(
+                       map1.getTransformation(), new RescalePartitioner<>(), 
ShuffleMode.UNDEFINED));
+               final DataStream<Integer> map2 = rescale.map(i -> 
i).setParallelism(2);
+
+               map2.rebalance().print().setParallelism(2);
+
+               return env.getStreamGraph();
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
index b636320..43028aa 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.transformations.ShuffleMode;
@@ -85,7 +86,7 @@ public class ExecutorUtils {
                        throw new IllegalArgumentException("Checkpoint is not 
supported for batch jobs.");
                }
                if (ExecutorUtils.isShuffleModeAllBatch(tableConfig)) {
-                       streamGraph.setBlockingConnectionsBetweenChains(true);
+                       
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
                }
        }
 

Reply via email to