Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/5104#discussion_r154024973
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
---
@@ -107,6 +110,7 @@ public SerializationResult addRecord(T record) throws
IOException {
@Override
public SerializationResult setNextBuffer(Buffer buffer) throws
IOException {
--- End diff --
Actually, whether we are the sole owner of the `buffer` now depends on the
outside and this adds a pretty heavy implicit assumption. Why not make the
`SpanningRecordSerializer` work with the `BufferProvider` of the
`targetPartition` directly, e.g. either by adding it at construction time or as
a parameter. We'd then probably have to rename the `setNextBuffer` method but
can keep the logic and make sure we are the sole owner.
---