pnowojski commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r262975197
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##########
 @@ -44,4 +52,67 @@ public BroadcastRecordWriter(
        public void emit(T record) throws IOException, InterruptedException {
                broadcastEmit(record);
        }
+
+       @Override
+       public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 
 Review comment:
   I also do not like spilling this logic to the receiver. I thought a little 
bit more about it and there is also one more option that should work. 
   
   We have one shared `BufferBuilder` with multiple `BufferConsumers`. Once we 
get `randomEmit()` for a single channel `channelId`, we:
   1. finish the current `BufferBuilder`
   2. we create a new `BufferBuilder` (let's call it `bb`)
   3. we serialize `LatencyMarker` to the `bb`
   4. we enqueue new `BufferConsumer` created from `bb` to the `channelId`
   5. For the sake of other channels, we create `BufferConsumer`s from the `bb` 
with offsetted starting position. Starting position should be offseted in a 
such way, that the `LatencyMarker` wouldn't be visible for those channels. 
(initiating `BufferConsumer#currentReaderPosition` to `LatencyMarker.size()`).
   
   There would be one drawback of this solution. Reported latency, would be 
slightly higher, since latency markers would be always written to the buffer's 
beginning, thus marker would have to wait a bit more for the buffer to fill in 
compared to how long the regular records do wait on average. However flushing 
timeout wouldn't be affected, so in the end this should be fine.
   
   What do you think?
   
   > Considering immediately finishing current BufferBuilder once randomEmit 
done, it might increase the frequency of finishing BufferBuilder. For example, 
if the randomEmit is triggered very frequently and there are no common emit or 
broadcastEmit calling for a while, one BufferBuilder might write multiple 
random emitted elements, no need one BufferBuilder for one randomEmit. Or this 
worry is not worth considering for most scenarios?
   
   I think two subsequent `randomEmit` calls one after another shouldn't happen 
often and if so, only for idling tasks, so we should be good ignoring this. 
This might be an issue if we would start using `randomEmit` for something else 
then latency markers, but this might never happen.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to