alexandru-tetelea commented on code in PR #21765:
URL: https://github.com/apache/flink/pull/21765#discussion_r1088736932
##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java:
##########
@@ -1733,6 +1745,176 @@ public void testCoordinatedSerializationException() {
.hasRootCauseMessage("This provider is not serializable.");
}
+ @Test
+ void testSupportConcurrentExecutionAttempts() {
+ final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(new
Configuration());
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+
+ final DataStream<Integer> source = env.fromElements(1, 2,
3).name("source");
+ // source -> (map1 -> map2) -> sink
+ source.rebalance()
+ .map(v -> v)
+ .name("map1")
+ .map(v -> v)
+ .name("map2")
+ .rebalance()
+ .sinkTo(new PrintSink<>())
+ .name("sink");
+
+ final StreamGraph streamGraph = env.getStreamGraph();
+ final List<StreamNode> streamNodes =
+ streamGraph.getStreamNodes().stream()
+ .sorted(Comparator.comparingInt(StreamNode::getId))
+ .collect(Collectors.toList());
+
+ final StreamNode sourceNode = streamNodes.get(0);
+ final StreamNode map1Node = streamNodes.get(1);
+ final StreamNode map2Node = streamNodes.get(2);
+ final StreamNode sinkNode = streamNodes.get(3);
+ streamGraph.setSupportsConcurrentExecutionAttempts(sourceNode.getId(),
true);
+ // map1 and map2 are chained
+ // map1 supports concurrent execution attempt however map2 does not
+ streamGraph.setSupportsConcurrentExecutionAttempts(map1Node.getId(),
true);
+ streamGraph.setSupportsConcurrentExecutionAttempts(map2Node.getId(),
false);
+ streamGraph.setSupportsConcurrentExecutionAttempts(sinkNode.getId(),
false);
+
+ final JobGraph jobGraph =
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+ assertThat(jobGraph.getNumberOfVertices()).isEqualTo(3);
+ for (JobVertex jobVertex : jobGraph.getVertices()) {
+ if (jobVertex.getName().contains("source")) {
Review Comment:
nit: What do you think about extracting these strings to constants?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]