featzhang commented on code in PR #27719:
URL: https://github.com/apache/flink/pull/27719#discussion_r2870181324
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -67,17 +67,23 @@ class ExecutionGraphRestartTest {
private static final int NUM_TASKS = 31;
@RegisterExtension
- static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_RESOURCE =
- TestingUtils.defaultExecutorExtension();
+ static final TestExecutorExtension<ScheduledExecutorService>
EXECUTOR_EXTENSION =
+ TestingUtils.jmAsyncThreadExecutorExtension();
- private static final ComponentMainThreadExecutor mainThreadExecutor =
- ComponentMainThreadExecutorServiceAdapter.forMainThread();
+ @RegisterExtension
+ static final TestExecutorExtension<ScheduledExecutorService>
JM_MAIN_THREAD_EXECUTOR_EXTENSION =
+ TestingUtils.jmMainThreadExecutorExtension();
+
+ private ComponentMainThreadExecutor mainThreadExecutor;
private ManuallyTriggeredScheduledExecutor taskRestartExecutor;
@BeforeEach
void setUp() {
taskRestartExecutor = new ManuallyTriggeredScheduledExecutor();
+ mainThreadExecutor =
Review Comment:
It might be cleaner to directly use the main-thread executor abstraction
instead of wrapping JM_MAIN_THREAD_EXECUTOR_EXTENSION.getExecutor() via
forSingleThreadExecutor.
Please confirm that this preserves strict main-thread semantics and does not
introduce an extra execution layer.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -349,43 +382,49 @@ void testFailExecutionAfterCancel() throws Exception {
new DefaultSchedulerBuilder(
createJobGraphToCancel(),
mainThreadExecutor,
- EXECUTOR_RESOURCE.getExecutor())
+ EXECUTOR_EXTENSION.getExecutor())
.setExecutionSlotAllocatorFactory(
createExecutionSlotAllocatorFactory(slotPool))
.setRestartBackoffTimeStrategy(
new TestRestartBackoffTimeStrategy(false,
Long.MAX_VALUE))
.setDelayExecutor(taskRestartExecutor)
.build();
- ExecutionGraph eg = scheduler.getExecutionGraph();
+ mainThreadExecutor.execute(
+ () -> {
+ ExecutionGraph eg = scheduler.getExecutionGraph();
- startScheduling(scheduler);
+ startScheduling(scheduler);
- offerSlots(slotPool, 1);
+ offerSlots(slotPool, 1);
- // Fail right after cancel (for example with concurrent slot
release)
- scheduler.cancel();
+ // Fail right after cancel (for example with
concurrent slot release)
+ scheduler.cancel();
- for (ExecutionVertex v : eg.getAllExecutionVertices()) {
- v.getCurrentExecutionAttempt().fail(new Exception("Test
Exception"));
- }
+ for (ExecutionVertex v : eg.getAllExecutionVertices())
{
+ v.getCurrentExecutionAttempt().fail(new
Exception("Test Exception"));
+ }
- FlinkAssertions.assertThatFuture(eg.getTerminationFuture())
- .eventuallySucceeds()
- .isEqualTo(JobStatus.CANCELED);
+
FlinkAssertions.assertThatFuture(eg.getTerminationFuture())
+ .eventuallySucceeds()
+ .isEqualTo(JobStatus.CANCELED);
Review Comment:
As a structural improvement, it might be cleaner to enforce main-thread
confinement within the scheduler setup itself rather than wrapping every test
case body in mainThreadExecutor.execute.
This would reduce boilerplate and improve readability.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -105,20 +111,24 @@ void testCancelAllPendingRequestWhileCanceling() throws
Exception {
JobGraph graph = JobGraphTestUtils.streamingJobGraph(sender);
SchedulerBase scheduler =
new DefaultSchedulerBuilder(
- graph, mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
+ graph, mainThreadExecutor,
EXECUTOR_EXTENSION.getExecutor())
.setExecutionSlotAllocatorFactory(
createExecutionSlotAllocatorFactory(slotPool))
.build();
- ExecutionGraph executionGraph = scheduler.getExecutionGraph();
- startScheduling(scheduler);
- offerSlots(slotPool, NUM_TASKS);
+ mainThreadExecutor.execute(
Review Comment:
Wrapping the entire test logic in mainThreadExecutor.execute(...) improves
thread confinement, but we do not explicitly wait for completion.
If the executor is asynchronous, this may still lead to timing issues.
Would it be safer to block on a submitted future (e.g. submit(...).get()) to
ensure deterministic execution?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -159,38 +173,43 @@ void testCancelWhileRestarting() throws Exception {
new DefaultSchedulerBuilder(
createJobGraph(),
mainThreadExecutor,
- EXECUTOR_RESOURCE.getExecutor())
+ EXECUTOR_EXTENSION.getExecutor())
.setExecutionSlotAllocatorFactory(
createExecutionSlotAllocatorFactory(slotPool))
.setRestartBackoffTimeStrategy(
new TestRestartBackoffTimeStrategy(true,
Long.MAX_VALUE))
.setDelayExecutor(taskRestartExecutor)
.build();
- ExecutionGraph executionGraph = scheduler.getExecutionGraph();
- startScheduling(scheduler);
+ mainThreadExecutor.execute(
+ () -> {
+ ExecutionGraph executionGraph =
scheduler.getExecutionGraph();
- final ResourceID taskManagerResourceId = offerSlots(slotPool,
NUM_TASKS);
+ startScheduling(scheduler);
- // Release the TaskManager and wait for the job to restart
- slotPool.releaseTaskManager(taskManagerResourceId, new
Exception("Test Exception"));
-
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
+ final ResourceID taskManagerResourceId =
offerSlots(slotPool, NUM_TASKS);
- // Canceling needs to abort the restart
- scheduler.cancel();
+ // Release the TaskManager and wait for the job to
restart
+ slotPool.releaseTaskManager(
+ taskManagerResourceId, new Exception("Test
Exception"));
+
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
-
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
+ // Canceling needs to abort the restart
+ scheduler.cancel();
- taskRestartExecutor.triggerScheduledTasks();
+
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
-
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
- for (ExecutionVertex vertex :
executionGraph.getAllExecutionVertices()) {
-
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FAILED);
- }
+ taskRestartExecutor.triggerScheduledTasks();
+
+
assertThat(executionGraph.getState()).isEqualTo(JobStatus.CANCELED);
+ for (ExecutionVertex vertex :
executionGraph.getAllExecutionVertices()) {
+
assertThat(vertex.getExecutionState()).isEqualTo(ExecutionState.FAILED);
+ }
+ });
}
}
- private static ResourceID offerSlots(SlotPool slotPool, int numSlots) {
+ private ResourceID offerSlots(SlotPool slotPool, int numSlots) {
Review Comment:
The method offerSlots is now non-static due to the mainThreadExecutor
instance usage.
Please confirm that this change does not impact other test utility patterns
or assumptions.
##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java:
##########
@@ -159,38 +173,43 @@ void testCancelWhileRestarting() throws Exception {
new DefaultSchedulerBuilder(
createJobGraph(),
mainThreadExecutor,
- EXECUTOR_RESOURCE.getExecutor())
+ EXECUTOR_EXTENSION.getExecutor())
.setExecutionSlotAllocatorFactory(
createExecutionSlotAllocatorFactory(slotPool))
.setRestartBackoffTimeStrategy(
new TestRestartBackoffTimeStrategy(true,
Long.MAX_VALUE))
.setDelayExecutor(taskRestartExecutor)
.build();
- ExecutionGraph executionGraph = scheduler.getExecutionGraph();
- startScheduling(scheduler);
+ mainThreadExecutor.execute(
+ () -> {
+ ExecutionGraph executionGraph =
scheduler.getExecutionGraph();
- final ResourceID taskManagerResourceId = offerSlots(slotPool,
NUM_TASKS);
+ startScheduling(scheduler);
- // Release the TaskManager and wait for the job to restart
- slotPool.releaseTaskManager(taskManagerResourceId, new
Exception("Test Exception"));
-
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
+ final ResourceID taskManagerResourceId =
offerSlots(slotPool, NUM_TASKS);
- // Canceling needs to abort the restart
- scheduler.cancel();
+ // Release the TaskManager and wait for the job to
restart
+ slotPool.releaseTaskManager(
+ taskManagerResourceId, new Exception("Test
Exception"));
+
assertThat(executionGraph.getState()).isEqualTo(JobStatus.RESTARTING);
Review Comment:
`assertThatFuture(executionGraph.getTerminationFuture())`
Some state transitions (e.g. RESTARTING) may be triggered asynchronously.
It may be safer to assert the state via a future-based or eventually-style
assertion to avoid timing sensitivity.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]