wanglijie95 commented on code in PR #22686:
URL: https://github.com/apache/flink/pull/22686#discussion_r1228936755


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java:
##########
@@ -118,36 +118,44 @@ public static DefaultExecutionGraph buildGraph(
                 
PartitionGroupReleaseStrategyFactoryLoader.loadPartitionGroupReleaseStrategyFactory(
                         jobManagerConfig);
 
-        // create a new execution graph, if none exists so far
-        final DefaultExecutionGraph executionGraph;
+        final TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory;
         try {
-            executionGraph =
-                    new DefaultExecutionGraph(
-                            jobInformation,
-                            futureExecutor,
-                            ioExecutor,
-                            rpcTimeout,
-                            executionHistorySizeLimit,
-                            classLoader,
-                            blobWriter,
-                            partitionGroupReleaseStrategyFactory,
-                            shuffleMaster,
-                            partitionTracker,
+            taskDeploymentDescriptorFactory =
+                    new TaskDeploymentDescriptorFactory(
+                            BlobWriter.serializeAndTryOffload(jobInformation, 
jobId, blobWriter),
+                            jobId,
                             partitionLocationConstraint,
-                            executionDeploymentListener,
-                            executionStateUpdateListener,
-                            initializationTimestamp,
-                            vertexAttemptNumberStore,
-                            vertexParallelismStore,
-                            isDynamicGraph,
-                            executionJobVertexFactory,
-                            jobGraph.getJobStatusHooks(),
-                            markPartitionFinishedStrategy,
+                            blobWriter,
                             nonFinishedHybridPartitionShouldBeUnknown);
         } catch (IOException e) {
-            throw new JobException("Could not create the ExecutionGraph.", e);
+            throw new JobException("Could not create the 
TaskDeploymentDescriptorFactory.", e);
         }
 
+        // create a new execution graph, if none exists so far
+        final DefaultExecutionGraph executionGraph;
+        executionGraph =

Review Comment:
   This 2 lines can be merged into 1.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java:
##########
@@ -400,7 +385,7 @@ public DefaultExecutionGraph(
 
         this.markPartitionFinishedStrategy = markPartitionFinishedStrategy;
 
-        this.nonFinishedHybridPartitionShouldBeUnknown = 
nonFinishedHybridPartitionShouldBeUnknown;
+        this.taskDeploymentDescriptorFactory = taskDeploymentDescriptorFactory;

Review Comment:
   maybe add `checkNotNull`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to