Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/5104#discussion_r154024973 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java --- @@ -107,6 +110,7 @@ public SerializationResult addRecord(T record) throws IOException { @Override public SerializationResult setNextBuffer(Buffer buffer) throws IOException { --- End diff -- Actually, whether we are the sole owner of the `buffer` now depends on the outside and this adds a pretty heavy implicit assumption. Why not make the `SpanningRecordSerializer` work with the `BufferProvider` of the `targetPartition` directly, e.g. either by adding it at construction time or as a parameter. We'd then probably have to rename the `setNextBuffer` method but can keep the logic and make sure we are the sole owner.
---