This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new ef4daeb [FLINK-17020][runtime] Introduce GlobalDataExchangeMode for JobGraph generation ef4daeb is described below commit ef4daeba7881cecc1548e387ab68d829f998dc67 Author: Zhu Zhu <reed...@gmail.com> AuthorDate: Fri Apr 24 11:39:36 2020 +0800 [FLINK-17020][runtime] Introduce GlobalDataExchangeMode for JobGraph generation --- .../api/graph/GlobalDataExchangeMode.java | 48 +++++++ .../flink/streaming/api/graph/StreamGraph.java | 22 +-- .../streaming/api/graph/StreamGraphGenerator.java | 12 +- .../api/graph/StreamingJobGraphGenerator.java | 32 ++++- .../api/graph/StreamingJobGraphGeneratorTest.java | 73 ---------- ...aphGeneratorWithGlobalDataExchangeModeTest.java | 153 +++++++++++++++++++++ .../flink/table/planner/utils/ExecutorUtils.java | 3 +- 7 files changed, 241 insertions(+), 102 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java new file mode 100644 index 0000000..a74d9f3 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; + +/** + * This mode decides the default {@link ResultPartitionType} of job edges. + * Note that this only affects job edges which are {@link ShuffleMode#UNDEFINED}. + */ +public enum GlobalDataExchangeMode { + /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */ + ALL_EDGES_BLOCKING, + + /** + * Set job edges with {@link ForwardPartitioner} to be {@link ResultPartitionType#PIPELINED_BOUNDED} + * and other edges to be {@link ResultPartitionType#BLOCKING}. + **/ + FORWARD_EDGES_PIPELINED, + + /** + * Set job edges with {@link ForwardPartitioner} or {@link RescalePartitioner} to be + * {@link ResultPartitionType#PIPELINED_BOUNDED} and other edges to be {@link ResultPartitionType#BLOCKING}. + **/ + POINTWISE_EDGES_PIPELINED, + + /** Set all job edges {@link ResultPartitionType#PIPELINED_BOUNDED}. */ + ALL_EDGES_PIPELINED +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 39393fc..83d9aca 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -96,11 +96,7 @@ public class StreamGraph implements Pipeline { private TimeCharacteristic timeCharacteristic; - /** - * If there are some stream edges that can not be chained and the shuffle mode of edge is not - * specified, translate these edges into {@code BLOCKING} result partition type. - */ - private boolean blockingConnectionsBetweenChains; + private GlobalDataExchangeMode globalDataExchangeMode; /** Flag to indicate whether to put all vertices into the same slot sharing group by default. */ private boolean allVerticesInSameSlotSharingGroupByDefault = true; @@ -201,20 +197,12 @@ public class StreamGraph implements Pipeline { this.timeCharacteristic = timeCharacteristic; } - /** - * If there are some stream edges that can not be chained and the shuffle mode of edge is not - * specified, translate these edges into {@code BLOCKING} result partition type. - */ - public boolean isBlockingConnectionsBetweenChains() { - return blockingConnectionsBetweenChains; + public GlobalDataExchangeMode getGlobalDataExchangeMode() { + return globalDataExchangeMode; } - /** - * If there are some stream edges that can not be chained and the shuffle mode of edge is not - * specified, translate these edges into {@code BLOCKING} result partition type. - */ - public void setBlockingConnectionsBetweenChains(boolean blockingConnectionsBetweenChains) { - this.blockingConnectionsBetweenChains = blockingConnectionsBetweenChains; + public void setGlobalDataExchangeMode(GlobalDataExchangeMode globalDataExchangeMode) { + this.globalDataExchangeMode = globalDataExchangeMode; } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index dab9bb7..4cf427b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -130,11 +130,7 @@ public class StreamGraphGenerator { private String jobName = DEFAULT_JOB_NAME; - /** - * If there are some stream edges that can not be chained and the shuffle mode of edge is not - * specified, translate these edges into {@code BLOCKING} result partition type. - */ - private boolean blockingConnectionsBetweenChains = false; + private GlobalDataExchangeMode globalDataExchangeMode = GlobalDataExchangeMode.ALL_EDGES_PIPELINED; // This is used to assign a unique ID to iteration source/sink protected static Integer iterationIdCounter = 0; @@ -190,8 +186,8 @@ public class StreamGraphGenerator { return this; } - public StreamGraphGenerator setBlockingConnectionsBetweenChains(boolean blockingConnectionsBetweenChains) { - this.blockingConnectionsBetweenChains = blockingConnectionsBetweenChains; + public StreamGraphGenerator setGlobalDataExchangeMode(GlobalDataExchangeMode globalDataExchangeMode) { + this.globalDataExchangeMode = globalDataExchangeMode; return this; } @@ -207,7 +203,7 @@ public class StreamGraphGenerator { streamGraph.setUserArtifacts(userArtifacts); streamGraph.setTimeCharacteristic(timeCharacteristic); streamGraph.setJobName(jobName); - streamGraph.setBlockingConnectionsBetweenChains(blockingConnectionsBetweenChains); + streamGraph.setGlobalDataExchangeMode(globalDataExchangeMode); alreadyTransformed = new HashMap<>(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index ec58aed..f84242b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -575,8 +575,7 @@ public class StreamingJobGraphGenerator { resultPartitionType = ResultPartitionType.BLOCKING; break; case UNDEFINED: - resultPartitionType = streamGraph.isBlockingConnectionsBetweenChains() ? - ResultPartitionType.BLOCKING : ResultPartitionType.PIPELINED_BOUNDED; + resultPartitionType = determineResultPartitionType(partitioner); break; default: throw new UnsupportedOperationException("Data exchange mode " + @@ -584,7 +583,7 @@ public class StreamingJobGraphGenerator { } JobEdge jobEdge; - if (partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner) { + if (isPointwisePartitioner(partitioner)) { jobEdge = downStreamVertex.connectNewDataSetAsInput( headVertex, DistributionPattern.POINTWISE, @@ -604,6 +603,33 @@ public class StreamingJobGraphGenerator { } } + private static boolean isPointwisePartitioner(StreamPartitioner<?> partitioner) { + return partitioner instanceof ForwardPartitioner || partitioner instanceof RescalePartitioner; + } + + private ResultPartitionType determineResultPartitionType(StreamPartitioner<?> partitioner) { + switch (streamGraph.getGlobalDataExchangeMode()) { + case ALL_EDGES_BLOCKING: + return ResultPartitionType.BLOCKING; + case FORWARD_EDGES_PIPELINED: + if (partitioner instanceof ForwardPartitioner) { + return ResultPartitionType.PIPELINED_BOUNDED; + } else { + return ResultPartitionType.BLOCKING; + } + case POINTWISE_EDGES_PIPELINED: + if (isPointwisePartitioner(partitioner)) { + return ResultPartitionType.PIPELINED_BOUNDED; + } else { + return ResultPartitionType.BLOCKING; + } + case ALL_EDGES_PIPELINED: + return ResultPartitionType.PIPELINED_BOUNDED; + default: + throw new RuntimeException("Unrecognized global data exchange mode " + streamGraph.getGlobalDataExchangeMode()); + } + } + public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 4eb15a6..3ee0002 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -589,79 +589,6 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertEquals(ScheduleMode.LAZY_FROM_SOURCES, jobGraph.getScheduleMode()); } - /** - * Verify that "blockingConnectionsBetweenChains" is off by default. - */ - @Test - public void testBlockingAfterChainingOffDisabled() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // fromElements -> Filter -> Print - DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3); - - // partition transformation with an undefined shuffle mode between source and filter - DataStream<Integer> partitionAfterSourceDataStream = new DataStream<>(env, new PartitionTransformation<>( - sourceDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.UNDEFINED)); - DataStream<Integer> filterDataStream = partitionAfterSourceDataStream.filter(value -> true).setParallelism(2); - - DataStream<Integer> partitionAfterFilterDataStream = new DataStream<>(env, new PartitionTransformation<>( - filterDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.UNDEFINED)); - - partitionAfterFilterDataStream.print().setParallelism(2); - - JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); - - List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); - assertEquals(2, verticesSorted.size()); - - JobVertex sourceVertex = verticesSorted.get(0); - JobVertex filterAndPrintVertex = verticesSorted.get(1); - - assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType()); - assertEquals(ResultPartitionType.PIPELINED_BOUNDED, - filterAndPrintVertex.getInputs().get(0).getSource().getResultType()); - } - - /** - * Test enabling the property "blockingConnectionsBetweenChains". - */ - @Test - public void testBlockingConnectionsBetweenChainsEnabled() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - // fromElements -> Filter -> Map -> Print - DataStream<Integer> sourceDataStream = env.fromElements(1, 2, 3); - - // partition transformation with an undefined shuffle mode between source and filter - DataStream<Integer> partitionAfterSourceDataStream = new DataStream<>(env, new PartitionTransformation<>( - sourceDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.UNDEFINED)); - DataStream<Integer> filterDataStream = partitionAfterSourceDataStream.filter(value -> true).setParallelism(2); - - DataStream<Integer> partitionAfterFilterDataStream = new DataStream<>(env, new PartitionTransformation<>( - filterDataStream.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.UNDEFINED)); - partitionAfterFilterDataStream.map(value -> value).setParallelism(2); - - DataStream<Integer> partitionAfterMapDataStream = new DataStream<>(env, new PartitionTransformation<>( - filterDataStream.getTransformation(), new RescalePartitioner<>(), ShuffleMode.PIPELINED)); - partitionAfterMapDataStream.print().setParallelism(1); - - StreamGraph streamGraph = env.getStreamGraph(); - streamGraph.setBlockingConnectionsBetweenChains(true); - JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); - - List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); - assertEquals(3, verticesSorted.size()); - - JobVertex sourceVertex = verticesSorted.get(0); - // still can be chained - JobVertex filterAndMapVertex = verticesSorted.get(1); - JobVertex printVertex = verticesSorted.get(2); - - // the edge with undefined shuffle mode is translated into BLOCKING - assertEquals(ResultPartitionType.BLOCKING, sourceVertex.getProducedDataSets().get(0).getResultType()); - // the edge with PIPELINED shuffle mode is translated into PIPELINED_BOUNDED - assertEquals(ResultPartitionType.PIPELINED_BOUNDED, filterAndMapVertex.getProducedDataSets().get(0).getResultType()); - assertEquals(ResultPartitionType.PIPELINED_BOUNDED, printVertex.getInputs().get(0).getSource().getResultType()); - } - @Test public void testYieldingOperatorNotChainableToTaskChainedToLegacySource() { StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.createLocalEnvironment(1); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java new file mode 100644 index 0000000..0123df7 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.List; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link StreamingJobGraphGenerator} on different {@link GlobalDataExchangeMode} settings. + */ +public class StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest extends TestLogger { + + @Test + public void testDefaultGlobalDataExchangeModeIsAllEdgesPipelined() { + final StreamGraph streamGraph = createStreamGraph(); + assertThat(streamGraph.getGlobalDataExchangeMode(), is(GlobalDataExchangeMode.ALL_EDGES_PIPELINED)); + } + + @Test + public void testAllEdgesBlockingMode() { + final StreamGraph streamGraph = createStreamGraph(); + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING); + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + final JobVertex sourceVertex = verticesSorted.get(0); + final JobVertex map1Vertex = verticesSorted.get(1); + final JobVertex map2Vertex = verticesSorted.get(2); + + assertEquals(ResultPartitionType.BLOCKING, sourceVertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.BLOCKING, map1Vertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.BLOCKING, map2Vertex.getProducedDataSets().get(0).getResultType()); + } + + @Test + public void testAllEdgesPipelinedMode() { + final StreamGraph streamGraph = createStreamGraph(); + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED); + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + final JobVertex sourceVertex = verticesSorted.get(0); + final JobVertex map1Vertex = verticesSorted.get(1); + final JobVertex map2Vertex = verticesSorted.get(2); + + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, map1Vertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, map2Vertex.getProducedDataSets().get(0).getResultType()); + } + + @Test + public void testForwardEdgesPipelinedMode() { + final StreamGraph streamGraph = createStreamGraph(); + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED); + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + final JobVertex sourceVertex = verticesSorted.get(0); + final JobVertex map1Vertex = verticesSorted.get(1); + final JobVertex map2Vertex = verticesSorted.get(2); + + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.BLOCKING, map1Vertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.BLOCKING, map2Vertex.getProducedDataSets().get(0).getResultType()); + } + + @Test + public void testPointwiseEdgesPipelinedMode() { + final StreamGraph streamGraph = createStreamGraph(); + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED); + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + final JobVertex sourceVertex = verticesSorted.get(0); + final JobVertex map1Vertex = verticesSorted.get(1); + final JobVertex map2Vertex = verticesSorted.get(2); + + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, map1Vertex.getProducedDataSets().get(0).getResultType()); + assertEquals(ResultPartitionType.BLOCKING, map2Vertex.getProducedDataSets().get(0).getResultType()); + } + + @Test + public void testGlobalDataExchangeModeDoesNotOverrideSpecifiedShuffleMode() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + final DataStream<Integer> source = env.fromElements(1, 2, 3).setParallelism(1); + final DataStream<Integer> forward = new DataStream<>(env, new PartitionTransformation<>( + source.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.PIPELINED)); + forward.map(i -> i).startNewChain().setParallelism(1); + final StreamGraph streamGraph = env.getStreamGraph(); + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING); + + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + + final List<JobVertex> verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources(); + final JobVertex sourceVertex = verticesSorted.get(0); + + assertEquals(ResultPartitionType.PIPELINED_BOUNDED, sourceVertex.getProducedDataSets().get(0).getResultType()); + } + + /** + * Topology: source(parallelism=1) --(forward)--> map1(parallelism=1) + * --(rescale)--> map2(parallelism=2) --(rebalance)--> sink(parallelism=2). + */ + private static StreamGraph createStreamGraph() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + final DataStream<Integer> source = env.fromElements(1, 2, 3).setParallelism(1); + + final DataStream<Integer> forward = new DataStream<>(env, new PartitionTransformation<>( + source.getTransformation(), new ForwardPartitioner<>(), ShuffleMode.UNDEFINED)); + final DataStream<Integer> map1 = forward.map(i -> i).startNewChain().setParallelism(1); + + final DataStream<Integer> rescale = new DataStream<>(env, new PartitionTransformation<>( + map1.getTransformation(), new RescalePartitioner<>(), ShuffleMode.UNDEFINED)); + final DataStream<Integer> map2 = rescale.map(i -> i).setParallelism(2); + + map2.rebalance().print().setParallelism(2); + + return env.getStreamGraph(); + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java index b636320..43028aa 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java @@ -25,6 +25,7 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.transformations.ShuffleMode; @@ -85,7 +86,7 @@ public class ExecutorUtils { throw new IllegalArgumentException("Checkpoint is not supported for batch jobs."); } if (ExecutorUtils.isShuffleModeAllBatch(tableConfig)) { - streamGraph.setBlockingConnectionsBetweenChains(true); + streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING); } }