This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ef4daeb [FLINK-17020][runtime] Introduce GlobalDataExchangeMode for
JobGraph generation
ef4daeb is described below
commit ef4daeba7881cecc1548e387ab68d829f998dc67
Author: Zhu Zhu <[email protected]>
AuthorDate: Fri Apr 24 11:39:36 2020 +0800
[FLINK-17020][runtime] Introduce GlobalDataExchangeMode for JobGraph
generation
---
.../api/graph/GlobalDataExchangeMode.java | 48 +++++++
.../flink/streaming/api/graph/StreamGraph.java | 22 +--
.../streaming/api/graph/StreamGraphGenerator.java | 12 +-
.../api/graph/StreamingJobGraphGenerator.java | 32 ++++-
.../api/graph/StreamingJobGraphGeneratorTest.java | 73 ----------
...aphGeneratorWithGlobalDataExchangeModeTest.java | 153 +++++++++++++++++++++
.../flink/table/planner/utils/ExecutorUtils.java | 3 +-
7 files changed, 241 insertions(+), 102 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
new file mode 100644
index 0000000..a74d9f3
--- /dev/null
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+
+/**
+ * This mode decides the default {@link ResultPartitionType} of job edges.
+ * Note that this only affects job edges which are {@link
ShuffleMode#UNDEFINED}.
+ */
+public enum GlobalDataExchangeMode {
+ /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */
+ ALL_EDGES_BLOCKING,
+
+ /**
+ * Set job edges with {@link ForwardPartitioner} to be {@link
ResultPartitionType#PIPELINED_BOUNDED}
+ * and other edges to be {@link ResultPartitionType#BLOCKING}.
+ **/
+ FORWARD_EDGES_PIPELINED,
+
+ /**
+ * Set job edges with {@link ForwardPartitioner} or {@link
RescalePartitioner} to be
+ * {@link ResultPartitionType#PIPELINED_BOUNDED} and other edges to be
{@link ResultPartitionType#BLOCKING}.
+ **/
+ POINTWISE_EDGES_PIPELINED,
+
+ /** Set all job edges {@link ResultPartitionType#PIPELINED_BOUNDED}. */
+ ALL_EDGES_PIPELINED
+}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 39393fc..83d9aca 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -96,11 +96,7 @@ public class StreamGraph implements Pipeline {
private TimeCharacteristic timeCharacteristic;
- /**
- * If there are some stream edges that can not be chained and the
shuffle mode of edge is not
- * specified, translate these edges into {@code BLOCKING} result
partition type.
- */
- private boolean blockingConnectionsBetweenChains;
+ private GlobalDataExchangeMode globalDataExchangeMode;
/** Flag to indicate whether to put all vertices into the same slot
sharing group by default. */
private boolean allVerticesInSameSlotSharingGroupByDefault = true;
@@ -201,20 +197,12 @@ public class StreamGraph implements Pipeline {
this.timeCharacteristic = timeCharacteristic;
}
- /**
- * If there are some stream edges that can not be chained and the
shuffle mode of edge is not
- * specified, translate these edges into {@code BLOCKING} result
partition type.
- */
- public boolean isBlockingConnectionsBetweenChains() {
- return blockingConnectionsBetweenChains;
+ public GlobalDataExchangeMode getGlobalDataExchangeMode() {
+ return globalDataExchangeMode;
}
- /**
- * If there are some stream edges that can not be chained and the
shuffle mode of edge is not
- * specified, translate these edges into {@code BLOCKING} result
partition type.
- */
- public void setBlockingConnectionsBetweenChains(boolean
blockingConnectionsBetweenChains) {
- this.blockingConnectionsBetweenChains =
blockingConnectionsBetweenChains;
+ public void setGlobalDataExchangeMode(GlobalDataExchangeMode
globalDataExchangeMode) {
+ this.globalDataExchangeMode = globalDataExchangeMode;
}
/**
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index dab9bb7..4cf427b 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -130,11 +130,7 @@ public class StreamGraphGenerator {
private String jobName = DEFAULT_JOB_NAME;
- /**
- * If there are some stream edges that can not be chained and the
shuffle mode of edge is not
- * specified, translate these edges into {@code BLOCKING} result
partition type.
- */
- private boolean blockingConnectionsBetweenChains = false;
+ private GlobalDataExchangeMode globalDataExchangeMode =
GlobalDataExchangeMode.ALL_EDGES_PIPELINED;
// This is used to assign a unique ID to iteration source/sink
protected static Integer iterationIdCounter = 0;
@@ -190,8 +186,8 @@ public class StreamGraphGenerator {
return this;
}
- public StreamGraphGenerator setBlockingConnectionsBetweenChains(boolean
blockingConnectionsBetweenChains) {
- this.blockingConnectionsBetweenChains =
blockingConnectionsBetweenChains;
+ public StreamGraphGenerator
setGlobalDataExchangeMode(GlobalDataExchangeMode globalDataExchangeMode) {
+ this.globalDataExchangeMode = globalDataExchangeMode;
return this;
}
@@ -207,7 +203,7 @@ public class StreamGraphGenerator {
streamGraph.setUserArtifacts(userArtifacts);
streamGraph.setTimeCharacteristic(timeCharacteristic);
streamGraph.setJobName(jobName);
-
streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains);
+ streamGraph.setGlobalDataExchangeMode(globalDataExchangeMode);
alreadyTransformed = new HashMap<>();
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index ec58aed..f84242b 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -575,8 +575,7 @@ public class StreamingJobGraphGenerator {
resultPartitionType =
ResultPartitionType.BLOCKING;
break;
case UNDEFINED:
- resultPartitionType =
streamGraph.isBlockingConnectionsBetweenChains() ?
- ResultPartitionType.BLOCKING :
ResultPartitionType.PIPELINED_BOUNDED;
+ resultPartitionType =
determineResultPartitionType(partitioner);
break;
default:
throw new UnsupportedOperationException("Data
exchange mode " +
@@ -584,7 +583,7 @@ public class StreamingJobGraphGenerator {
}
JobEdge jobEdge;
- if (partitioner instanceof ForwardPartitioner || partitioner
instanceof RescalePartitioner) {
+ if (isPointwisePartitioner(partitioner)) {
jobEdge = downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
@@ -604,6 +603,33 @@ public class StreamingJobGraphGenerator {
}
}
+ private static boolean isPointwisePartitioner(StreamPartitioner<?>
partitioner) {
+ return partitioner instanceof ForwardPartitioner || partitioner
instanceof RescalePartitioner;
+ }
+
+ private ResultPartitionType
determineResultPartitionType(StreamPartitioner<?> partitioner) {
+ switch (streamGraph.getGlobalDataExchangeMode()) {
+ case ALL_EDGES_BLOCKING:
+ return ResultPartitionType.BLOCKING;
+ case FORWARD_EDGES_PIPELINED:
+ if (partitioner instanceof ForwardPartitioner) {
+ return
ResultPartitionType.PIPELINED_BOUNDED;
+ } else {
+ return ResultPartitionType.BLOCKING;
+ }
+ case POINTWISE_EDGES_PIPELINED:
+ if (isPointwisePartitioner(partitioner)) {
+ return
ResultPartitionType.PIPELINED_BOUNDED;
+ } else {
+ return ResultPartitionType.BLOCKING;
+ }
+ case ALL_EDGES_PIPELINED:
+ return ResultPartitionType.PIPELINED_BOUNDED;
+ default:
+ throw new RuntimeException("Unrecognized global
data exchange mode " + streamGraph.getGlobalDataExchangeMode());
+ }
+ }
+
public static boolean isChainable(StreamEdge edge, StreamGraph
streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 4eb15a6..3ee0002 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -589,79 +589,6 @@ public class StreamingJobGraphGeneratorTest extends
TestLogger {
assertEquals(ScheduleMode.LAZY_FROM_SOURCES,
jobGraph.getScheduleMode());
}
- /**
- * Verify that "blockingConnectionsBetweenChains" is off by default.
- */
- @Test
- public void testBlockingAfterChainingOffDisabled() {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- // fromElements -> Filter -> Print
- DataStream<Integer> sourceDataStream = env.fromElements(1, 2,
3);
-
- // partition transformation with an undefined shuffle mode
between source and filter
- DataStream<Integer> partitionAfterSourceDataStream = new
DataStream<>(env, new PartitionTransformation<>(
- sourceDataStream.getTransformation(), new
RescalePartitioner<>(), ShuffleMode.UNDEFINED));
- DataStream<Integer> filterDataStream =
partitionAfterSourceDataStream.filter(value -> true).setParallelism(2);
-
- DataStream<Integer> partitionAfterFilterDataStream = new
DataStream<>(env, new PartitionTransformation<>(
- filterDataStream.getTransformation(), new
ForwardPartitioner<>(), ShuffleMode.UNDEFINED));
-
- partitionAfterFilterDataStream.print().setParallelism(2);
-
- JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
-
- List<JobVertex> verticesSorted =
jobGraph.getVerticesSortedTopologicallyFromSources();
- assertEquals(2, verticesSorted.size());
-
- JobVertex sourceVertex = verticesSorted.get(0);
- JobVertex filterAndPrintVertex = verticesSorted.get(1);
-
- assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
sourceVertex.getProducedDataSets().get(0).getResultType());
- assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
-
filterAndPrintVertex.getInputs().get(0).getSource().getResultType());
- }
-
- /**
- * Test enabling the property "blockingConnectionsBetweenChains".
- */
- @Test
- public void testBlockingConnectionsBetweenChainsEnabled() {
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- // fromElements -> Filter -> Map -> Print
- DataStream<Integer> sourceDataStream = env.fromElements(1, 2,
3);
-
- // partition transformation with an undefined shuffle mode
between source and filter
- DataStream<Integer> partitionAfterSourceDataStream = new
DataStream<>(env, new PartitionTransformation<>(
- sourceDataStream.getTransformation(), new
RescalePartitioner<>(), ShuffleMode.UNDEFINED));
- DataStream<Integer> filterDataStream =
partitionAfterSourceDataStream.filter(value -> true).setParallelism(2);
-
- DataStream<Integer> partitionAfterFilterDataStream = new
DataStream<>(env, new PartitionTransformation<>(
- filterDataStream.getTransformation(), new
ForwardPartitioner<>(), ShuffleMode.UNDEFINED));
- partitionAfterFilterDataStream.map(value ->
value).setParallelism(2);
-
- DataStream<Integer> partitionAfterMapDataStream = new
DataStream<>(env, new PartitionTransformation<>(
- filterDataStream.getTransformation(), new
RescalePartitioner<>(), ShuffleMode.PIPELINED));
- partitionAfterMapDataStream.print().setParallelism(1);
-
- StreamGraph streamGraph = env.getStreamGraph();
- streamGraph.setBlockingConnectionsBetweenChains(true);
- JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
-
- List<JobVertex> verticesSorted =
jobGraph.getVerticesSortedTopologicallyFromSources();
- assertEquals(3, verticesSorted.size());
-
- JobVertex sourceVertex = verticesSorted.get(0);
- // still can be chained
- JobVertex filterAndMapVertex = verticesSorted.get(1);
- JobVertex printVertex = verticesSorted.get(2);
-
- // the edge with undefined shuffle mode is translated into
BLOCKING
- assertEquals(ResultPartitionType.BLOCKING,
sourceVertex.getProducedDataSets().get(0).getResultType());
- // the edge with PIPELINED shuffle mode is translated into
PIPELINED_BOUNDED
- assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
filterAndMapVertex.getProducedDataSets().get(0).getResultType());
- assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
printVertex.getInputs().get(0).getSource().getResultType());
- }
-
@Test
public void
testYieldingOperatorNotChainableToTaskChainedToLegacySource() {
StreamExecutionEnvironment chainEnv =
StreamExecutionEnvironment.createLocalEnvironment(1);
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java
new file mode 100644
index 0000000..0123df7
--- /dev/null
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link StreamingJobGraphGenerator} on different {@link
GlobalDataExchangeMode} settings.
+ */
+public class StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest extends
TestLogger {
+
+ @Test
+ public void testDefaultGlobalDataExchangeModeIsAllEdgesPipelined() {
+ final StreamGraph streamGraph = createStreamGraph();
+ assertThat(streamGraph.getGlobalDataExchangeMode(),
is(GlobalDataExchangeMode.ALL_EDGES_PIPELINED));
+ }
+
+ @Test
+ public void testAllEdgesBlockingMode() {
+ final StreamGraph streamGraph = createStreamGraph();
+
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+ final JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+ final List<JobVertex> verticesSorted =
jobGraph.getVerticesSortedTopologicallyFromSources();
+ final JobVertex sourceVertex = verticesSorted.get(0);
+ final JobVertex map1Vertex = verticesSorted.get(1);
+ final JobVertex map2Vertex = verticesSorted.get(2);
+
+ assertEquals(ResultPartitionType.BLOCKING,
sourceVertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.BLOCKING,
map1Vertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.BLOCKING,
map2Vertex.getProducedDataSets().get(0).getResultType());
+ }
+
+ @Test
+ public void testAllEdgesPipelinedMode() {
+ final StreamGraph streamGraph = createStreamGraph();
+
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+ final JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+ final List<JobVertex> verticesSorted =
jobGraph.getVerticesSortedTopologicallyFromSources();
+ final JobVertex sourceVertex = verticesSorted.get(0);
+ final JobVertex map1Vertex = verticesSorted.get(1);
+ final JobVertex map2Vertex = verticesSorted.get(2);
+
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
sourceVertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
map1Vertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
map2Vertex.getProducedDataSets().get(0).getResultType());
+ }
+
+ @Test
+ public void testForwardEdgesPipelinedMode() {
+ final StreamGraph streamGraph = createStreamGraph();
+
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED);
+ final JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+ final List<JobVertex> verticesSorted =
jobGraph.getVerticesSortedTopologicallyFromSources();
+ final JobVertex sourceVertex = verticesSorted.get(0);
+ final JobVertex map1Vertex = verticesSorted.get(1);
+ final JobVertex map2Vertex = verticesSorted.get(2);
+
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
sourceVertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.BLOCKING,
map1Vertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.BLOCKING,
map2Vertex.getProducedDataSets().get(0).getResultType());
+ }
+
+ @Test
+ public void testPointwiseEdgesPipelinedMode() {
+ final StreamGraph streamGraph = createStreamGraph();
+
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
+ final JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+ final List<JobVertex> verticesSorted =
jobGraph.getVerticesSortedTopologicallyFromSources();
+ final JobVertex sourceVertex = verticesSorted.get(0);
+ final JobVertex map1Vertex = verticesSorted.get(1);
+ final JobVertex map2Vertex = verticesSorted.get(2);
+
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
sourceVertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
map1Vertex.getProducedDataSets().get(0).getResultType());
+ assertEquals(ResultPartitionType.BLOCKING,
map2Vertex.getProducedDataSets().get(0).getResultType());
+ }
+
+ @Test
+ public void
testGlobalDataExchangeModeDoesNotOverrideSpecifiedShuffleMode() {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ final DataStream<Integer> source = env.fromElements(1, 2,
3).setParallelism(1);
+ final DataStream<Integer> forward = new DataStream<>(env, new
PartitionTransformation<>(
+ source.getTransformation(), new ForwardPartitioner<>(),
ShuffleMode.PIPELINED));
+ forward.map(i -> i).startNewChain().setParallelism(1);
+ final StreamGraph streamGraph = env.getStreamGraph();
+
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+
+ final JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+ final List<JobVertex> verticesSorted =
jobGraph.getVerticesSortedTopologicallyFromSources();
+ final JobVertex sourceVertex = verticesSorted.get(0);
+
+ assertEquals(ResultPartitionType.PIPELINED_BOUNDED,
sourceVertex.getProducedDataSets().get(0).getResultType());
+ }
+
+ /**
+ * Topology: source(parallelism=1) --(forward)--> map1(parallelism=1)
+ * --(rescale)--> map2(parallelism=2) --(rebalance)-->
sink(parallelism=2).
+ */
+ private static StreamGraph createStreamGraph() {
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+ final DataStream<Integer> source = env.fromElements(1, 2,
3).setParallelism(1);
+
+ final DataStream<Integer> forward = new DataStream<>(env, new
PartitionTransformation<>(
+ source.getTransformation(), new ForwardPartitioner<>(),
ShuffleMode.UNDEFINED));
+ final DataStream<Integer> map1 = forward.map(i ->
i).startNewChain().setParallelism(1);
+
+ final DataStream<Integer> rescale = new DataStream<>(env, new
PartitionTransformation<>(
+ map1.getTransformation(), new RescalePartitioner<>(),
ShuffleMode.UNDEFINED));
+ final DataStream<Integer> map2 = rescale.map(i ->
i).setParallelism(2);
+
+ map2.rebalance().print().setParallelism(2);
+
+ return env.getStreamGraph();
+ }
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
index b636320..43028aa 100644
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
+++
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
@@ -85,7 +86,7 @@ public class ExecutorUtils {
throw new IllegalArgumentException("Checkpoint is not
supported for batch jobs.");
}
if (ExecutorUtils.isShuffleModeAllBatch(tableConfig)) {
- streamGraph.setBlockingConnectionsBetweenChains(true);
+
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
}
}