akalash commented on pull request #17796:
URL: https://github.com/apache/flink/pull/17796#issuecomment-976609683


   @pnowojski, I understand the problem something like this:
   
   1. AsyncWaitOperator#processElement registers the timeout for the entry
   2. endInput happens which waits for in-flight inputs but doesn't wait for 
the registered timeout
   3. finish happens
   4. mailbox is suspend
   5. timer fires which put mail to mailbox 
(AsyncWaitOperator.ResultHandler#processInMailbox)
   6. all timers close
   7. mailbox is marked as close which forbids adding new mails
   8. drain is called which execute mail described above which recursively try 
to add new mail to mailbox which fails
   
   In my opinion, it is impossible to be sure that there are no new data 
possible until we close all potential sources of data like timers. So in this 
PR I close all timers which should guarantee that no new outside data is 
possible then I drain mailbox, and only then I mark the mailbox as closed. 
   
   It is difficult to say something about `finish ` contract because it indeed 
guarantees that no new records from the channel are possible but it doesn't 
guarantee that records from another place(like timer service) are impossible. 
If we think that it is not correct we should rewrite 
`AsyncWaitOperator#waitInFlightInputsFinished` in such a way that it waits for 
time service as well(which is not so easy to do) or we can ignore all time 
service events after `endInput` was received. For example, we can rewrite 
`AsyncWaitOperator.ResultHandler#timerTriggered` to:
   ```
   private void timerTriggered() throws Exception {
               if (!completed.get() && !endInput.get()) {
                   userFunction.timeout(inputRecord.getValue(), this);
               }
           }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to