GJL commented on a change in pull request #11774:
URL: https://github.com/apache/flink/pull/11774#discussion_r413926186



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -898,6 +825,116 @@ public void 
testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled(
                assertDistinctSharingGroups(source1Vertex, source2Vertex, 
map2Vertex);
        }
 
+       @Test
+       public void testDefaultGlobalDataExchangeModeIsAllEdgesPipelined() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               assertThat(streamGraph.getGlobalDataExchangeMode(), 
is(GlobalDataExchangeMode.ALL_EDGES_PIPELINED));
+       }
+
+       @Test
+       public void testAllEdgesBlockingMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.BLOCKING, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testAllEdgesPipelinedMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testForwardEdgesPipelinedMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testPointwiseEdgesPipelinedMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void 
testGlobalDataExchangeModeDoesNotOverrideSpecifiedShuffleMode() {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               final DataStream<Integer> source = env.fromElements(1, 2, 
3).setParallelism(1);
+               final DataStream<Integer> forward = new DataStream<>(env, new 
PartitionTransformation<>(
+                       source.getTransformation(), new ForwardPartitioner<>(), 
ShuffleMode.PIPELINED));
+               forward.map(i -> i).startNewChain().setParallelism(1);
+               final StreamGraph streamGraph = env.getStreamGraph();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       /**
+        * Topology: source(parallelism=1) --(forward)--> map1(parallelism=1)
+        *           --(rescale)--> map2(parallelism=2) --(rebalance)--> 
sink(parallelism=2).
+        */
+       private StreamGraph createStreamGraphForGlobalDataExchangeModeTests() {

Review comment:
       Maybe it makes sense to move the added tests to a new class.
   Also, can be declared `static`.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -898,6 +825,116 @@ public void 
testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled(
                assertDistinctSharingGroups(source1Vertex, source2Vertex, 
map2Vertex);
        }
 
+       @Test
+       public void testDefaultGlobalDataExchangeModeIsAllEdgesPipelined() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               assertThat(streamGraph.getGlobalDataExchangeMode(), 
is(GlobalDataExchangeMode.ALL_EDGES_PIPELINED));
+       }
+
+       @Test
+       public void testAllEdgesBlockingMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.BLOCKING, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testAllEdgesPipelinedMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testForwardEdgesPipelinedMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testPointwiseEdgesPipelinedMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void 
testGlobalDataExchangeModeDoesNotOverrideSpecifiedShuffleMode() {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               final DataStream<Integer> source = env.fromElements(1, 2, 
3).setParallelism(1);
+               final DataStream<Integer> forward = new DataStream<>(env, new 
PartitionTransformation<>(
+                       source.getTransformation(), new ForwardPartitioner<>(), 
ShuffleMode.PIPELINED));
+               forward.map(i -> i).startNewChain().setParallelism(1);
+               final StreamGraph streamGraph = env.getStreamGraph();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       /**
+        * Topology: source(parallelism=1) --(forward)--> map1(parallelism=1)
+        *           --(rescale)--> map2(parallelism=2) --(rebalance)--> 
sink(parallelism=2).
+        */
+       private StreamGraph createStreamGraphForGlobalDataExchangeModeTests() {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+               final DataStream<Integer> source = env.fromElements(1, 2, 
3).setParallelism(1);
+
+               final DataStream<Integer> forward = new DataStream<>(env, new 
PartitionTransformation<>(
+                       source.getTransformation(), new ForwardPartitioner<>(), 
ShuffleMode.UNDEFINED));
+               final DataStream<Integer> map1 = forward.map(i -> 
i).startNewChain().setParallelism(1);
+
+               final DataStream<Integer> rescale = new DataStream<>(env, new 
PartitionTransformation<>(
+                       map1.getTransformation(), new RescalePartitioner<>(), 
ShuffleMode.UNDEFINED));
+               final DataStream<Integer> map2 = rescale.map(i -> 
i).setParallelism(2);

Review comment:
       Is there a benefit compared to calling `map1.rescale().map(i -> 
i).setParallelism(2)` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to