This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 851771035ed [FLINK-30875][runtime] Fix usages of legacy
AdaptiveBatchScheduler configuration
851771035ed is described below
commit 851771035ed964063a9bfec8a59b4669a58d8aa1
Author: JunRuiLee <[email protected]>
AuthorDate: Thu Feb 2 18:05:22 2023 +0800
[FLINK-30875][runtime] Fix usages of legacy AdaptiveBatchScheduler
configuration
This closes #21829.
---
flink-end-to-end-tests/test-scripts/test_tpcds.sh | 8 ++++----
.../DefaultVertexParallelismAndInputInfosDecider.java | 3 +--
.../DefaultVertexParallelismAndInputInfosDeciderTest.java | 3 +--
.../apache/flink/test/scheduling/SpeculativeSchedulerITCase.java | 2 +-
4 files changed, 7 insertions(+), 9 deletions(-)
diff --git a/flink-end-to-end-tests/test-scripts/test_tpcds.sh
b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
index 4065c927ac3..8797f7a2e0f 100755
--- a/flink-end-to-end-tests/test-scripts/test_tpcds.sh
+++ b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
@@ -66,10 +66,10 @@ function run_test() {
set_config_key "taskmanager.numberOfTaskSlots" "4"
elif [ "${scheduler}" == "AdaptiveBatch" ]; then
set_config_key "taskmanager.numberOfTaskSlots" "8"
- set_config_key "jobmanager.adaptive-batch-scheduler.max-parallelism"
"8"
- set_config_key
"jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task" "6m"
- set_config_key
"jobmanager.adaptive-batch-scheduler.speculative.enabled" "true"
- set_config_key
"jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration" "0s"
+ set_config_key
"execution.batch.adaptive.auto-parallelism.max-parallelism" "8"
+ set_config_key
"execution.batch.adaptive.auto-parallelism.avg-data-volume-per-task" "6m"
+ set_config_key "execution.batch.speculative.enabled" "true"
+ set_config_key "execution.batch.speculative.block-slow-node-duration"
"0s"
set_config_key "slow-task-detector.execution-time.baseline-ratio" "0.0"
set_config_key
"slow-task-detector.execution-time.baseline-lower-bound" "0s"
else
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
index 1e49ca17256..52defc9ce73 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.scheduler.adaptivebatch;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.executiongraph.ExecutionVertexInputInfo;
import org.apache.flink.runtime.executiongraph.IndexRange;
@@ -463,7 +462,7 @@ public class DefaultVertexParallelismAndInputInfosDecider
configuration.getInteger(
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM),
configuration.getInteger(
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM),
+
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM),
configuration.get(
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK),
configuration.get(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
index fe9afebdca6..0737097c5bf 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.scheduler.adaptivebatch;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.executiongraph.ExecutionVertexInputInfo;
import org.apache.flink.runtime.executiongraph.IndexRange;
@@ -439,7 +438,7 @@ class DefaultVertexParallelismAndInputInfosDeciderTest {
configuration.setInteger(
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM,
maxParallelism);
configuration.setInteger(
- JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM,
minParallelism);
+
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM,
minParallelism);
configuration.set(
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK,
new MemorySize(dataVolumePerTask));
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
index 4d8eed39bfc..fd493575bfc 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.java
@@ -268,7 +268,7 @@ class SpeculativeSchedulerITCase {
configuration.set(
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM,
MAX_PARALLELISM);
-
configuration.set(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM,
1);
+
configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM,
1);
configuration.set(
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM,
MAX_PARALLELISM);
configuration.set(