This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5ff8d5de62e81cdeecca91f1617f0e05ecc8c6bc
Author: noorall <[email protected]>
AuthorDate: Wed Nov 6 13:05:57 2024 +0800

    [FLINK-36066][runtime] Further Improve Method Reusability in 
StreamingJobGraphGenerator
---
 .../apache/flink/runtime/jobgraph/JobGraph.java    |   3 +-
 .../api/graph/StreamingJobGraphGenerator.java      | 456 +++++++++++++--------
 .../api/graph/util/JobVertexBuildContext.java      |  38 +-
 .../api/graph/util/OperatorChainInfo.java          |  43 +-
 .../streaming/api/graph/util/OperatorInfo.java     |  23 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  |  16 +-
 6 files changed, 349 insertions(+), 230 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index 4d57bcd1d4f..9b862553252 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -310,7 +310,8 @@ public class JobGraph implements ExecutionPlan {
         setSerializedExecutionConfig(new SerializedValue<>(executionConfig));
     }
 
-    void setSerializedExecutionConfig(SerializedValue<ExecutionConfig> 
serializedExecutionConfig) {
+    public void setSerializedExecutionConfig(
+            SerializedValue<ExecutionConfig> serializedExecutionConfig) {
         this.serializedExecutionConfig =
                 checkNotNull(
                         serializedExecutionConfig,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 29b0f1bd219..6476cfdfb52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -112,7 +112,9 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -162,7 +164,6 @@ public class StreamingJobGraphGenerator {
     private final StreamGraph streamGraph;
 
     private final JobGraph jobGraph;
-    private final Collection<Integer> builtVertices;
 
     private final StreamGraphHasher defaultStreamGraphHasher;
     private final List<StreamGraphHasher> legacyStreamGraphHashers;
@@ -181,10 +182,8 @@ public class StreamingJobGraphGenerator {
         this.streamGraph = streamGraph;
         this.defaultStreamGraphHasher = new StreamGraphHasherV2();
         this.legacyStreamGraphHashers = Arrays.asList(new 
StreamGraphUserHashHasher());
-
-        this.builtVertices = new HashSet<>();
         this.serializationExecutor = 
Preconditions.checkNotNull(serializationExecutor);
-        jobGraph = new JobGraph(jobID, streamGraph.getJobName());
+        jobGraph = createAndInitializeJobGraph(streamGraph, jobID);
 
         // Generate deterministic hashes for the nodes in order to identify 
them across
         // submission iff they didn't change.
@@ -200,16 +199,16 @@ public class StreamingJobGraphGenerator {
 
         this.jobVertexBuildContext =
                 new JobVertexBuildContext(
-                        streamGraph, new AtomicBoolean(false), hashes, 
legacyHashes);
+                        jobGraph,
+                        streamGraph,
+                        new AtomicBoolean(false),
+                        hashes,
+                        legacyHashes,
+                        new SlotSharingGroup());
     }
 
     private JobGraph createJobGraph() {
         preValidate(streamGraph, userClassloader);
-        jobGraph.setJobType(streamGraph.getJobType());
-        jobGraph.setDynamic(streamGraph.isDynamic());
-
-        jobGraph.enableApproximateLocalRecovery(
-                
streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
 
         setChaining();
 
@@ -231,69 +230,49 @@ public class StreamingJobGraphGenerator {
 
         validateHybridShuffleExecuteInBatchMode(jobVertexBuildContext);
 
-        setSlotSharingAndCoLocation(jobGraph, jobVertexBuildContext);
+        setSlotSharingAndCoLocation(jobVertexBuildContext);
 
         setManagedMemoryFraction(jobVertexBuildContext);
 
-        configureCheckpointing(streamGraph, jobGraph);
-
-        
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
-
-        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
-                streamGraph.getUserArtifacts().entrySet()) {
-            jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
-        }
-
-        streamGraph.getUserJarBlobKeys().forEach(jobGraph::addUserJarBlobKey);
-        jobGraph.setClasspaths(streamGraph.getClasspath());
-
-        // set the ExecutionConfig last when it has been finalized
-        try {
-            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
-        } catch (IOException e) {
-            throw new IllegalConfigurationException(
-                    "Could not serialize the ExecutionConfig."
-                            + "This indicates that non-serializable types 
(like custom serializers) were registered");
-        }
-        jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());
-
-        addVertexIndexPrefixInVertexName(jobVertexBuildContext, new 
AtomicInteger(0), jobGraph);
+        addVertexIndexPrefixInVertexName(jobVertexBuildContext, new 
AtomicInteger(0));
 
         setVertexDescription(jobVertexBuildContext);
 
         // Wait for the serialization of operator coordinators and stream 
config.
-        serializeOperatorCoordinatorsAndStreamConfig(
-                jobGraph, serializationExecutor, jobVertexBuildContext);
-
-        if (!streamGraph.getJobStatusHooks().isEmpty()) {
-            jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
-        }
+        serializeOperatorCoordinatorsAndStreamConfig(serializationExecutor, 
jobVertexBuildContext);
 
         return jobGraph;
     }
 
     public static void serializeOperatorCoordinatorsAndStreamConfig(
-            JobGraph jobGraph,
-            Executor serializationExecutor,
-            JobVertexBuildContext jobVertexBuildContext) {
+            Executor serializationExecutor, JobVertexBuildContext 
jobVertexBuildContext) {
         try {
             FutureUtils.combineAll(
-                            
jobVertexBuildContext.getOperatorInfos().values().stream()
-                                    .map(
-                                            operatorInfo ->
-                                                    operatorInfo
-                                                            .getVertexConfig()
-                                                            
.triggerSerializationAndReturnFuture(
-                                                                    
serializationExecutor))
+                            
jobVertexBuildContext.getChainInfosInOrder().values().stream()
+                                    .flatMap(
+                                            chainInfo ->
+                                                    
serializationOperatorConfigs(
+                                                            chainInfo, 
serializationExecutor))
                                     .collect(Collectors.toList()))
                     .get();
 
-            waitForSerializationFuturesAndUpdateJobVertices(jobGraph, 
jobVertexBuildContext);
+            
waitForSerializationFuturesAndUpdateJobVertices(jobVertexBuildContext);
         } catch (Exception e) {
             throw new FlinkRuntimeException("Error in serialization.", e);
         }
     }
 
+    private static Stream<CompletableFuture<StreamConfig>> 
serializationOperatorConfigs(
+            OperatorChainInfo chainInfo, Executor serializationExecutor) {
+        return chainInfo.getOperatorInfos().values().stream()
+                .map(
+                        operatorInfo ->
+                                operatorInfo
+                                        .getVertexConfig()
+                                        .triggerSerializationAndReturnFuture(
+                                                serializationExecutor));
+    }
+
     /**
      * Creates an instance of {@link OperatorID} based on the provided 
operator unique identifier
      * (UID).
@@ -306,7 +285,7 @@ public class StreamingJobGraphGenerator {
     }
 
     private static void waitForSerializationFuturesAndUpdateJobVertices(
-            JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext)
+            JobVertexBuildContext jobVertexBuildContext)
             throws ExecutionException, InterruptedException {
         for (Map.Entry<
                         JobVertexID,
@@ -316,7 +295,8 @@ public class StreamingJobGraphGenerator {
                                 
.getCoordinatorSerializationFuturesPerJobVertex()
                                 .entrySet()) {
             final JobVertexID jobVertexId = futuresPerJobVertex.getKey();
-            final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
+            final JobVertex jobVertex =
+                    
jobVertexBuildContext.getJobGraph().findVertexByID(jobVertexId);
 
             Preconditions.checkState(
                     jobVertex != null,
@@ -329,9 +309,7 @@ public class StreamingJobGraphGenerator {
     }
 
     public static void addVertexIndexPrefixInVertexName(
-            JobVertexBuildContext jobVertexBuildContext,
-            AtomicInteger vertexIndexId,
-            JobGraph jobGraph) {
+            JobVertexBuildContext jobVertexBuildContext, AtomicInteger 
vertexIndexId) {
         if 
(!jobVertexBuildContext.getStreamGraph().isVertexNameIncludeIndexPrefix()) {
             return;
         }
@@ -342,7 +320,9 @@ public class StreamingJobGraphGenerator {
         // JobVertexBuildContext only contains incrementally generated 
jobVertex instances. The
         // setName method needs to set the names based on the topological 
order of all jobVertices,
         // so it relies on the job graph to compute a difference set.
-        jobGraph.getVerticesSortedTopologicallyFromSources()
+        jobVertexBuildContext
+                .getJobGraph()
+                .getVerticesSortedTopologicallyFromSources()
                 .forEach(
                         vertex -> {
                             if (jobVertexIds.contains(vertex.getID())) {
@@ -576,6 +556,7 @@ public class StreamingJobGraphGenerator {
             List<StreamEdge> edgeList = inEdges.getValue();
 
             jobVertexBuildContext
+                    .getChainInfo(vertex)
                     .getOperatorInfo(vertex)
                     .getVertexConfig()
                     .setInPhysicalEdges(edgeList);
@@ -583,69 +564,15 @@ public class StreamingJobGraphGenerator {
     }
 
     private Map<Integer, OperatorChainInfo> 
buildChainedInputsAndGetHeadInputs() {
-
-        final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();
         final Map<Integer, OperatorChainInfo> chainEntryPoints = new 
HashMap<>();
 
         for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
             final StreamNode sourceNode = 
streamGraph.getStreamNode(sourceNodeId);
-
-            if (sourceNode.getOperatorFactory() != null
-                    && sourceNode.getOperatorFactory() instanceof 
SourceOperatorFactory
-                    && sourceNode.getOutEdges().size() == 1) {
-                // as long as only NAry ops support this chaining, we need to 
skip the other parts
-                final StreamEdge sourceOutEdge = 
sourceNode.getOutEdges().get(0);
-                final StreamNode target = 
streamGraph.getStreamNode(sourceOutEdge.getTargetId());
-                final ChainingStrategy targetChainingStrategy =
-                        Preconditions.checkNotNull(target.getOperatorFactory())
-                                .getChainingStrategy();
-
-                if (targetChainingStrategy == 
ChainingStrategy.HEAD_WITH_SOURCES
-                        && isChainableInput(sourceOutEdge, streamGraph)) {
-                    final OperatorInfo operatorInfo =
-                            
jobVertexBuildContext.createAndGetOperatorInfo(sourceNodeId);
-                    final OperatorID opId =
-                            new 
OperatorID(jobVertexBuildContext.getHash(sourceNodeId));
-                    final StreamConfig.SourceInputConfig inputConfig =
-                            new StreamConfig.SourceInputConfig(sourceOutEdge);
-                    final StreamConfig operatorConfig = new StreamConfig(new 
Configuration());
-                    setOperatorConfig(
-                            sourceNodeId,
-                            operatorConfig,
-                            Collections.emptyMap(),
-                            jobVertexBuildContext);
-                    setOperatorChainedOutputsConfig(
-                            operatorConfig, Collections.emptyList(), 
jobVertexBuildContext);
-                    // we cache the non-chainable outputs here, and set the 
non-chained config later
-                    
operatorInfo.addNonChainableOutputs(Collections.emptyList());
-
-                    operatorConfig.setChainIndex(0); // sources are always 
first
-                    operatorConfig.setOperatorID(opId);
-                    
operatorConfig.setOperatorName(sourceNode.getOperatorName());
-                    chainedSources.put(
-                            sourceNodeId, new 
ChainedSourceInfo(operatorConfig, inputConfig));
-
-                    final SourceOperatorFactory<?> sourceOpFact =
-                            (SourceOperatorFactory<?>) 
sourceNode.getOperatorFactory();
-                    final OperatorCoordinator.Provider coord =
-                            
sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);
-
-                    final OperatorChainInfo chainInfo =
-                            chainEntryPoints.computeIfAbsent(
-                                    sourceOutEdge.getTargetId(),
-                                    (k) ->
-                                            new OperatorChainInfo(
-                                                    
sourceOutEdge.getTargetId(),
-                                                    chainedSources,
-                                                    streamGraph));
-                    chainInfo.addCoordinatorProvider(coord);
-                    chainInfo.recordChainedNode(sourceNodeId);
-                    continue;
-                }
+            if (isChainableSource(sourceNode, streamGraph)) {
+                createSourceChainInfo(sourceNode, chainEntryPoints, 
jobVertexBuildContext);
+            } else {
+                chainEntryPoints.put(sourceNodeId, new 
OperatorChainInfo(sourceNodeId));
             }
-
-            chainEntryPoints.put(
-                    sourceNodeId, new OperatorChainInfo(sourceNodeId, 
chainedSources, streamGraph));
         }
 
         return chainEntryPoints;
@@ -673,18 +600,33 @@ public class StreamingJobGraphGenerator {
                     info.getStartNodeId(),
                     1, // operators start at position 1 because 0 is for 
chained source inputs
                     info,
-                    chainEntryPoints);
+                    chainEntryPoints,
+                    true,
+                    serializationExecutor,
+                    jobVertexBuildContext,
+                    null);
         }
     }
 
-    private List<StreamEdge> createChain(
+    public static List<StreamEdge> createChain(
             final Integer currentNodeId,
             final int chainIndex,
             final OperatorChainInfo chainInfo,
-            final Map<Integer, OperatorChainInfo> chainEntryPoints) {
+            final Map<Integer, OperatorChainInfo> chainEntryPoints,
+            final boolean canCreateNewChain,
+            final Executor serializationExecutor,
+            final JobVertexBuildContext jobVertexBuildContext,
+            final @Nullable Consumer<Integer> visitedStreamNodeConsumer) {
 
         Integer startNodeId = chainInfo.getStartNodeId();
-        if (!builtVertices.contains(startNodeId)) {
+        if 
(!jobVertexBuildContext.getJobVerticesInOrder().containsKey(startNodeId)) {
+            StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
+
+            // Adaptive graph generator needs to subscribe the visited stream 
node id to
+            // generate hashes for it.
+            if (visitedStreamNodeConsumer != null) {
+                visitedStreamNodeConsumer.accept(currentNodeId);
+            }
 
             List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
 
@@ -721,7 +663,11 @@ public class StreamingJobGraphGenerator {
                                 chainable.getTargetId(),
                                 chainIndex + 1,
                                 chainInfo,
-                                chainEntryPoints));
+                                chainEntryPoints,
+                                canCreateNewChain,
+                                serializationExecutor,
+                                jobVertexBuildContext,
+                                visitedStreamNodeConsumer));
                 // Mark upstream nodes in the same chain as outputBlocking
                 if (targetNodeAttribute != null
                         && targetNodeAttribute.isNoOutputUntilEndOfInput()) {
@@ -731,13 +677,24 @@ public class StreamingJobGraphGenerator {
 
             for (StreamEdge nonChainable : nonChainableOutputs) {
                 transitiveOutEdges.add(nonChainable);
-                createChain(
-                        nonChainable.getTargetId(),
-                        1, // operators start at position 1 because 0 is for 
chained source inputs
-                        chainEntryPoints.computeIfAbsent(
-                                nonChainable.getTargetId(),
-                                (k) -> 
chainInfo.newChain(nonChainable.getTargetId())),
-                        chainEntryPoints);
+                // Used to control whether a new chain can be created, this 
value is true in the
+                // full graph generation algorithm and false in the 
progressive generation
+                // algorithm. In the future, this variable can be a boolean 
type function to adapt
+                // to more adaptive scenarios.
+                if (canCreateNewChain) {
+                    createChain(
+                            nonChainable.getTargetId(),
+                            1, // operators start at position 1 because 0 is 
for chained source
+                            // inputs
+                            chainEntryPoints.computeIfAbsent(
+                                    nonChainable.getTargetId(),
+                                    (k) -> 
chainInfo.newChain(nonChainable.getTargetId())),
+                            chainEntryPoints,
+                            canCreateNewChain,
+                            serializationExecutor,
+                            jobVertexBuildContext,
+                            visitedStreamNodeConsumer);
+                }
             }
 
             chainInfo.addChainedName(
@@ -777,18 +734,25 @@ public class StreamingJobGraphGenerator {
                         .addOutputFormat(currentOperatorId, 
currentNode.getOutputFormat());
             }
             OperatorInfo operatorInfo =
-                    
jobVertexBuildContext.createAndGetOperatorInfo(currentNodeId);
+                    chainInfo.createAndGetOperatorInfo(currentNodeId, 
currentOperatorId);
 
-            StreamConfig config =
-                    currentNodeId.equals(startNodeId)
-                            ? createJobVertex(startNodeId, chainInfo)
-                            : new StreamConfig(new Configuration());
+            StreamConfig config;
+            if (currentNodeId.equals(startNodeId)) {
+                JobVertex jobVertex = 
jobVertexBuildContext.getJobVertex(startNodeId);
+                if (jobVertex == null) {
+                    jobVertex =
+                            createJobVertex(
+                                    chainInfo, serializationExecutor, 
jobVertexBuildContext);
+                }
+                config = new StreamConfig(jobVertex.getConfiguration());
+            } else {
+                config = new StreamConfig(new Configuration());
+            }
 
             tryConvertPartitionerForDynamicGraph(
                     chainableOutputs, nonChainableOutputs, 
jobVertexBuildContext);
             config.setAttribute(currentNodeAttribute);
-            setOperatorConfig(
-                    currentNodeId, config, chainInfo.getChainedSources(), 
jobVertexBuildContext);
+            setOperatorConfig(currentNodeId, config, chainInfo, 
jobVertexBuildContext);
 
             setOperatorChainedOutputsConfig(config, chainableOutputs, 
jobVertexBuildContext);
             // we cache the non-chainable outputs here, and set the 
non-chained config later
@@ -923,6 +887,83 @@ public class StreamingJobGraphGenerator {
                 });
     }
 
+    public static JobGraph createAndInitializeJobGraph(
+            StreamGraph streamGraph, @Nullable JobID jobId) {
+        JobGraph jobGraph = new JobGraph(jobId, streamGraph.getJobName());
+        jobGraph.setJobType(streamGraph.getJobType());
+        jobGraph.setDynamic(streamGraph.isDynamic());
+
+        jobGraph.enableApproximateLocalRecovery(
+                
streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
+
+        jobGraph.setSnapshotSettings(streamGraph.getCheckpointingSettings());
+        
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
+        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
+                streamGraph.getUserArtifacts().entrySet()) {
+            jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
+        }
+
+        streamGraph.getUserJarBlobKeys().forEach(jobGraph::addUserJarBlobKey);
+        jobGraph.setClasspaths(streamGraph.getClasspath());
+
+        ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
+        if (executionConfig == null) {
+            
jobGraph.setSerializedExecutionConfig(streamGraph.getSerializedExecutionConfig());
+        } else {
+            try {
+                jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
+            } catch (IOException e) {
+                throw new IllegalConfigurationException(
+                        "Could not serialize the ExecutionConfig."
+                                + "This indicates that non-serializable types 
(like custom serializers) were registered");
+            }
+        }
+
+        jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());
+
+        if (!streamGraph.getJobStatusHooks().isEmpty()) {
+            jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
+        }
+
+        return jobGraph;
+    }
+
+    public static void createSourceChainInfo(
+            StreamNode sourceNode,
+            Map<Integer, OperatorChainInfo> chainEntryPoints,
+            JobVertexBuildContext jobVertexBuildContext) {
+        Integer sourceNodeId = sourceNode.getId();
+        StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);
+
+        final OperatorChainInfo chainInfo =
+                chainEntryPoints.computeIfAbsent(
+                        sourceOutEdge.getTargetId(),
+                        (k) -> new 
OperatorChainInfo(sourceOutEdge.getTargetId()));
+        final OperatorID opId = new 
OperatorID(jobVertexBuildContext.getHash(sourceNodeId));
+        final OperatorInfo operatorInfo = 
chainInfo.createAndGetOperatorInfo(sourceNodeId, opId);
+        final StreamConfig.SourceInputConfig inputConfig =
+                new StreamConfig.SourceInputConfig(sourceOutEdge);
+        final StreamConfig operatorConfig = new StreamConfig(new 
Configuration());
+        setOperatorConfig(sourceNodeId, operatorConfig, chainInfo, 
jobVertexBuildContext);
+        setOperatorChainedOutputsConfig(
+                operatorConfig, Collections.emptyList(), 
jobVertexBuildContext);
+        // we cache the non-chainable outputs here, and set the non-chained 
config later
+        operatorInfo.addNonChainableOutputs(Collections.emptyList());
+
+        operatorConfig.setChainIndex(0); // sources are always first
+        operatorConfig.setOperatorID(opId);
+        operatorConfig.setOperatorName(sourceNode.getOperatorName());
+
+        final SourceOperatorFactory<?> sourceOpFact =
+                (SourceOperatorFactory<?>)
+                        
Preconditions.checkNotNull(sourceNode.getOperatorFactory());
+        final OperatorCoordinator.Provider coord =
+                
sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);
+
+        chainInfo.addChainedSource(sourceNode, new 
ChainedSourceInfo(operatorConfig, inputConfig));
+        chainInfo.addCoordinatorProvider(coord);
+    }
+
     private static void checkAndReplaceReusableHybridPartitionType(
             NonChainedOutput reusableOutput) {
         if (reusableOutput.getPartitionType() == 
ResultPartitionType.HYBRID_SELECTIVE) {
@@ -1009,10 +1050,15 @@ public class StreamingJobGraphGenerator {
         return preferredResources;
     }
 
-    private StreamConfig createJobVertex(Integer streamNodeId, 
OperatorChainInfo chainInfo) {
+    public static JobVertex createJobVertex(
+            OperatorChainInfo chainInfo,
+            Executor serializationExecutor,
+            JobVertexBuildContext jobVertexBuildContext) {
+
+        Integer streamNodeId = chainInfo.getStartNodeId();
 
         JobVertex jobVertex;
-        StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
+        StreamNode streamNode = 
jobVertexBuildContext.getStreamGraph().getStreamNode(streamNodeId);
 
         byte[] hash = jobVertexBuildContext.getHash(streamNodeId);
 
@@ -1104,23 +1150,23 @@ public class StreamingJobGraphGenerator {
         }
 
         jobVertexBuildContext.addJobVertex(streamNodeId, jobVertex);
-        builtVertices.add(streamNodeId);
-        jobGraph.addVertex(jobVertex);
+        jobVertexBuildContext.getJobGraph().addVertex(jobVertex);
 
         jobVertex.setParallelismConfigured(
                 chainInfo.getAllChainedNodes().stream()
                         .anyMatch(StreamNode::isParallelismConfigured));
 
-        return new StreamConfig(jobVertex.getConfiguration());
+        return jobVertex;
     }
 
     public static void setOperatorConfig(
             Integer vertexId,
             StreamConfig config,
-            Map<Integer, ChainedSourceInfo> chainedSources,
+            OperatorChainInfo chainInfo,
             JobVertexBuildContext jobVertexBuildContext) {
         StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
-        OperatorInfo operatorInfo = 
jobVertexBuildContext.getOperatorInfo(vertexId);
+        OperatorInfo operatorInfo = chainInfo.getOperatorInfo(vertexId);
+        Map<Integer, ChainedSourceInfo> chainedSources = 
chainInfo.getChainedSources();
         StreamNode vertex = streamGraph.getStreamNode(vertexId);
 
         config.setVertexID(vertexId);
@@ -1301,20 +1347,22 @@ public class StreamingJobGraphGenerator {
             final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs,
             JobVertexBuildContext jobVertexBuildContext) {
         // set non chainable output config
-        jobVertexBuildContext
-                .getOperatorInfos()
-                .forEach(
-                        (vertexId, operatorInfo) -> {
-                            Map<StreamEdge, NonChainedOutput> 
outputsConsumedByEdge =
-                                    opIntermediateOutputs.computeIfAbsent(
-                                            vertexId, ignored -> new 
HashMap<>());
-                            setOperatorNonChainedOutputsConfig(
-                                    vertexId,
-                                    operatorInfo.getVertexConfig(),
-                                    operatorInfo.getNonChainableOutputs(),
-                                    outputsConsumedByEdge,
-                                    jobVertexBuildContext);
-                        });
+        for (OperatorChainInfo chainInfo : 
jobVertexBuildContext.getChainInfosInOrder().values()) {
+            chainInfo
+                    .getOperatorInfos()
+                    .forEach(
+                            (vertexId, operatorInfo) -> {
+                                Map<StreamEdge, NonChainedOutput> 
outputsConsumedByEdge =
+                                        opIntermediateOutputs.computeIfAbsent(
+                                                vertexId, ignored -> new 
HashMap<>());
+                                setOperatorNonChainedOutputsConfig(
+                                        vertexId,
+                                        operatorInfo.getVertexConfig(),
+                                        operatorInfo.getNonChainableOutputs(),
+                                        outputsConsumedByEdge,
+                                        jobVertexBuildContext);
+                            });
+        }
     }
 
     private void setAllVertexNonChainedOutputsConfigs(
@@ -1327,6 +1375,7 @@ public class StreamingJobGraphGenerator {
                                 setVertexNonChainedOutputsConfig(
                                         startNodeId,
                                         jobVertexBuildContext
+                                                .getChainInfo(startNodeId)
                                                 .getOperatorInfo(startNodeId)
                                                 .getVertexConfig(),
                                         jobVertexBuildContext
@@ -1626,18 +1675,43 @@ public class StreamingJobGraphGenerator {
     }
 
     public static boolean isChainable(StreamEdge edge, StreamGraph 
streamGraph) {
+        return isChainable(edge, streamGraph, false);
+    }
+
+    public static boolean isChainable(
+            StreamEdge edge, StreamGraph streamGraph, boolean 
allowChainWithDefaultParallelism) {
         StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
 
-        return downStreamVertex.getInEdges().size() == 1 && 
isChainableInput(edge, streamGraph);
+        return downStreamVertex.getInEdges().size() == 1
+                && isChainableInput(edge, streamGraph, 
allowChainWithDefaultParallelism);
     }
 
-    public static boolean isChainableInput(StreamEdge edge, StreamGraph 
streamGraph) {
+    public static boolean isChainableSource(StreamNode streamNode, StreamGraph 
streamGraph) {
+        if (streamNode.getOperatorFactory() == null
+                || !(streamNode.getOperatorFactory() instanceof 
SourceOperatorFactory)
+                || streamNode.getOutEdges().size() != 1) {
+            return false;
+        }
+        final StreamEdge sourceOutEdge = streamNode.getOutEdges().get(0);
+        final StreamNode target = 
streamGraph.getStreamNode(sourceOutEdge.getTargetId());
+        final ChainingStrategy targetChainingStrategy =
+                
Preconditions.checkNotNull(target.getOperatorFactory()).getChainingStrategy();
+        return targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES
+                && isChainableInput(sourceOutEdge, streamGraph, false);
+    }
+
+    private static boolean isChainableInput(
+            StreamEdge edge, StreamGraph streamGraph, boolean 
allowChainWithDefaultParallelism) {
         StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
         StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
 
         if (!(streamGraph.isChainingEnabled()
                 && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
-                && areOperatorsChainable(upStreamVertex, downStreamVertex, 
streamGraph)
+                && areOperatorsChainable(
+                        upStreamVertex,
+                        downStreamVertex,
+                        streamGraph,
+                        allowChainWithDefaultParallelism)
                 && arePartitionerAndExchangeModeChainable(
                         edge.getPartitioner(), edge.getExchangeMode(), 
streamGraph.isDynamic()))) {
 
@@ -1673,7 +1747,10 @@ public class StreamingJobGraphGenerator {
 
     @VisibleForTesting
     static boolean areOperatorsChainable(
-            StreamNode upStreamVertex, StreamNode downStreamVertex, 
StreamGraph streamGraph) {
+            StreamNode upStreamVertex,
+            StreamNode downStreamVertex,
+            StreamGraph streamGraph,
+            boolean allowChainWithDefaultParallelism) {
         StreamOperatorFactory<?> upStreamOperator = 
upStreamVertex.getOperatorFactory();
         StreamOperatorFactory<?> downStreamOperator = 
downStreamVertex.getOperatorFactory();
         if (downStreamOperator == null || upStreamOperator == null) {
@@ -1723,8 +1800,18 @@ public class StreamingJobGraphGenerator {
                         "Unknown chaining strategy: " + 
downStreamOperator.getChainingStrategy());
         }
 
-        // Only vertices with the same parallelism can be chained.
-        isChainable &= upStreamVertex.getParallelism() == 
downStreamVertex.getParallelism();
+        // When allowChainWithDefaultParallelism is true, any vertex with 
default parallelism can
+        // be chained, otherwise only vertices with the same parallelism can 
be chained.
+        if (allowChainWithDefaultParallelism) {
+            isChainable &=
+                    (upStreamVertex.getParallelism() == 
downStreamVertex.getParallelism()
+                            || upStreamVertex.getParallelism()
+                                    == ExecutionConfig.PARALLELISM_DEFAULT
+                            || downStreamVertex.getParallelism()
+                                    == ExecutionConfig.PARALLELISM_DEFAULT);
+        } else {
+            isChainable &= upStreamVertex.getParallelism() == 
downStreamVertex.getParallelism();
+        }
 
         if 
(!streamGraph.isChainingOfOperatorsWithDifferentMaxParallelismEnabled()) {
             isChainable &=
@@ -1784,18 +1871,16 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    public static void setSlotSharingAndCoLocation(
-            JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) {
-        setSlotSharing(jobGraph, jobVertexBuildContext);
+    public static void setSlotSharingAndCoLocation(JobVertexBuildContext 
jobVertexBuildContext) {
+        setSlotSharing(jobVertexBuildContext);
         setCoLocation(jobVertexBuildContext);
     }
 
-    private static void setSlotSharing(
-            JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) {
+    private static void setSlotSharing(JobVertexBuildContext 
jobVertexBuildContext) {
         StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
         final Map<String, SlotSharingGroup> specifiedSlotSharingGroups = new 
HashMap<>();
         final Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups 
=
-                buildVertexRegionSlotSharingGroups(jobGraph, 
jobVertexBuildContext);
+                buildVertexRegionSlotSharingGroups(jobVertexBuildContext);
         final Map<Integer, JobVertex> jobVertices = 
jobVertexBuildContext.getJobVerticesInOrder();
 
         for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
@@ -1849,10 +1934,11 @@ public class StreamingJobGraphGenerator {
      * in the same slot sharing group.
      */
     private static Map<JobVertexID, SlotSharingGroup> 
buildVertexRegionSlotSharingGroups(
-            JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) {
+            JobVertexBuildContext jobVertexBuildContext) {
         StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
         final Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups 
= new HashMap<>();
-        final SlotSharingGroup defaultSlotSharingGroup = new 
SlotSharingGroup();
+        final SlotSharingGroup defaultSlotSharingGroup =
+                jobVertexBuildContext.getDefaultSlotSharingGroup();
         streamGraph
                 
.getSlotSharingGroupResource(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP)
                 .ifPresent(defaultSlotSharingGroup::setResourceProfile);
@@ -1861,7 +1947,8 @@ public class StreamingJobGraphGenerator {
                 streamGraph.isAllVerticesInSameSlotSharingGroupByDefault();
 
         final Iterable<DefaultLogicalPipelinedRegion> regions =
-                
DefaultLogicalTopology.fromJobGraph(jobGraph).getAllPipelinedRegions();
+                
DefaultLogicalTopology.fromJobGraph(jobVertexBuildContext.getJobGraph())
+                        .getAllPipelinedRegions();
         for (DefaultLogicalPipelinedRegion region : regions) {
             final SlotSharingGroup regionSlotSharingGroup;
             if (allRegionsInSameSlotSharingGroup) {
@@ -1969,8 +2056,17 @@ public class StreamingJobGraphGenerator {
         final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
         final Map<Integer, Map<Integer, StreamConfig>> vertexChainedConfigs =
                 jobVertexBuildContext.getChainedConfigs();
-        final Set<Integer> groupOperatorIds =
+        // In the progressive job graph generation algorithm, if the user 
specified the
+        // SlotSharingGroupResource or the 
AllVerticesInSameSlotSharingGroupByDefault is set to
+        // true, job vertices generated in different phase may be assigned to 
the same
+        // slotSharingGroup. Therefore, we need to filter out the job vertices 
that belong to the
+        // current phase.
+        final Set<JobVertexID> jobVertexIds =
                 slotSharingGroup.getJobVertexIds().stream()
+                        .filter(vertexOperators::containsKey)
+                        .collect(Collectors.toSet());
+        final Set<Integer> groupOperatorIds =
+                jobVertexIds.stream()
                         .flatMap((vid) -> vertexOperators.get(vid).stream())
                         .collect(Collectors.toSet());
 
@@ -1994,10 +2090,14 @@ public class StreamingJobGraphGenerator {
                                                 
.getManagedMemorySlotScopeUseCases().stream())
                         .collect(Collectors.toSet());
 
-        for (JobVertexID jobVertexID : slotSharingGroup.getJobVertexIds()) {
+        for (JobVertexID jobVertexID : jobVertexIds) {
+            final int headOperatorNodeId = 
vertexHeadOperators.get(jobVertexID);
             for (int operatorNodeId : vertexOperators.get(jobVertexID)) {
                 final StreamConfig operatorConfig =
-                        
jobVertexBuildContext.getOperatorInfo(operatorNodeId).getVertexConfig();
+                        jobVertexBuildContext
+                                .getChainInfo(headOperatorNodeId)
+                                .getOperatorInfo(operatorNodeId)
+                                .getVertexConfig();
                 final Map<ManagedMemoryUseCase, Integer> 
operatorScopeUseCaseWeights =
                         streamGraph
                                 .getStreamNode(operatorNodeId)
@@ -2015,9 +2115,11 @@ public class StreamingJobGraphGenerator {
             }
 
             // need to refresh the chained task configs because they are 
serialized
-            final int headOperatorNodeId = 
vertexHeadOperators.get(jobVertexID);
             final StreamConfig vertexConfig =
-                    
jobVertexBuildContext.getOperatorInfo(headOperatorNodeId).getVertexConfig();
+                    jobVertexBuildContext
+                            .getChainInfo(headOperatorNodeId)
+                            .getOperatorInfo(headOperatorNodeId)
+                            .getVertexConfig();
             vertexConfig.setTransitiveChainedTaskConfigs(
                     vertexChainedConfigs.get(headOperatorNodeId));
         }
@@ -2050,10 +2152,6 @@ public class StreamingJobGraphGenerator {
         }
     }
 
-    public static void configureCheckpointing(StreamGraph streamGraph, 
JobGraph jobGraph) {
-        jobGraph.setSnapshotSettings(streamGraph.getCheckpointingSettings());
-    }
-
     private static String nameWithChainedSourcesInfo(
             String operatorName, Collection<ChainedSourceInfo> 
chainedSourceInfos) {
         return chainedSourceInfos.isEmpty()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java
index d6ef01424e3..a4bd1f507f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java
@@ -19,8 +19,10 @@
 package org.apache.flink.streaming.api.graph.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -42,6 +44,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 @Internal
 public class JobVertexBuildContext {
 
+    private final JobGraph jobGraph;
+
     private final StreamGraph streamGraph;
 
     /**
@@ -51,9 +55,6 @@ public class JobVertexBuildContext {
      */
     private final Map<Integer, OperatorChainInfo> chainInfosInOrder;
 
-    /** The {@link OperatorInfo}s, key is the id of the stream node. */
-    private final Map<Integer, OperatorInfo> operatorInfos;
-
     // This map's key represents the starting node id of each chain. Note that 
this includes not
     // only the usual head node of the chain but also the ids of chain sources 
which are used by
     // multi-input.
@@ -83,11 +84,16 @@ public class JobVertexBuildContext {
 
     private final List<Map<Integer, byte[]>> legacyHashes;
 
+    private final SlotSharingGroup defaultSlotSharingGroup;
+
     public JobVertexBuildContext(
+            JobGraph jobGraph,
             StreamGraph streamGraph,
             AtomicBoolean hasHybridResultPartition,
             Map<Integer, byte[]> hashes,
-            List<Map<Integer, byte[]>> legacyHashes) {
+            List<Map<Integer, byte[]>> legacyHashes,
+            SlotSharingGroup defaultSlotSharingGroup) {
+        this.jobGraph = jobGraph;
         this.streamGraph = streamGraph;
         this.hashes = hashes;
         this.legacyHashes = legacyHashes;
@@ -97,7 +103,7 @@ public class JobVertexBuildContext {
         this.hasHybridResultPartition = hasHybridResultPartition;
         this.coordinatorSerializationFuturesPerJobVertex = new HashMap<>();
         this.chainedConfigs = new HashMap<>();
-        this.operatorInfos = new HashMap<>();
+        this.defaultSlotSharingGroup = defaultSlotSharingGroup;
     }
 
     public void addChainInfo(Integer startNodeId, OperatorChainInfo chainInfo) 
{
@@ -112,20 +118,6 @@ public class JobVertexBuildContext {
         return chainInfosInOrder;
     }
 
-    public OperatorInfo getOperatorInfo(Integer nodeId) {
-        return operatorInfos.get(nodeId);
-    }
-
-    public OperatorInfo createAndGetOperatorInfo(Integer nodeId) {
-        OperatorInfo operatorInfo = new OperatorInfo();
-        operatorInfos.put(nodeId, operatorInfo);
-        return operatorInfo;
-    }
-
-    public Map<Integer, OperatorInfo> getOperatorInfos() {
-        return operatorInfos;
-    }
-
     public StreamGraph getStreamGraph() {
         return streamGraph;
     }
@@ -189,4 +181,12 @@ public class JobVertexBuildContext {
         }
         return hashes;
     }
+
+    public JobGraph getJobGraph() {
+        return jobGraph;
+    }
+
+    public SlotSharingGroup getDefaultSlotSharingGroup() {
+        return defaultSlotSharingGroup;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java
index 8e776b746a1..45c5173c20d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.graph.StreamNode;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -41,29 +42,29 @@ public class OperatorChainInfo {
     private final Map<Integer, ResourceSpec> chainedMinResources;
     private final Map<Integer, ResourceSpec> chainedPreferredResources;
     private final Map<Integer, String> chainedNames;
+
+    /** The {@link OperatorInfo}s, key is the id of the stream node. */
+    private final Map<Integer, OperatorInfo> chainedOperatorInfos;
+
     private final List<OperatorCoordinator.Provider> coordinatorProviders;
-    private final StreamGraph streamGraph;
     private final List<StreamNode> chainedNodes;
     private final List<StreamEdge> transitiveOutEdges;
     private final List<StreamEdge> transitiveInEdges;
 
     private InputOutputFormatContainer inputOutputFormatContainer = null;
 
-    public OperatorChainInfo(
-            int startNodeId,
-            Map<Integer, ChainedSourceInfo> chainedSources,
-            StreamGraph streamGraph) {
+    public OperatorChainInfo(int startNodeId) {
         this.startNodeId = startNodeId;
         this.chainedOperatorHashes = new HashMap<>();
         this.coordinatorProviders = new ArrayList<>();
-        this.chainedSources = chainedSources;
+        this.chainedSources = new HashMap<>();
         this.chainedMinResources = new HashMap<>();
         this.chainedPreferredResources = new HashMap<>();
         this.chainedNames = new HashMap<>();
-        this.streamGraph = streamGraph;
         this.chainedNodes = new ArrayList<>();
         this.transitiveOutEdges = new ArrayList<>();
         this.transitiveInEdges = new ArrayList<>();
+        this.chainedOperatorInfos = new HashMap<>();
     }
 
     public Integer getStartNodeId() {
@@ -88,9 +89,11 @@ public class OperatorChainInfo {
 
     public OperatorID addNodeToChain(
             int currentNodeId, String operatorName, JobVertexBuildContext 
jobVertexBuildContext) {
-        recordChainedNode(currentNodeId);
+        StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
         StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
 
+        recordChainedNode(streamNode);
+
         List<ChainedOperatorHashInfo> operatorHashes =
                 chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new 
ArrayList<>());
 
@@ -116,13 +119,12 @@ public class OperatorChainInfo {
         return transitiveOutEdges;
     }
 
-    public void recordChainedNode(int currentNodeId) {
-        StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
+    public void recordChainedNode(StreamNode streamNode) {
         chainedNodes.add(streamNode);
     }
 
     public OperatorChainInfo newChain(Integer startNodeId) {
-        return new OperatorChainInfo(startNodeId, chainedSources, streamGraph);
+        return new OperatorChainInfo(startNodeId);
     }
 
     public List<StreamNode> getAllChainedNodes() {
@@ -141,8 +143,9 @@ public class OperatorChainInfo {
         return inputOutputFormatContainer;
     }
 
-    public void addChainedSource(Integer sourceNodeId, ChainedSourceInfo 
chainedSourceInfo) {
-        chainedSources.put(sourceNodeId, chainedSourceInfo);
+    public void addChainedSource(StreamNode sourceNode, ChainedSourceInfo 
chainedSourceInfo) {
+        recordChainedNode(sourceNode);
+        chainedSources.put(sourceNode.getId(), chainedSourceInfo);
     }
 
     public void addChainedMinResources(Integer sourceNodeId, ResourceSpec 
resourceSpec) {
@@ -180,4 +183,18 @@ public class OperatorChainInfo {
     public List<StreamEdge> getTransitiveInEdges() {
         return transitiveInEdges;
     }
+
+    public OperatorInfo getOperatorInfo(Integer nodeId) {
+        return chainedOperatorInfos.get(nodeId);
+    }
+
+    public OperatorInfo createAndGetOperatorInfo(Integer nodeId, OperatorID 
operatorId) {
+        OperatorInfo operatorInfo = new OperatorInfo(operatorId);
+        chainedOperatorInfos.put(nodeId, operatorInfo);
+        return operatorInfo;
+    }
+
+    public Map<Integer, OperatorInfo> getOperatorInfos() {
+        return Collections.unmodifiableMap(chainedOperatorInfos);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java
index f79ca90a3cf..0b29a2a9245 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.graph.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 
@@ -29,29 +30,19 @@ import java.util.List;
 @Internal
 public class OperatorInfo {
 
-    private StreamConfig vertexConfig;
+    private final OperatorID operatorId;
 
-    // This is used to cache the chainable outputs, to set the chainable 
outputs config after
-    // all job vertices are created.
-    private final List<StreamEdge> chainableOutputs;
+    private StreamConfig vertexConfig;
 
     // This is used to cache the non-chainable outputs, to set the 
non-chainable outputs config
     // after all job vertices are created.
     private final List<StreamEdge> nonChainableOutputs;
 
-    public OperatorInfo() {
-        this.chainableOutputs = new ArrayList<>();
+    public OperatorInfo(OperatorID operatorId) {
+        this.operatorId = operatorId;
         this.nonChainableOutputs = new ArrayList<>();
     }
 
-    public List<StreamEdge> getChainableOutputs() {
-        return chainableOutputs;
-    }
-
-    public void addChainableOutputs(List<StreamEdge> chainableOutputs) {
-        this.chainableOutputs.addAll(chainableOutputs);
-    }
-
     public List<StreamEdge> getNonChainableOutputs() {
         return nonChainableOutputs;
     }
@@ -67,4 +58,8 @@ public class OperatorInfo {
     public void setVertexConfig(StreamConfig vertexConfig) {
         this.vertexConfig = vertexConfig;
     }
+
+    public OperatorID getOperatorId() {
+        return operatorId;
+    }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index aaf963a6bd2..ce3de3c5e46 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -1010,9 +1010,13 @@ class StreamingJobGraphGeneratorTest {
                 streamGraph.getStreamNodes().stream()
                         .sorted(Comparator.comparingInt(StreamNode::getId))
                         .collect(Collectors.toList());
-        assertThat(areOperatorsChainable(streamNodes.get(0), 
streamNodes.get(1), streamGraph))
+        assertThat(
+                        areOperatorsChainable(
+                                streamNodes.get(0), streamNodes.get(1), 
streamGraph, false))
                 .isTrue();
-        assertThat(areOperatorsChainable(streamNodes.get(1), 
streamNodes.get(2), streamGraph))
+        assertThat(
+                        areOperatorsChainable(
+                                streamNodes.get(1), streamNodes.get(2), 
streamGraph, false))
                 .isFalse();
     }
 
@@ -1058,9 +1062,13 @@ class StreamingJobGraphGeneratorTest {
                 streamGraph.getStreamNodes().stream()
                         .sorted(Comparator.comparingInt(StreamNode::getId))
                         .collect(Collectors.toList());
-        assertThat(areOperatorsChainable(streamNodes.get(0), 
streamNodes.get(1), streamGraph))
+        assertThat(
+                        areOperatorsChainable(
+                                streamNodes.get(0), streamNodes.get(1), 
streamGraph, false))
                 .isFalse();
-        assertThat(areOperatorsChainable(streamNodes.get(1), 
streamNodes.get(2), streamGraph))
+        assertThat(
+                        areOperatorsChainable(
+                                streamNodes.get(1), streamNodes.get(2), 
streamGraph, false))
                 .isTrue();
     }
 

Reply via email to