This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 242987c75e519ae7b084b19226e598e9663de555 Author: ifndef-SleePy <[email protected]> AuthorDate: Mon Jan 23 01:00:27 2023 +0800 [FLINK-30755][runtime] Support SupportsConcurrentExecutionAttempts property of StreamGraph and JobGraph --- .../executiongraph/SpeculativeExecutionVertex.java | 4 ++ .../apache/flink/runtime/jobgraph/JobVertex.java | 15 ++++++ .../flink/streaming/api/graph/StreamGraph.java | 8 ++++ .../flink/streaming/api/graph/StreamNode.java | 11 +++++ .../api/graph/StreamingJobGraphGenerator.java | 26 +++++++++++ .../api/graph/StreamingJobGraphGeneratorTest.java | 53 ++++++++++++++++++++++ 6 files changed, 117 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java index 66d9625b103..48171a81d57 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java @@ -80,6 +80,10 @@ public class SpeculativeExecutionVertex extends ExecutionVertex { return getJobVertex().getJobVertex().containsSinks(); } + public boolean isSupportsConcurrentExecutionAttempts() { + return getJobVertex().getJobVertex().isSupportsConcurrentExecutionAttempts(); + } + public Execution createNewSpeculativeExecution(final long timestamp) { final Execution newExecution = createNewExecution(timestamp); getExecutionGraphAccessor().registerExecution(newExecution); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java index a28a3c43a88..eb322fe72bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java @@ -155,6 +155,12 @@ public class JobVertex implements java.io.Serializable { /** Indicates whether this job vertex contains sink operators. */ private boolean containsSinkOperators = false; + /** + * Indicates whether this job vertex supports multiple attempts of the same subtask executing at + * the same time. + */ + private boolean supportsConcurrentExecutionAttempts = true; + // -------------------------------------------------------------------------------------------- /** @@ -553,6 +559,15 @@ public class JobVertex implements java.io.Serializable { return containsSinkOperators; } + public void setSupportsConcurrentExecutionAttempts( + boolean supportsConcurrentExecutionAttempts) { + this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts; + } + + public boolean isSupportsConcurrentExecutionAttempts() { + return supportsConcurrentExecutionAttempts; + } + // -------------------------------------------------------------------------------------------- /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index ad7a91a9bf4..6b082ffd45d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -1066,4 +1066,12 @@ public class StreamGraph implements Pipeline { public List<JobStatusHook> getJobStatusHooks() { return this.jobStatusHooks; } + + public void setSupportsConcurrentExecutionAttempts( + Integer vertexId, boolean supportsConcurrentExecutionAttempts) { + final StreamNode streamNode = getStreamNode(vertexId); + if (streamNode != null) { + streamNode.setSupportsConcurrentExecutionAttempts(supportsConcurrentExecutionAttempts); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 68edc370b5d..6177c0fdbc1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -93,6 +93,8 @@ public class StreamNode { private @Nullable IntermediateDataSetID consumeClusterDatasetId; + private boolean supportsConcurrentExecutionAttempts = true; + @VisibleForTesting public StreamNode( Integer id, @@ -418,4 +420,13 @@ public class StreamNode { @Nullable IntermediateDataSetID consumeClusterDatasetId) { this.consumeClusterDatasetId = consumeClusterDatasetId; } + + public boolean isSupportsConcurrentExecutionAttempts() { + return supportsConcurrentExecutionAttempts; + } + + public void setSupportsConcurrentExecutionAttempts( + boolean supportsConcurrentExecutionAttempts) { + this.supportsConcurrentExecutionAttempts = supportsConcurrentExecutionAttempts; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 5161a2586bf..2e7b332cd9d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -240,6 +240,7 @@ public class StreamingJobGraphGenerator { setPhysicalEdges(); markContainsSourcesOrSinks(); + markSupportingConcurrentExecutionAttempts(); setSlotSharingAndCoLocation(); @@ -1410,6 +1411,31 @@ public class StreamingJobGraphGenerator { } } + private void markSupportingConcurrentExecutionAttempts() { + for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) { + final JobVertex jobVertex = entry.getValue(); + final Set<Integer> vertexOperators = new HashSet<>(); + vertexOperators.add(entry.getKey()); + final Map<Integer, StreamConfig> vertexChainedConfigs = + chainedConfigs.get(entry.getKey()); + if (vertexChainedConfigs != null) { + vertexOperators.addAll(vertexChainedConfigs.keySet()); + } + + // disable supportConcurrentExecutionAttempts of job vertex if there is any stream node + // does not support it + boolean supportConcurrentExecutionAttempts = true; + for (int nodeId : vertexOperators) { + final StreamNode streamNode = streamGraph.getStreamNode(nodeId); + if (!streamNode.isSupportsConcurrentExecutionAttempts()) { + supportConcurrentExecutionAttempts = false; + break; + } + } + jobVertex.setSupportsConcurrentExecutionAttempts(supportConcurrentExecutionAttempts); + } + } + private void setSlotSharingAndCoLocation() { setSlotSharing(); setCoLocation(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 23a481c4ff0..16f16084803 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -72,6 +72,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.PrintSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; @@ -132,6 +133,7 @@ import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.ar import static org.apache.flink.util.Preconditions.checkNotNull; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.InstanceOfAssertFactories.stream; /** Tests for {@link StreamingJobGraphGenerator}. */ @ExtendWith(TestLoggerExtension.class) @@ -1733,6 +1735,57 @@ class StreamingJobGraphGeneratorTest { .hasRootCauseMessage("This provider is not serializable."); } + @Test + void testSupportConcurrentExecutionAttempts() { + final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + + final DataStream<Integer> source = env.fromElements(1, 2, 3).name("source"); + // source -> (map1 -> map2) -> sink + source.rebalance() + .map(v -> v) + .name("map1") + .map(v -> v) + .name("map2") + .rebalance() + .sinkTo(new PrintSink<>()) + .name("sink"); + + final StreamGraph streamGraph = env.getStreamGraph(); + final List<StreamNode> streamNodes = + streamGraph.getStreamNodes().stream() + .sorted(Comparator.comparingInt(StreamNode::getId)) + .collect(Collectors.toList()); + + final StreamNode sourceNode = streamNodes.get(0); + final StreamNode map1Node = streamNodes.get(1); + final StreamNode map2Node = streamNodes.get(2); + final StreamNode sinkNode = streamNodes.get(3); + streamGraph.setSupportsConcurrentExecutionAttempts(sourceNode.getId(), true); + // map1 and map2 are chained + // map1 supports concurrent execution attempt however map2 does not + streamGraph.setSupportsConcurrentExecutionAttempts(map1Node.getId(), true); + streamGraph.setSupportsConcurrentExecutionAttempts(map2Node.getId(), false); + streamGraph.setSupportsConcurrentExecutionAttempts(sinkNode.getId(), false); + + final JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + assertThat(jobGraph.getNumberOfVertices()).isEqualTo(3); + for (JobVertex jobVertex : jobGraph.getVertices()) { + if (jobVertex.getName().contains("source")) { + assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isTrue(); + } else if (jobVertex.getName().contains("map")) { + // chained job vertex does not support concurrent execution attempt if any operator + // in chain does not support it + assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse(); + } else if (jobVertex.getName().contains("sink")) { + assertThat(jobVertex.isSupportsConcurrentExecutionAttempts()).isFalse(); + } else { + Assertions.fail("Unexpected job vertex " + jobVertex.getName()); + } + } + } + private static class SerializationTestOperatorFactory extends AbstractStreamOperatorFactory<Integer> implements CoordinatedOperatorFactory<Integer> {
