zhuzhurk commented on code in PR #21787:
URL: https://github.com/apache/flink/pull/21787#discussion_r1091455797
##########
flink-core/src/main/java/org/apache/flink/configuration/BatchExecutionOptions.java:
##########
@@ -32,4 +37,106 @@ public class BatchExecutionOptions {
.defaultValue(true)
.withDescription(
"If true, Flink will automatically decide the
parallelism of operators in batch jobs.");
+
+ @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+ public static final ConfigOption<Integer>
ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM =
+ key("execution.batch.adaptive.auto-parallelism.min-parallelism")
+ .intType()
+ .defaultValue(1)
+
.withDeprecatedKeys("jobmanager.adaptive-batch-scheduler.min-parallelism")
+ .withDescription(
+ Description.builder()
+ .text(
+ "The lower bound of allowed
parallelism to set adaptively if %s has been set to %s",
+ code(SCHEDULER.key()),
+ code(
+
JobManagerOptions.SchedulerType.AdaptiveBatch
+ .name()))
+ .build());
+
+ @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+ public static final ConfigOption<Integer>
ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM =
+ key("execution.batch.adaptive.auto-parallelism.max-parallelism")
+ .intType()
+ .defaultValue(128)
+
.withDeprecatedKeys("jobmanager.adaptive-batch-scheduler.max-parallelism")
+ .withDescription(
+ Description.builder()
+ .text(
+ "The upper bound of allowed
parallelism to set adaptively if %s has been set to %s",
+ code(SCHEDULER.key()),
+ code(
+
JobManagerOptions.SchedulerType.AdaptiveBatch
+ .name()))
+ .build());
+
+ @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+ public static final ConfigOption<MemorySize>
+ ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK =
+
key("execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(16))
+ .withDeprecatedKeys(
+
"jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task")
+ .withDescription(
+ Description.builder()
+ .text(
+ "The average size of data
volume to expect each task instance to process if %s has been set to %s. "
+ + "Note that when
data skew occurs or the decided parallelism reaches the %s (due to too much
data), "
+ + "the data
actually processed by some tasks may far exceed this value.",
+ code(SCHEDULER.key()),
+ code(
+
JobManagerOptions.SchedulerType
+
.AdaptiveBatch.name()),
+ code(
+
ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM
+ .key()))
+ .build());
+
+ @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+ public static final ConfigOption<Integer>
ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM =
+
key("execution.batch.adaptive.auto-parallelism.default-source-parallelism")
+ .intType()
+ .defaultValue(1)
+ .withDeprecatedKeys(
+
"jobmanager.adaptive-batch-scheduler.default-source-parallelism")
+ .withDescription(
+ Description.builder()
+ .text(
+ "The default parallelism of source
vertices if %s has been set to %s",
+ code(SCHEDULER.key()),
+ code(
+
JobManagerOptions.SchedulerType.AdaptiveBatch
+ .name()))
+ .build());
+
+ @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+ public static final ConfigOption<Boolean> SPECULATIVE_ENABLED =
+ key("execution.batch.adaptive.speculative.enabled")
+ .booleanType()
+ .defaultValue(false)
+
.withDeprecatedKeys("jobmanager.adaptive-batch-scheduler.speculative.enabled")
+ .withDescription("Controls whether to enable speculative
execution.");
+
+ @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+ public static final ConfigOption<Integer>
SPECULATIVE_MAX_CONCURRENT_EXECUTIONS =
+ key("execution.batch.speculative.max-concurrent-executions")
+ .intType()
+ .defaultValue(2)
+ .withDeprecatedKeys(
+
"jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions")
+ .withDescription(
+ "Controls the maximum number of execution attempts
of each operator "
+ + "that can execute concurrently,
including the original one "
+ + "and speculative ones.");
+
+ @Documentation.Section({Documentation.Sections.EXPERT_SCHEDULING})
+ public static final ConfigOption<Duration> BLOCK_SLOW_NODE_DURATION =
+ key("execution.batch.speculative.block-slow-node-duration")
+ .durationType()
+ .defaultValue(Duration.ofMinutes(1))
+ .withDeprecatedKeys(
+
"jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration")
+ .withDescription(
+ "Controls how long an detected slow node should be
blocked for.");
}
Review Comment:
A private constructor should be added. A reference can be
`RestartStrategyOptions`.
##########
flink-core/src/main/java/org/apache/flink/configuration/BatchExecutionOptions.java:
##########
@@ -19,8 +19,13 @@
package org.apache.flink.configuration;
import org.apache.flink.annotation.docs.Documentation;
+import org.apache.flink.configuration.description.Description;
+
+import java.time.Duration;
import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.JobManagerOptions.SCHEDULER;
+import static org.apache.flink.configuration.description.TextElement.code;
/** Configuration options for the batch job execution. */
public class BatchExecutionOptions {
Review Comment:
This class should be marked with `@PublicEvolving`
##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -602,20 +604,18 @@ public enum SchedulerType {
code(SchedulerType.AdaptiveBatch.name()))
.build());
- @Documentation.Section({
- Documentation.Sections.EXPERT_SCHEDULING,
- Documentation.Sections.ALL_JOB_MANAGER
- })
+ /** @deprecated Use {@link BatchExecutionOptions#SPECULATIVE_ENABLED}. */
+ @Documentation.ExcludeFromDocumentation("Hidden for deprecated")
+ @Deprecated
Review Comment:
It's better to place `@Deprecated` in the first place of annotations.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]