pnowojski commented on a change in pull request #10345: [FLINK-12484][runtime] 
synchronize all mailbox actions
URL: https://github.com/apache/flink/pull/10345#discussion_r351848241
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -463,96 +468,96 @@ public final void invoke() throws Exception {
        }
 
        private void afterInvoke() throws Exception {
-               LOG.debug("Finished task {}", getName());
-
-               // make sure no further checkpoint and notification actions 
happen.
-               // we make sure that no other thread is currently in the locked 
scope before
-               // we close the operators by trying to acquire the checkpoint 
scope lock
-               // we also need to make sure that no triggers fire concurrently 
with the close logic
-               // at the same time, this makes sure that during any "regular" 
exit where still
-               synchronized (lock) {
-                       // this is part of the main logic, so if this fails, 
the task is considered failed
-                       closeAllOperators();
-
-                       // make sure no new timers can come
-                       timerService.quiesce();
-
-                       // let mailbox execution reject all new letters from 
this point
-                       mailboxProcessor.prepareClose();
-
-                       // only set the StreamTask to not running after all 
operators have been closed!
-                       // See FLINK-7430
-                       isRunning = false;
+                       LOG.debug("Finished task {}", getName());
+
+                       // make sure no further checkpoint and notification 
actions happen.
+                       // we make sure that no other thread is currently in 
the locked scope before
+                       // we close the operators by trying to acquire the 
checkpoint scope lock
+                       // we also need to make sure that no triggers fire 
concurrently with the close logic
+                       // at the same time, this makes sure that during any 
"regular" exit where still
+                       executionDecorator.runThrowing(() -> {
+                               // this is part of the main logic, so if this 
fails, the task is considered failed
+                               closeAllOperators();
+
+                               // make sure no new timers can come
+                               timerService.quiesce();
+
+                               // let mailbox execution reject all new letters 
from this point
+                               mailboxProcessor.prepareClose();
+
+                               // only set the StreamTask to not running after 
all operators have been closed!
+                               // See FLINK-7430
+                               isRunning = false;
+                       });
+                       // processes the remaining mails; no new mails can be 
enqueued
+                       mailboxProcessor.drain();
+
+                       // make sure all timers finish
+                       timerService.awaitPendingAfterQuiesce();
+
+                       LOG.debug("Closed operators for task {}", getName());
+
+                       // make sure all buffered data is flushed
+                       operatorChain.flushOutputs();
+
+                       // make an attempt to dispose the operators such that 
failures in the dispose call
+                       // still let the computation fail
+                       tryDisposeAllOperators();
+                       disposed = true;
                }
-               // processes the remaining mails; no new mails can be enqueued
-               mailboxProcessor.drain();
-
-               // make sure all timers finish
-               timerService.awaitPendingAfterQuiesce();
-
-               LOG.debug("Closed operators for task {}", getName());
-
-               // make sure all buffered data is flushed
-               operatorChain.flushOutputs();
-
-               // make an attempt to dispose the operators such that failures 
in the dispose call
-               // still let the computation fail
-               tryDisposeAllOperators();
-               disposed = true;
-       }
 
        private void cleanUpInvoke() throws Exception {
-               // clean up everything we initialized
-               isRunning = false;
+                       // clean up everything we initialized
 
 Review comment:
   nit: unrelated white space changes? (and ditto in other places)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to