curcur commented on a change in pull request #13501:
URL: https://github.com/apache/flink/pull/13501#discussion_r498891296



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
##########
@@ -96,15 +98,43 @@ public boolean isFinished() {
                return writerPosition.isFinished();
        }
 
+       public boolean startOfDataBuffer() {
+               return buffer.getDataType() == DATA_BUFFER && 
currentReaderPosition == 0;
+       }
+
        /**
+        * BufferConsumer skips the buffer header before building buffer.
         * @return sliced {@link Buffer} containing the not yet consumed data. 
Returned {@link Buffer} shares the reference
         * counter with the parent {@link BufferConsumer} - in order to recycle 
memory both of them must be recycled/closed.
         */
        public Buffer build() {
                writerPosition.update();
                int cachedWriterPosition = writerPosition.getCached();
-               Buffer slice = buffer.readOnlySlice(currentReaderPosition, 
cachedWriterPosition - currentReaderPosition);
+
+               Buffer slice = null;
+
+               // data buffer && starting from the beginning of the buffer
+               if (startOfDataBuffer()) {
+                       // either do not have any data, or at least have 4 
bytes (header + data)
+                       checkState(
+                               (cachedWriterPosition - currentReaderPosition > 
BUFFER_BUILDER_HEADER_SIZE)
+                                       || (currentReaderPosition == 
cachedWriterPosition)
+                       );
+
+                       // remove the header
+                       if (cachedWriterPosition - currentReaderPosition > 
BUFFER_BUILDER_HEADER_SIZE) {
+                               slice = buffer.readOnlySlice(
+                                       currentReaderPosition + 
BUFFER_BUILDER_HEADER_SIZE,
+                                       cachedWriterPosition - 
currentReaderPosition - BUFFER_BUILDER_HEADER_SIZE);
+                       }
+               }

Review comment:
       `SpillingAdaptiveSpanningRecordDeserializer#setNextBuffer` is on the 
receiver side? (downstream)
   
   if we do it here, we do it on the sending side, which means the additional 
partial-record information is not sent over the network.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to