zhijiangW commented on a change in pull request #7713: [FLINK-10995][network]
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r263377268
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
##########
@@ -44,4 +52,67 @@ public BroadcastRecordWriter(
public void emit(T record) throws IOException, InterruptedException {
broadcastEmit(record);
}
+
+ @Override
+ public void broadcastEmit(T record) throws IOException,
InterruptedException {
Review comment:
Considering initiating `BufferConsumer#currentReaderPosition` to
`LatencyMarker.size()` for other channels in
`BroadcastRecordWriter#randomEmit`, I meant this way of getting marker size
seems coupled with `LatencyMarker` currently. Maybe we do not need consider it
now, if there are other usages in future, we can make it smarter then.
For `copyFromSerializerToTargetChannel`, this process might be called either
by `randomEmit` or `BroadcastRecordWriter#emit`. And the
`requestNewBufferBuilder` would be triggered in
`copyFromSerializerToTargetChannel`. If it is in the `randomEmit` process, we
need to initialize `BufferConsumer#currentReaderPosition` for other sharing
channels, otherwise we do not need to do. So it needs a flag to distinguish the
different ways. It would be easy for explanation after I submit the codes
later. :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services