dawidwys commented on a change in pull request #15789:
URL: https://github.com/apache/flink/pull/15789#discussion_r627167049
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/program/PerJobMiniClusterFactoryTest.java
##########
@@ -191,14 +193,16 @@ public void invoke() throws Exception {
lock.wait();
}
}
+ terminationFuture.complete(null);
Review comment:
The logic around `terminationFuture` is unnecessarily complicated, in my
opinion. I'd suggest rewriting it as follows:
```
public abstract class CancelableInvokable extends AbstractInvokable {
private volatile boolean canceled;
private final CompletableFuture<Void> terminationFuture = new
CompletableFuture<>();
protected CancelableInvokable(Environment environment) {
super(environment);
}
@Override
public final void invoke() throws Exception {
doInvoke();
terminationFuture.complete(null);
}
protected abstract void doInvoke() throws Exception;
@Override
public final Future<Void> cancel() {
canceled = true;
return terminationFuture;
}
protected void waitUntilCancelled() throws InterruptedException {
synchronized (this) {
while (!canceled) {
wait();
}
}
}
}
```
Then this class can be:
```
/** Invokable which waits until it is cancelled. */
public static class MyCancellableInvokable extends CancelableInvokable {
public MyCancellableInvokable(Environment environment) {
super(environment);
}
@Override
public void doInvoke() throws Exception {
waitUntilCancelled();
}
}
```
At the same time the `OperatorEventSendingInvokable` could be:
```
/** Test invokable that fails when receiving an operator event. */
public static final class OperatorEventSendingInvokable extends
CancelableInvokable {
public OperatorEventSendingInvokable(Environment environment) {
super(environment);
}
@Override
public void doInvoke() throws Exception {
getEnvironment()
.getOperatorCoordinatorEventGateway()
.sendOperatorEventToCoordinator(
new OperatorID(), new SerializedValue<>(new
TestOperatorEvent()));
waitUntilCancelled();
}
}
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
##########
@@ -155,5 +155,6 @@ public void run() throws Exception {
incrementIterationCounter();
}
}
+ terminationCompleted();
Review comment:
`finally`?
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
##########
@@ -2508,4 +2538,92 @@ public void awaitRunning() throws InterruptedException {
runningLatch.await();
}
}
+
+ /**
+ * A {@link StreamTask} that register a single timer that waits for a
cancellation and then
+ * emits some data. The assumption is that output remains available until
{@link
+ * AbstractInvokable#getTerminationFuturex()} is completed (even after
cancellation). Public
+ * access to allow reflection in {@link Task}.
+ */
+ public static class StreamTaskWithBlockingTimer extends StreamTask {
+ static volatile CompletableFuture<Void> timerStarted;
+ static volatile CompletableFuture<Void> timerFinished;
+ static volatile CompletableFuture<Void> invokableCancelled;
+
+ public static void reset() {
+ timerStarted = new CompletableFuture<>();
+ timerFinished = new CompletableFuture<>();
+ invokableCancelled = new CompletableFuture<>();
+ }
+
+ // public access to allow reflection in Task
+ public StreamTaskWithBlockingTimer(Environment env) throws Exception {
+ super(env);
+ super.inputProcessor = getInputProcessor();
+ getCancelables().registerCloseable(() ->
invokableCancelled.complete(null));
Review comment:
How about we put the `invokableCancelled.complete(null)` into
`cancelTask`? My concern here is that we apply an additional contract that the
cancellables are closed in the `cancel` method. (Which honestly I am not sure
if it is correct as we do it a second time in the `cleanUpInvoke`, a different
story though).
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
##########
@@ -104,9 +105,12 @@ public AbstractInvokable(Environment environment) {
* execution failure. It can be overwritten to respond to shut down the
user code properly.
*
* @throws Exception thrown if any exception occurs during the execution
of the user code
+ * @return a future that is completed when this {@link AbstractInvokable}
is fully terminated.
+ * Note that it may never complete if the termination fails
Review comment:
Shouldn't we rather impose a restriction it must complete in case of
failure?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
##########
@@ -141,6 +141,7 @@ public void run() throws Exception {
incrementIterationCounter();
}
}
+ terminationCompleted();
Review comment:
Shouldn't we have a `finally` block here?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1711,6 +1727,9 @@ public void run() {
}
}
+ if (cancelThread.isAlive()) {
Review comment:
Why do we need that? Is it for cases when we have an invalid
implementation of a `cancel` method that never finishes?
##########
File path:
flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
##########
@@ -34,7 +34,7 @@ object Tasks {
getEnvironment.getInputGate(0),
classOf[IntValue],
getEnvironment.getTaskManagerInfo.getTmpDirectories)
-
Review comment:
unrelated changes
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1573,7 +1577,19 @@ public void run() {
// the user-defined cancel method may throw errors.
// we need do continue despite that
try {
- invokable.cancel();
+ Future<Void> cancellationFuture = invokable.cancel();
+ // Wait for any active actions to complete (e.g. timers,
mailbox actions)
+ // Before that, interrupt to notify them about cancellation
+ if (invokable.shouldInterruptOnCancel()) {
+ executer.interrupt();
+ }
+ try {
+ cancellationFuture.get(
+ /* wait with timeout to still release the
resources if taskCancellationTimeout is not configured */
+ 180000L, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | TimeoutException |
InterruptedException e) {
Review comment:
Is the additional try-catch block necessary? I think we can reuse the
`catch (Throwable t)` block.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -1573,7 +1577,19 @@ public void run() {
// the user-defined cancel method may throw errors.
// we need do continue despite that
try {
- invokable.cancel();
+ Future<Void> cancellationFuture = invokable.cancel();
+ // Wait for any active actions to complete (e.g. timers,
mailbox actions)
+ // Before that, interrupt to notify them about cancellation
+ if (invokable.shouldInterruptOnCancel()) {
+ executer.interrupt();
+ }
+ try {
+ cancellationFuture.get(
+ /* wait with timeout to still release the
resources if taskCancellationTimeout is not configured */
+ 180000L, TimeUnit.MILLISECONDS);
Review comment:
I wouldn't say I like using a magic number here. Could we, e.g., reuse
the value of `TASK_CANCELLATION_TIMEOUT`?
If I understand the code correctly this would mean we don't need to
interrupt the canceller thread from the CancelerWatchdog.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]