[ 
https://issues.apache.org/jira/browse/FLINK-8581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16362516#comment-16362516
 ] 

ASF GitHub Bot commented on FLINK-8581:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r167546706
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
    @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, 
InterruptedException {
        private void sendToTarget(T record, int targetChannel) throws 
IOException, InterruptedException {
                RecordSerializer<T> serializer = serializers[targetChannel];
     
    -           synchronized (serializer) {
    -                   SerializationResult result = 
serializer.addRecord(record);
    -
    -                   while (result.isFullBuffer()) {
    -                           Buffer buffer = serializer.getCurrentBuffer();
    -
    -                           if (buffer != null) {
    -                                   numBytesOut.inc(buffer.getSizeUnsafe());
    -                                   writeAndClearBuffer(buffer, 
targetChannel, serializer);
    -
    -                                   // If this was a full record, we are 
done. Not breaking
    -                                   // out of the loop at this point will 
lead to another
    -                                   // buffer request before breaking out 
(that would not be
    -                                   // a problem per se, but it can lead to 
stalls in the
    -                                   // pipeline).
    -                                   if (result.isFullRecord()) {
    -                                           break;
    -                                   }
    -                           } else {
    -                                   BufferBuilder bufferBuilder =
    -                                           
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
    -                                   result = 
serializer.setNextBufferBuilder(bufferBuilder);
    +           SerializationResult result = serializer.addRecord(record);
    +
    +           while (result.isFullBuffer()) {
    --- End diff --
    
    I wonder if this loop could not be simplified to
    ```
                while (!result.isFullRecord()) {
                        tryFinishCurrentBufferBuilder(targetChannel, 
serializer);
                        BufferBuilder bufferBuilder = 
requestNewBufferBuilder(targetChannel);
                        result = serializer.setNextBufferBuilder(bufferBuilder);
                }
    ```
    
    This would introduce a minor change in behaviour in cases where the end of 
the record falls exactly to the end of a buffer. With the change, the buffer is 
only finished by the next record and not on the spot. However this should not 
be a problem because this outcome is what usually should happen for almost 
every record beside those corner cases and thus the code should already handle 
them well.
    With this change, `tryFinishCurrentBufferBuilder` also does not longer 
require a return value.


> Improve performance for low latency network
> -------------------------------------------
>
>                 Key: FLINK-8581
>                 URL: https://issues.apache.org/jira/browse/FLINK-8581
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network
>    Affects Versions: 1.5.0
>            Reporter: Piotr Nowojski
>            Assignee: Piotr Nowojski
>            Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to