tzulitai opened a new pull request #175:
URL: https://github.com/apache/flink-statefun/pull/175


   This PR fixes the per key-group `Header.skipHeaderSilently` operation to 
properly fully fill the header bytes read buffer, before comparing it with the 
expected header bytes.
   
   Since {{InputStream.read(byte[])}} by contract only guarantees reading at 
least 1 byte and could read less bytes than desired (i.e. the read buffer 
length), prior to this PR, it was possible that the read buffer wasn't fully 
filled. This would result in an invalid header bytes comparison, and 
incorrectly pushing back header bytes back into the remaining stream.
   
   Ultimately, what this means is that in this case, 
`Header.skipHeaderSilently` doesn't actually skip the header bytes, and the 
following reads on the per key-group streams (to restore the feedback events) 
would be attempting to read a corrupt stream.
   
   The fix is to do a `readFully` operation instead of a simple read when 
filling the header bytes read buffer.
   
   ## Change log
   
   - b6256df Backports the `tryReadFully` utility in Flink. This backport can 
be reverted once we upgrade to Flink 1.12.0 / 1.11.3. Note that in our 
backport, we introduced a new `RandomReadLengthByteArrayInputStream` utility to 
attempt to mimic the behaviour of "real-life" input streams in our tests, where 
each read can return less than the desired number of bytes.
   
   - e246c1c Use `tryReadFully` in `Header.skipHeaderSilently`.
   
   - 099015c Extend the existing tests in `UnboundedFeedbackLoggerTest` to use 
the new `RandomReadLengthByteArrayInputStream`. This new version of the test 
would fail without applying the fix.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to