zhijiangW commented on a change in pull request #7713: [FLINK-10995][network]
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r262784641
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
##########
@@ -44,4 +52,67 @@ public BroadcastRecordWriter(
public void emit(T record) throws IOException, InterruptedException {
broadcastEmit(record);
}
+
+ @Override
+ public void broadcastEmit(T record) throws IOException,
InterruptedException {
Review comment:
Thanks for these kindly detail suggestions. You pointed out a very critical
potential problem which I have not thought through before.
Regarding with the `LatencyMarker`, I have not thought of a perfect solution
yet.
1. If finishing the `LatencyMarker` immediately, it could only reflect the
latency before this marker, and lose a bit latency of current `BufferConsumer`
containing `LatencyMarker`.
2. If broadcasting the `LatencyMarker`, there are two concerns: First we
might add the `LatencyMarkerID` in the marker like `CheckpointID`, and ignore
the duplicate id in `StreamInputProcessor` which needs maintain a set of
`LatencyMarkerID` for filter. We could add this logic in processor to avoid
gigantic avalanche of the marker. If to do so, the logic seems very easy for
`BroadcastRecordWriter`, and the methods for `emit` and `randomEmit` are all
processed in broadcast mode. But if we extend other elements to be `randomEmit`
future, we might need add corresponding special handing logic like
`LatencyMarker` then. The second concern is that broadcasting `LatencyMarker`
still loses the latency precision compared with now. Currently we choose the
random channel to send this marker, so it could reflect the average latency
from statistics. But the broadcast mode could always reflect the latency of the
fastest channel, because once the downstream receives the first marker, it
would ignore the same following markers from other slow channels.
Considering the above two options, I would prefer the first option because
the latency lose seems less compared with broadcasting way, and we do not need
to adjust the logic on receiver side (how many IDs to be maintained in memory
might be another concern in this way).
Regarding with the first option, I agree with your suggestion of abstracting
`AbstractRecordWriter` to maintain single `BufferBuilder` in
`BroadcastRecordWriter`. And I could understand your concerns of the
`randomTriggered` field. Your suggestion of finishing `LatencyMarker`
immediately in `randomEmit` could avoid this field. I would like to have a try
based on them if you also approve the first option. :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services