[ 
https://issues.apache.org/jira/browse/FLINK-20189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-20189:
-----------------------------------
    Labels: pull-request-available  (was: )

> Restored feedback events may be silently dropped if per key-group header 
> bytes were not fully read
> --------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-20189
>                 URL: https://issues.apache.org/jira/browse/FLINK-20189
>             Project: Flink
>          Issue Type: Task
>          Components: Stateful Functions
>    Affects Versions: statefun-2.2.1
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: statefun-2.3.0, statefun-2.2.2
>
>
> The attempt to read the per key-group header bytes here does not guarantee 
> the header bytes are fully-read:
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L163
> What could happen is the following:
> * Say the input stream actually has the header bytes written in there
> * Less then {{HEADER_BYTES.length}} number of bytes was read into the read 
> buffer, in the above reference code line.
> * The {{if (bytesRead > 0 && !Arrays.equals(header, HEADER_BYTES))}} check 
> would be true, because the read byte array != the expected header bytes.
> * We would mistakenly think that the header bytes are not in the input 
> stream, and pushback. i.e. the header bytes were not being skipped, and the 
> following reads would see the header bytes first.
> * Most importantly, since the header bytes are not being skipped in this 
> case, the {{STATEFUN_VERSION}} (which is {{0}}) is being incorrectly read by 
> {{KeyGroupStream.readFrom(...)}} as the number of feedback elements to read: 
> https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java#L57
> * The end result of all of this is in this scenario: some checkpointed 
> feedback events would be silently dropped.
> Although it is hard to say how possible this would happen in reality, and 
> would also depend on the actual implementation of the {{InputStream}}, from 
> the general contracts of {{InputStream#read(byte[])}} this is definitely 
> something that should be addressed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to