## What is the purpose of the change

`SpillingAdaptiveSpanningRecordDeserializer` doesn't have too many consistency 
checks for usage calls or serializers behaving properly, e.g. to read only as 
many bytes as available/promised for that record. At least these checks should 
be added:

1. Check that buffers have not been read from yet before adding them (this is 
an invariant `SpillingAdaptiveSpanningRecordDeserializer` works with and from 
what I can see, it is followed now.
2. Check that after deserialization, we actually consumed `recordLength` bytes
   - If not, in the spanning deserializer, we currently simply skip the 
remaining bytes.
   - But in the non-spanning deserializer, we currently continue from the wrong 
offset.
3. Protect against `setNextBuffer()` being called before draining all available 
records

## Brief change log

- add the aforementioned checks
- also add `[FLINK-9812][network][tests] fix SpanningRecordSerializationTest 
not using deserializer correctly` which was needed to not throw based on the 
added checks
- fix not resetting all fields correctly in `SpanningWrapper` (to safeguard 
further usages / future changes)

This PR is based on #6693 

## Verifying this change

This change is already covered by existing tests for the successful 
de/serializations, e.g. via `SpanningRecordSerializationTest`.

This change added tests and can be verified as follows:

- added misbehaving serialization tests under `SpanningRecordSerializationTest`
- adapted and fixed `CustomSerializationITCase`, working with 
`ExpectedException` now (please also note that `testIncorrectSerializer4` is 
not actually verifying behaviour - previously it silently passed although no 
exception was thrown)

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **yes** (network only!)
  - The runtime per-record code paths (performance sensitive): **yes**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **not applicable**


[ Full content available at: https://github.com/apache/flink/pull/6705 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to