This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new caa03223697 [FLINK-36335][runtime] Improving Method Reusability in 
StreamGraphGenerator with JobVertexBuildContext
caa03223697 is described below

commit caa032236979bdd53cdbf78ff05cceb6ec2f4d93
Author: noorall <[email protected]>
AuthorDate: Thu Sep 5 17:02:22 2024 +0800

    [FLINK-36335][runtime] Improving Method Reusability in StreamGraphGenerator 
with JobVertexBuildContext
---
 .../api/graph/StreamingJobGraphGenerator.java      | 813 ++++++++++-----------
 .../api/graph/util/ChainedOperatorHashInfo.java    |  56 ++
 .../api/graph/util/ChainedSourceInfo.java          |  43 ++
 .../api/graph/util/JobVertexBuildContext.java      | 192 +++++
 .../api/graph/util/OperatorChainInfo.java          | 183 +++++
 .../streaming/api/graph/util/OperatorInfo.java     |  70 ++
 6 files changed, 932 insertions(+), 425 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 6e45fbe505e..3908820db92 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -41,7 +41,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
 import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
@@ -69,6 +68,11 @@ import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils;
 import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.graph.util.ChainedOperatorHashInfo;
+import org.apache.flink.streaming.api.graph.util.ChainedSourceInfo;
+import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext;
+import org.apache.flink.streaming.api.graph.util.OperatorChainInfo;
+import org.apache.flink.streaming.api.graph.util.OperatorInfo;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.InputSelectable;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
@@ -107,7 +111,6 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
-import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -120,6 +123,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -172,43 +176,16 @@ public class StreamingJobGraphGenerator {
     private final ClassLoader userClassloader;
     private final StreamGraph streamGraph;
 
-    private final Map<Integer, JobVertex> jobVertices;
     private final JobGraph jobGraph;
     private final Collection<Integer> builtVertices;
 
-    private final List<StreamEdge> physicalEdgesInOrder;
-
-    private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
-
-    private final Map<Integer, StreamConfig> vertexConfigs;
-    private final Map<Integer, String> chainedNames;
-
-    private final Map<Integer, ResourceSpec> chainedMinResources;
-    private final Map<Integer, ResourceSpec> chainedPreferredResources;
-
-    private final Map<Integer, InputOutputFormatContainer> 
chainedInputOutputFormats;
-
     private final StreamGraphHasher defaultStreamGraphHasher;
     private final List<StreamGraphHasher> legacyStreamGraphHashers;
 
-    private boolean hasHybridResultPartition = false;
-
     private final Executor serializationExecutor;
 
-    // Futures for the serialization of operator coordinators
-    private final Map<
-                    JobVertexID,
-                    
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>>
-            coordinatorSerializationFuturesPerJobVertex = new HashMap<>();
-
-    /** The {@link OperatorChainInfo}s, key is the start node id of the chain. 
*/
-    private final Map<Integer, OperatorChainInfo> chainInfos;
-
-    /**
-     * This is used to cache the non-chainable outputs, to set the 
non-chainable outputs config
-     * after all job vertices are created.
-     */
-    private final Map<Integer, List<StreamEdge>> opNonChainableOutputsCache;
+    // We save all the context needed to create the JobVertex in this 
structure.
+    private final JobVertexBuildContext jobVertexBuildContext;
 
     private StreamingJobGraphGenerator(
             ClassLoader userClassloader,
@@ -220,42 +197,36 @@ public class StreamingJobGraphGenerator {
         this.defaultStreamGraphHasher = new StreamGraphHasherV2();
         this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphUserHashHasher());
 
-        this.jobVertices = new LinkedHashMap<>();
         this.builtVertices = new HashSet<>();
-        this.chainedConfigs = new HashMap<>();
-        this.vertexConfigs = new HashMap<>();
-        this.chainedNames = new HashMap<>();
-        this.chainedMinResources = new HashMap<>();
-        this.chainedPreferredResources = new HashMap<>();
-        this.chainedInputOutputFormats = new HashMap<>();
-        this.physicalEdgesInOrder = new ArrayList<>();
         this.serializationExecutor = 
Preconditions.checkNotNull(serializationExecutor);
-        this.chainInfos = new HashMap<>();
-        this.opNonChainableOutputsCache = new LinkedHashMap<>();
-
         jobGraph = new JobGraph(jobID, streamGraph.getJobName());
-    }
-
-    private JobGraph createJobGraph() {
-        preValidate();
-        jobGraph.setJobType(streamGraph.getJobType());
-        jobGraph.setDynamic(streamGraph.isDynamic());
-
-        jobGraph.enableApproximateLocalRecovery(
-                
streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
 
         // Generate deterministic hashes for the nodes in order to identify 
them across
         // submission iff they didn't change.
-        Map<Integer, byte[]> hashes =
+        final Map<Integer, byte[]> hashes =
                 
defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
 
         // Generate legacy version hashes for backwards compatibility
-        List<Map<Integer, byte[]>> legacyHashes = new 
ArrayList<>(legacyStreamGraphHashers.size());
+        final List<Map<Integer, byte[]>> legacyHashes =
+                new ArrayList<>(legacyStreamGraphHashers.size());
         for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
             
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
         }
 
-        setChaining(hashes, legacyHashes);
+        this.jobVertexBuildContext =
+                new JobVertexBuildContext(
+                        streamGraph, new AtomicBoolean(false), hashes, 
legacyHashes);
+    }
+
+    private JobGraph createJobGraph() {
+        preValidate(streamGraph, userClassloader);
+        jobGraph.setJobType(streamGraph.getJobType());
+        jobGraph.setDynamic(streamGraph.isDynamic());
+
+        jobGraph.enableApproximateLocalRecovery(
+                
streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
+
+        setChaining();
 
         if (jobGraph.isDynamic()) {
             setVertexParallelismsForDynamicGraphIfNecessary();
@@ -266,25 +237,20 @@ public class StreamingJobGraphGenerator {
         // vertices and partition-reuse
         final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs =
                 new HashMap<>();
-        setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);
+        setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs, 
jobVertexBuildContext);
         setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);
 
-        setPhysicalEdges();
+        setPhysicalEdges(jobVertexBuildContext);
 
-        markSupportingConcurrentExecutionAttempts();
+        markSupportingConcurrentExecutionAttempts(jobVertexBuildContext);
 
-        validateHybridShuffleExecuteInBatchMode();
+        validateHybridShuffleExecuteInBatchMode(jobVertexBuildContext);
 
-        setSlotSharingAndCoLocation();
+        setSlotSharingAndCoLocation(jobGraph, jobVertexBuildContext);
 
-        setManagedMemoryFraction(
-                Collections.unmodifiableMap(jobVertices),
-                Collections.unmodifiableMap(vertexConfigs),
-                Collections.unmodifiableMap(chainedConfigs),
-                id -> 
streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
-                id -> 
streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
+        setManagedMemoryFraction(jobVertexBuildContext);
 
-        configureCheckpointing();
+        configureCheckpointing(streamGraph, jobGraph);
 
         
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
 
@@ -309,31 +275,41 @@ public class StreamingJobGraphGenerator {
         }
         jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());
 
-        addVertexIndexPrefixInVertexName();
+        addVertexIndexPrefixInVertexName(jobVertexBuildContext, new 
AtomicInteger(0), jobGraph);
 
-        setVertexDescription();
+        setVertexDescription(jobVertexBuildContext);
 
         // Wait for the serialization of operator coordinators and stream 
config.
+        serializeOperatorCoordinatorsAndStreamConfig(
+                jobGraph, serializationExecutor, jobVertexBuildContext);
+
+        if (!streamGraph.getJobStatusHooks().isEmpty()) {
+            jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
+        }
+
+        return jobGraph;
+    }
+
+    public static void serializeOperatorCoordinatorsAndStreamConfig(
+            JobGraph jobGraph,
+            Executor serializationExecutor,
+            JobVertexBuildContext jobVertexBuildContext) {
         try {
             FutureUtils.combineAll(
-                            vertexConfigs.values().stream()
+                            
jobVertexBuildContext.getOperatorInfos().values().stream()
                                     .map(
-                                            config ->
-                                                    
config.triggerSerializationAndReturnFuture(
-                                                            
serializationExecutor))
+                                            operatorInfo ->
+                                                    operatorInfo
+                                                            .getVertexConfig()
+                                                            
.triggerSerializationAndReturnFuture(
+                                                                    
serializationExecutor))
                                     .collect(Collectors.toList()))
                     .get();
 
-            waitForSerializationFuturesAndUpdateJobVertices();
+            waitForSerializationFuturesAndUpdateJobVertices(jobGraph, 
jobVertexBuildContext);
         } catch (Exception e) {
             throw new FlinkRuntimeException("Error in serialization.", e);
         }
-
-        if (!streamGraph.getJobStatusHooks().isEmpty()) {
-            jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
-        }
-
-        return jobGraph;
     }
 
     /**
@@ -347,12 +323,16 @@ public class StreamingJobGraphGenerator {
         return new 
OperatorID(StreamGraphHasherV2.generateUserSpecifiedHash(operatorUid));
     }
 
-    private void waitForSerializationFuturesAndUpdateJobVertices()
+    private static void waitForSerializationFuturesAndUpdateJobVertices(
+            JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext)
             throws ExecutionException, InterruptedException {
         for (Map.Entry<
                         JobVertexID,
                         
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>>
-                futuresPerJobVertex : 
coordinatorSerializationFuturesPerJobVertex.entrySet()) {
+                futuresPerJobVertex :
+                        jobVertexBuildContext
+                                
.getCoordinatorSerializationFuturesPerJobVertex()
+                                .entrySet()) {
             final JobVertexID jobVertexId = futuresPerJobVertex.getKey();
             final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
 
@@ -366,32 +346,46 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private void addVertexIndexPrefixInVertexName() {
-        if (!streamGraph.isVertexNameIncludeIndexPrefix()) {
+    public static void addVertexIndexPrefixInVertexName(
+            JobVertexBuildContext jobVertexBuildContext,
+            AtomicInteger vertexIndexId,
+            JobGraph jobGraph) {
+        if 
(!jobVertexBuildContext.getStreamGraph().isVertexNameIncludeIndexPrefix()) {
             return;
         }
-        final AtomicInteger vertexIndexId = new AtomicInteger(0);
+        Set<JobVertexID> jobVertexIds =
+                jobVertexBuildContext.getJobVerticesInOrder().values().stream()
+                        .map(JobVertex::getID)
+                        .collect(Collectors.toSet());
+        // JobVertexBuildContext only contains incrementally generated 
jobVertex instances. The
+        // setName method needs to set the names based on the topological 
order of all jobVertices,
+        // so it relies on the job graph to compute a difference set.
         jobGraph.getVerticesSortedTopologicallyFromSources()
                 .forEach(
-                        vertex ->
+                        vertex -> {
+                            if (jobVertexIds.contains(vertex.getID())) {
                                 vertex.setName(
                                         String.format(
                                                 "[vertex-%d]%s",
-                                                
vertexIndexId.getAndIncrement(),
-                                                vertex.getName())));
+                                                
vertexIndexId.getAndIncrement(), vertex.getName()));
+                            }
+                        });
     }
 
-    private void setVertexDescription() {
+    public static void setVertexDescription(JobVertexBuildContext 
jobVertexBuildContext) {
+        final Map<Integer, JobVertex> jobVertices = 
jobVertexBuildContext.getJobVerticesInOrder();
+        final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
         for (Map.Entry<Integer, JobVertex> headOpAndJobVertex : 
jobVertices.entrySet()) {
             Integer headOpId = headOpAndJobVertex.getKey();
             JobVertex vertex = headOpAndJobVertex.getValue();
             StringBuilder builder = new StringBuilder();
             switch (streamGraph.getVertexDescriptionMode()) {
                 case CASCADING:
-                    buildCascadingDescription(builder, headOpId, headOpId);
+                    buildCascadingDescription(builder, headOpId, headOpId, 
jobVertexBuildContext);
                     break;
                 case TREE:
-                    buildTreeDescription(builder, headOpId, headOpId, "", 
true);
+                    buildTreeDescription(
+                            builder, headOpId, headOpId, "", true, 
jobVertexBuildContext);
                     break;
                 default:
                     throw new IllegalArgumentException(
@@ -403,11 +397,16 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private void buildCascadingDescription(StringBuilder builder, int 
headOpId, int currentOpId) {
-        StreamNode node = streamGraph.getStreamNode(currentOpId);
-        builder.append(getDescriptionWithChainedSourcesInfo(node));
+    private static void buildCascadingDescription(
+            StringBuilder builder,
+            int headOpId,
+            int currentOpId,
+            JobVertexBuildContext jobVertexBuildContext) {
+        StreamNode node = 
jobVertexBuildContext.getStreamGraph().getStreamNode(currentOpId);
+        builder.append(getDescriptionWithChainedSourcesInfo(node, 
jobVertexBuildContext));
 
-        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        LinkedList<Integer> chainedOutput =
+                getChainedOutputNodes(headOpId, node, jobVertexBuildContext);
         if (chainedOutput.isEmpty()) {
             return;
         }
@@ -419,7 +418,7 @@ public class StreamingJobGraphGenerator {
         }
         while (true) {
             Integer outputId = chainedOutput.pollFirst();
-            buildCascadingDescription(builder, headOpId, outputId);
+            buildCascadingDescription(builder, headOpId, outputId, 
jobVertexBuildContext);
             if (chainedOutput.isEmpty()) {
                 break;
             }
@@ -430,8 +429,11 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private LinkedList<Integer> getChainedOutputNodes(int headOpId, StreamNode 
node) {
+    private static LinkedList<Integer> getChainedOutputNodes(
+            int headOpId, StreamNode node, JobVertexBuildContext 
jobVertexBuildContext) {
         LinkedList<Integer> chainedOutput = new LinkedList<>();
+        Map<Integer, Map<Integer, StreamConfig>> chainedConfigs =
+                jobVertexBuildContext.getChainedConfigs();
         if (chainedConfigs.containsKey(headOpId)) {
             for (StreamEdge edge : node.getOutEdges()) {
                 int targetId = edge.getTargetId();
@@ -443,8 +445,13 @@ public class StreamingJobGraphGenerator {
         return chainedOutput;
     }
 
-    private void buildTreeDescription(
-            StringBuilder builder, int headOpId, int currentOpId, String 
prefix, boolean isLast) {
+    private static void buildTreeDescription(
+            StringBuilder builder,
+            int headOpId,
+            int currentOpId,
+            String prefix,
+            boolean isLast,
+            JobVertexBuildContext jobVertexBuildContext) {
         // Replace the '-' in prefix of current node with ' ', keep ':'
         // HeadNode
         // :- Node1
@@ -465,21 +472,33 @@ public class StreamingJobGraphGenerator {
             }
         }
 
-        StreamNode node = streamGraph.getStreamNode(currentOpId);
+        StreamNode node = 
jobVertexBuildContext.getStreamGraph().getStreamNode(currentOpId);
         builder.append(currentNodePrefix);
-        builder.append(getDescriptionWithChainedSourcesInfo(node));
+        builder.append(getDescriptionWithChainedSourcesInfo(node, 
jobVertexBuildContext));
         builder.append("\n");
 
-        LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId, 
node);
+        LinkedList<Integer> chainedOutput =
+                getChainedOutputNodes(headOpId, node, jobVertexBuildContext);
         while (!chainedOutput.isEmpty()) {
             Integer outputId = chainedOutput.pollFirst();
-            buildTreeDescription(builder, headOpId, outputId, childPrefix, 
chainedOutput.isEmpty());
+            buildTreeDescription(
+                    builder,
+                    headOpId,
+                    outputId,
+                    childPrefix,
+                    chainedOutput.isEmpty(),
+                    jobVertexBuildContext);
         }
     }
 
-    private String getDescriptionWithChainedSourcesInfo(StreamNode node) {
+    private static String getDescriptionWithChainedSourcesInfo(
+            StreamNode node, JobVertexBuildContext jobVertexBuildContext) {
 
         List<StreamNode> chainedSources;
+        final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs =
+                jobVertexBuildContext.getChainedConfigs();
+        final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
+
         if (!chainedConfigs.containsKey(node.getId())) {
             // node is not head operator of a vertex
             chainedSources = Collections.emptyList();
@@ -507,7 +526,7 @@ public class StreamingJobGraphGenerator {
     }
 
     @SuppressWarnings("deprecation")
-    private void preValidate() {
+    public static void preValidate(StreamGraph streamGraph, ClassLoader 
userClassloader) {
         CheckpointConfig checkpointConfig = streamGraph.getCheckpointConfig();
 
         if (checkpointConfig.isCheckpointingEnabled()) {
@@ -522,7 +541,8 @@ public class StreamingJobGraphGenerator {
             }
             if (checkpointConfig.isUnalignedCheckpointsEnabled()
                     && !checkpointConfig.isForceUnalignedCheckpoints()
-                    && 
streamGraph.getStreamNodes().stream().anyMatch(this::hasCustomPartitioner)) {
+                    && streamGraph.getStreamNodes().stream()
+                            
.anyMatch(StreamingJobGraphGenerator::hasCustomPartitioner)) {
                 throw new UnsupportedOperationException(
                         "Unaligned checkpoints are currently not supported for 
custom partitioners, "
                                 + "as rescaling is not guaranteed to work 
correctly."
@@ -551,16 +571,16 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private boolean hasCustomPartitioner(StreamNode node) {
+    private static boolean hasCustomPartitioner(StreamNode node) {
         return node.getOutEdges().stream()
                 .anyMatch(edge -> edge.getPartitioner() instanceof 
CustomPartitionerWrapper);
     }
 
-    private void setPhysicalEdges() {
+    public static void setPhysicalEdges(JobVertexBuildContext 
jobVertexBuildContext) {
         Map<Integer, List<StreamEdge>> physicalInEdgesInOrder =
                 new HashMap<Integer, List<StreamEdge>>();
 
-        for (StreamEdge edge : physicalEdgesInOrder) {
+        for (StreamEdge edge : 
jobVertexBuildContext.getPhysicalEdgesInOrder()) {
             int target = edge.getTargetId();
 
             List<StreamEdge> inEdges =
@@ -573,12 +593,14 @@ public class StreamingJobGraphGenerator {
             int vertex = inEdges.getKey();
             List<StreamEdge> edgeList = inEdges.getValue();
 
-            vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
+            jobVertexBuildContext
+                    .getOperatorInfo(vertex)
+                    .getVertexConfig()
+                    .setInPhysicalEdges(edgeList);
         }
     }
 
-    private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(
-            final Map<Integer, byte[]> hashes, final List<Map<Integer, 
byte[]>> legacyHashes) {
+    private Map<Integer, OperatorChainInfo> 
buildChainedInputsAndGetHeadInputs() {
 
         final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();
         final Map<Integer, OperatorChainInfo> chainEntryPoints = new 
HashMap<>();
@@ -598,14 +620,22 @@ public class StreamingJobGraphGenerator {
 
                 if (targetChainingStrategy == 
ChainingStrategy.HEAD_WITH_SOURCES
                         && isChainableInput(sourceOutEdge, streamGraph)) {
-                    final OperatorID opId = new 
OperatorID(hashes.get(sourceNodeId));
+                    final OperatorInfo operatorInfo =
+                            
jobVertexBuildContext.createAndGetOperatorInfo(sourceNodeId);
+                    final OperatorID opId =
+                            new 
OperatorID(jobVertexBuildContext.getHash(sourceNodeId));
                     final StreamConfig.SourceInputConfig inputConfig =
                             new StreamConfig.SourceInputConfig(sourceOutEdge);
                     final StreamConfig operatorConfig = new StreamConfig(new 
Configuration());
-                    setOperatorConfig(sourceNodeId, operatorConfig, 
Collections.emptyMap());
-                    setOperatorChainedOutputsConfig(operatorConfig, 
Collections.emptyList());
+                    setOperatorConfig(
+                            sourceNodeId,
+                            operatorConfig,
+                            Collections.emptyMap(),
+                            jobVertexBuildContext);
+                    setOperatorChainedOutputsConfig(
+                            operatorConfig, Collections.emptyList(), 
jobVertexBuildContext);
                     // we cache the non-chainable outputs here, and set the 
non-chained config later
-                    opNonChainableOutputsCache.put(sourceNodeId, 
Collections.emptyList());
+                    
operatorInfo.addNonChainableOutputs(Collections.emptyList());
 
                     operatorConfig.setChainIndex(0); // sources are always 
first
                     operatorConfig.setOperatorID(opId);
@@ -624,8 +654,6 @@ public class StreamingJobGraphGenerator {
                                     (k) ->
                                             new OperatorChainInfo(
                                                     
sourceOutEdge.getTargetId(),
-                                                    hashes,
-                                                    legacyHashes,
                                                     chainedSources,
                                                     streamGraph));
                     chainInfo.addCoordinatorProvider(coord);
@@ -635,9 +663,7 @@ public class StreamingJobGraphGenerator {
             }
 
             chainEntryPoints.put(
-                    sourceNodeId,
-                    new OperatorChainInfo(
-                            sourceNodeId, hashes, legacyHashes, 
chainedSources, streamGraph));
+                    sourceNodeId, new OperatorChainInfo(sourceNodeId, 
chainedSources, streamGraph));
         }
 
         return chainEntryPoints;
@@ -648,11 +674,11 @@ public class StreamingJobGraphGenerator {
      *
      * <p>This will recursively create all {@link JobVertex} instances.
      */
-    private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, 
byte[]>> legacyHashes) {
+    private void setChaining() {
         // we separate out the sources that run as inputs to another operator 
(chained inputs)
         // from the sources that needs to run as the main (head) operator.
         final Map<Integer, OperatorChainInfo> chainEntryPoints =
-                buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
+                buildChainedInputsAndGetHeadInputs();
         final Collection<OperatorChainInfo> initialEntryPoints =
                 chainEntryPoints.entrySet().stream()
                         .sorted(Comparator.comparing(Map.Entry::getKey))
@@ -732,64 +758,76 @@ public class StreamingJobGraphGenerator {
                         chainEntryPoints);
             }
 
-            chainedNames.put(
+            chainInfo.addChainedName(
                     currentNodeId,
                     createChainedName(
                             currentNodeId,
                             chainableOutputs,
-                            
Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
-            chainedMinResources.put(
-                    currentNodeId, createChainedMinResources(currentNodeId, 
chainableOutputs));
-            chainedPreferredResources.put(
+                            
Optional.ofNullable(chainEntryPoints.get(currentNodeId)),
+                            chainInfo.getChainedNames(),
+                            jobVertexBuildContext));
+
+            chainInfo.addChainedMinResources(
                     currentNodeId,
-                    createChainedPreferredResources(currentNodeId, 
chainableOutputs));
+                    createChainedMinResources(
+                            currentNodeId, chainableOutputs, chainInfo, 
jobVertexBuildContext));
+
+            chainInfo.addChainedPreferredResources(
+                    currentNodeId,
+                    createChainedPreferredResources(
+                            currentNodeId, chainableOutputs, chainInfo, 
jobVertexBuildContext));
 
             OperatorID currentOperatorId =
                     chainInfo.addNodeToChain(
                             currentNodeId,
-                            
streamGraph.getStreamNode(currentNodeId).getOperatorName());
+                            
streamGraph.getStreamNode(currentNodeId).getOperatorName(),
+                            jobVertexBuildContext);
 
             if (currentNode.getInputFormat() != null) {
-                getOrCreateFormatContainer(startNodeId)
+                chainInfo
+                        .getOrCreateFormatContainer()
                         .addInputFormat(currentOperatorId, 
currentNode.getInputFormat());
             }
 
             if (currentNode.getOutputFormat() != null) {
-                getOrCreateFormatContainer(startNodeId)
+                chainInfo
+                        .getOrCreateFormatContainer()
                         .addOutputFormat(currentOperatorId, 
currentNode.getOutputFormat());
             }
+            OperatorInfo operatorInfo =
+                    
jobVertexBuildContext.createAndGetOperatorInfo(currentNodeId);
 
             StreamConfig config =
                     currentNodeId.equals(startNodeId)
                             ? createJobVertex(startNodeId, chainInfo)
                             : new StreamConfig(new Configuration());
 
-            tryConvertPartitionerForDynamicGraph(chainableOutputs, 
nonChainableOutputs);
+            tryConvertPartitionerForDynamicGraph(
+                    chainableOutputs, nonChainableOutputs, 
jobVertexBuildContext);
             config.setAttribute(currentNodeAttribute);
-            setOperatorConfig(currentNodeId, config, 
chainInfo.getChainedSources());
-
-            setOperatorChainedOutputsConfig(config, chainableOutputs);
+            setOperatorConfig(
+                    currentNodeId, config, chainInfo.getChainedSources(), 
jobVertexBuildContext);
 
+            setOperatorChainedOutputsConfig(config, chainableOutputs, 
jobVertexBuildContext);
             // we cache the non-chainable outputs here, and set the 
non-chained config later
-            opNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);
+            operatorInfo.addNonChainableOutputs(nonChainableOutputs);
 
             if (currentNodeId.equals(startNodeId)) {
                 chainInfo.setTransitiveOutEdges(transitiveOutEdges);
-                chainInfos.put(startNodeId, chainInfo);
+                jobVertexBuildContext.addChainInfo(startNodeId, chainInfo);
 
                 config.setChainStart();
                 config.setChainIndex(chainIndex);
                 
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
-                
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
-
+                config.setTransitiveChainedTaskConfigs(
+                        
jobVertexBuildContext.getChainedConfigs().get(startNodeId));
             } else {
-                chainedConfigs.computeIfAbsent(
-                        startNodeId, k -> new HashMap<Integer, 
StreamConfig>());
-
                 config.setChainIndex(chainIndex);
                 StreamNode node = streamGraph.getStreamNode(currentNodeId);
                 config.setOperatorName(node.getOperatorName());
-                chainedConfigs.get(startNodeId).put(currentNodeId, config);
+                jobVertexBuildContext
+                        .getOrCreateChainedConfig(startNodeId)
+                        .put(currentNodeId, config);
             }
 
             config.setOperatorID(currentOperatorId);
@@ -814,6 +852,7 @@ public class StreamingJobGraphGenerator {
      * parallelism at early stage if possible, to avoid invalid partition 
reuse.
      */
     private void setVertexParallelismsForDynamicGraphIfNecessary() {
+        final Map<Integer, JobVertex> jobVertices = 
jobVertexBuildContext.getJobVerticesInOrder();
         // Note that the jobVertices are reverse topological order
         final List<JobVertex> topologicalOrderVertices =
                 
IterableUtils.toStream(jobVertices.values()).collect(Collectors.toList());
@@ -822,7 +861,8 @@ public class StreamingJobGraphGenerator {
         // reset parallelism for job vertices whose parallelism is not 
configured
         jobVertices.forEach(
                 (startNodeId, jobVertex) -> {
-                    final OperatorChainInfo chainInfo = 
chainInfos.get(startNodeId);
+                    final OperatorChainInfo chainInfo =
+                            jobVertexBuildContext.getChainInfo(startNodeId);
                     if (!jobVertex.isParallelismConfigured()
                             && streamGraph.isAutoParallelismEnabled()) {
                         
jobVertex.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
@@ -840,7 +880,8 @@ public class StreamingJobGraphGenerator {
         jobVertices.forEach(
                 (startNodeId, jobVertex) -> {
                     Set<JobVertex> forwardConsumers =
-                            
chainInfos.get(startNodeId).getTransitiveOutEdges().stream()
+                            
jobVertexBuildContext.getChainInfo(startNodeId).getTransitiveOutEdges()
+                                    .stream()
                                     .filter(
                                             edge ->
                                                     edge.getPartitioner()
@@ -877,8 +918,8 @@ public class StreamingJobGraphGenerator {
                     if (forwardGroup != null && 
forwardGroup.isParallelismDecided()) {
                         
jobVertex.setParallelism(forwardGroup.getParallelism());
                         jobVertex.setParallelismConfigured(true);
-                        chainInfos
-                                .get(startNodeId)
+                        jobVertexBuildContext
+                                .getChainInfo(startNodeId)
                                 .getAllChainedNodes()
                                 .forEach(
                                         streamNode ->
@@ -889,8 +930,8 @@ public class StreamingJobGraphGenerator {
                     // set max parallelism for vertices in forward group
                     if (forwardGroup != null && 
forwardGroup.isMaxParallelismDecided()) {
                         
jobVertex.setMaxParallelism(forwardGroup.getMaxParallelism());
-                        chainInfos
-                                .get(startNodeId)
+                        jobVertexBuildContext
+                                .getChainInfo(startNodeId)
                                 .getAllChainedNodes()
                                 .forEach(
                                         streamNode ->
@@ -900,7 +941,8 @@ public class StreamingJobGraphGenerator {
                 });
     }
 
-    private void checkAndReplaceReusableHybridPartitionType(NonChainedOutput 
reusableOutput) {
+    private static void checkAndReplaceReusableHybridPartitionType(
+            NonChainedOutput reusableOutput) {
         if (reusableOutput.getPartitionType() == 
ResultPartitionType.HYBRID_SELECTIVE) {
             // for can be reused hybrid output, it can be optimized to always 
use full
             // spilling strategy to significantly reduce shuffle data writing 
cost.
@@ -913,21 +955,19 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private InputOutputFormatContainer getOrCreateFormatContainer(Integer 
startNodeId) {
-        return chainedInputOutputFormats.computeIfAbsent(
-                startNodeId,
-                k ->
-                        new InputOutputFormatContainer(
-                                
Thread.currentThread().getContextClassLoader()));
-    }
-
-    private String createChainedName(
+    public static String createChainedName(
             Integer vertexID,
             List<StreamEdge> chainedOutputs,
-            Optional<OperatorChainInfo> operatorChainInfo) {
+            Optional<OperatorChainInfo> operatorChainInfo,
+            Map<Integer, String> chainedNames,
+            JobVertexBuildContext jobVertexBuildContext) {
+        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
         List<ChainedSourceInfo> chainedSourceInfos =
                 operatorChainInfo
-                        .map(chainInfo -> 
getChainedSourcesByVertexId(vertexID, chainInfo))
+                        .map(
+                                chainInfo ->
+                                        getChainedSourcesByVertexId(
+                                                vertexID, chainInfo, 
streamGraph))
                         .orElse(Collections.emptyList());
         final String operatorName =
                 nameWithChainedSourcesInfo(
@@ -945,31 +985,44 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private List<ChainedSourceInfo> getChainedSourcesByVertexId(
-            Integer vertexId, OperatorChainInfo chainInfo) {
+    private static List<ChainedSourceInfo> getChainedSourcesByVertexId(
+            Integer vertexId, OperatorChainInfo chainInfo, StreamGraph 
streamGraph) {
         return streamGraph.getStreamNode(vertexId).getInEdges().stream()
                 .map(inEdge -> 
chainInfo.getChainedSources().get(inEdge.getSourceId()))
                 .filter(Objects::nonNull)
                 .collect(Collectors.toList());
     }
 
-    private ResourceSpec createChainedMinResources(
-            Integer vertexID, List<StreamEdge> chainedOutputs) {
-        ResourceSpec minResources = 
streamGraph.getStreamNode(vertexID).getMinResources();
+    public static ResourceSpec createChainedMinResources(
+            Integer vertexID,
+            List<StreamEdge> chainedOutputs,
+            OperatorChainInfo operatorChainInfo,
+            JobVertexBuildContext jobVertexBuildContext) {
+        ResourceSpec minResources =
+                
jobVertexBuildContext.getStreamGraph().getStreamNode(vertexID).getMinResources();
         for (StreamEdge chainable : chainedOutputs) {
-            minResources = 
minResources.merge(chainedMinResources.get(chainable.getTargetId()));
+            minResources =
+                    minResources.merge(
+                            
operatorChainInfo.getChainedMinResources(chainable.getTargetId()));
         }
         return minResources;
     }
 
-    private ResourceSpec createChainedPreferredResources(
-            Integer vertexID, List<StreamEdge> chainedOutputs) {
+    public static ResourceSpec createChainedPreferredResources(
+            Integer vertexID,
+            List<StreamEdge> chainedOutputs,
+            OperatorChainInfo operatorChainInfo,
+            JobVertexBuildContext jobVertexBuildContext) {
         ResourceSpec preferredResources =
-                streamGraph.getStreamNode(vertexID).getPreferredResources();
+                jobVertexBuildContext
+                        .getStreamGraph()
+                        .getStreamNode(vertexID)
+                        .getPreferredResources();
         for (StreamEdge chainable : chainedOutputs) {
             preferredResources =
                     preferredResources.merge(
-                            
chainedPreferredResources.get(chainable.getTargetId()));
+                            operatorChainInfo.getChainedPreferredResources(
+                                    chainable.getTargetId()));
         }
         return preferredResources;
     }
@@ -979,7 +1032,7 @@ public class StreamingJobGraphGenerator {
         JobVertex jobVertex;
         StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
 
-        byte[] hash = chainInfo.getHash(streamNodeId);
+        byte[] hash = jobVertexBuildContext.getHash(streamNodeId);
 
         if (hash == null) {
             throw new IllegalStateException(
@@ -995,28 +1048,29 @@ public class StreamingJobGraphGenerator {
         if (chainedOperators != null) {
             for (ChainedOperatorHashInfo chainedOperator : chainedOperators) {
                 OperatorID userDefinedOperatorID =
-                        chainedOperator.getUserDefinedOperatorID() == null
+                        chainedOperator.getUserDefinedOperatorId() == null
                                 ? null
-                                : new 
OperatorID(chainedOperator.getUserDefinedOperatorID());
+                                : new 
OperatorID(chainedOperator.getUserDefinedOperatorId());
                 operatorIDPairs.add(
                         OperatorIDPair.of(
-                                new 
OperatorID(chainedOperator.getGeneratedOperatorID()),
+                                new 
OperatorID(chainedOperator.getGeneratedOperatorId()),
                                 userDefinedOperatorID,
                                 
chainedOperator.getStreamNode().getOperatorName(),
                                 
chainedOperator.getStreamNode().getTransformationUID()));
             }
         }
 
-        if (chainedInputOutputFormats.containsKey(streamNodeId)) {
+        if (chainInfo.hasFormatContainer()) {
             jobVertex =
                     new InputOutputFormatVertex(
-                            chainedNames.get(streamNodeId), jobVertexId, 
operatorIDPairs);
-
-            chainedInputOutputFormats
-                    .get(streamNodeId)
+                            chainInfo.getChainedName(streamNodeId), 
jobVertexId, operatorIDPairs);
+            chainInfo
+                    .getOrCreateFormatContainer()
                     .write(new TaskConfig(jobVertex.getConfiguration()));
         } else {
-            jobVertex = new JobVertex(chainedNames.get(streamNodeId), 
jobVertexId, operatorIDPairs);
+            jobVertex =
+                    new JobVertex(
+                            chainInfo.getChainedName(streamNodeId), 
jobVertexId, operatorIDPairs);
         }
 
         if (streamNode.getConsumeClusterDatasetId() != null) {
@@ -1036,18 +1090,20 @@ public class StreamingJobGraphGenerator {
                                     throw new FlinkRuntimeException(
                                             String.format(
                                                     "Coordinator Provider for 
node %s is not serializable.",
-                                                    
chainedNames.get(streamNodeId)),
+                                                    
chainInfo.getChainedName(streamNodeId)),
                                             e);
                                 }
                             },
                             serializationExecutor));
         }
         if (!serializationFutures.isEmpty()) {
-            coordinatorSerializationFuturesPerJobVertex.put(jobVertexId, 
serializationFutures);
+            jobVertexBuildContext.putCoordinatorSerializationFutures(
+                    jobVertexId, serializationFutures);
         }
 
         jobVertex.setResources(
-                chainedMinResources.get(streamNodeId), 
chainedPreferredResources.get(streamNodeId));
+                chainInfo.getChainedMinResources(streamNodeId),
+                chainInfo.getChainedPreferredResources(streamNodeId));
 
         jobVertex.setInvokableClass(streamNode.getJobVertexClass());
 
@@ -1065,7 +1121,7 @@ public class StreamingJobGraphGenerator {
             LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
         }
 
-        jobVertices.put(streamNodeId, jobVertex);
+        jobVertexBuildContext.addJobVertex(streamNodeId, jobVertex);
         builtVertices.add(streamNodeId);
         jobGraph.addVertex(jobVertex);
 
@@ -1076,9 +1132,13 @@ public class StreamingJobGraphGenerator {
         return new StreamConfig(jobVertex.getConfiguration());
     }
 
-    private void setOperatorConfig(
-            Integer vertexId, StreamConfig config, Map<Integer, 
ChainedSourceInfo> chainedSources) {
-
+    public static void setOperatorConfig(
+            Integer vertexId,
+            StreamConfig config,
+            Map<Integer, ChainedSourceInfo> chainedSources,
+            JobVertexBuildContext jobVertexBuildContext) {
+        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
+        OperatorInfo operatorInfo = 
jobVertexBuildContext.getOperatorInfo(vertexId);
         StreamNode vertex = streamGraph.getStreamNode(vertexId);
 
         config.setVertexID(vertexId);
@@ -1106,8 +1166,8 @@ public class StreamingJobGraphGenerator {
                             "Trying to union a chained source with another 
input.");
                 }
                 inputConfigs[inputIndex] = chainedSource.getInputConfig();
-                chainedConfigs
-                        .computeIfAbsent(vertexId, (key) -> new HashMap<>())
+                jobVertexBuildContext
+                        .getOrCreateChainedConfig(vertexId)
                         .put(inEdge.getSourceId(), 
chainedSource.getOperatorConfig());
             } else {
                 // network input. null if we move to a new input, non-null if 
this is a further edge
@@ -1172,11 +1232,13 @@ public class StreamingJobGraphGenerator {
             config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexId));
         }
 
-        vertexConfigs.put(vertexId, config);
+        operatorInfo.setVertexConfig(config);
     }
 
-    private void setOperatorChainedOutputsConfig(
-            StreamConfig config, List<StreamEdge> chainableOutputs) {
+    public static void setOperatorChainedOutputsConfig(
+            StreamConfig config,
+            List<StreamEdge> chainableOutputs,
+            JobVertexBuildContext jobVertexBuildContext) {
         // iterate edges, find sideOutput edges create and save serializers 
for each outputTag type
         for (StreamEdge edge : chainableOutputs) {
             if (edge.getOutputTag() != null) {
@@ -1185,17 +1247,21 @@ public class StreamingJobGraphGenerator {
                         edge.getOutputTag()
                                 .getTypeInfo()
                                 .createSerializer(
-                                        
streamGraph.getExecutionConfig().getSerializerConfig()));
+                                        jobVertexBuildContext
+                                                .getStreamGraph()
+                                                .getExecutionConfig()
+                                                .getSerializerConfig()));
             }
         }
         config.setChainedOutputs(chainableOutputs);
     }
 
-    private void setOperatorNonChainedOutputsConfig(
+    private static void setOperatorNonChainedOutputsConfig(
             Integer vertexId,
             StreamConfig config,
             List<StreamEdge> nonChainableOutputs,
-            Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge) {
+            Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge,
+            JobVertexBuildContext jobVertexBuildContext) {
         // iterate edges, find sideOutput edges create and save serializers 
for each outputTag type
         for (StreamEdge edge : nonChainableOutputs) {
             if (edge.getOutputTag() != null) {
@@ -1204,12 +1270,19 @@ public class StreamingJobGraphGenerator {
                         edge.getOutputTag()
                                 .getTypeInfo()
                                 .createSerializer(
-                                        
streamGraph.getExecutionConfig().getSerializerConfig()));
+                                        jobVertexBuildContext
+                                                .getStreamGraph()
+                                                .getExecutionConfig()
+                                                .getSerializerConfig()));
             }
         }
 
         List<NonChainedOutput> deduplicatedOutputs =
-                mayReuseNonChainedOutputs(vertexId, nonChainableOutputs, 
outputsConsumedByEdge);
+                mayReuseNonChainedOutputs(
+                        vertexId,
+                        nonChainableOutputs,
+                        outputsConsumedByEdge,
+                        jobVertexBuildContext);
         config.setNumberOfOutputs(deduplicatedOutputs.size());
         config.setOperatorNonChainedOutputs(deduplicatedOutputs);
     }
@@ -1224,52 +1297,68 @@ public class StreamingJobGraphGenerator {
         for (StreamEdge edge : transitiveOutEdges) {
             NonChainedOutput output = 
opIntermediateOutputs.get(edge.getSourceId()).get(edge);
             transitiveOutputs.add(output);
-            connect(startNodeId, edge, output);
+            connect(
+                    startNodeId,
+                    edge,
+                    output,
+                    jobVertexBuildContext.getJobVerticesInOrder(),
+                    jobVertexBuildContext);
         }
 
         config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs));
     }
 
-    private void setAllOperatorNonChainedOutputsConfigs(
-            final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs) {
+    public static void setAllOperatorNonChainedOutputsConfigs(
+            final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs,
+            JobVertexBuildContext jobVertexBuildContext) {
         // set non chainable output config
-        opNonChainableOutputsCache.forEach(
-                (vertexId, nonChainableOutputs) -> {
-                    Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge =
-                            opIntermediateOutputs.computeIfAbsent(
-                                    vertexId, ignored -> new HashMap<>());
-                    setOperatorNonChainedOutputsConfig(
-                            vertexId,
-                            vertexConfigs.get(vertexId),
-                            nonChainableOutputs,
-                            outputsConsumedByEdge);
-                });
+        jobVertexBuildContext
+                .getOperatorInfos()
+                .forEach(
+                        (vertexId, operatorInfo) -> {
+                            Map<StreamEdge, NonChainedOutput> 
outputsConsumedByEdge =
+                                    opIntermediateOutputs.computeIfAbsent(
+                                            vertexId, ignored -> new 
HashMap<>());
+                            setOperatorNonChainedOutputsConfig(
+                                    vertexId,
+                                    operatorInfo.getVertexConfig(),
+                                    operatorInfo.getNonChainableOutputs(),
+                                    outputsConsumedByEdge,
+                                    jobVertexBuildContext);
+                        });
     }
 
     private void setAllVertexNonChainedOutputsConfigs(
             final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs) {
-        jobVertices
+        jobVertexBuildContext
+                .getJobVerticesInOrder()
                 .keySet()
                 .forEach(
                         startNodeId ->
                                 setVertexNonChainedOutputsConfig(
                                         startNodeId,
-                                        vertexConfigs.get(startNodeId),
-                                        
chainInfos.get(startNodeId).getTransitiveOutEdges(),
+                                        jobVertexBuildContext
+                                                .getOperatorInfo(startNodeId)
+                                                .getVertexConfig(),
+                                        jobVertexBuildContext
+                                                .getChainInfo(startNodeId)
+                                                .getTransitiveOutEdges(),
                                         opIntermediateOutputs));
     }
 
-    private List<NonChainedOutput> mayReuseNonChainedOutputs(
+    private static List<NonChainedOutput> mayReuseNonChainedOutputs(
             int vertexId,
             List<StreamEdge> consumerEdges,
-            Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge) {
+            Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge,
+            JobVertexBuildContext jobVertexBuildContext) {
         if (consumerEdges.isEmpty()) {
             return new ArrayList<>();
         }
         List<NonChainedOutput> outputs = new ArrayList<>(consumerEdges.size());
         for (StreamEdge consumerEdge : consumerEdges) {
             checkState(vertexId == consumerEdge.getSourceId(), "Vertex id must 
be the same.");
-            ResultPartitionType partitionType = 
getResultPartitionType(consumerEdge);
+            ResultPartitionType partitionType =
+                    getResultPartitionType(consumerEdge, 
jobVertexBuildContext);
             IntermediateDataSetID dataSetId = new IntermediateDataSetID();
 
             boolean isPersistentDataSet =
@@ -1280,7 +1369,7 @@ public class StreamingJobGraphGenerator {
             }
 
             if (partitionType.isHybridResultPartition()) {
-                hasHybridResultPartition = true;
+                jobVertexBuildContext.setHasHybridResultPartition(true);
                 if (consumerEdge.getPartitioner().isBroadcast()
                         && partitionType == 
ResultPartitionType.HYBRID_SELECTIVE) {
                     // for broadcast result partition, it can be optimized to 
always use full
@@ -1300,18 +1389,20 @@ public class StreamingJobGraphGenerator {
                     consumerEdge,
                     isPersistentDataSet,
                     dataSetId,
-                    partitionType);
+                    partitionType,
+                    jobVertexBuildContext.getStreamGraph());
         }
         return outputs;
     }
 
-    private void createOrReuseOutput(
+    private static void createOrReuseOutput(
             List<NonChainedOutput> outputs,
             Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge,
             StreamEdge consumerEdge,
             boolean isPersistentDataSet,
             IntermediateDataSetID dataSetId,
-            ResultPartitionType partitionType) {
+            ResultPartitionType partitionType,
+            StreamGraph streamGraph) {
         int consumerParallelism =
                 
streamGraph.getStreamNode(consumerEdge.getTargetId()).getParallelism();
         int consumerMaxParallelism =
@@ -1360,23 +1451,25 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private boolean isPartitionTypeCanBeReuse(ResultPartitionType 
partitionType) {
+    private static boolean isPartitionTypeCanBeReuse(ResultPartitionType 
partitionType) {
         // for non-hybrid partition, partition reuse only works when its 
re-consumable.
         // for hybrid selective partition, it still has the opportunity to be 
converted to
         // hybrid full partition to support partition reuse.
         return partitionType.isReconsumable() || 
partitionType.isHybridResultPartition();
     }
 
-    private boolean allHybridOrSameReconsumablePartitionType(
+    private static boolean allHybridOrSameReconsumablePartitionType(
             ResultPartitionType partitionType1, ResultPartitionType 
partitionType2) {
         return (partitionType1.isReconsumable() && partitionType1 == 
partitionType2)
                 || (partitionType1.isHybridResultPartition()
                         && partitionType2.isHybridResultPartition());
     }
 
-    private void tryConvertPartitionerForDynamicGraph(
-            List<StreamEdge> chainableOutputs, List<StreamEdge> 
nonChainableOutputs) {
-
+    public static void tryConvertPartitionerForDynamicGraph(
+            List<StreamEdge> chainableOutputs,
+            List<StreamEdge> nonChainableOutputs,
+            JobVertexBuildContext jobVertexBuildContext) {
+        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
         for (StreamEdge edge : chainableOutputs) {
             StreamPartitioner<?> partitioner = edge.getPartitioner();
             if (partitioner instanceof ForwardForConsecutiveHashPartitioner
@@ -1407,7 +1500,7 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private CheckpointingMode getCheckpointingMode(CheckpointConfig 
checkpointConfig) {
+    private static CheckpointingMode getCheckpointingMode(CheckpointConfig 
checkpointConfig) {
         CheckpointingMode checkpointingMode = 
checkpointConfig.getCheckpointingConsistencyMode();
 
         checkArgument(
@@ -1425,9 +1518,14 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private void connect(Integer headOfChain, StreamEdge edge, 
NonChainedOutput output) {
+    public static void connect(
+            Integer headOfChain,
+            StreamEdge edge,
+            NonChainedOutput output,
+            Map<Integer, JobVertex> jobVertices,
+            JobVertexBuildContext jobVertexBuildContext) {
 
-        physicalEdgesInOrder.add(edge);
+        jobVertexBuildContext.addPhysicalEdgesInOrder(edge);
 
         Integer downStreamVertexID = edge.getTargetId();
 
@@ -1477,13 +1575,13 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private boolean isPersistentIntermediateDataset(
+    private static boolean isPersistentIntermediateDataset(
             ResultPartitionType resultPartitionType, StreamEdge edge) {
         return 
resultPartitionType.isBlockingOrBlockingPersistentResultPartition()
                 && edge.getIntermediateDatasetIdToProduce() != null;
     }
 
-    private void checkBufferTimeout(ResultPartitionType type, StreamEdge edge) 
{
+    private static void checkBufferTimeout(ResultPartitionType type, 
StreamEdge edge) {
         long bufferTimeout = edge.getBufferTimeout();
         if (!type.canBePipelinedConsumed()
                 && bufferTimeout != 
ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT) {
@@ -1496,7 +1594,8 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private ResultPartitionType getResultPartitionType(StreamEdge edge) {
+    private static ResultPartitionType getResultPartitionType(
+            StreamEdge edge, JobVertexBuildContext jobVertexBuildContext) {
         switch (edge.getExchangeMode()) {
             case PIPELINED:
                 return ResultPartitionType.PIPELINED_BOUNDED;
@@ -1507,14 +1606,16 @@ public class StreamingJobGraphGenerator {
             case HYBRID_SELECTIVE:
                 return ResultPartitionType.HYBRID_SELECTIVE;
             case UNDEFINED:
-                return determineUndefinedResultPartitionType(edge);
+                return determineUndefinedResultPartitionType(edge, 
jobVertexBuildContext);
             default:
                 throw new UnsupportedOperationException(
                         "Data exchange mode " + edge.getExchangeMode() + " is 
not supported yet.");
         }
     }
 
-    private ResultPartitionType 
determineUndefinedResultPartitionType(StreamEdge edge) {
+    public static ResultPartitionType determineUndefinedResultPartitionType(
+            StreamEdge edge, JobVertexBuildContext jobVertexBuildContext) {
+        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
         Attribute sourceNodeAttribute =
                 streamGraph.getStreamNode(edge.getSourceId()).getAttribute();
         if (sourceNodeAttribute.isNoOutputUntilEndOfInput()) {
@@ -1559,7 +1660,7 @@ public class StreamingJobGraphGenerator {
         return downStreamVertex.getInEdges().size() == 1 && 
isChainableInput(edge, streamGraph);
     }
 
-    private static boolean isChainableInput(StreamEdge edge, StreamGraph 
streamGraph) {
+    public static boolean isChainableInput(StreamEdge edge, StreamGraph 
streamGraph) {
         StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
         StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
 
@@ -1683,13 +1784,17 @@ public class StreamingJobGraphGenerator {
         return streamGraph.getHeadOperatorForNodeFromCache(upStreamVertex);
     }
 
-    private void markSupportingConcurrentExecutionAttempts() {
+    public static void markSupportingConcurrentExecutionAttempts(
+            JobVertexBuildContext jobVertexBuildContext) {
+        final Map<Integer, JobVertex> jobVertices = 
jobVertexBuildContext.getJobVerticesInOrder();
+        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
+
         for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
             final JobVertex jobVertex = entry.getValue();
             final Set<Integer> vertexOperators = new HashSet<>();
             vertexOperators.add(entry.getKey());
             final Map<Integer, StreamConfig> vertexChainedConfigs =
-                    chainedConfigs.get(entry.getKey());
+                    
jobVertexBuildContext.getChainedConfigs().get(entry.getKey());
             if (vertexChainedConfigs != null) {
                 vertexOperators.addAll(vertexChainedConfigs.keySet());
             }
@@ -1708,15 +1813,19 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private void setSlotSharingAndCoLocation() {
-        setSlotSharing();
-        setCoLocation();
+    public static void setSlotSharingAndCoLocation(
+            JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) {
+        setSlotSharing(jobGraph, jobVertexBuildContext);
+        setCoLocation(jobVertexBuildContext);
     }
 
-    private void setSlotSharing() {
+    private static void setSlotSharing(
+            JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) {
+        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
         final Map<String, SlotSharingGroup> specifiedSlotSharingGroups = new 
HashMap<>();
         final Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups 
=
-                buildVertexRegionSlotSharingGroups();
+                buildVertexRegionSlotSharingGroups(jobGraph, 
jobVertexBuildContext);
+        final Map<Integer, JobVertex> jobVertices = 
jobVertexBuildContext.getJobVerticesInOrder();
 
         for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
 
@@ -1733,7 +1842,7 @@ public class StreamingJobGraphGenerator {
                         
checkNotNull(vertexRegionSlotSharingGroups.get(vertex.getID()));
             } else {
                 checkState(
-                        !hasHybridResultPartition,
+                        !jobVertexBuildContext.hasHybridResultPartition(),
                         "hybrid shuffle mode currently does not support 
setting non-default slot sharing group.");
 
                 effectiveSlotSharingGroup =
@@ -1752,10 +1861,11 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private void validateHybridShuffleExecuteInBatchMode() {
-        if (hasHybridResultPartition) {
+    public static void validateHybridShuffleExecuteInBatchMode(
+            JobVertexBuildContext jobVertexBuildContext) {
+        if (jobVertexBuildContext.hasHybridResultPartition()) {
             checkState(
-                    jobGraph.getJobType() == JobType.BATCH,
+                    jobVertexBuildContext.getStreamGraph().getJobType() == 
JobType.BATCH,
                     "hybrid shuffle mode only supports batch job, please set 
%s to %s",
                     ExecutionOptions.RUNTIME_MODE.key(),
                     RuntimeExecutionMode.BATCH.name());
@@ -1767,7 +1877,9 @@ public class StreamingJobGraphGenerator {
      * StreamGraph#isAllVerticesInSameSlotSharingGroupByDefault()} returns 
true, all regions will be
      * in the same slot sharing group.
      */
-    private Map<JobVertexID, SlotSharingGroup> 
buildVertexRegionSlotSharingGroups() {
+    private static Map<JobVertexID, SlotSharingGroup> 
buildVertexRegionSlotSharingGroups(
+            JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) {
+        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
         final Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups 
= new HashMap<>();
         final SlotSharingGroup defaultSlotSharingGroup = new 
SlotSharingGroup();
         streamGraph
@@ -1799,9 +1911,11 @@ public class StreamingJobGraphGenerator {
         return vertexRegionSlotSharingGroups;
     }
 
-    private void setCoLocation() {
+    private static void setCoLocation(JobVertexBuildContext 
jobVertexBuildContext) {
         final Map<String, Tuple2<SlotSharingGroup, CoLocationGroupImpl>> 
coLocationGroups =
                 new HashMap<>();
+        final Map<Integer, JobVertex> jobVertices = 
jobVertexBuildContext.getJobVerticesInOrder();
+        final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
 
         for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
 
@@ -1833,14 +1947,9 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private static void setManagedMemoryFraction(
-            final Map<Integer, JobVertex> jobVertices,
-            final Map<Integer, StreamConfig> operatorConfigs,
-            final Map<Integer, Map<Integer, StreamConfig>> 
vertexChainedConfigs,
-            final java.util.function.Function<Integer, 
Map<ManagedMemoryUseCase, Integer>>
-                    operatorScopeManagedMemoryUseCaseWeightsRetriever,
-            final java.util.function.Function<Integer, 
Set<ManagedMemoryUseCase>>
-                    slotScopeManagedMemoryUseCasesRetriever) {
+    public static void setManagedMemoryFraction(JobVertexBuildContext 
jobVertexBuildContext) {
+
+        final Map<Integer, JobVertex> jobVertices = 
jobVertexBuildContext.getJobVerticesInOrder();
 
         // all slot sharing groups in this job
         final Set<SlotSharingGroup> slotSharingGroups =
@@ -1868,7 +1977,8 @@ public class StreamingJobGraphGenerator {
             final Set<Integer> operatorIds = new HashSet<>();
             operatorIds.add(headOperatorId);
             operatorIds.addAll(
-                    vertexChainedConfigs
+                    jobVertexBuildContext
+                            .getChainedConfigs()
                             .getOrDefault(headOperatorId, 
Collections.emptyMap())
                             .keySet());
             vertexOperators.put(jobVertex.getID(), operatorIds);
@@ -1876,13 +1986,7 @@ public class StreamingJobGraphGenerator {
 
         for (SlotSharingGroup slotSharingGroup : slotSharingGroups) {
             setManagedMemoryFractionForSlotSharingGroup(
-                    slotSharingGroup,
-                    vertexHeadOperators,
-                    vertexOperators,
-                    operatorConfigs,
-                    vertexChainedConfigs,
-                    operatorScopeManagedMemoryUseCaseWeightsRetriever,
-                    slotScopeManagedMemoryUseCasesRetriever);
+                    slotSharingGroup, vertexHeadOperators, vertexOperators, 
jobVertexBuildContext);
         }
     }
 
@@ -1890,13 +1994,10 @@ public class StreamingJobGraphGenerator {
             final SlotSharingGroup slotSharingGroup,
             final Map<JobVertexID, Integer> vertexHeadOperators,
             final Map<JobVertexID, Set<Integer>> vertexOperators,
-            final Map<Integer, StreamConfig> operatorConfigs,
-            final Map<Integer, Map<Integer, StreamConfig>> 
vertexChainedConfigs,
-            final java.util.function.Function<Integer, 
Map<ManagedMemoryUseCase, Integer>>
-                    operatorScopeManagedMemoryUseCaseWeightsRetriever,
-            final java.util.function.Function<Integer, 
Set<ManagedMemoryUseCase>>
-                    slotScopeManagedMemoryUseCasesRetriever) {
-
+            final JobVertexBuildContext jobVertexBuildContext) {
+        final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
+        final Map<Integer, Map<Integer, StreamConfig>> vertexChainedConfigs =
+                jobVertexBuildContext.getChainedConfigs();
         final Set<Integer> groupOperatorIds =
                 slotSharingGroup.getJobVertexIds().stream()
                         .flatMap((vid) -> vertexOperators.get(vid).stream())
@@ -1906,7 +2007,8 @@ public class StreamingJobGraphGenerator {
                 groupOperatorIds.stream()
                         .flatMap(
                                 (oid) ->
-                                        
operatorScopeManagedMemoryUseCaseWeightsRetriever.apply(oid)
+                                        streamGraph.getStreamNode(oid)
+                                                
.getManagedMemoryOperatorScopeUseCaseWeights()
                                                 .entrySet().stream())
                         .collect(
                                 Collectors.groupingBy(
@@ -1917,16 +2019,22 @@ public class StreamingJobGraphGenerator {
                 groupOperatorIds.stream()
                         .flatMap(
                                 (oid) ->
-                                        
slotScopeManagedMemoryUseCasesRetriever.apply(oid).stream())
+                                        streamGraph.getStreamNode(oid)
+                                                
.getManagedMemorySlotScopeUseCases().stream())
                         .collect(Collectors.toSet());
 
         for (JobVertexID jobVertexID : slotSharingGroup.getJobVertexIds()) {
             for (int operatorNodeId : vertexOperators.get(jobVertexID)) {
-                final StreamConfig operatorConfig = 
operatorConfigs.get(operatorNodeId);
+                final StreamConfig operatorConfig =
+                        
jobVertexBuildContext.getOperatorInfo(operatorNodeId).getVertexConfig();
                 final Map<ManagedMemoryUseCase, Integer> 
operatorScopeUseCaseWeights =
-                        
operatorScopeManagedMemoryUseCaseWeightsRetriever.apply(operatorNodeId);
+                        streamGraph
+                                .getStreamNode(operatorNodeId)
+                                .getManagedMemoryOperatorScopeUseCaseWeights();
                 final Set<ManagedMemoryUseCase> slotScopeUseCases =
-                        
slotScopeManagedMemoryUseCasesRetriever.apply(operatorNodeId);
+                        streamGraph
+                                .getStreamNode(operatorNodeId)
+                                .getManagedMemorySlotScopeUseCases();
                 setManagedMemoryFractionForOperator(
                         operatorScopeUseCaseWeights,
                         slotScopeUseCases,
@@ -1937,7 +2045,8 @@ public class StreamingJobGraphGenerator {
 
             // need to refresh the chained task configs because they are 
serialized
             final int headOperatorNodeId = 
vertexHeadOperators.get(jobVertexID);
-            final StreamConfig vertexConfig = 
operatorConfigs.get(headOperatorNodeId);
+            final StreamConfig vertexConfig =
+                    
jobVertexBuildContext.getOperatorInfo(headOperatorNodeId).getVertexConfig();
             vertexConfig.setTransitiveChainedTaskConfigs(
                     vertexChainedConfigs.get(headOperatorNodeId));
         }
@@ -1970,7 +2079,7 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    private void configureCheckpointing() {
+    public static void configureCheckpointing(StreamGraph streamGraph, 
JobGraph jobGraph) {
         CheckpointConfig cfg = streamGraph.getCheckpointConfig();
 
         long interval = cfg.getCheckpointInterval();
@@ -2108,150 +2217,4 @@ public class StreamingJobGraphGenerator {
                                                         .getOperatorName())
                                 .collect(Collectors.joining(", ")));
     }
-
-    /**
-     * A private class to help maintain the information of an operator chain 
during the recursive
-     * call in {@link #createChain(Integer, int, OperatorChainInfo, Map)}.
-     */
-    private static class OperatorChainInfo {
-        private final Integer startNodeId;
-        private final Map<Integer, byte[]> hashes;
-        private final List<Map<Integer, byte[]>> legacyHashes;
-        private final Map<Integer, List<ChainedOperatorHashInfo>> 
chainedOperatorHashes;
-        private final Map<Integer, ChainedSourceInfo> chainedSources;
-        private final List<OperatorCoordinator.Provider> coordinatorProviders;
-        private final StreamGraph streamGraph;
-        private final List<StreamNode> chainedNodes;
-        private final List<StreamEdge> transitiveOutEdges;
-
-        private OperatorChainInfo(
-                int startNodeId,
-                Map<Integer, byte[]> hashes,
-                List<Map<Integer, byte[]>> legacyHashes,
-                Map<Integer, ChainedSourceInfo> chainedSources,
-                StreamGraph streamGraph) {
-            this.startNodeId = startNodeId;
-            this.hashes = hashes;
-            this.legacyHashes = legacyHashes;
-            this.chainedOperatorHashes = new HashMap<>();
-            this.coordinatorProviders = new ArrayList<>();
-            this.chainedSources = chainedSources;
-            this.streamGraph = streamGraph;
-            this.chainedNodes = new ArrayList<>();
-            this.transitiveOutEdges = new ArrayList<>();
-        }
-
-        byte[] getHash(Integer streamNodeId) {
-            return hashes.get(streamNodeId);
-        }
-
-        private Integer getStartNodeId() {
-            return startNodeId;
-        }
-
-        private List<ChainedOperatorHashInfo> getChainedOperatorHashes(int 
startNodeId) {
-            return chainedOperatorHashes.get(startNodeId);
-        }
-
-        void addCoordinatorProvider(OperatorCoordinator.Provider coordinator) {
-            coordinatorProviders.add(coordinator);
-        }
-
-        private List<OperatorCoordinator.Provider> getCoordinatorProviders() {
-            return coordinatorProviders;
-        }
-
-        Map<Integer, ChainedSourceInfo> getChainedSources() {
-            return chainedSources;
-        }
-
-        private OperatorID addNodeToChain(int currentNodeId, String 
operatorName) {
-            recordChainedNode(currentNodeId);
-            StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
-
-            List<ChainedOperatorHashInfo> operatorHashes =
-                    chainedOperatorHashes.computeIfAbsent(startNodeId, k -> 
new ArrayList<>());
-
-            byte[] primaryHashBytes = hashes.get(currentNodeId);
-
-            for (Map<Integer, byte[]> legacyHash : legacyHashes) {
-                operatorHashes.add(
-                        new ChainedOperatorHashInfo(
-                                primaryHashBytes, 
legacyHash.get(currentNodeId), streamNode));
-            }
-
-            streamNode
-                    .getCoordinatorProvider(operatorName, new 
OperatorID(getHash(currentNodeId)))
-                    .map(coordinatorProviders::add);
-
-            return new OperatorID(primaryHashBytes);
-        }
-
-        private void setTransitiveOutEdges(final List<StreamEdge> 
transitiveOutEdges) {
-            this.transitiveOutEdges.addAll(transitiveOutEdges);
-        }
-
-        private List<StreamEdge> getTransitiveOutEdges() {
-            return transitiveOutEdges;
-        }
-
-        private void recordChainedNode(int currentNodeId) {
-            StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
-            chainedNodes.add(streamNode);
-        }
-
-        private OperatorChainInfo newChain(Integer startNodeId) {
-            return new OperatorChainInfo(
-                    startNodeId, hashes, legacyHashes, chainedSources, 
streamGraph);
-        }
-
-        private List<StreamNode> getAllChainedNodes() {
-            return chainedNodes;
-        }
-    }
-
-    private static final class ChainedOperatorHashInfo {
-        private final byte[] generatedOperatorID;
-        private final byte[] userDefinedOperatorID;
-        private final StreamNode streamNode;
-
-        ChainedOperatorHashInfo(
-                final byte[] generatedOperatorID,
-                final byte[] userDefinedOperatorID,
-                final StreamNode streamNode) {
-            this.generatedOperatorID = generatedOperatorID;
-            this.userDefinedOperatorID = userDefinedOperatorID;
-            this.streamNode = streamNode;
-        }
-
-        public byte[] getGeneratedOperatorID() {
-            return generatedOperatorID;
-        }
-
-        public byte[] getUserDefinedOperatorID() {
-            return userDefinedOperatorID;
-        }
-
-        public StreamNode getStreamNode() {
-            return streamNode;
-        }
-    }
-
-    private static final class ChainedSourceInfo {
-        private final StreamConfig operatorConfig;
-        private final StreamConfig.SourceInputConfig inputConfig;
-
-        ChainedSourceInfo(StreamConfig operatorConfig, 
StreamConfig.SourceInputConfig inputConfig) {
-            this.operatorConfig = operatorConfig;
-            this.inputConfig = inputConfig;
-        }
-
-        public StreamConfig getOperatorConfig() {
-            return operatorConfig;
-        }
-
-        public StreamConfig.SourceInputConfig getInputConfig() {
-            return inputConfig;
-        }
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedOperatorHashInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedOperatorHashInfo.java
new file mode 100644
index 00000000000..57848680f6a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedOperatorHashInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.graph.StreamNode;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Helper class to help maintain the hash info of an operator chain. */
+@Internal
+public final class ChainedOperatorHashInfo {
+    private final byte[] generatedOperatorId;
+    private final byte[] userDefinedOperatorId;
+    private final StreamNode streamNode;
+
+    ChainedOperatorHashInfo(
+            final byte[] generatedOperatorId,
+            @Nullable final byte[] userDefinedOperatorId,
+            final StreamNode streamNode) {
+        this.generatedOperatorId = checkNotNull(generatedOperatorId);
+        this.userDefinedOperatorId = userDefinedOperatorId;
+        this.streamNode = checkNotNull(streamNode);
+    }
+
+    public byte[] getGeneratedOperatorId() {
+        return generatedOperatorId;
+    }
+
+    @Nullable
+    public byte[] getUserDefinedOperatorId() {
+        return userDefinedOperatorId;
+    }
+
+    public StreamNode getStreamNode() {
+        return streamNode;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedSourceInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedSourceInfo.java
new file mode 100644
index 00000000000..af3afb22fc0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedSourceInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/** Helper class to help maintain the chained source info of an operator 
chain. */
+@Internal
+public final class ChainedSourceInfo {
+    private final StreamConfig operatorConfig;
+    private final StreamConfig.SourceInputConfig inputConfig;
+
+    public ChainedSourceInfo(
+            StreamConfig operatorConfig, StreamConfig.SourceInputConfig 
inputConfig) {
+        this.operatorConfig = operatorConfig;
+        this.inputConfig = inputConfig;
+    }
+
+    public StreamConfig getOperatorConfig() {
+        return operatorConfig;
+    }
+
+    public StreamConfig.SourceInputConfig getInputConfig() {
+        return inputConfig;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java
new file mode 100644
index 00000000000..d6ef01424e3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Helper class encapsulates all necessary information and configurations 
required during the
+ * construction of job vertices.
+ */
+@Internal
+public class JobVertexBuildContext {
+
+    private final StreamGraph streamGraph;
+
+    /**
+     * The {@link OperatorChainInfo}s, key is the start node id of the chain. 
It should be ordered,
+     * as the implementation of the incremental generator relies on its order 
to create the
+     * JobVertex.
+     */
+    private final Map<Integer, OperatorChainInfo> chainInfosInOrder;
+
+    /** The {@link OperatorInfo}s, key is the id of the stream node. */
+    private final Map<Integer, OperatorInfo> operatorInfos;
+
+    // This map's key represents the starting node id of each chain. Note that 
this includes not
+    // only the usual head node of the chain but also the ids of chain sources 
which are used by
+    // multi-input.
+    private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
+
+    // The created JobVertex, the key is start node id.  It records the order 
in which the JobVertex
+    // is created, and some functions depend on it.
+    private final Map<Integer, JobVertex> jobVerticesInOrder;
+
+    // Futures for the serialization of operator coordinators.
+    private final Map<
+                    JobVertexID,
+                    
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>>
+            coordinatorSerializationFuturesPerJobVertex;
+
+    // The order of StreamEdge connected to other vertices should be 
consistent with the order in
+    // which JobEdge was created.
+    private final List<StreamEdge> physicalEdgesInOrder;
+
+    // We use AtomicBoolean to track the existence of HybridResultPartition 
during the incremental
+    // JobGraph generation process introduced by AdaptiveGraphManager. It is 
essential to globally
+    // monitor changes to this variable, thus necessitating the use of a 
Boolean object instead of a
+    // primitive boolean.
+    private final AtomicBoolean hasHybridResultPartition;
+
+    private final Map<Integer, byte[]> hashes;
+
+    private final List<Map<Integer, byte[]>> legacyHashes;
+
+    public JobVertexBuildContext(
+            StreamGraph streamGraph,
+            AtomicBoolean hasHybridResultPartition,
+            Map<Integer, byte[]> hashes,
+            List<Map<Integer, byte[]>> legacyHashes) {
+        this.streamGraph = streamGraph;
+        this.hashes = hashes;
+        this.legacyHashes = legacyHashes;
+        this.chainInfosInOrder = new LinkedHashMap<>();
+        this.jobVerticesInOrder = new LinkedHashMap<>();
+        this.physicalEdgesInOrder = new ArrayList<>();
+        this.hasHybridResultPartition = hasHybridResultPartition;
+        this.coordinatorSerializationFuturesPerJobVertex = new HashMap<>();
+        this.chainedConfigs = new HashMap<>();
+        this.operatorInfos = new HashMap<>();
+    }
+
+    public void addChainInfo(Integer startNodeId, OperatorChainInfo chainInfo) 
{
+        chainInfosInOrder.put(startNodeId, chainInfo);
+    }
+
+    public OperatorChainInfo getChainInfo(Integer startNodeId) {
+        return chainInfosInOrder.get(startNodeId);
+    }
+
+    public Map<Integer, OperatorChainInfo> getChainInfosInOrder() {
+        return chainInfosInOrder;
+    }
+
+    public OperatorInfo getOperatorInfo(Integer nodeId) {
+        return operatorInfos.get(nodeId);
+    }
+
+    public OperatorInfo createAndGetOperatorInfo(Integer nodeId) {
+        OperatorInfo operatorInfo = new OperatorInfo();
+        operatorInfos.put(nodeId, operatorInfo);
+        return operatorInfo;
+    }
+
+    public Map<Integer, OperatorInfo> getOperatorInfos() {
+        return operatorInfos;
+    }
+
+    public StreamGraph getStreamGraph() {
+        return streamGraph;
+    }
+
+    public boolean hasHybridResultPartition() {
+        return hasHybridResultPartition.get();
+    }
+
+    public void setHasHybridResultPartition(boolean hasHybridResultPartition) {
+        this.hasHybridResultPartition.set(hasHybridResultPartition);
+    }
+
+    public void addPhysicalEdgesInOrder(StreamEdge edge) {
+        physicalEdgesInOrder.add(edge);
+    }
+
+    public List<StreamEdge> getPhysicalEdgesInOrder() {
+        return physicalEdgesInOrder;
+    }
+
+    public void addJobVertex(Integer startNodeId, JobVertex jobVertex) {
+        jobVerticesInOrder.put(startNodeId, jobVertex);
+    }
+
+    public Map<Integer, JobVertex> getJobVerticesInOrder() {
+        return jobVerticesInOrder;
+    }
+
+    public JobVertex getJobVertex(Integer startNodeId) {
+        return jobVerticesInOrder.get(startNodeId);
+    }
+
+    public void putCoordinatorSerializationFutures(
+            JobVertexID vertexId,
+            
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>
+                    serializationFutures) {
+        coordinatorSerializationFuturesPerJobVertex.put(vertexId, 
serializationFutures);
+    }
+
+    public Map<JobVertexID, 
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>>
+            getCoordinatorSerializationFuturesPerJobVertex() {
+        return coordinatorSerializationFuturesPerJobVertex;
+    }
+
+    public Map<Integer, Map<Integer, StreamConfig>> getChainedConfigs() {
+        return chainedConfigs;
+    }
+
+    public Map<Integer, StreamConfig> getOrCreateChainedConfig(Integer 
streamNodeId) {
+        return chainedConfigs.computeIfAbsent(streamNodeId, key -> new 
HashMap<>());
+    }
+
+    public byte[] getHash(Integer streamNodeId) {
+        return hashes.get(streamNodeId);
+    }
+
+    public List<byte[]> getLegacyHashes(Integer streamNodeId) {
+        List<byte[]> hashes = new ArrayList<>();
+        for (Map<Integer, byte[]> legacyHash : legacyHashes) {
+            hashes.add(legacyHash.get(streamNodeId));
+        }
+        return hashes;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java
new file mode 100644
index 00000000000..8e776b746a1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Helper class to help maintain the information of an operator chain. */
+@Internal
+public class OperatorChainInfo {
+    private final Integer startNodeId;
+    private final Map<Integer, List<ChainedOperatorHashInfo>> 
chainedOperatorHashes;
+    private final Map<Integer, ChainedSourceInfo> chainedSources;
+    private final Map<Integer, ResourceSpec> chainedMinResources;
+    private final Map<Integer, ResourceSpec> chainedPreferredResources;
+    private final Map<Integer, String> chainedNames;
+    private final List<OperatorCoordinator.Provider> coordinatorProviders;
+    private final StreamGraph streamGraph;
+    private final List<StreamNode> chainedNodes;
+    private final List<StreamEdge> transitiveOutEdges;
+    private final List<StreamEdge> transitiveInEdges;
+
+    private InputOutputFormatContainer inputOutputFormatContainer = null;
+
+    public OperatorChainInfo(
+            int startNodeId,
+            Map<Integer, ChainedSourceInfo> chainedSources,
+            StreamGraph streamGraph) {
+        this.startNodeId = startNodeId;
+        this.chainedOperatorHashes = new HashMap<>();
+        this.coordinatorProviders = new ArrayList<>();
+        this.chainedSources = chainedSources;
+        this.chainedMinResources = new HashMap<>();
+        this.chainedPreferredResources = new HashMap<>();
+        this.chainedNames = new HashMap<>();
+        this.streamGraph = streamGraph;
+        this.chainedNodes = new ArrayList<>();
+        this.transitiveOutEdges = new ArrayList<>();
+        this.transitiveInEdges = new ArrayList<>();
+    }
+
+    public Integer getStartNodeId() {
+        return startNodeId;
+    }
+
+    public List<ChainedOperatorHashInfo> getChainedOperatorHashes(int 
startNodeId) {
+        return chainedOperatorHashes.get(startNodeId);
+    }
+
+    public void addCoordinatorProvider(OperatorCoordinator.Provider 
coordinator) {
+        coordinatorProviders.add(coordinator);
+    }
+
+    public List<OperatorCoordinator.Provider> getCoordinatorProviders() {
+        return coordinatorProviders;
+    }
+
+    public Map<Integer, ChainedSourceInfo> getChainedSources() {
+        return chainedSources;
+    }
+
+    public OperatorID addNodeToChain(
+            int currentNodeId, String operatorName, JobVertexBuildContext 
jobVertexBuildContext) {
+        recordChainedNode(currentNodeId);
+        StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
+
+        List<ChainedOperatorHashInfo> operatorHashes =
+                chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new 
ArrayList<>());
+
+        byte[] primaryHashBytes = jobVertexBuildContext.getHash(currentNodeId);
+
+        for (byte[] legacyHash : 
jobVertexBuildContext.getLegacyHashes(currentNodeId)) {
+            operatorHashes.add(
+                    new ChainedOperatorHashInfo(primaryHashBytes, legacyHash, 
streamNode));
+        }
+
+        streamNode
+                .getCoordinatorProvider(operatorName, new 
OperatorID(primaryHashBytes))
+                .map(coordinatorProviders::add);
+
+        return new OperatorID(primaryHashBytes);
+    }
+
+    public void setTransitiveOutEdges(final List<StreamEdge> 
transitiveOutEdges) {
+        this.transitiveOutEdges.addAll(transitiveOutEdges);
+    }
+
+    public List<StreamEdge> getTransitiveOutEdges() {
+        return transitiveOutEdges;
+    }
+
+    public void recordChainedNode(int currentNodeId) {
+        StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
+        chainedNodes.add(streamNode);
+    }
+
+    public OperatorChainInfo newChain(Integer startNodeId) {
+        return new OperatorChainInfo(startNodeId, chainedSources, streamGraph);
+    }
+
+    public List<StreamNode> getAllChainedNodes() {
+        return chainedNodes;
+    }
+
+    public boolean hasFormatContainer() {
+        return inputOutputFormatContainer != null;
+    }
+
+    public InputOutputFormatContainer getOrCreateFormatContainer() {
+        if (inputOutputFormatContainer == null) {
+            inputOutputFormatContainer =
+                    new 
InputOutputFormatContainer(Thread.currentThread().getContextClassLoader());
+        }
+        return inputOutputFormatContainer;
+    }
+
+    public void addChainedSource(Integer sourceNodeId, ChainedSourceInfo 
chainedSourceInfo) {
+        chainedSources.put(sourceNodeId, chainedSourceInfo);
+    }
+
+    public void addChainedMinResources(Integer sourceNodeId, ResourceSpec 
resourceSpec) {
+        chainedMinResources.put(sourceNodeId, resourceSpec);
+    }
+
+    public ResourceSpec getChainedMinResources(Integer sourceNodeId) {
+        return chainedMinResources.get(sourceNodeId);
+    }
+
+    public void addChainedPreferredResources(Integer sourceNodeId, 
ResourceSpec resourceSpec) {
+        chainedPreferredResources.put(sourceNodeId, resourceSpec);
+    }
+
+    public ResourceSpec getChainedPreferredResources(Integer sourceNodeId) {
+        return chainedPreferredResources.get(sourceNodeId);
+    }
+
+    public String getChainedName(Integer streamNodeId) {
+        return chainedNames.get(streamNodeId);
+    }
+
+    public Map<Integer, String> getChainedNames() {
+        return chainedNames;
+    }
+
+    public void addChainedName(Integer streamNodeId, String chainedName) {
+        this.chainedNames.put(streamNodeId, chainedName);
+    }
+
+    public void addTransitiveInEdge(StreamEdge streamEdge) {
+        transitiveInEdges.add(streamEdge);
+    }
+
+    public List<StreamEdge> getTransitiveInEdges() {
+        return transitiveInEdges;
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java
new file mode 100644
index 00000000000..f79ca90a3cf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Helper class to help maintain the information of an operator. */
+@Internal
+public class OperatorInfo {
+
+    private StreamConfig vertexConfig;
+
+    // This is used to cache the chainable outputs, to set the chainable 
outputs config after
+    // all job vertices are created.
+    private final List<StreamEdge> chainableOutputs;
+
+    // This is used to cache the non-chainable outputs, to set the 
non-chainable outputs config
+    // after all job vertices are created.
+    private final List<StreamEdge> nonChainableOutputs;
+
+    public OperatorInfo() {
+        this.chainableOutputs = new ArrayList<>();
+        this.nonChainableOutputs = new ArrayList<>();
+    }
+
+    public List<StreamEdge> getChainableOutputs() {
+        return chainableOutputs;
+    }
+
+    public void addChainableOutputs(List<StreamEdge> chainableOutputs) {
+        this.chainableOutputs.addAll(chainableOutputs);
+    }
+
+    public List<StreamEdge> getNonChainableOutputs() {
+        return nonChainableOutputs;
+    }
+
+    public void addNonChainableOutputs(List<StreamEdge> nonChainableOutEdges) {
+        this.nonChainableOutputs.addAll(nonChainableOutEdges);
+    }
+
+    public StreamConfig getVertexConfig() {
+        return vertexConfig;
+    }
+
+    public void setVertexConfig(StreamConfig vertexConfig) {
+        this.vertexConfig = vertexConfig;
+    }
+}

Reply via email to