This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch release-1.18
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.18 by this push:
new 9802d240b34 [BP-1.18][FLINK-34274][runtime] Implicitly disable
resource wait timeout for AdaptiveSchedulerTest (#24399)
9802d240b34 is described below
commit 9802d240b341633f460fe15a89a263f12e540fe4
Author: Matthias Pohl <[email protected]>
AuthorDate: Wed Feb 28 15:23:37 2024 +0100
[BP-1.18][FLINK-34274][runtime] Implicitly disable resource wait timeout
for AdaptiveSchedulerTest (#24399)
---
.../adaptive/AdaptiveSchedulerBuilder.java | 7 +++++++
.../scheduler/adaptive/AdaptiveSchedulerTest.java | 21 +++++++++------------
2 files changed, 16 insertions(+), 12 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
index 6a7f855141e..c2ddef6417b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerBuilder.java
@@ -53,6 +53,7 @@ import javax.annotation.Nullable;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
/** Builder for {@link AdaptiveScheduler}. */
public class AdaptiveSchedulerBuilder {
@@ -118,6 +119,12 @@ public class AdaptiveSchedulerBuilder {
return this;
}
+ public AdaptiveSchedulerBuilder withConfigurationOverride(
+ Function<Configuration, Configuration> modifyFn) {
+ this.jobMasterConfiguration = modifyFn.apply(jobMasterConfiguration);
+ return this;
+ }
+
public AdaptiveSchedulerBuilder setUserCodeLoader(final ClassLoader
userCodeLoader) {
this.userCodeLoader = userCodeLoader;
return this;
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index be927fe81bf..98074b58d37 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -1218,9 +1218,12 @@ public class AdaptiveSchedulerTest {
JobResourceRequirements initialJobResourceRequirements =
createRequirementsWithEqualLowerAndUpperParallelism(PARALLELISM);
+ final Configuration config = getConfigurationWithNoTimeouts();
+ // enable the ResourceWaitTimeout to trigger the immediate failure
+ config.set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1));
final AdaptiveScheduler scheduler =
prepareScheduler(jobGraph, declarativeSlotPool)
-
.setJobMasterConfiguration(getConfigurationWithNoResourceWaitTimeout())
+ .setJobMasterConfiguration(config)
.setJobResourceRequirements(initialJobResourceRequirements)
.build();
@@ -1252,6 +1255,7 @@ public class AdaptiveSchedulerTest {
final AdaptiveScheduler scheduler =
prepareScheduler(jobGraph, declarativeSlotPool)
+
.setJobMasterConfiguration(getConfigurationWithNoTimeouts())
.setJobResourceRequirements(initialJobResourceRequirements)
.build();
@@ -1261,14 +1265,6 @@ public class AdaptiveSchedulerTest {
startJobWithSlotsMatchingParallelism(
scheduler, declarativeSlotPool, taskManagerGateway,
availableSlots);
- // at this point we'd ideally check that the job is stuck in
WaitingForResources, but we
- // can't differentiate between waiting due to the minimum requirements
not being fulfilled
- // and the resource timeout not being elapsed
- // We just continue here, as the following tests validate that the
lower bound can prevent
- // a job from running:
- // -
#testInitialRequirementLowerBoundBeyondAvailableSlotsCausesImmediateFailure()
- // -
#testRequirementLowerBoundIncreaseBeyondCurrentParallelismAttemptsImmediateRescale()
-
// unlock job by decreasing the parallelism
JobResourceRequirements newJobResourceRequirements =
createRequirementsWithLowerAndUpperParallelism(availableSlots,
PARALLELISM);
@@ -1285,15 +1281,16 @@ public class AdaptiveSchedulerTest {
.setDeclarativeSlotPool(declarativeSlotPool);
}
- private static Configuration getConfigurationWithNoResourceWaitTimeout() {
+ private static Configuration getConfigurationWithNoTimeouts() {
return new Configuration()
- .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(1L));
+ .set(JobManagerOptions.RESOURCE_WAIT_TIMEOUT,
Duration.ofMillis(-1L))
+ .set(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT,
Duration.ofMillis(1L));
}
private AdaptiveSchedulerBuilder prepareSchedulerWithNoResourceWaitTimeout(
JobGraph jobGraph, DeclarativeSlotPool declarativeSlotPool) {
return prepareScheduler(jobGraph, declarativeSlotPool)
-
.setJobMasterConfiguration(getConfigurationWithNoResourceWaitTimeout());
+ .setJobMasterConfiguration(getConfigurationWithNoTimeouts());
}
private AdaptiveScheduler createSchedulerWithNoResourceWaitTimeout(