StefanRRichter commented on a change in pull request #8826: 
[FLINK-12479][operators] Integrate StreamInputProcessor(s) with mailbox
URL: https://github.com/apache/flink/pull/8826#discussion_r299430997
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -243,26 +249,15 @@ protected StreamTask(
         * @param context context object for collaborative interaction between 
the action and the stream task.
         * @throws Exception on any problems in the action.
         */
-       protected abstract void performDefaultAction(ActionContext context) 
throws Exception;
-
-       /**
-        * Runs the stream-tasks main processing loop.
-        */
-       private void run() throws Exception {
-               final ActionContext actionContext = new ActionContext();
-               while (true) {
-                       if (mailbox.hasMail()) {
-                               Optional<Runnable> maybeLetter;
-                               while ((maybeLetter = 
mailbox.tryTakeMail()).isPresent()) {
-                                       Runnable letter = maybeLetter.get();
-                                       if (letter == POISON_LETTER) {
-                                               return;
-                                       }
-                                       letter.run();
-                               }
+       protected void performDefaultAction(DefaultActionContext context) 
throws Exception {
+               if (!inputProcessor.processInput()) {
+                       if (inputProcessor.isFinished()) {
+                               context.allActionsCompleted();
+                       }
+                       else {
+                               SuspendedMailboxDefaultAction 
suspendedDefaultAction = context.suspendDefaultAction();
+                               
inputProcessor.isAvailable().thenRun(suspendedDefaultAction::resume);
 
 Review comment:
   I would suggest to change this line to 
`inputProcessor.isAvailable().thenRunAsync(suspendedDefaultAction::resume, 
getTaskMailboxExecutor());` so that we ensure the resume is executed in the 
mailbox thread and not by another thread that might get interrupted before it 
is able to deliver the resume message to the mailbox.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to