zhuzhurk commented on code in PR #21787:
URL: https://github.com/apache/flink/pull/21787#discussion_r1091455797


##########
flink-core/src/main/java/org/apache/flink/configuration/BatchExecutionOptions.java:
##########
@@ -32,4 +37,106 @@ public class BatchExecutionOptions {
                     .defaultValue(true)
                     .withDescription(
                             "If true, Flink will automatically decide the 
parallelism of operators in batch jobs.");
+
+    @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+    public static final ConfigOption<Integer> 
ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM =
+            key("execution.batch.adaptive.auto-parallelism.min-parallelism")
+                    .intType()
+                    .defaultValue(1)
+                    
.withDeprecatedKeys("jobmanager.adaptive-batch-scheduler.min-parallelism")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The lower bound of allowed 
parallelism to set adaptively if %s has been set to %s",
+                                            code(SCHEDULER.key()),
+                                            code(
+                                                    
JobManagerOptions.SchedulerType.AdaptiveBatch
+                                                            .name()))
+                                    .build());
+
+    @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+    public static final ConfigOption<Integer> 
ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM =
+            key("execution.batch.adaptive.auto-parallelism.max-parallelism")
+                    .intType()
+                    .defaultValue(128)
+                    
.withDeprecatedKeys("jobmanager.adaptive-batch-scheduler.max-parallelism")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The upper bound of allowed 
parallelism to set adaptively if %s has been set to %s",
+                                            code(SCHEDULER.key()),
+                                            code(
+                                                    
JobManagerOptions.SchedulerType.AdaptiveBatch
+                                                            .name()))
+                                    .build());
+
+    @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+    public static final ConfigOption<MemorySize>
+            ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK =
+                    
key("execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task")
+                            .memoryType()
+                            .defaultValue(MemorySize.ofMebiBytes(16))
+                            .withDeprecatedKeys(
+                                    
"jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task")
+                            .withDescription(
+                                    Description.builder()
+                                            .text(
+                                                    "The average size of data 
volume to expect each task instance to process if %s has been set to %s. "
+                                                            + "Note that when 
data skew occurs or the decided parallelism reaches the %s (due to too much 
data), "
+                                                            + "the data 
actually processed by some tasks may far exceed this value.",
+                                                    code(SCHEDULER.key()),
+                                                    code(
+                                                            
JobManagerOptions.SchedulerType
+                                                                    
.AdaptiveBatch.name()),
+                                                    code(
+                                                            
ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM
+                                                                    .key()))
+                                            .build());
+
+    @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+    public static final ConfigOption<Integer> 
ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM =
+            
key("execution.batch.adaptive.auto-parallelism.default-source-parallelism")
+                    .intType()
+                    .defaultValue(1)
+                    .withDeprecatedKeys(
+                            
"jobmanager.adaptive-batch-scheduler.default-source-parallelism")
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "The default parallelism of source 
vertices if %s has been set to %s",
+                                            code(SCHEDULER.key()),
+                                            code(
+                                                    
JobManagerOptions.SchedulerType.AdaptiveBatch
+                                                            .name()))
+                                    .build());
+
+    @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+    public static final ConfigOption<Boolean> SPECULATIVE_ENABLED =
+            key("execution.batch.adaptive.speculative.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    
.withDeprecatedKeys("jobmanager.adaptive-batch-scheduler.speculative.enabled")
+                    .withDescription("Controls whether to enable speculative 
execution.");
+
+    @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+    public static final ConfigOption<Integer> 
SPECULATIVE_MAX_CONCURRENT_EXECUTIONS =
+            key("execution.batch.speculative.max-concurrent-executions")
+                    .intType()
+                    .defaultValue(2)
+                    .withDeprecatedKeys(
+                            
"jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions")
+                    .withDescription(
+                            "Controls the maximum number of execution attempts 
of each operator "
+                                    + "that can execute concurrently, 
including the original one "
+                                    + "and speculative ones.");
+
+    @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+    public static final ConfigOption<Duration> BLOCK_SLOW_NODE_DURATION =
+            key("execution.batch.speculative.block-slow-node-duration")
+                    .durationType()
+                    .defaultValue(Duration.ofMinutes(1))
+                    .withDeprecatedKeys(
+                            
"jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration")
+                    .withDescription(
+                            "Controls how long an detected slow node should be 
blocked for.");
 }

Review Comment:
   A private constructor should be added. A reference can be 
`RestartStrategyOptions`.



##########
flink-core/src/main/java/org/apache/flink/configuration/BatchExecutionOptions.java:
##########
@@ -19,8 +19,13 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.JobManagerOptions.SCHEDULER;
+import static org.apache.flink.configuration.description.TextElement.code;
 
 /** Configuration options for the batch job execution. */
 public class BatchExecutionOptions {

Review Comment:
   This class should be marked with `@PublicEvolving`



##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -602,20 +604,18 @@ public enum SchedulerType {
                                             
code(SchedulerType.AdaptiveBatch.name()))
                                     .build());
 
-    @Documentation.Section({
-        Documentation.Sections.EXPERT_SCHEDULING,
-        Documentation.Sections.ALL_JOB_MANAGER
-    })
+    /** @deprecated Use {@link BatchExecutionOptions#SPECULATIVE_ENABLED}. */
+    @Documentation.ExcludeFromDocumentation("Hidden for deprecated")
+    @Deprecated

Review Comment:
   It's better to place `@Deprecated` in the first place of annotations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to