pnowojski commented on a change in pull request #12353:
URL: https://github.com/apache/flink/pull/12353#discussion_r434550118



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
##########
@@ -84,7 +86,7 @@ void randomEmit(T record, int targetChannelIndex) throws 
IOException, Interrupte
                if (bufferBuilder != null) {
                        for (int index = 0; index < numberOfChannels; index++) {
                                if (index != targetChannelIndex) {
-                                       
addBufferConsumer(bufferBuilder.createBufferConsumer(), index);
+                                       
addBufferConsumer(randomTriggeredConsumer.copyAndSync(bufferBuilder), index);

Review comment:
       Uhhh. This is a nice spaghetti (the way how `randomTriggeredConsumer` is 
being set and used here).
   
   I don't know, but maybe we should re-think the inheritance and code 
deduplication between `BroadcastRecordWriter` and `RecordWriter`? Maybe we 
should drop this connection and live with code duplication? Or maybe there is 
some better way? Like extract some class and switch to composition?
   
   If you and @curcur are fine with the current fix, I would be fine with doing 
the refactor/rethinking it outside of this bug fix. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
##########
@@ -84,7 +86,7 @@ void randomEmit(T record, int targetChannelIndex) throws 
IOException, Interrupte
                if (bufferBuilder != null) {
                        for (int index = 0; index < numberOfChannels; index++) {
                                if (index != targetChannelIndex) {
-                                       
addBufferConsumer(bufferBuilder.createBufferConsumer(), index);

Review comment:
       Could you re-introduced `checkState(!bufferConsumerCreated)` removed by 
https://github.com/apache/flink/commit/7f2382a532ab72e6ead12945eb2395e6af829eb5#diff-c3aa29406973e93a909773e89093b9caL43
 ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to