This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a1059cea6e696fc1c0bcbb37e63db61d8163619c Author: Chesnay Schepler <[email protected]> AuthorDate: Tue Jun 14 13:49:19 2022 +0200 [hotfix][tests] Always respond to task cancellation requests The previous cancel setup doesn't make sense if you aren't aware that one of the tests forces a restart. We now immediately respond to any task cancellation request instead. --- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index e454f2f136a..ee3f211807e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -1043,7 +1043,6 @@ public class AdaptiveSchedulerTest extends TestLogger { final JobGraph jobGraph = createJobGraph(); setupJobGraph.accept(jobGraph); RunFailedJobListener listener = new RunFailedJobListener(); - List<ExecutionAttemptID> cancelledTasks = new ArrayList<>(); final CompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); @@ -1070,7 +1069,16 @@ public class AdaptiveSchedulerTest extends TestLogger { final SubmissionBufferingTaskManagerGateway taskManagerGateway = new SubmissionBufferingTaskManagerGateway(PARALLELISM); - taskManagerGateway.setCancelConsumer(cancelledTasks::add); + taskManagerGateway.setCancelConsumer( + attemptId -> + singleThreadMainThreadExecutor.execute( + () -> + scheduler.updateTaskExecutionState( + new TaskExecutionStateTransition( + new TaskExecutionState( + attemptId, + ExecutionState.CANCELED, + null))))); singleThreadMainThreadExecutor.execute( () -> { @@ -1105,16 +1113,6 @@ public class AdaptiveSchedulerTest extends TestLogger { singleThreadMainThreadExecutor); runTestLogicFuture.get(); - Consumer<ExecutionAttemptID> canceller = - attemptId -> - scheduler.updateTaskExecutionState( - new TaskExecutionStateTransition( - new TaskExecutionState( - attemptId, ExecutionState.CANCELED, null))); - CompletableFuture<Void> cancelFuture = - CompletableFuture.runAsync( - () -> cancelledTasks.forEach(canceller), singleThreadMainThreadExecutor); - cancelFuture.get(); listener.waitForTerminal(); return scheduler.requestJob().getExceptionHistory();
