pnowojski commented on a change in pull request #12714:
URL: https://github.com/apache/flink/pull/12714#discussion_r442749136
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -810,6 +817,7 @@ else if (transitionState(current, ExecutionState.FAILED,
t)) {
}
// else fall through the loop and
}
+ waitInvokableCancelCompletion();
Review comment:
Why is it placed only in the `catch(...)` block? Don't we want to wait
in case of clean shutdown or clean completion?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -861,6 +869,43 @@ else if (transitionState(current, ExecutionState.FAILED,
t)) {
}
}
+ private void waitInvokableCancelCompletion() throws
java.util.concurrent.ExecutionException, java.util.concurrent.TimeoutException {
+ if (!invokableHasBeenCanceled.get()) {
+ return;
+ }
+
+ // set deadline if taskCancellationTimeout is set to have time
for cleanup (before being killing by TaskCancelerWatchDog) (best effort)
+ Optional<Deadline> deadline = Optional
+ .of((long) (taskCancellationTimeout * .9))
+ .filter(t -> t > 0)
+ .map(t -> Deadline.fromNow(Duration.ofMillis((t))));
+
+ if (taskCancellationTimeout > 0 && !deadline.isPresent()) {
+ LOG.debug("Skip waiting for invokable cancellation to
have time for cleanup (taskCancellationTimeout is too small)");
+ return;
+ }
+ LOG.debug("Waiting for invokable cancellation (deadline: {})",
deadline);
+
+ // prevent interrupts by TaskInterrupter and other canceling
threads (best effort)
+ invokable.setShouldInterruptOnCancel(false);
Review comment:
This change would be perfectly fine without this line right? In that
case I would prefer to remove it, I think it would make the code cleaner/easier
to understand.
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -430,8 +430,8 @@ protected void processInput(MailboxDefaultAction.Controller
controller) throws E
protected void cleanup() {}
@Override
- protected void cancelTask() throws Exception {
- throw new Exception("test exception");
+ protected CompletableFuture<Void> cancelTask() {
+ throw new RuntimeException("test exception");
Review comment:
nit: `ExpectedTestException`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -657,20 +657,25 @@ protected void cleanUpInvoke() throws Exception {
}
@Override
- public final void cancel() throws Exception {
+ public final CompletableFuture<Void> cancel() {
isRunning = false;
canceled = true;
// the "cancel task" call must come first, but the cancelables
must be
// closed no matter what
- try {
- cancelTask();
- }
- finally {
- mailboxProcessor.allActionsCompleted();
- cancelables.close();
+ return FutureUtils
+ .completedVoidFuture()
+ .thenRun(this::cancelTask)
Review comment:
Why are you using this construct? With
`FutureUtils.completedVoidFuture()`? Why not
```
this.cancelTask().whenComplete(...);
```
+ eventually some exception handling if `cancelTask()` throws.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -861,6 +869,43 @@ else if (transitionState(current, ExecutionState.FAILED,
t)) {
}
}
+ private void waitInvokableCancelCompletion() throws
java.util.concurrent.ExecutionException, java.util.concurrent.TimeoutException {
+ if (!invokableHasBeenCanceled.get()) {
+ return;
+ }
+
+ // set deadline if taskCancellationTimeout is set to have time
for cleanup (before being killing by TaskCancelerWatchDog) (best effort)
+ Optional<Deadline> deadline = Optional
+ .of((long) (taskCancellationTimeout * .9))
+ .filter(t -> t > 0)
+ .map(t -> Deadline.fromNow(Duration.ofMillis((t))));
Review comment:
What's the motivation behind this deadline? I think it should be
perfectly fine/safe to simplify this and just wait on the future indefinitely,
as this is the behaviour for network tasks and for source tasks before we
introduced the mailbox (in the past, `SourceFunction#run()` loop was called
from the task thread).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]