pnowojski commented on a change in pull request #9564:
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319467301
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -1368,13 +1369,19 @@ private void
checkpointStreamOperator(StreamOperator<?> op) throws Exception {
private class TimerInvocationContext implements
SystemProcessingTimeService.ScheduledCallbackExecutionContext {
@Override
- public void invoke(ProcessingTimeCallback callback, long
timestamp) {
- synchronized (getCheckpointLock()) {
- try {
- callback.onProcessingTime(timestamp);
- } catch (Throwable t) {
- handleAsyncException("Caught exception
while processing timer.", new TimerException(t));
- }
+ public void invoke(ProcessingTimeCallback callback, long
timestamp) throws InterruptedException {
+ try {
+
mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> {
+ synchronized (getCheckpointLock()) {
+ try {
+
callback.onProcessingTime(timestamp);
+ } catch (Throwable t) {
+
handleAsyncException("Caught exception while processing timer.", new
TimerException(t));
+ }
+ }
+ });
+ } catch (Throwable t) {
Review comment:
Unfortunately not, this is I think the upper most layer in the stack trace.
If we do not catch an exception here, it would be silently ignored. This made
the deadlock that I was debugging last week non obvious to find.
However maybe this deserves some comment here explaining this construct? Like
```
// Inner try catch handles all errors during the execution of the action in
the mailbox. Outer try catch handles errors that could happen during enqueuing
the action.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services