[ 
https://issues.apache.org/jira/browse/FLINK-5059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15719998#comment-15719998
 ] 

ASF GitHub Bot commented on FLINK-5059:
---------------------------------------

Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2805#discussion_r90777958
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 ---
    @@ -131,35 +132,30 @@ private void sendToTarget(T record, int 
targetChannel) throws IOException, Inter
        }
     
        public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
    -           for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
    -                   RecordSerializer<T> serializer = 
serializers[targetChannel];
    -
    -                   synchronized (serializer) {
    -                           Buffer buffer = serializer.getCurrentBuffer();
    -                           if (buffer != null) {
    -                                   writeAndClearBuffer(buffer, 
targetChannel, serializer);
    -                           } else if (serializer.hasData()) {
    -                                   throw new IllegalStateException("No 
buffer, but serializer has buffered data.");
    -                           }
    -
    -                           targetPartition.writeEvent(event, 
targetChannel);
    -                   }
    -           }
    -   }
    +           final Buffer eventBuffer = EventSerializer.toBuffer(event);
    +           try {
    +                   for (int targetChannel = 0; targetChannel < 
numChannels; targetChannel++) {
    +                           RecordSerializer<T> serializer = 
serializers[targetChannel];
     
    -   public void sendEndOfSuperstep() throws IOException, 
InterruptedException {
    -           for (int targetChannel = 0; targetChannel < numChannels; 
targetChannel++) {
    -                   RecordSerializer<T> serializer = 
serializers[targetChannel];
    +                           synchronized (serializer) {
    +                                   Buffer buffer = 
serializer.getCurrentBuffer();
    +                                   if (buffer != null) {
    +                                           writeAndClearBuffer(buffer, 
targetChannel, serializer);
    +                                   } else if (serializer.hasData()) {
    +                                           // sanity check
    +                                           throw new 
IllegalStateException("No buffer, but serializer has buffered data.");
    +                                   }
     
    -                   synchronized (serializer) {
    -                           Buffer buffer = serializer.getCurrentBuffer();
    -                           if (buffer != null) {
    -                                   writeAndClearBuffer(buffer, 
targetChannel, serializer);
    +                                   // retain the buffer so that it can be 
recycled by each channel of targetPartition
    +                                   eventBuffer.retain();
    --- End diff --
    
    It would be good to add a special RecordWriterTest that ensure that the 
reference counting logic works.
    
    ```java
    @Test
    public void testBroadcastEventBufferReferenceCounting() throws Exception {
      Buffer buffer = EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
    
      // Partial mocking of static method...
      PowerMockito.stub(PowerMockito.method(EventSerializer.class, 
"toBuffer")).toReturn(buffer);
    
      @SuppressWarnings("unchecked")
      ArrayDeque<BufferOrEvent>[] queues = new ArrayDeque[] { new ArrayDeque(), 
new ArrayDeque() };
    
      ResultPartitionWriter partition = createCollectingPartitionWriter(queues, 
new TestInfiniteBufferProvider());
      RecordWriter<?> writer = new RecordWriter<>(partition);
    
      writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
    
      // Verify added to all queues
      assertEquals(1, queues[0].size());
      assertEquals(1, queues[1].size());
    
      assertTrue(buffer.isRecycled());
    }
    ```
    
    You have to adjust the `createCollectingPartitionWriter` to correctly 
recycle event buffers and replace the `PrepareForTest` class annotation with 
`@PrepareForTest({ResultPartitionWriter.class, EventSerializer.class})`.
    
    ---
    
    Not changed in this PR, but to work correctly this relies on the 
`ResultPartition` to recycle the buffer if the `add` calls fails. It might make 
sense to add a special test (to `ResultPartitionTest` or `RecordWriterTest`) 
where we ensure that this actually happens to guard against future behaviour 
changes in `ResultPartition`. A possible better behaviour would be to let the 
`RecordWriter` recycle it if an Exception occurs. This should be addressed in a 
different PR though.


> only serialise events once in RecordWriter#broadcastEvent
> ---------------------------------------------------------
>
>                 Key: FLINK-5059
>                 URL: https://issues.apache.org/jira/browse/FLINK-5059
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> Currently, 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter#broadcastEvent 
> serialises the event once per target channel. Instead, it could serialise the 
> event only once and use the serialised form for every channel and thus save 
> resources.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to