gaoyunhaii opened a new pull request #16366:
URL: https://github.com/apache/flink/pull/16366


   ## What is the purpose of the change
   
   This PR tries to fix the issue that when flushAlways is enabled, the data 
emitted during the channel get blocked might not be able to get notified. 
   
   To fix this issue, we propose to do one additional notify if there is data 
available when resuming. This piece of data must be emitted after the channel 
get blocked since the event cause block has been polled, thus the notification 
would not be repeated (although repeat notification is also not a problem).
   
   There are two cases that might add available data to the channel:
   1. There is only one unfinished buffer, and it has been flushed at least 
once.
   2. There is at least one finished buffer. 
   
   When resuming, for the second case, we could detect it according to the 
buffers in queue, but with the current implementation we could not know if the 
unfinished buffer is flushed or not. To solve this issue, we could decouple 
`flushRequested` with `isBlocked`, namely whenever get flushed `flushRequested` 
will be set to `true` no matter what value `isBlocked` is. This would not cause 
problem to other parts since we still have checks on `isBlocked` when computing 
if notify is required and `isAvailable()`. Then when resumed, we could detect 
if there are available data due to case 1 via `flushRequested`. 
   
   ## Brief change log
   
   - 72a7a8aa4ab9a351214f467cb6c8f65d0e3a7384 modifies the logic according to 
the above proposal. 
   
   ## Verifying this change
   
   This change could be verified by the modified UT.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **yes** but 
should not affect the performance
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to