pnowojski commented on a change in pull request #10345: [FLINK-12484][runtime]
synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351848241
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -463,96 +468,96 @@ public final void invoke() throws Exception {
}
private void afterInvoke() throws Exception {
- LOG.debug("Finished task {}", getName());
-
- // make sure no further checkpoint and notification actions
happen.
- // we make sure that no other thread is currently in the locked
scope before
- // we close the operators by trying to acquire the checkpoint
scope lock
- // we also need to make sure that no triggers fire concurrently
with the close logic
- // at the same time, this makes sure that during any "regular"
exit where still
- synchronized (lock) {
- // this is part of the main logic, so if this fails,
the task is considered failed
- closeAllOperators();
-
- // make sure no new timers can come
- timerService.quiesce();
-
- // let mailbox execution reject all new letters from
this point
- mailboxProcessor.prepareClose();
-
- // only set the StreamTask to not running after all
operators have been closed!
- // See FLINK-7430
- isRunning = false;
+ LOG.debug("Finished task {}", getName());
+
+ // make sure no further checkpoint and notification
actions happen.
+ // we make sure that no other thread is currently in
the locked scope before
+ // we close the operators by trying to acquire the
checkpoint scope lock
+ // we also need to make sure that no triggers fire
concurrently with the close logic
+ // at the same time, this makes sure that during any
"regular" exit where still
+ executionDecorator.runThrowing(() -> {
+ // this is part of the main logic, so if this
fails, the task is considered failed
+ closeAllOperators();
+
+ // make sure no new timers can come
+ timerService.quiesce();
+
+ // let mailbox execution reject all new letters
from this point
+ mailboxProcessor.prepareClose();
+
+ // only set the StreamTask to not running after
all operators have been closed!
+ // See FLINK-7430
+ isRunning = false;
+ });
+ // processes the remaining mails; no new mails can be
enqueued
+ mailboxProcessor.drain();
+
+ // make sure all timers finish
+ timerService.awaitPendingAfterQuiesce();
+
+ LOG.debug("Closed operators for task {}", getName());
+
+ // make sure all buffered data is flushed
+ operatorChain.flushOutputs();
+
+ // make an attempt to dispose the operators such that
failures in the dispose call
+ // still let the computation fail
+ tryDisposeAllOperators();
+ disposed = true;
}
- // processes the remaining mails; no new mails can be enqueued
- mailboxProcessor.drain();
-
- // make sure all timers finish
- timerService.awaitPendingAfterQuiesce();
-
- LOG.debug("Closed operators for task {}", getName());
-
- // make sure all buffered data is flushed
- operatorChain.flushOutputs();
-
- // make an attempt to dispose the operators such that failures
in the dispose call
- // still let the computation fail
- tryDisposeAllOperators();
- disposed = true;
- }
private void cleanUpInvoke() throws Exception {
- // clean up everything we initialized
- isRunning = false;
+ // clean up everything we initialized
Review comment:
nit: unrelated white space changes? (and ditto in other places)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services