pnowojski commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r262504889
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 ##########
 @@ -44,4 +52,67 @@ public BroadcastRecordWriter(
        public void emit(T record) throws IOException, InterruptedException {
                broadcastEmit(record);
        }
+
+       @Override
+       public void broadcastEmit(T record) throws IOException, 
InterruptedException {
 
 Review comment:
   I see couple of issues/potential issues here:
   1. I'm not sure if I like the `randomTriggered` state field and how it's 
value affects the logic in different methods. It took quite some time to 
understand two successive `randomEmit` one after another work correctly (and 
how they actually work), because `requestNewBufferBuilder` behaves differently 
for `randomTriggered = true`. 
   2. I'm not sure, but I think that `BroadcastRecordWriter`'s logic would be 
actually easier to follow, if it didn't have `bufferBuilders` array, but single 
`bufferBuilder` field. Maybe it would be worth creating `AbstractRecordWriter` 
without `bufferBuilders`/`bufferBuilder` field and then have two concrete 
implementations of `RecordWriter` and `BroadcastRecordWriter` with one two 
different implementations?
   3. `LatencyMarker`s are a bit broken here, since effectively they will be 
flushed on the very first next emit following the `LatencyMarker`, so they do 
not measure the actual latency. This might not be the critical issue, but it 
would be good to have correct metric for that.
   
   Point 1. is not that difficult to address (for 1. `randomEmit()` could flush 
the current buffer, request a new one for one `randomEmit()` and immediately 
finish it, so that this special handling of `randomTriggered = true` wouldn't 
leak outside of the `randomEmit()` method).
   
   Point 2. Might be not worth doing so.
   
   Point 3. is the more problematic one I guess. 
   
   a) One way to deal with it, would be to broadcast `LatencyMarker`. Easy to 
do, but with huge number of output channels and multiple broadcasts channels 
one after another, would result with gigantic cascade/avalanche of the 
`LatencyMarkers`. 
   b) Another idea would be to broadcast the `LatencyMarker` to everybody, but 
add some kind of marker to let the receiver know, that it should ignore it, 
because it was targeted to someone else.
   c) Maybe there are other solutions that I haven't thought about?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to