AHeise commented on a change in pull request #12353:
URL: https://github.com/apache/flink/pull/12353#discussion_r432768888



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
##########
@@ -130,7 +132,7 @@ public BufferBuilder requestNewBufferBuilder(int 
targetChannel) throws IOExcepti
 
                BufferBuilder builder = 
super.requestNewBufferBuilder(targetChannel);
                if (randomTriggered) {
-                       addBufferConsumer(builder.createBufferConsumer(), 
targetChannel);
+                       addBufferConsumer(randomConsumer = 
builder.createBufferConsumer(), targetChannel);

Review comment:
       You are right, currently LM gets replicated, which is not good. I was 
expecting `#copy` to pick up the current builder position for some reason.
   
   Coming back to this bug, the issue is that the underlying memory segment 
will be recycled as soon as the first buffer consumer (with LM) is being 
consumed. That causes buffers to be effectively skipped and corrupted streams.
   
   @pnowojski and I was also questioning why `BufferBuilder` only holds the 
memory segment and not the buffer. By definition calling 
`#createBufferConsumer` more than once will create the aforementioned race 
condition.
   
   It would be solved if `BufferBuilder` holds the buffer instead of the raw 
memory segment as a buffer would only be recycled when the last buffer consumer 
has been closed. However, it seems so obvious that there must be a reason for 
the current implementation.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to