Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r168724735
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
    @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
        private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
                RecordSerializer<T> serializer = serializers[targetChannel];
     
    -           synchronized (serializer) {
    -                   SerializationResult result = 
serializer.addRecord(record);
    -
    -                   while (result.isFullBuffer()) {
    -                           Buffer buffer = serializer.getCurrentBuffer();
    -
    -                           if (buffer != null) {
    -                                   numBytesOut.inc(buffer.getSizeUnsafe());
    -                                   writeAndClearBuffer(buffer, 
targetChannel, serializer);
    -
    -                                   // If this was a full record, we are 
done. Not breaking
    -                                   // out of the loop at this point will 
lead to another
    -                                   // buffer request before breaking out 
(that would not be
    -                                   // a problem per se, but it can lead to 
stalls in the
    -                                   // pipeline).
    -                                   if (result.isFullRecord()) {
    -                                           break;
    -                                   }
    -                           } else {
    -                                   BufferBuilder bufferBuilder =
    -                                           
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
    -                                   result = 
serializer.setNextBufferBuilder(bufferBuilder);
    +           SerializationResult result = serializer.addRecord(record);
    +
    +           while (result.isFullBuffer()) {
    +                   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
    +                           // If this was a full record, we are done. Not 
breaking
    +                           // out of the loop at this point will lead to 
another
    +                           // buffer request before breaking out (that 
would not be
    +                           // a problem per se, but it can lead to stalls 
in the
    +                           // pipeline).
    +                           if (result.isFullRecord()) {
    +                                   break;
                                }
                        }
    +                   BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
    +
    +                   result = serializer.setNextBufferBuilder(bufferBuilder);
                }
    +           checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
        }
     
    -   public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
    -           final Buffer eventBuffer = EventSerializer.toBuffer(event);
    -           try {
    +   public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
    --- End diff --
    
    Good question, there could be a package private method that returns the 
buffer, and the public method uses this method but does not return the buffer. 
But questionable if that is really better, because we also would need to ensure 
that the the public goes through the private method etc.


---

Reply via email to