Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r167528361
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
    @@ -209,22 +171,38 @@ public void setMetricGroup(TaskIOMetricGroup metrics) 
{
        }
     
        /**
    -    * Writes the buffer to the {@link ResultPartitionWriter} and removes 
the
    -    * buffer from the serializer state.
    +    * Marks the current {@link BufferBuilder} as finished and clears the 
state for next one.
         *
    -    * <p><b>Needs to be synchronized on the serializer!</b>
    +    * @return true if some data were written
         */
    -   private void writeAndClearBuffer(
    -                   Buffer buffer,
    +   private boolean tryFinishCurrentBufferBuilder(
                        int targetChannel,
                        RecordSerializer<T> serializer) throws IOException {
     
    -           try {
    -                   targetPartition.writeBuffer(buffer, targetChannel);
    -           }
    -           finally {
    -                   serializer.clearCurrentBuffer();
    +           if (!bufferBuilders[targetChannel].isPresent()) {
    +                   return false;
                }
    +           BufferBuilder bufferBuilder = 
bufferBuilders[targetChannel].get();
    +           bufferBuilders[targetChannel] = Optional.empty();
    +
    +           numBytesOut.inc(bufferBuilder.getWrittenBytes());
    +           bufferBuilder.finish();
    --- End diff --
    
    You could combine this into `numBytesOut.inc(bufferBuilder.finish())` or 
maybe `finish()` should not need to have a return value?


---

Reply via email to