zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r263377268
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##########
 @@ -44,4 +52,67 @@ public BroadcastRecordWriter(
        public void emit(T record) throws IOException, InterruptedException {
                broadcastEmit(record);
        }
+
+       @Override
+       public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 
 Review comment:
   Considering initiating `BufferConsumer#currentReaderPosition` to 
`LatencyMarker.size()` for other channels in 
`BroadcastRecordWriter#randomEmit`, I meant this way of getting marker size 
seems coupled with `LatencyMarker` currently. Maybe we do not need consider it 
now, if there are other usages in future, we can make it smarter then.
   
   For `copyFromSerializerToTargetChannel`, this process might be called either 
by `randomEmit` or `BroadcastRecordWriter#emit`. And the 
`requestNewBufferBuilder` would be triggered in 
`copyFromSerializerToTargetChannel`. If it is in the `randomEmit` process, we 
need to initialize `BufferConsumer#currentReaderPosition` for other sharing 
channels, otherwise we do not need to do. So it needs a flag to distinguish the 
different ways.  It would be easy for explanation after I submit the codes 
later. :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to