Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2805#discussion_r90777962
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
 ---
    @@ -71,21 +68,26 @@ public void writeBuffer(Buffer buffer, int 
targetChannel) throws IOException {
                partition.add(buffer, targetChannel);
        }
     
    -   public void writeEvent(AbstractEvent event, int targetChannel) throws 
IOException {
    -           partition.add(EventSerializer.toBuffer(event), targetChannel);
    -   }
    -
    -   public void writeEventToAllChannels(AbstractEvent event) throws 
IOException {
    -           for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) {
    -                   Buffer buffer = EventSerializer.toBuffer(event);
    -                   partition.add(buffer, i);
    -           }
    -   }
    -
    -   public void writeEndOfSuperstep() throws IOException {
    -           for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) {
    -                   Buffer buffer = 
EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE);
    -                   partition.add(buffer, i);
    +   /**
    +    * Writes the given buffer to all available target channels.
    +    *
    +    * The buffer is taken over and used for each of the channels.
    +    * It will be recycled afterwards.
    +    *
    +    * @param eventBuffer the buffer to write
    +    * @throws IOException
    +    */
    +   public void writeBufferToAllChannels(final Buffer eventBuffer) throws 
IOException {
    +           try {
    +                   for (int targetChannel = 0; targetChannel < 
partition.getNumberOfSubpartitions(); targetChannel++) {
    +                           // retain the buffer so that it can be recycled 
by each channel of targetPartition
    +                           eventBuffer.retain();
    --- End diff --
    
    Similar comments as in the `RecordWriter` apply here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to