Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r167859598
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
    @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
        private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
                RecordSerializer<T> serializer = serializers[targetChannel];
     
    -           synchronized (serializer) {
    -                   SerializationResult result = 
serializer.addRecord(record);
    -
    -                   while (result.isFullBuffer()) {
    -                           Buffer buffer = serializer.getCurrentBuffer();
    -
    -                           if (buffer != null) {
    -                                   numBytesOut.inc(buffer.getSizeUnsafe());
    -                                   writeAndClearBuffer(buffer, 
targetChannel, serializer);
    -
    -                                   // If this was a full record, we are 
done. Not breaking
    -                                   // out of the loop at this point will 
lead to another
    -                                   // buffer request before breaking out 
(that would not be
    -                                   // a problem per se, but it can lead to 
stalls in the
    -                                   // pipeline).
    -                                   if (result.isFullRecord()) {
    -                                           break;
    -                                   }
    -                           } else {
    -                                   BufferBuilder bufferBuilder =
    -                                           
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
    -                                   result = 
serializer.setNextBufferBuilder(bufferBuilder);
    +           SerializationResult result = serializer.addRecord(record);
    +
    +           while (result.isFullBuffer()) {
    +                   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
    +                           // If this was a full record, we are done. Not 
breaking
    +                           // out of the loop at this point will lead to 
another
    +                           // buffer request before breaking out (that 
would not be
    +                           // a problem per se, but it can lead to stalls 
in the
    +                           // pipeline).
    +                           if (result.isFullRecord()) {
    +                                   break;
                                }
                        }
    +                   BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
    +
    +                   result = serializer.setNextBufferBuilder(bufferBuilder);
                }
    +           checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
        }
     
    -   public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
    -           final Buffer eventBuffer = EventSerializer.toBuffer(event);
    -           try {
    +   public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
    --- End diff --
    
    This method does not longer throw `InterruptedException`.


---

Reply via email to