zhuzhurk commented on code in PR #21672:
URL: https://github.com/apache/flink/pull/21672#discussion_r1071037676


##########
flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java:
##########
@@ -298,7 +298,11 @@ private void testLoadingIsDynamicGraphFromConfiguration(
         configFromConfiguration.configure(
                 configuration, Thread.currentThread().getContextClassLoader());
 
-        assertThat(configFromConfiguration.isDynamicGraph(), 
is(expectIsDynamicGraph));
+        assertThat(
+                configFromConfiguration.getSchedulerType().isPresent()
+                        && configFromConfiguration.getSchedulerType().get()
+                                == 
JobManagerOptions.SchedulerType.AdaptiveBatch,
+                is(expectIsDynamicGraph));

Review Comment:
   We should rework this test, by renaming it to 
`testLoadingSchedulerTypeFromConfiguration` and check whether the expected 
scheduler type is set.



##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -476,8 +477,8 @@ public ExecutionConfig setScheduler(SchedulerType 
schedulerType) {
     }

Review Comment:
   Let's add a hotfix to remove this method like the `TODO` suggested. It can 
be replaced in tests by using `ExecutionConfig#configure(...)`.



##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java:
##########
@@ -69,6 +70,7 @@ public abstract class AbstractTestBase extends TestLogger {
                     new MiniClusterResourceConfiguration.Builder()
                             .setNumberTaskManagers(1)
                             .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+                            
.setScheduler(JobManagerOptions.SchedulerType.Default)

Review Comment:
   This is not the correct way I think.
   We should rather have the batch job tests to test adaptive batch scheduler 
by default, instead of testing default scheduler. Because adaptive batch 
scheduler will be used in production by default for batch jobs.



##########
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java:
##########
@@ -103,8 +103,6 @@ private void executeJob(Boolean isFineGrained) throws 
Exception {
         final Configuration configuration = new Configuration();
         configuration.setString(RestOptions.BIND_PORT, "0");
         configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
-        configuration.set(
-                JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.AdaptiveBatch);

Review Comment:
   We should ensure the IT cases of default scheduler still run with default 
scheduler, e.g. PipelinedRegionSchedulingITCase.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1362,7 +1363,8 @@ void testRequestPartitionState() throws Exception {
             // finish the producer task
             jobMasterGateway
                     .updateTaskExecutionState(
-                            new TaskExecutionState(executionAttemptId, 
ExecutionState.FINISHED))
+                            
SchedulerTestingUtils.createFinishedTaskExecutionState(
+                                    executionAttemptId))

Review Comment:
   Why is this change(and the below one) needed?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -139,6 +139,8 @@ public class StreamGraph implements Pipeline {
 
     private final List<JobStatusHook> jobStatusHooks = new ArrayList<>();
 
+    private boolean enableDynamicGraph;

Review Comment:
   I prefer to rename it as `dynamic`, and then use 
`isDynamic()`/`setDynamic(boolean)` for the getter/setter.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java:
##########
@@ -89,6 +89,7 @@ public class JobGraph implements Serializable {
 
     private JobType jobType = JobType.BATCH;
 
+    private boolean enableDataStream;

Review Comment:
   I prefer to name it as `dynamic`, similar to another comment.
   It should be set to true in `StreamingJobGraphGenerator` only if the 
`StreamGraph` is dynamic.
   
   Besides that, it's better to add one new line below this field.



##########
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java:
##########
@@ -205,11 +207,12 @@ public class ClusterOptions {
                                             
TextElement.code(PROCESS_WORKING_DIR_BASE.key()))
                                     .build());
 
-    public static JobManagerOptions.SchedulerType 
getSchedulerType(Configuration configuration) {
+    public static Optional<JobManagerOptions.SchedulerType> getSchedulerType(

Review Comment:
   This is a public interface by-law so that we should not change it.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -220,6 +220,7 @@ private StreamingJobGraphGenerator(
     private JobGraph createJobGraph() {
         preValidate();
         jobGraph.setJobType(streamGraph.getJobType());
+        jobGraph.enableDataStream(true);

Review Comment:
   As mentioned in another comment, can be
   ```
   jobGraph.setDynamic(streamGraph.isDynamic());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to