This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ff3cfb498e1 [FLINK-35766][streaming] Fix hang issue in JobGraph 
generation when StreamGraph contains many YieldingOperatorFactory.
ff3cfb498e1 is described below

commit ff3cfb498e1987365320dc77fb0ec40d1573ee25
Author: xincheng.ljr <[email protected]>
AuthorDate: Mon Aug 12 18:15:31 2024 +0800

    [FLINK-35766][streaming] Fix hang issue in JobGraph generation when 
StreamGraph contains many YieldingOperatorFactory.
---
 .../flink/streaming/api/graph/StreamGraph.java     | 11 +++++++++
 .../api/graph/StreamingJobGraphGenerator.java      | 20 ++++++++++++----
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 28 ++++++++++++++++++++++
 3 files changed, 54 insertions(+), 5 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 2e5788cd1ec..fc794022bf8 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -137,6 +137,9 @@ public class StreamGraph implements Pipeline {
 
     private boolean autoParallelismEnabled;
 
+    private final transient Map<StreamNode, StreamOperatorFactory<?>> 
nodeToHeadOperatorCache =
+            new HashMap<>();
+
     public StreamGraph(
             Configuration jobConfiguration,
             ExecutionConfig executionConfig,
@@ -176,6 +179,14 @@ public class StreamGraph implements Pipeline {
         return checkpointConfig;
     }
 
+    void cacheHeadOperatorForNode(StreamNode node, StreamOperatorFactory<?> 
headOperator) {
+        nodeToHeadOperatorCache.put(node, headOperator);
+    }
+
+    StreamOperatorFactory<?> getHeadOperatorForNodeFromCache(StreamNode node) {
+        return nodeToHeadOperatorCache.get(node);
+    }
+
     public void setSavepointRestoreSettings(SavepointRestoreSettings 
savepointRestoreSettings) {
         this.savepointRestoreSettings = savepointRestoreSettings;
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index cbe6513db5e..7b0a6cf90b4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1652,12 +1652,22 @@ public class StreamingJobGraphGenerator {
     /** Backtraces the head of an operator chain. */
     private static StreamOperatorFactory<?> getHeadOperator(
             StreamNode upStreamVertex, StreamGraph streamGraph) {
-        if (upStreamVertex.getInEdges().size() == 1
-                && isChainable(upStreamVertex.getInEdges().get(0), 
streamGraph)) {
-            return getHeadOperator(
-                    
streamGraph.getSourceVertex(upStreamVertex.getInEdges().get(0)), streamGraph);
+        if (streamGraph.getHeadOperatorForNodeFromCache(upStreamVertex) == 
null) {
+            if (upStreamVertex.getInEdges().size() == 1
+                    && isChainable(upStreamVertex.getInEdges().get(0), 
streamGraph)) {
+                StreamOperatorFactory<?> headOperator =
+                        getHeadOperator(
+                                
streamGraph.getSourceVertex(upStreamVertex.getInEdges().get(0)),
+                                streamGraph);
+                streamGraph.cacheHeadOperatorForNode(upStreamVertex, 
headOperator);
+            } else {
+                
Preconditions.checkNotNull(upStreamVertex.getOperatorFactory());
+                streamGraph.cacheHeadOperatorForNode(
+                        upStreamVertex, upStreamVertex.getOperatorFactory());
+            }
         }
-        return Preconditions.checkNotNull(upStreamVertex.getOperatorFactory());
+
+        return streamGraph.getHeadOperatorForNodeFromCache(upStreamVertex);
     }
 
     private void markSupportingConcurrentExecutionAttempts() {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index a1eb1d4de09..34085974363 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -139,6 +139,7 @@ import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.lang.reflect.Method;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -150,6 +151,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -1146,6 +1148,32 @@ class StreamingJobGraphGeneratorTest {
                 .isFalse();
     }
 
+    @Test
+    void testJobGraphGenerationWithManyYieldingOperatorsDoesNotHang() {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
+
+        SingleOutputStreamOperator<Long> operator =
+                env.fromSource(
+                                new NumberSequenceSource(0, 10),
+                                WatermarkStrategy.noWatermarks(),
+                                "input")
+                        .map((x) -> x);
+
+        // add 40 YieldingOperatorFactory
+        for (int i = 0; i < 40; i++) {
+            operator =
+                    operator.transform(
+                            "test",
+                            BasicTypeInfo.LONG_TYPE_INFO,
+                            new YieldingTestOperatorFactory<>());
+        }
+
+        operator.sinkTo(new DiscardingSink<>());
+
+        assertThat(CompletableFuture.runAsync(() -> 
env.getStreamGraph().getJobGraph()))
+                .succeedsWithin(Duration.ofMinutes(1));
+    }
+
     @Test
     void testYieldingOperatorChainableToTaskNotChainedToLegacySource() {
         StreamExecutionEnvironment chainEnv = 
StreamExecutionEnvironment.createLocalEnvironment(1);

Reply via email to