AHeise commented on a change in pull request #12353:
URL: https://github.com/apache/flink/pull/12353#discussion_r432768888
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
##########
@@ -130,7 +132,7 @@ public BufferBuilder requestNewBufferBuilder(int
targetChannel) throws IOExcepti
BufferBuilder builder =
super.requestNewBufferBuilder(targetChannel);
if (randomTriggered) {
- addBufferConsumer(builder.createBufferConsumer(),
targetChannel);
+ addBufferConsumer(randomConsumer =
builder.createBufferConsumer(), targetChannel);
Review comment:
You are right, currently LM gets replicated, which is not good. I was
expecting `#copy` to pick up the current builder position for some reason.
Coming back to this bug, the issue is that the underlying memory segment
will be recycled as soon as the first buffer consumer (with LM) is being
consumed. That causes buffers to be effectively skipped and corrupted streams.
@pnowojski and I was also questioning why `BufferBuilder` only holds the
memory segment and not the buffer. By definition calling
`#createBufferConsumer` more than once will create the aforementioned race
condition.
It would be solved if `BufferBuilder` holds the buffer instead of the raw
memory segment as a buffer would only be recycled when the last buffer consumer
has been closed. However, it seems so obvious that there must be a reason for
the current implementation.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]