This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 80c748f7b7f3058a197e984208d0f02eb25bc2a9
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Fri Dec 27 10:44:06 2024 +0800

    [FLINK-36850][streaming-java] Support IntermediateDataSetID generated from 
source JobVertexID and consumer stream node id.
---
 .../runtime/jobgraph/IntermediateDataSetID.java    |  4 ++
 .../api/graph/StreamingJobGraphGenerator.java      | 66 +++++++++++++++-------
 2 files changed, 49 insertions(+), 21 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java
index a79a17dbef6..051220653e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java
@@ -35,6 +35,10 @@ public class IntermediateDataSetID extends AbstractID 
implements ResultID {
         super();
     }
 
+    public IntermediateDataSetID(JobVertexID sourceId, int edgeId) {
+        super(sourceId.getUpperPart() + edgeId, sourceId.getLowerPart() + 
edgeId);
+    }
+
     /**
      * Creates a new intermediate data set ID with the bytes of the given ID.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 05a4ebbb3a3..fa032de5592 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1296,7 +1296,8 @@ public class StreamingJobGraphGenerator {
     }
 
     private static void setOperatorNonChainedOutputsConfig(
-            Integer vertexId,
+            JobVertexID jobVertexId,
+            Integer streamNodeId,
             StreamConfig config,
             List<StreamEdge> nonChainableOutputs,
             Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge,
@@ -1318,7 +1319,8 @@ public class StreamingJobGraphGenerator {
 
         List<NonChainedOutput> deduplicatedOutputs =
                 mayReuseNonChainedOutputs(
-                        vertexId,
+                        jobVertexId,
+                        streamNodeId,
                         nonChainableOutputs,
                         outputsConsumedByEdge,
                         jobVertexBuildContext);
@@ -1352,22 +1354,41 @@ public class StreamingJobGraphGenerator {
             final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs,
             JobVertexBuildContext jobVertexBuildContext) {
         // set non chainable output config
-        for (OperatorChainInfo chainInfo : 
jobVertexBuildContext.getChainInfosInOrder().values()) {
-            chainInfo
-                    .getOperatorInfos()
-                    .forEach(
-                            (vertexId, operatorInfo) -> {
-                                Map<StreamEdge, NonChainedOutput> 
outputsConsumedByEdge =
-                                        opIntermediateOutputs.computeIfAbsent(
-                                                vertexId, ignored -> new 
HashMap<>());
-                                setOperatorNonChainedOutputsConfig(
-                                        vertexId,
-                                        operatorInfo.getVertexConfig(),
-                                        operatorInfo.getNonChainableOutputs(),
-                                        outputsConsumedByEdge,
-                                        jobVertexBuildContext);
-                            });
-        }
+        jobVertexBuildContext
+                .getChainInfosInOrder()
+                .forEach(
+                        (startNodeId, chainInfo) -> {
+                            JobVertexID jobVertexId =
+                                    
jobVertexBuildContext.getJobVertex(startNodeId).getID();
+
+                            setOperatorNonChainedOutputsConfigs(
+                                    opIntermediateOutputs,
+                                    jobVertexBuildContext,
+                                    chainInfo,
+                                    jobVertexId);
+                        });
+    }
+
+    private static void setOperatorNonChainedOutputsConfigs(
+            Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs,
+            JobVertexBuildContext jobVertexBuildContext,
+            OperatorChainInfo chainInfo,
+            JobVertexID jobVertexId) {
+        chainInfo
+                .getOperatorInfos()
+                .forEach(
+                        (streamNodeId, operatorInfo) -> {
+                            Map<StreamEdge, NonChainedOutput> 
outputsConsumedByEdge =
+                                    opIntermediateOutputs.computeIfAbsent(
+                                            streamNodeId, ignored -> new 
HashMap<>());
+                            setOperatorNonChainedOutputsConfig(
+                                    jobVertexId,
+                                    streamNodeId,
+                                    operatorInfo.getVertexConfig(),
+                                    operatorInfo.getNonChainableOutputs(),
+                                    outputsConsumedByEdge,
+                                    jobVertexBuildContext);
+                        });
     }
 
     private void setAllVertexNonChainedOutputsConfigs(
@@ -1390,7 +1411,8 @@ public class StreamingJobGraphGenerator {
     }
 
     private static List<NonChainedOutput> mayReuseNonChainedOutputs(
-            int vertexId,
+            JobVertexID jobVertexId,
+            int streamNodeId,
             List<StreamEdge> consumerEdges,
             Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge,
             JobVertexBuildContext jobVertexBuildContext) {
@@ -1399,10 +1421,12 @@ public class StreamingJobGraphGenerator {
         }
         List<NonChainedOutput> outputs = new ArrayList<>(consumerEdges.size());
         for (StreamEdge consumerEdge : consumerEdges) {
-            checkState(vertexId == consumerEdge.getSourceId(), "Vertex id must 
be the same.");
+            checkState(
+                    streamNodeId == consumerEdge.getSourceId(), "stream node 
id must be the same.");
             ResultPartitionType partitionType =
                     getResultPartitionType(consumerEdge, 
jobVertexBuildContext);
-            IntermediateDataSetID dataSetId = new IntermediateDataSetID();
+            IntermediateDataSetID dataSetId =
+                    new IntermediateDataSetID(jobVertexId, 
consumerEdge.getEdgeId().hashCode());
 
             boolean isPersistentDataSet =
                     isPersistentIntermediateDataset(partitionType, 
consumerEdge);

Reply via email to