Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5104#discussion_r154024973
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 ---
    @@ -107,6 +110,7 @@ public SerializationResult addRecord(T record) throws 
IOException {
        @Override
        public SerializationResult setNextBuffer(Buffer buffer) throws 
IOException {
    --- End diff --
    
    Actually, whether we are the sole owner of the `buffer` now depends on the 
outside and this adds a pretty heavy implicit assumption. Why not make the 
`SpanningRecordSerializer` work with the `BufferProvider` of the 
`targetPartition` directly, e.g. either by adding it at construction time or as 
a parameter. We'd then probably have to rename the `setNextBuffer` method but 
can keep the logic and make sure we are the sole owner.


---

Reply via email to