pnowojski commented on a change in pull request #12714:
URL: https://github.com/apache/flink/pull/12714#discussion_r442749136



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -810,6 +817,7 @@ else if (transitionState(current, ExecutionState.FAILED, 
t)) {
                                        }
                                        // else fall through the loop and
                                }
+                               waitInvokableCancelCompletion();

Review comment:
       Why is it placed only in the `catch(...)` block? Don't we want to wait 
in case of clean shutdown or clean completion?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -861,6 +869,43 @@ else if (transitionState(current, ExecutionState.FAILED, 
t)) {
                }
        }
 
+       private void waitInvokableCancelCompletion() throws 
java.util.concurrent.ExecutionException, java.util.concurrent.TimeoutException {
+               if (!invokableHasBeenCanceled.get()) {
+                       return;
+               }
+
+               // set deadline if taskCancellationTimeout is set to have time 
for cleanup (before being killing by TaskCancelerWatchDog) (best effort)
+               Optional<Deadline> deadline = Optional
+                       .of((long) (taskCancellationTimeout * .9))
+                       .filter(t -> t > 0)
+                       .map(t -> Deadline.fromNow(Duration.ofMillis((t))));
+
+               if (taskCancellationTimeout > 0 && !deadline.isPresent()) {
+                       LOG.debug("Skip waiting for invokable cancellation to 
have time for cleanup (taskCancellationTimeout is too small)");
+                       return;
+               }
+               LOG.debug("Waiting for invokable cancellation (deadline: {})", 
deadline);
+
+               // prevent interrupts by TaskInterrupter and other canceling 
threads (best effort)
+               invokable.setShouldInterruptOnCancel(false);

Review comment:
       This change would be perfectly fine without this line right? In that 
case I would prefer to remove it, I think it would make the code cleaner/easier 
to understand.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -430,8 +430,8 @@ protected void processInput(MailboxDefaultAction.Controller 
controller) throws E
                protected void cleanup() {}
 
                @Override
-               protected void cancelTask() throws Exception {
-                       throw new Exception("test exception");
+               protected CompletableFuture<Void> cancelTask() {
+                       throw new RuntimeException("test exception");

Review comment:
       nit: `ExpectedTestException`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -657,20 +657,25 @@ protected void cleanUpInvoke() throws Exception {
        }
 
        @Override
-       public final void cancel() throws Exception {
+       public final CompletableFuture<Void> cancel() {
                isRunning = false;
                canceled = true;
 
                // the "cancel task" call must come first, but the cancelables 
must be
                // closed no matter what
-               try {
-                       cancelTask();
-               }
-               finally {
-                       mailboxProcessor.allActionsCompleted();
-                       cancelables.close();
+               return FutureUtils
+                       .completedVoidFuture()
+                       .thenRun(this::cancelTask)

Review comment:
       Why are you using this construct? With 
`FutureUtils.completedVoidFuture()`?  Why not
   ```
   this.cancelTask().whenComplete(...);
   ```
   + eventually some exception handling if `cancelTask()` throws.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -861,6 +869,43 @@ else if (transitionState(current, ExecutionState.FAILED, 
t)) {
                }
        }
 
+       private void waitInvokableCancelCompletion() throws 
java.util.concurrent.ExecutionException, java.util.concurrent.TimeoutException {
+               if (!invokableHasBeenCanceled.get()) {
+                       return;
+               }
+
+               // set deadline if taskCancellationTimeout is set to have time 
for cleanup (before being killing by TaskCancelerWatchDog) (best effort)
+               Optional<Deadline> deadline = Optional
+                       .of((long) (taskCancellationTimeout * .9))
+                       .filter(t -> t > 0)
+                       .map(t -> Deadline.fromNow(Duration.ofMillis((t))));

Review comment:
       What's the motivation behind this deadline? I think it should be 
perfectly fine/safe to simplify this and just wait on the future indefinitely, 
as this is the behaviour for network tasks and for source tasks before we 
introduced the mailbox (in the past, `SourceFunction#run()` loop was called 
from the task thread).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to