This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a1059cea6e696fc1c0bcbb37e63db61d8163619c
Author: Chesnay Schepler <[email protected]>
AuthorDate: Tue Jun 14 13:49:19 2022 +0200

    [hotfix][tests] Always respond to task cancellation requests
    
    The previous cancel setup doesn't make sense if you aren't aware that one 
of the tests forces a restart.
    We now immediately respond to any task cancellation request instead.
---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 22 ++++++++++------------
 1 file changed, 10 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index e454f2f136a..ee3f211807e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -1043,7 +1043,6 @@ public class AdaptiveSchedulerTest extends TestLogger {
         final JobGraph jobGraph = createJobGraph();
         setupJobGraph.accept(jobGraph);
         RunFailedJobListener listener = new RunFailedJobListener();
-        List<ExecutionAttemptID> cancelledTasks = new ArrayList<>();
 
         final CompletedCheckpointStore completedCheckpointStore =
                 new StandaloneCompletedCheckpointStore(1);
@@ -1070,7 +1069,16 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
                 new SubmissionBufferingTaskManagerGateway(PARALLELISM);
-        taskManagerGateway.setCancelConsumer(cancelledTasks::add);
+        taskManagerGateway.setCancelConsumer(
+                attemptId ->
+                        singleThreadMainThreadExecutor.execute(
+                                () ->
+                                        scheduler.updateTaskExecutionState(
+                                                new 
TaskExecutionStateTransition(
+                                                        new TaskExecutionState(
+                                                                attemptId,
+                                                                
ExecutionState.CANCELED,
+                                                                null)))));
 
         singleThreadMainThreadExecutor.execute(
                 () -> {
@@ -1105,16 +1113,6 @@ public class AdaptiveSchedulerTest extends TestLogger {
                         singleThreadMainThreadExecutor);
         runTestLogicFuture.get();
 
-        Consumer<ExecutionAttemptID> canceller =
-                attemptId ->
-                        scheduler.updateTaskExecutionState(
-                                new TaskExecutionStateTransition(
-                                        new TaskExecutionState(
-                                                attemptId, 
ExecutionState.CANCELED, null)));
-        CompletableFuture<Void> cancelFuture =
-                CompletableFuture.runAsync(
-                        () -> cancelledTasks.forEach(canceller), 
singleThreadMainThreadExecutor);
-        cancelFuture.get();
         listener.waitForTerminal();
 
         return scheduler.requestJob().getExceptionHistory();

Reply via email to