zhuzhurk commented on code in PR #21672:
URL: https://github.com/apache/flink/pull/21672#discussion_r1071037676
##########
flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java:
##########
@@ -298,7 +298,11 @@ private void testLoadingIsDynamicGraphFromConfiguration(
configFromConfiguration.configure(
configuration, Thread.currentThread().getContextClassLoader());
- assertThat(configFromConfiguration.isDynamicGraph(),
is(expectIsDynamicGraph));
+ assertThat(
+ configFromConfiguration.getSchedulerType().isPresent()
+ && configFromConfiguration.getSchedulerType().get()
+ ==
JobManagerOptions.SchedulerType.AdaptiveBatch,
+ is(expectIsDynamicGraph));
Review Comment:
We should rework this test, by renaming it to
`testLoadingSchedulerTypeFromConfiguration` and check whether the expected
scheduler type is set.
##########
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java:
##########
@@ -476,8 +477,8 @@ public ExecutionConfig setScheduler(SchedulerType
schedulerType) {
}
Review Comment:
Let's add a hotfix to remove this method like the `TODO` suggested. It can
be replaced in tests by using `ExecutionConfig#configure(...)`.
##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/AbstractTestBase.java:
##########
@@ -69,6 +70,7 @@ public abstract class AbstractTestBase extends TestLogger {
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+
.setScheduler(JobManagerOptions.SchedulerType.Default)
Review Comment:
This is not the correct way I think.
We should rather have the batch job tests to test adaptive batch scheduler
by default, instead of testing default scheduler. Because adaptive batch
scheduler will be used in production by default for batch jobs.
##########
flink-tests/src/test/java/org/apache/flink/test/scheduling/AdaptiveBatchSchedulerITCase.java:
##########
@@ -103,8 +103,6 @@ private void executeJob(Boolean isFineGrained) throws
Exception {
final Configuration configuration = new Configuration();
configuration.setString(RestOptions.BIND_PORT, "0");
configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
- configuration.set(
- JobManagerOptions.SCHEDULER,
JobManagerOptions.SchedulerType.AdaptiveBatch);
Review Comment:
We should ensure the IT cases of default scheduler still run with default
scheduler, e.g. PipelinedRegionSchedulingITCase.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1362,7 +1363,8 @@ void testRequestPartitionState() throws Exception {
// finish the producer task
jobMasterGateway
.updateTaskExecutionState(
- new TaskExecutionState(executionAttemptId,
ExecutionState.FINISHED))
+
SchedulerTestingUtils.createFinishedTaskExecutionState(
+ executionAttemptId))
Review Comment:
Why is this change(and the below one) needed?
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -139,6 +139,8 @@ public class StreamGraph implements Pipeline {
private final List<JobStatusHook> jobStatusHooks = new ArrayList<>();
+ private boolean enableDynamicGraph;
Review Comment:
I prefer to rename it as `dynamic`, and then use
`isDynamic()`/`setDynamic(boolean)` for the getter/setter.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java:
##########
@@ -89,6 +89,7 @@ public class JobGraph implements Serializable {
private JobType jobType = JobType.BATCH;
+ private boolean enableDataStream;
Review Comment:
I prefer to name it as `dynamic`, similar to another comment.
It should be set to true in `StreamingJobGraphGenerator` only if the
`StreamGraph` is dynamic.
Besides that, it's better to add one new line below this field.
##########
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java:
##########
@@ -205,11 +207,12 @@ public class ClusterOptions {
TextElement.code(PROCESS_WORKING_DIR_BASE.key()))
.build());
- public static JobManagerOptions.SchedulerType
getSchedulerType(Configuration configuration) {
+ public static Optional<JobManagerOptions.SchedulerType> getSchedulerType(
Review Comment:
This is a public interface by-law so that we should not change it.
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -220,6 +220,7 @@ private StreamingJobGraphGenerator(
private JobGraph createJobGraph() {
preValidate();
jobGraph.setJobType(streamGraph.getJobType());
+ jobGraph.enableDataStream(true);
Review Comment:
As mentioned in another comment, can be
```
jobGraph.setDynamic(streamGraph.isDynamic());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]