[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362523#comment-16362523
 ] 

ASF GitHub Bot commented on FLINK-8581:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r167588455
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
    @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
        private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
                RecordSerializer<T> serializer = serializers[targetChannel];
     
    -           synchronized (serializer) {
    -                   SerializationResult result = 
serializer.addRecord(record);
    -
    -                   while (result.isFullBuffer()) {
    -                           Buffer buffer = serializer.getCurrentBuffer();
    -
    -                           if (buffer != null) {
    -                                   numBytesOut.inc(buffer.getSizeUnsafe());
    -                                   writeAndClearBuffer(buffer, 
targetChannel, serializer);
    -
    -                                   // If this was a full record, we are 
done. Not breaking
    -                                   // out of the loop at this point will 
lead to another
    -                                   // buffer request before breaking out 
(that would not be
    -                                   // a problem per se, but it can lead to 
stalls in the
    -                                   // pipeline).
    -                                   if (result.isFullRecord()) {
    -                                           break;
    -                                   }
    -                           } else {
    -                                   BufferBuilder bufferBuilder =
    -                                           
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
    -                                   result = 
serializer.setNextBufferBuilder(bufferBuilder);
    +           SerializationResult result = serializer.addRecord(record);
    +
    +           while (result.isFullBuffer()) {
    +                   if (tryFinishCurrentBufferBuilder(targetChannel, 
serializer)) {
    +                           // If this was a full record, we are done. Not 
breaking
    +                           // out of the loop at this point will lead to 
another
    +                           // buffer request before breaking out (that 
would not be
    +                           // a problem per se, but it can lead to stalls 
in the
    +                           // pipeline).
    +                           if (result.isFullRecord()) {
    +                                   break;
                                }
                        }
    +                   BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
    +
    +                   result = serializer.setNextBufferBuilder(bufferBuilder);
                }
    +           checkState(!serializer.hasSerializedData(), "All data should be 
written at once");
        }
     
    -   public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
    -           final Buffer eventBuffer = EventSerializer.toBuffer(event);
    -           try {
    +   public BufferConsumer broadcastEvent(AbstractEvent event) throws 
IOException, InterruptedException {
    --- End diff --
    
    I think this method does not truly require a return value. The return value 
is only used in one test, and I found it confusing that it is first closed and 
then returned.


> Improve performance for low latency network
> -------------------------------------------
>
>                 Key: FLINK-8581
>                 URL: https://issues.apache.org/jira/browse/FLINK-8581
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to