This is an automated email from the ASF dual-hosted git repository.
RocMarshal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new aa52739877b [FLINK-39903][tests] Fix race in
RescaleTimelineITCase.testRescaleTerminatedByResourceRequirementsUpdated
(#28379)
aa52739877b is described below
commit aa52739877badaca3f9afd6efbec975c66476197
Author: Martijn Visser <[email protected]>
AuthorDate: Sat Jun 13 09:11:50 2026 +0200
[FLINK-39903][tests] Fix race in
RescaleTimelineITCase.testRescaleTerminatedByResourceRequirementsUpdated
(#28379)
The test issues two sequential updateJobResourceRequirements RPCs and
asserts that
the in-progress rescale started by the first update is terminated by the
second
update with terminal reason RESOURCE_REQUIREMENTS_UPDATED.
For that to hold, the second update must be processed while the first
rescale is
still in-progress.
AdaptiveScheduler#recordRescaleForNewResourceRequirements sets
RESOURCE_REQUIREMENTS_UPDATED via RescaleTimeline#updateRescale, which is a
no-op
once the current rescale is already terminated
(DefaultRescaleTimeline#isIdling).
With the short cooldown (100 ms) and resource-stabilization (50 ms)
timeouts that
the parameterized configuration shares with the other cases, the
DefaultStateTransitionManager re-enters its Idling phase and the Idling
constructor terminates the in-progress rescale with
NO_RESOURCES_OR_PARALLELISMS_CHANGE (the upper bound exceeds the available
slots,
so no parallelism change is possible). On a slow machine this happens
before the
second update RPC is processed, so the second update finds the rescale
already
terminated and its RESOURCE_REQUIREMENTS_UPDATED setter is ignored,
producing the
flaky assertion failure.
This is a test-side timing assumption, not a product bug: re-entering
Idling and
recording NO_RESOURCES_OR_PARALLELISMS_CHANGE is the correct behavior for an
in-progress rescale that cannot change parallelism. That termination is
driven by
wall-clock timers that start the moment the first rescale is recorded, so no
amount of waiting between the two updates can guarantee the second update
wins the
race; waiting only consumes the same budget. Widening cooldown and
stabilization
for this case is therefore the only deterministic test-side fix: it keeps
the
in-progress rescale alive far longer than the single synchronous RPC round
trip
between the two updates. 60 s is an intentionally generous bound (the suite
already uses second-scale guards for slow CI) and has no cost because the
test
completes as soon as the second update lands.
To avoid running two clusters, rebuild the shared fixture cluster in place
with
the widened configuration instead of starting a dedicated one on top of the
@BeforeEach cluster, so only one cluster is ever running and the @AfterEach
teardown still applies. The disabled-history parameter is skipped up front,
mirroring the sibling cases, so it does not pay for a cluster rebuild it
never
asserts on. The shared parameterized configuration relied upon by the
other cases is left untouched.
Generated-by: Claude Opus 4.8 (1M context)
---
.../adaptive/timeline/RescaleTimelineITCase.java | 43 ++++++++++++++++++++++
1 file changed, 43 insertions(+)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
index 2a04d302428..11711208453 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
@@ -392,6 +392,49 @@ class RescaleTimelineITCase {
@TestTemplate
void testRescaleTerminatedByResourceRequirementsUpdated() throws Exception
{
+ // This case only asserts on the recorded rescale history; skip the
disabled-history
+ // parameter before the cluster rebuild below so it does not pay for
an unused cluster.
+ assumeThat(enabledRescaleHistory(configuration)).isTrue();
+
+ // This test asserts that the second resource-requirements update
terminates the in-progress
+ // rescale started by the first update with terminal reason
RESOURCE_REQUIREMENTS_UPDATED.
+ // For that to happen the second update must be processed while the
first rescale is still
+ // in-progress:
AdaptiveScheduler#recordRescaleForNewResourceRequirements only sets the
+ // RESOURCE_REQUIREMENTS_UPDATED reason via
RescaleTimeline#updateRescale, which is a no-op
+ // once the current rescale is already terminated
(DefaultRescaleTimeline#isIdling).
+ //
+ // The upper bound exceeds the available slots, so the first rescale
cannot change the
+ // parallelism. With the short cooldown/stabilization timeouts shared
by the other cases the
+ // DefaultStateTransitionManager re-enters its Idling phase and the
Idling constructor
+ // terminates that rescale with NO_RESOURCES_OR_PARALLELISMS_CHANGE (a
legitimate terminal
+ // reason for an in-progress rescale that cannot change parallelism).
That termination is
+ // driven by wall-clock timers that start the moment the first rescale
is recorded, so no
+ // amount of waiting between the two updates can guarantee the second
update wins the race;
+ // waiting only consumes the same budget. Widening cooldown and
stabilization for this case
+ // is therefore the only deterministic test-side fix: it keeps the
in-progress rescale alive
+ // far longer than the single synchronous RPC round trip between the
two updates.
+ //
+ // Rebuild the shared fixture cluster in place rather than starting a
second one on top of
+ // it, so only one cluster is ever running and the @AfterEach teardown
still applies. 60s is
+ // an intentionally generous bound (the suite already uses
second-scale guards for slow CI);
+ // the test completes as soon as the second update lands, so a large
value has no cost.
+ miniClusterResource.after();
+ final Configuration testConfiguration = new
Configuration(configuration);
+ testConfiguration.set(
+ JobManagerOptions.SCHEDULER_EXECUTING_COOLDOWN_AFTER_RESCALING,
+ Duration.ofSeconds(60));
+ testConfiguration.set(
+
JobManagerOptions.SCHEDULER_EXECUTING_RESOURCE_STABILIZATION_TIMEOUT,
+ Duration.ofSeconds(60));
+ miniClusterResource =
+ new MiniClusterResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(testConfiguration)
+
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+ .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+ .build());
+ miniClusterResource.before();
+
final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
miniCluster.submitJob(jobGraph).join();