curcur opened a new pull request #13614:
URL: https://github.com/apache/flink/pull/13614
## What is the purpose of the change
Partial records happen if a record can not fit into one buffer, then the
remaining part of the same record is put into the next buffer. Hence partial
records only exist at the beginning of a buffer. Partial record clean-up is
needed in the mode of approximate local recovery. If a record is spanning over
multiple buffers, and the first (several) buffers have got lost due to the
failure of the receiver task, the remaining data belonging to the same record
in transition should be cleaned up.
`partialRecordLength` is the length of bytes to skip in order to start with
a complete record, from position index 0 of the underlying MemorySegment.
`partialRecordLength` is used in approximate local recovery to find the start
position of a complete record on a BufferConsumer, so-called `partial record
clean-up`.
## Brief change log
- API change to add `partialRecordLength` when creating a BufferConsumer
- Add `partialRecordLength` in `BufferWritingResultPartition` when
emitRecord and `broadcastRecord`
- Update the type of buffers in PipelinedSubpartition
fr`PrioritizedDeque<BufferConsumer>` to
`PrioritizedDeque<BufferConsumerWithPartialRecordLength>`
- Implement partial record clean-up logic in
BufferConsumerWithPartialRecordLength
## Verifying this change
- add unit tests to test partial record length
- pass existing tests to make sure the changes do not affect data
write/read/transition.
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with
`@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (yes)
- Anything that affects deployment or recovery: JobManager (and its
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]