xintongsong commented on code in PR #21419:
URL: https://github.com/apache/flink/pull/21419#discussion_r1043991638
##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -605,6 +605,17 @@ public enum SchedulerType {
code(SchedulerType.AdaptiveBatch.name()))
.build());
+ @Documentation.Section({
+ Documentation.Sections.EXPERT_SCHEDULING,
+ Documentation.Sections.ALL_JOB_MANAGER
+ })
+ public static final ConfigOption<Boolean> ONLY_CONSUME_FINISHED_PARTITION =
+
key("jobmanager.adaptive-batch-scheduler.only-consume-finished-partition")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Controls whether the scheduler only allows
downstream task consume finished partition.");
Review Comment:
I think this is probably a configuration of hybrid shuffle, not adaptive
batch scheduler.
- This config option can also be supported by other schedulers. We just
decide to only support adaptive batch scheduler atm.
- Pipelined shuffle always consumes unfinished partitions, and blocking
shuffle always consumes finished partitions. This config option only makes
sense for hybrid shuffle.
I'd suggest the config key
`taskmanager.network.hybrid-shuffle.only-consume-finished-partition` and move
it to `NettyShuffleEnvironmentOptions`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/InternalExecutionGraphAccessor.java:
##########
@@ -120,4 +120,6 @@ void notifySchedulerNgAboutInternalTaskFailure(
/** Get the shuffle descriptors of the cluster partitions ordered by
partition number. */
List<ShuffleDescriptor> getClusterPartitionShuffleDescriptors(
IntermediateDataSetID intermediateResultPartition);
+
+ boolean isOnlyConsumeFinishedPartition();
Review Comment:
This should not be a property of execution graph. I think it's rather a
property of result partition. We can read the configuration in
`StreamingJobGraphGenerator`, and pass it all the way to the
`IntermediateResultPartition`.
##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -605,6 +605,17 @@ public enum SchedulerType {
code(SchedulerType.AdaptiveBatch.name()))
.build());
+ @Documentation.Section({
+ Documentation.Sections.EXPERT_SCHEDULING,
+ Documentation.Sections.ALL_JOB_MANAGER
+ })
+ public static final ConfigOption<Boolean> ONLY_CONSUME_FINISHED_PARTITION =
+
key("jobmanager.adaptive-batch-scheduler.only-consume-finished-partition")
+ .booleanType()
+ .defaultValue(false)
Review Comment:
I'd suggest `noDefaultValue()`. If not explicitly specified by users, it
should behaves differently according to the conditions. E.g., `true` if
speculative is enabled, `false` otherwise.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]