[ 
https://issues.apache.org/jira/browse/FLINK-5059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15728924#comment-15728924
 ] 

ASF GitHub Bot commented on FLINK-5059:
---------------------------------------

Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2805#discussion_r91306180
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
    @@ -131,35 +132,30 @@ private void sendToTarget(T record, int 
targetChannel) throws IOException, Inter
        }
     
        public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
    -           for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
    -                   RecordSerializer<T> serializer = 
serializers[targetChannel];
    -
    -                   synchronized (serializer) {
    -                           Buffer buffer = serializer.getCurrentBuffer();
    -                           if (buffer != null) {
    -                                   writeAndClearBuffer(buffer, 
targetChannel, serializer);
    -                           } else if (serializer.hasData()) {
    -                                   throw new IllegalStateException("No 
buffer, but serializer has buffered data.");
    -                           }
    -
    -                           targetPartition.writeEvent(event, 
targetChannel);
    -                   }
    -           }
    -   }
    +           final Buffer eventBuffer = EventSerializer.toBuffer(event);
    +           try {
    +                   for (int targetChannel = 0; targetChannel < 
numChannels; targetChannel++) {
    +                           RecordSerializer<T> serializer = 
serializers[targetChannel];
     
    -   public void sendEndOfSuperstep() throws IOException, 
InterruptedException {
    -           for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
    -                   RecordSerializer<T> serializer = 
serializers[targetChannel];
    +                           synchronized (serializer) {
    +                                   Buffer buffer = 
serializer.getCurrentBuffer();
    +                                   if (buffer != null) {
    +                                           writeAndClearBuffer(buffer, 
targetChannel, serializer);
    +                                   } else if (serializer.hasData()) {
    +                                           // sanity check
    +                                           throw new 
IllegalStateException("No buffer, but serializer has buffered data.");
    +                                   }
     
    -                   synchronized (serializer) {
    -                           Buffer buffer = serializer.getCurrentBuffer();
    -                           if (buffer != null) {
    -                                   writeAndClearBuffer(buffer, 
targetChannel, serializer);
    +                                   // retain the buffer so that it can be 
recycled by each channel of targetPartition
    +                                   eventBuffer.retain();
    --- End diff --
    
    done, thanks
    
    I opened [FLINK-5277] for the ResultPartition#add test: 
https://issues.apache.org/jira/browse/FLINK-5277


> only serialise events once in RecordWriter#broadcastEvent
> ---------------------------------------------------------
>
>                 Key: FLINK-5059
>                 URL: https://issues.apache.org/jira/browse/FLINK-5059
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> Currently, 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter#broadcastEvent 
> serialises the event once per target channel. Instead, it could serialise the 
> event only once and use the serialised form for every channel and thus save 
> resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to