tzulitai opened a new pull request #175:
URL: https://github.com/apache/flink-statefun/pull/175
This PR fixes the per key-group `Header.skipHeaderSilently` operation to
properly fully fill the header bytes read buffer, before comparing it with the
expected header bytes.
Since {{InputStream.read(byte[])}} by contract only guarantees reading at
least 1 byte and could read less bytes than desired (i.e. the read buffer
length), prior to this PR, it was possible that the read buffer wasn't fully
filled. This would result in an invalid header bytes comparison, and
incorrectly pushing back header bytes back into the remaining stream.
Ultimately, what this means is that in this case,
`Header.skipHeaderSilently` doesn't actually skip the header bytes, and the
following reads on the per key-group streams (to restore the feedback events)
would be attempting to read a corrupt stream.
The fix is to do a `readFully` operation instead of a simple read when
filling the header bytes read buffer.
## Change log
- b6256df Backports the `tryReadFully` utility in Flink. This backport can
be reverted once we upgrade to Flink 1.12.0 / 1.11.3. Note that in our
backport, we introduced a new `RandomReadLengthByteArrayInputStream` utility to
attempt to mimic the behaviour of "real-life" input streams in our tests, where
each read can return less than the desired number of bytes.
- e246c1c Use `tryReadFully` in `Header.skipHeaderSilently`.
- 099015c Extend the existing tests in `UnboundedFeedbackLoggerTest` to use
the new `RandomReadLengthByteArrayInputStream`. This new version of the test
would fail without applying the fix.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]