XComp commented on a change in pull request #18536:
URL: https://github.com/apache/flink/pull/18536#discussion_r796627322



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -162,34 +162,98 @@ public Dispatcher(
             DispatcherBootstrapFactory dispatcherBootstrapFactory,
             DispatcherServices dispatcherServices)
             throws Exception {
+        this(
+                rpcService,
+                fencingToken,
+                recoveredJobs,
+                recoveredDirtyJobs,
+                dispatcherBootstrapFactory,
+                dispatcherServices,
+                new JobManagerRunnerRegistry(16));
+    }
+
+    private Dispatcher(
+            RpcService rpcService,
+            DispatcherId fencingToken,
+            Collection<JobGraph> recoveredJobs,
+            Collection<JobResult> globallyTerminatedJobs,
+            DispatcherBootstrapFactory dispatcherBootstrapFactory,
+            DispatcherServices dispatcherServices,
+            JobManagerRunnerRegistry jobManagerRunnerRegistry)
+            throws Exception {
+        this(
+                rpcService,
+                fencingToken,
+                recoveredJobs,
+                globallyTerminatedJobs,
+                dispatcherServices.getConfiguration(),
+                dispatcherServices.getHighAvailabilityServices(),
+                dispatcherServices.getResourceManagerGatewayRetriever(),
+                dispatcherServices.getHeartbeatServices(),
+                dispatcherServices.getBlobServer(),
+                dispatcherServices.getFatalErrorHandler(),
+                dispatcherServices.getJobGraphWriter(),
+                dispatcherServices.getJobResultStore(),
+                dispatcherServices.getJobManagerMetricGroup(),
+                dispatcherServices.getMetricQueryServiceAddress(),
+                dispatcherServices.getIoExecutor(),
+                dispatcherServices.getHistoryServerArchivist(),
+                dispatcherServices.getArchivedExecutionGraphStore(),
+                dispatcherServices.getJobManagerRunnerFactory(),
+                dispatcherBootstrapFactory,
+                dispatcherServices.getOperationCaches(),
+                jobManagerRunnerRegistry);
+    }
+
+    private Dispatcher(
+            RpcService rpcService,
+            DispatcherId fencingToken,
+            Collection<JobGraph> recoveredJobs,
+            Collection<JobResult> recoveredDirtyJobs,
+            Configuration configuration,
+            HighAvailabilityServices highAvailabilityServices,
+            GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            HeartbeatServices heartbeatServices,
+            BlobServer blobServer,
+            FatalErrorHandler fatalErrorHandler,
+            JobGraphWriter jobGraphWriter,
+            JobResultStore jobResultStore,
+            JobManagerMetricGroup jobManagerMetricGroup,
+            @Nullable String metricServiceQueryAddress,
+            Executor ioExecutor,
+            HistoryServerArchivist historyServerArchivist,
+            ExecutionGraphInfoStore executionGraphInfoStore,
+            JobManagerRunnerFactory jobManagerRunnerFactory,
+            DispatcherBootstrapFactory dispatcherBootstrapFactory,
+            DispatcherOperationCaches dispatcherOperationCaches,
+            JobManagerRunnerRegistry jobManagerRunnerRegistry)
+            throws Exception {
         super(rpcService, RpcServiceUtils.createRandomName(DISPATCHER_NAME), 
fencingToken);
-        checkNotNull(dispatcherServices);
         assertRecoveredJobsAndDirtyJobResults(recoveredJobs, 
recoveredDirtyJobs);
 
-        this.configuration = dispatcherServices.getConfiguration();
-        this.highAvailabilityServices = 
dispatcherServices.getHighAvailabilityServices();
-        this.resourceManagerGatewayRetriever =
-                dispatcherServices.getResourceManagerGatewayRetriever();
-        this.heartbeatServices = dispatcherServices.getHeartbeatServices();
-        this.blobServer = dispatcherServices.getBlobServer();
-        this.fatalErrorHandler = dispatcherServices.getFatalErrorHandler();
-        this.jobGraphWriter = dispatcherServices.getJobGraphWriter();
-        this.jobResultStore = dispatcherServices.getJobResultStore();
-        this.jobManagerMetricGroup = 
dispatcherServices.getJobManagerMetricGroup();
-        this.metricServiceQueryAddress = 
dispatcherServices.getMetricQueryServiceAddress();
-        this.ioExecutor = dispatcherServices.getIoExecutor();
+        this.configuration = checkNotNull(configuration);
+        this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
+        this.resourceManagerGatewayRetriever = 
checkNotNull(resourceManagerGatewayRetriever);
+        this.heartbeatServices = checkNotNull(heartbeatServices);
+        this.blobServer = checkNotNull(blobServer);
+        this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
+        this.jobGraphWriter = checkNotNull(jobGraphWriter);
+        this.jobResultStore = checkNotNull(jobResultStore);
+        this.jobManagerMetricGroup = checkNotNull(jobManagerMetricGroup);
+        this.metricServiceQueryAddress = metricServiceQueryAddress;
+        this.ioExecutor = checkNotNull(ioExecutor);

Review comment:
       I considered the null checks as missing. That's why I added it. This 
change got reverted.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to