Tzu-Li (Gordon) Tai created FLINK-20189:
-------------------------------------------
Summary: Restored feedback events may be silently dropped if per
key-group header bytes were not fully read
Key: FLINK-20189
URL: https://issues.apache.org/jira/browse/FLINK-20189
Project: Flink
Issue Type: Task
Components: Stateful Functions
Affects Versions: statefun-2.2.1
Reporter: Tzu-Li (Gordon) Tai
Assignee: Tzu-Li (Gordon) Tai
Fix For: statefun-2.3.0, statefun-2.2.2
The attempt to read the per key-group header bytes here does not guarantee the
header bytes are fully-read:
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLogger.java#L163
What could happen is the following:
* Say the input stream actually has the header bytes written in there
* Less then {{HEADER_BYTES.length}} number of bytes was read into the read
buffer, in the above reference code line.
* The {{if (bytesRead > 0 && !Arrays.equals(header, HEADER_BYTES))}} check
would be true, because the read byte array != the expected header bytes.
* We would mistakenly think that the header bytes are not in the input stream,
and pushback. i.e. the header bytes were not being skipped, and the following
reads would see the header bytes first.
* Most importantly, since the header bytes are not being skipped in this case,
the {{STATEFUN_VERSION}} (which is {{0}}) is being incorrectly read by
{{KeyGroupStream.readFrom(...)}} as the number of feedback elements to read:
https://github.com/apache/flink-statefun/blob/master/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/KeyGroupStream.java#L57
* The end result of all of this is in this scenario: some checkpointed feedback
events would be silently dropped.
Although it is hard to say how possible this would happen in reality, and would
also depend on the actual implementation of the {{InputStream}}, from the
general contracts of {{InputStream#read(byte[])}} this is definitely something
that should be addressed.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)