zhuzhurk commented on code in PR #25498:
URL: https://github.com/apache/flink/pull/25498#discussion_r1797644232


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java:
##########
@@ -176,6 +187,11 @@ public static JobManagerSharedServices fromConfiguration(
                 Executors.newFixedThreadPool(
                         jobManagerIoPoolSize, new 
ExecutorThreadFactory("jobmanager-io"));
 
+        ExecutorService serializationExecutor =
+                Executors.newFixedThreadPool(
+                        numberCPUCores,
+                        new 
ExecutorThreadFactory("flink-operator-serialization-io"));

Review Comment:
   What's this change for? Is it needed in this PR?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -515,20 +515,21 @@ private void stopDispatcherServices() throws Exception {
     // ------------------------------------------------------
 
     @Override
-    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, 
Duration timeout) {
-        final JobID jobID = jobGraph.getJobID();
+    public CompletableFuture<Acknowledge> submitJob(ExecutionPlan 
executionPlan, Duration timeout) {
+        final JobID jobID = executionPlan.getJobID();
         try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
-            log.info("Received JobGraph submission '{}' ({}).", 
jobGraph.getName(), jobID);
+            log.info(
+                    "Received ExecutionPlan submission '{}' ({}).", 
executionPlan.getName(), jobID);
         }
         return isInGloballyTerminalState(jobID)
                 .thenComposeAsync(
                         isTerminated -> {
                             if (isTerminated) {
                                 log.warn(
-                                        "Ignoring JobGraph submission '{}' 
({}) because the job already "
+                                        "Ignoring ExecutionPlan submission 
'{}' ({}) because the job already "

Review Comment:
   ExecutionPlan -> job



##########
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterITCase.java:
##########
@@ -60,7 +60,7 @@ public void testRejectionOfEmptyJobGraphs() throws Exception {
             miniCluster.getMiniCluster().submitJob(jobGraph).get();
             fail("Expect failure");
         } catch (Throwable t) {
-            assertThat(t, containsMessage("The given job is empty"));
+            assertThat(t, containsMessage("The given execution plan is 
empty"));

Review Comment:
   execution plan -> job



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##########
@@ -515,20 +515,21 @@ private void stopDispatcherServices() throws Exception {
     // ------------------------------------------------------
 
     @Override
-    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, 
Duration timeout) {
-        final JobID jobID = jobGraph.getJobID();
+    public CompletableFuture<Acknowledge> submitJob(ExecutionPlan 
executionPlan, Duration timeout) {
+        final JobID jobID = executionPlan.getJobID();
         try (MdcCloseable ignored = 
MdcUtils.withContext(MdcUtils.asContextData(jobID))) {
-            log.info("Received JobGraph submission '{}' ({}).", 
jobGraph.getName(), jobID);
+            log.info(
+                    "Received ExecutionPlan submission '{}' ({}).", 
executionPlan.getName(), jobID);

Review Comment:
   ExecutionPlan -> job
   
   I it's more common and user friendly.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobMasterServiceLeadershipRunnerFactory.java:
##########
@@ -64,19 +64,19 @@ public JobManagerRunner createJobManagerRunner(
             long initializationTimestamp)
             throws Exception {
 
-        checkArgument(!jobGraph.isEmpty(), "The given job is empty");
+        checkArgument(!executionPlan.isEmpty(), "The given execution plan is 
empty");

Review Comment:
   execution plan -> job



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobSubmitHeaders.java:
##########
@@ -26,6 +27,8 @@
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 /** These headers define the protocol for submitting a job to a flink cluster. 
*/
[email protected](
+        "This API is not exposed to the users, as ExecutionPlan is used only 
internally.")

Review Comment:
   -> "This API is for Flink client only and should not be exposed to users, as 
it relies on internal classes."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to