pnowojski commented on a change in pull request #9564: 
[FLINK-12481][FLINK-12482][FLINK-12958] Streaming runtime: integrate mailbox 
for timer triggers, checkpoints and AsyncWaitOperator
URL: https://github.com/apache/flink/pull/9564#discussion_r319467301
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -1368,13 +1369,19 @@ private void 
checkpointStreamOperator(StreamOperator<?> op) throws Exception {
 
        private class TimerInvocationContext implements 
SystemProcessingTimeService.ScheduledCallbackExecutionContext {
                @Override
-               public void invoke(ProcessingTimeCallback callback, long 
timestamp) {
-                       synchronized (getCheckpointLock()) {
-                               try {
-                                       callback.onProcessingTime(timestamp);
-                               } catch (Throwable t) {
-                                       handleAsyncException("Caught exception 
while processing timer.", new TimerException(t));
-                               }
+               public void invoke(ProcessingTimeCallback callback, long 
timestamp) throws InterruptedException {
+                       try {
+                               
mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> {
+                                       synchronized (getCheckpointLock()) {
+                                               try {
+                                                       
callback.onProcessingTime(timestamp);
+                                               } catch (Throwable t) {
+                                                       
handleAsyncException("Caught exception while processing timer.", new 
TimerException(t));
+                                               }
+                                       }
+                               });
+                       } catch (Throwable t) {
 
 Review comment:
   Unfortunately not, this is I think the upper most layer in the stack trace. 
If we do not catch an exception here, it would be silently ignored. This made 
the deadlock that I was debugging last week non obvious to find.
   
   However maybe this deserves some comment here explaining this construct? Like
   
   ```
   // Inner try catch handles all errors during the execution of the action in 
the mailbox. Outer try catch handles errors that could happen during enqueuing 
the action.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to