liuml07 commented on code in PR #27091:
URL: https://github.com/apache/flink/pull/27091#discussion_r2482211991
##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java:
##########
@@ -1733,24 +1711,47 @@ void testJobFailureWhenTaskExecutorHeartbeatTimeout()
throws Exception {
final TestingHeartbeatServices testingHeartbeatService =
new TestingHeartbeatServices(heartbeatInterval,
heartbeatTimeout);
- runJobFailureWhenTaskExecutorTerminatesTest(
- testingHeartbeatService,
- (localTaskManagerLocation, jobMasterGateway) ->
- testingHeartbeatService.triggerHeartbeatTimeout(
- jmResourceId,
localTaskManagerLocation.getResourceID()),
- jobEvents);
- assertThat(
- jobEvents.stream()
- .filter(
- event ->
-
Events.JobStatusChangeEvent.name()
-
.equals(event.getName()))
- .map(Event::getAttributes)
- .map(x -> x.get("newJobStatus")))
- .containsExactly(
- JobStatus.RUNNING.toString(),
- JobStatus.FAILING.toString(),
- JobStatus.FAILED.toString());
+ final SchedulerType schedulerType =
+ runJobFailureWhenTaskExecutorTerminatesTest(
+ testingHeartbeatService,
+ (localTaskManagerLocation, jobMasterGateway) ->
+
testingHeartbeatService.triggerHeartbeatTimeout(
+ jmResourceId,
localTaskManagerLocation.getResourceID()),
+ jobEvents);
+
+ assertJobStatusTransitions(schedulerType, jobEvents);
+ }
+
+ /**
+ * Asserts that job status transitions are as expected based on the
scheduler type.
+ * DefaultScheduler does not emit CREATED state, while AdaptiveScheduler
and
+ * AdaptiveBatchScheduler do.
+ */
+ private static void assertJobStatusTransitions(
Review Comment:
Yes, I agree with @noorall and @zhuzhurk that he scheduler used in tests are
not static, but can be set when running it. Specifically it's via the
`flink.tests.enable-adaptive-scheduler` JVM args (or run tests via Maven
profile `enable-adaptive-scheduler`). The `slotPoolServiceSchedulerFactory`
creates schedulers dynamically by checking this property. With this PR, in this
test we make different assertions accordingly.
By default, when this test is running, the DEFAULT scheduler is used so the
`else` clause of this assertion method is called. When the test is running with
either `flink.tests.enable-adaptive-scheduler=true` or with the Maven profile
`enable-adaptive-scheduler`, the ADAPTIVE scheduler is used and the `if` clause
of this assertion method is called. One can enable it locally for debugging,
and in the nightly CI, one workflow is enabling it, see
[here](https://github.com/apache/flink/blob/master/.github/workflows/nightly.yml#L69).
I guess the purpose of adding the dynamic feature through the Maven profile
'enable-adaptive-scheduler' is likely to eliminate the need for existing tests
to explicitly enumerate various scheduler types, as long as they are either
agnostic or aware of scheduler type.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]