zhuzhurk commented on code in PR #21672:
URL: https://github.com/apache/flink/pull/21672#discussion_r1089682373


##########
flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java:
##########
@@ -282,23 +281,22 @@ public void 
testLoadingDefaultKryoSerializersFromConfiguration() {
     }
 
     @Test
-    public void testLoadingIsDynamicGraphFromConfiguration() {
-        testLoadingIsDynamicGraphFromConfiguration(
-                JobManagerOptions.SchedulerType.AdaptiveBatch, true);
-        
testLoadingIsDynamicGraphFromConfiguration(JobManagerOptions.SchedulerType.Default,
 false);
-        
testLoadingIsDynamicGraphFromConfiguration(JobManagerOptions.SchedulerType.Adaptive,
 false);
+    public void testLoadingSchedulerTypeFromConfiguration() {
+        
testLoadingSchedulerTypeFromConfiguration(JobManagerOptions.SchedulerType.AdaptiveBatch);
+        
testLoadingSchedulerTypeFromConfiguration(JobManagerOptions.SchedulerType.Default);
+        
testLoadingSchedulerTypeFromConfiguration(JobManagerOptions.SchedulerType.Adaptive);
     }
 
-    private void testLoadingIsDynamicGraphFromConfiguration(
-            JobManagerOptions.SchedulerType schedulerType, boolean 
expectIsDynamicGraph) {
+    private void testLoadingSchedulerTypeFromConfiguration(
+            JobManagerOptions.SchedulerType schedulerType) {
         Configuration configuration = new Configuration();
         configuration.set(JobManagerOptions.SCHEDULER, schedulerType);
 
         ExecutionConfig configFromConfiguration = new ExecutionConfig();
         configFromConfiguration.configure(
                 configuration, Thread.currentThread().getContextClassLoader());
 
-        assertThat(configFromConfiguration.isDynamicGraph(), 
is(expectIsDynamicGraph));
+        assertEquals(configFromConfiguration.getSchedulerType().get(), 
schedulerType);

Review Comment:
   We need to migrate this test class to JUnit5 since we are touching the 
assertion part.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java:
##########
@@ -223,6 +222,21 @@ public static DefaultSlotPoolServiceSchedulerFactory 
fromConfiguration(
                 slotPoolServiceFactory, schedulerNGFactory);
     }
 
+    private static JobManagerOptions.SchedulerType getSchedulerType(
+            Configuration configuration, JobType jobType, boolean isDynamic) {
+        if (configuration.get(JobManagerOptions.SCHEDULER_MODE)
+                == SchedulerExecutionMode.REACTIVE) {
+            return JobManagerOptions.SchedulerType.Adaptive;
+        } else {
+            return configuration
+                    .getOptional(JobManagerOptions.SCHEDULER)
+                    .orElse(
+                            jobType == JobType.BATCH && isDynamic
+                                    ? 
JobManagerOptions.SchedulerType.AdaptiveBatch
+                                    : JobManagerOptions.SchedulerType.Default);

Review Comment:
   The branch of 
`System.getProperties().containsKey("flink.tests.enable-adaptive-scheduler")` 
is not covered.



##########
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java:
##########
@@ -205,11 +207,12 @@ public class ClusterOptions {
                                             
TextElement.code(PROCESS_WORKING_DIR_BASE.key()))
                                     .build());
 
-    public static JobManagerOptions.SchedulerType 
getSchedulerType(Configuration configuration) {
+    public static Optional<JobManagerOptions.SchedulerType> getSchedulerType(

Review Comment:
   This change is not reverted.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/utils/BatchAbstractTestBase.java:
##########
@@ -47,6 +48,7 @@ public class BatchAbstractTestBase extends TestLogger {
     private static Configuration getConfiguration() {
         Configuration config = new Configuration();
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("100m"));
+        config.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Default);

Review Comment:
   Why is it needed?



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java:
##########
@@ -69,6 +70,7 @@ public abstract class AbstractTestBase extends TestLogger {
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(1)
                             .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+                            
.setScheduler(JobManagerOptions.SchedulerType.Default)

Review Comment:
   Is this change still needed?



##########
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/DagOptimizationTest.xml:
##########
@@ -273,7 +273,7 @@ Calc(select=[a AS a1, b, c AS c2])(reuse_id=[3])
    :- Exchange(distribution=[hash[a]])
    :  +- Calc(select=[a, b], where=[(a <= 10)])
    :     +- Reused(reference_id=[1])
-   +- Exchange(distribution=[hash[a2]], shuffle_mode=[BATCH])

Review Comment:
   What change leads to this difference?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java:
##########
@@ -147,7 +146,7 @@ public static DefaultSlotPoolServiceSchedulerFactory create(
     }
 
     public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration(
-            Configuration configuration, JobType jobType) {
+            Configuration configuration, JobType jobType, boolean isDynamic) {

Review Comment:
   Here I prefer to name it as `isDynamicGraph`, because this logic is not in 
StreamGraph/JobGraph.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DynamicFilteringITCase.java:
##########
@@ -48,9 +48,7 @@ public class DynamicFilteringITCase extends BatchTestBase {
     private Catalog catalog;
 
     static Stream<Arguments> parameters() {
-        return Stream.of(
-                Arguments.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING),
-                Arguments.of(BatchShuffleMode.ALL_EXCHANGES_PIPELINED));
+        return 
Stream.of(Arguments.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING));

Review Comment:
   Is it possible that we introduce a parameter pair (scheduler-type, 
exchange-type), including
   [(Default, ALL_EXCHANGES_PIPELINED), (Default, ALL_EXCHANGES_BLOCKING), 
(AdaptiveBatch, ALL_EXCHANGES_BLOCKING)]?
   Since the scheduler type also needs to be part of the cluster config, this 
may happen in a test base.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to