This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 80c748f7b7f3058a197e984208d0f02eb25bc2a9 Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Fri Dec 27 10:44:06 2024 +0800 [FLINK-36850][streaming-java] Support IntermediateDataSetID generated from source JobVertexID and consumer stream node id. --- .../runtime/jobgraph/IntermediateDataSetID.java | 4 ++ .../api/graph/StreamingJobGraphGenerator.java | 66 +++++++++++++++------- 2 files changed, 49 insertions(+), 21 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java index a79a17dbef6..051220653e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSetID.java @@ -35,6 +35,10 @@ public class IntermediateDataSetID extends AbstractID implements ResultID { super(); } + public IntermediateDataSetID(JobVertexID sourceId, int edgeId) { + super(sourceId.getUpperPart() + edgeId, sourceId.getLowerPart() + edgeId); + } + /** * Creates a new intermediate data set ID with the bytes of the given ID. * diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 05a4ebbb3a3..fa032de5592 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -1296,7 +1296,8 @@ public class StreamingJobGraphGenerator { } private static void setOperatorNonChainedOutputsConfig( - Integer vertexId, + JobVertexID jobVertexId, + Integer streamNodeId, StreamConfig config, List<StreamEdge> nonChainableOutputs, Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge, @@ -1318,7 +1319,8 @@ public class StreamingJobGraphGenerator { List<NonChainedOutput> deduplicatedOutputs = mayReuseNonChainedOutputs( - vertexId, + jobVertexId, + streamNodeId, nonChainableOutputs, outputsConsumedByEdge, jobVertexBuildContext); @@ -1352,22 +1354,41 @@ public class StreamingJobGraphGenerator { final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs, JobVertexBuildContext jobVertexBuildContext) { // set non chainable output config - for (OperatorChainInfo chainInfo : jobVertexBuildContext.getChainInfosInOrder().values()) { - chainInfo - .getOperatorInfos() - .forEach( - (vertexId, operatorInfo) -> { - Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge = - opIntermediateOutputs.computeIfAbsent( - vertexId, ignored -> new HashMap<>()); - setOperatorNonChainedOutputsConfig( - vertexId, - operatorInfo.getVertexConfig(), - operatorInfo.getNonChainableOutputs(), - outputsConsumedByEdge, - jobVertexBuildContext); - }); - } + jobVertexBuildContext + .getChainInfosInOrder() + .forEach( + (startNodeId, chainInfo) -> { + JobVertexID jobVertexId = + jobVertexBuildContext.getJobVertex(startNodeId).getID(); + + setOperatorNonChainedOutputsConfigs( + opIntermediateOutputs, + jobVertexBuildContext, + chainInfo, + jobVertexId); + }); + } + + private static void setOperatorNonChainedOutputsConfigs( + Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs, + JobVertexBuildContext jobVertexBuildContext, + OperatorChainInfo chainInfo, + JobVertexID jobVertexId) { + chainInfo + .getOperatorInfos() + .forEach( + (streamNodeId, operatorInfo) -> { + Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge = + opIntermediateOutputs.computeIfAbsent( + streamNodeId, ignored -> new HashMap<>()); + setOperatorNonChainedOutputsConfig( + jobVertexId, + streamNodeId, + operatorInfo.getVertexConfig(), + operatorInfo.getNonChainableOutputs(), + outputsConsumedByEdge, + jobVertexBuildContext); + }); } private void setAllVertexNonChainedOutputsConfigs( @@ -1390,7 +1411,8 @@ public class StreamingJobGraphGenerator { } private static List<NonChainedOutput> mayReuseNonChainedOutputs( - int vertexId, + JobVertexID jobVertexId, + int streamNodeId, List<StreamEdge> consumerEdges, Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge, JobVertexBuildContext jobVertexBuildContext) { @@ -1399,10 +1421,12 @@ public class StreamingJobGraphGenerator { } List<NonChainedOutput> outputs = new ArrayList<>(consumerEdges.size()); for (StreamEdge consumerEdge : consumerEdges) { - checkState(vertexId == consumerEdge.getSourceId(), "Vertex id must be the same."); + checkState( + streamNodeId == consumerEdge.getSourceId(), "stream node id must be the same."); ResultPartitionType partitionType = getResultPartitionType(consumerEdge, jobVertexBuildContext); - IntermediateDataSetID dataSetId = new IntermediateDataSetID(); + IntermediateDataSetID dataSetId = + new IntermediateDataSetID(jobVertexId, consumerEdge.getEdgeId().hashCode()); boolean isPersistentDataSet = isPersistentIntermediateDataset(partitionType, consumerEdge);