pnowojski commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r262975197
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##########
 @@ -44,4 +52,67 @@ public BroadcastRecordWriter(
        public void emit(T record) throws IOException, InterruptedException {
                broadcastEmit(record);
        }
+
+       @Override
+       public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 
 Review comment:
   I also do not like spilling this logic to the receiver. I thought a little 
bit more about it and there is also one more option that should work. 
   
   We have one shared `BufferBuilder` with multiple `BufferConsumers`. Once we 
get `randomEmit()` for a single channel `channelId`, we:
   1. finish the current `BufferBuilder`
   2. we create a new `BufferBuilder` (let's call it `bb`)
   3. we serialize `LatencyMarker` to the `bb`
   4. we enqueue new `BufferConsumer` created from `bb` to the `channelId`
   5. For the sake of other channels, we create `BufferConsumer`s from the `bb` 
with offsetted starting position. Starting position should be offseted in a 
such way, that the `LatencyMarker` wouldn't be visible for those channels. 
(initiating `BufferConsumer#currentReaderPosition` to `LatencyMarker.size()`).
   
   There would be one drawback of this solution. Reported latency, would be 
slightly higher, since latency markers would be always written to the buffer's 
beginning, thus marker would have to wait a bit more for the buffer to fill in 
compared to how long the regular records do wait on average. However flushing 
timeout wouldn't be affected, so in the end this should be fine.
   
   What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to