## What is the purpose of the change `SpillingAdaptiveSpanningRecordDeserializer` doesn't have too many consistency checks for usage calls or serializers behaving properly, e.g. to read only as many bytes as available/promised for that record. At least these checks should be added:
1. Check that buffers have not been read from yet before adding them (this is an invariant `SpillingAdaptiveSpanningRecordDeserializer` works with and from what I can see, it is followed now. 2. Check that after deserialization, we actually consumed `recordLength` bytes - If not, in the spanning deserializer, we currently simply skip the remaining bytes. - But in the non-spanning deserializer, we currently continue from the wrong offset. 3. Protect against `setNextBuffer()` being called before draining all available records ## Brief change log - add the aforementioned checks - also add `[FLINK-9812][network][tests] fix SpanningRecordSerializationTest not using deserializer correctly` which was needed to not throw based on the added checks - fix not resetting all fields correctly in `SpanningWrapper` (to safeguard further usages / future changes) This PR is based on #6693 ## Verifying this change This change is already covered by existing tests for the successful de/serializations, e.g. via `SpanningRecordSerializationTest`. This change added tests and can be verified as follows: - added misbehaving serialization tests under `SpanningRecordSerializationTest` - adapted and fixed `CustomSerializationITCase`, working with `ExpectedException` now (please also note that `testIncorrectSerializer4` is not actually verifying behaviour - previously it silently passed although no exception was thrown) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **yes** (network only!) - The runtime per-record code paths (performance sensitive): **yes** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** [ Full content available at: https://github.com/apache/flink/pull/6705 ] This message was relayed via gitbox.apache.org for [email protected]
