zhuzhurk commented on code in PR #21814:
URL: https://github.com/apache/flink/pull/21814#discussion_r1095634172


##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactoryTest.java:
##########
@@ -45,7 +46,7 @@
 public class DefaultSlotPoolServiceSchedulerFactoryTest {
 
     @Test
-    public void testFallsBackToDefaultSchedulerIfBatchJob() {
+    public void testFallsBackToDefaultSchedulerIfAdaptiveSchedulerInBatchJob() 
{

Review Comment:
   -> testFallsBackToAdaptiveBatchSchedulerIfBatchJob



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java:
##########
@@ -224,20 +210,52 @@ public static DefaultSlotPoolServiceSchedulerFactory 
fromConfiguration(
 
     private static JobManagerOptions.SchedulerType getSchedulerType(
             Configuration configuration, JobType jobType, boolean 
isDynamicGraph) {
-        if (configuration.get(JobManagerOptions.SCHEDULER_MODE)
-                == SchedulerExecutionMode.REACTIVE) {
-            return JobManagerOptions.SchedulerType.Adaptive;
+        JobManagerOptions.SchedulerType schedulerType;
+        if (jobType == JobType.BATCH) {
+            if (configuration.get(JobManagerOptions.SCHEDULER_MODE)
+                            == SchedulerExecutionMode.REACTIVE
+                    || configuration.get(JobManagerOptions.SCHEDULER)
+                            == JobManagerOptions.SchedulerType.Adaptive) {
+                LOG.info(
+                        "Adaptive Scheduler configured, but Batch job 
detected. Changing scheduler type to 'AdaptiveBatch'.");
+                // overwrite
+                schedulerType = JobManagerOptions.SchedulerType.AdaptiveBatch;
+            } else {
+                schedulerType =
+                        configuration
+                                .getOptional(JobManagerOptions.SCHEDULER)
+                                .orElse(
+                                        isDynamicGraph
+                                                ? 
JobManagerOptions.SchedulerType.AdaptiveBatch
+                                                : 
JobManagerOptions.SchedulerType.Default);
+            }

Review Comment:
   ```suggestion
               
                   schedulerType =
                           configuration
                                   .getOptional(JobManagerOptions.SCHEDULER)
                                   .orElse(
                                           isDynamicGraph
                                                   ? 
JobManagerOptions.SchedulerType.AdaptiveBatch
                                                   : 
JobManagerOptions.SchedulerType.Default);
                   
                   if (schedulerType = 
JobManagerOptions.SchedulerType.Adaptive) {
                       LOG.info(
                               "Adaptive Scheduler configured, but Batch job 
detected. Changing scheduler type to 'AdaptiveBatch'.");
                       schedulerType = 
JobManagerOptions.SchedulerType.AdaptiveBatch;
                   }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java:
##########
@@ -224,20 +210,52 @@ public static DefaultSlotPoolServiceSchedulerFactory 
fromConfiguration(
 
     private static JobManagerOptions.SchedulerType getSchedulerType(
             Configuration configuration, JobType jobType, boolean 
isDynamicGraph) {
-        if (configuration.get(JobManagerOptions.SCHEDULER_MODE)
-                == SchedulerExecutionMode.REACTIVE) {
-            return JobManagerOptions.SchedulerType.Adaptive;
+        JobManagerOptions.SchedulerType schedulerType;
+        if (jobType == JobType.BATCH) {
+            if (configuration.get(JobManagerOptions.SCHEDULER_MODE)
+                            == SchedulerExecutionMode.REACTIVE
+                    || configuration.get(JobManagerOptions.SCHEDULER)
+                            == JobManagerOptions.SchedulerType.Adaptive) {
+                LOG.info(
+                        "Adaptive Scheduler configured, but Batch job 
detected. Changing scheduler type to 'AdaptiveBatch'.");
+                // overwrite
+                schedulerType = JobManagerOptions.SchedulerType.AdaptiveBatch;
+            } else {
+                schedulerType =
+                        configuration
+                                .getOptional(JobManagerOptions.SCHEDULER)
+                                .orElse(
+                                        isDynamicGraph
+                                                ? 
JobManagerOptions.SchedulerType.AdaptiveBatch
+                                                : 
JobManagerOptions.SchedulerType.Default);
+            }

Review Comment:
   I think it's not needed to print that log if reactive mode is enabled for a 
batch job.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to