This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ff3cfb498e1 [FLINK-35766][streaming] Fix hang issue in JobGraph
generation when StreamGraph contains many YieldingOperatorFactory.
ff3cfb498e1 is described below
commit ff3cfb498e1987365320dc77fb0ec40d1573ee25
Author: xincheng.ljr <[email protected]>
AuthorDate: Mon Aug 12 18:15:31 2024 +0800
[FLINK-35766][streaming] Fix hang issue in JobGraph generation when
StreamGraph contains many YieldingOperatorFactory.
---
.../flink/streaming/api/graph/StreamGraph.java | 11 +++++++++
.../api/graph/StreamingJobGraphGenerator.java | 20 ++++++++++++----
.../api/graph/StreamingJobGraphGeneratorTest.java | 28 ++++++++++++++++++++++
3 files changed, 54 insertions(+), 5 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 2e5788cd1ec..fc794022bf8 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -137,6 +137,9 @@ public class StreamGraph implements Pipeline {
private boolean autoParallelismEnabled;
+ private final transient Map<StreamNode, StreamOperatorFactory<?>>
nodeToHeadOperatorCache =
+ new HashMap<>();
+
public StreamGraph(
Configuration jobConfiguration,
ExecutionConfig executionConfig,
@@ -176,6 +179,14 @@ public class StreamGraph implements Pipeline {
return checkpointConfig;
}
+ void cacheHeadOperatorForNode(StreamNode node, StreamOperatorFactory<?>
headOperator) {
+ nodeToHeadOperatorCache.put(node, headOperator);
+ }
+
+ StreamOperatorFactory<?> getHeadOperatorForNodeFromCache(StreamNode node) {
+ return nodeToHeadOperatorCache.get(node);
+ }
+
public void setSavepointRestoreSettings(SavepointRestoreSettings
savepointRestoreSettings) {
this.savepointRestoreSettings = savepointRestoreSettings;
}
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index cbe6513db5e..7b0a6cf90b4 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1652,12 +1652,22 @@ public class StreamingJobGraphGenerator {
/** Backtraces the head of an operator chain. */
private static StreamOperatorFactory<?> getHeadOperator(
StreamNode upStreamVertex, StreamGraph streamGraph) {
- if (upStreamVertex.getInEdges().size() == 1
- && isChainable(upStreamVertex.getInEdges().get(0),
streamGraph)) {
- return getHeadOperator(
-
streamGraph.getSourceVertex(upStreamVertex.getInEdges().get(0)), streamGraph);
+ if (streamGraph.getHeadOperatorForNodeFromCache(upStreamVertex) ==
null) {
+ if (upStreamVertex.getInEdges().size() == 1
+ && isChainable(upStreamVertex.getInEdges().get(0),
streamGraph)) {
+ StreamOperatorFactory<?> headOperator =
+ getHeadOperator(
+
streamGraph.getSourceVertex(upStreamVertex.getInEdges().get(0)),
+ streamGraph);
+ streamGraph.cacheHeadOperatorForNode(upStreamVertex,
headOperator);
+ } else {
+
Preconditions.checkNotNull(upStreamVertex.getOperatorFactory());
+ streamGraph.cacheHeadOperatorForNode(
+ upStreamVertex, upStreamVertex.getOperatorFactory());
+ }
}
- return Preconditions.checkNotNull(upStreamVertex.getOperatorFactory());
+
+ return streamGraph.getHeadOperatorForNodeFromCache(upStreamVertex);
}
private void markSupportingConcurrentExecutionAttempts() {
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index a1eb1d4de09..34085974363 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -139,6 +139,7 @@ import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Method;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -150,6 +151,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -1146,6 +1148,32 @@ class StreamingJobGraphGeneratorTest {
.isFalse();
}
+ @Test
+ void testJobGraphGenerationWithManyYieldingOperatorsDoesNotHang() {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
+
+ SingleOutputStreamOperator<Long> operator =
+ env.fromSource(
+ new NumberSequenceSource(0, 10),
+ WatermarkStrategy.noWatermarks(),
+ "input")
+ .map((x) -> x);
+
+ // add 40 YieldingOperatorFactory
+ for (int i = 0; i < 40; i++) {
+ operator =
+ operator.transform(
+ "test",
+ BasicTypeInfo.LONG_TYPE_INFO,
+ new YieldingTestOperatorFactory<>());
+ }
+
+ operator.sinkTo(new DiscardingSink<>());
+
+ assertThat(CompletableFuture.runAsync(() ->
env.getStreamGraph().getJobGraph()))
+ .succeedsWithin(Duration.ofMinutes(1));
+ }
+
@Test
void testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
StreamExecutionEnvironment chainEnv =
StreamExecutionEnvironment.createLocalEnvironment(1);