akalash commented on a change in pull request #17024:
URL: https://github.com/apache/flink/pull/17024#discussion_r699524505



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -330,9 +330,12 @@ private BufferBuilder 
appendBroadcastDataForRecordContinuation(
     private void createBroadcastBufferConsumers(BufferBuilder buffer, int 
partialRecordBytes)
             throws IOException {
         try (final BufferConsumer consumer = 
buffer.createBufferConsumerFromBeginning()) {
+            int desirableBufferSize = Integer.MAX_VALUE;
             for (ResultSubpartition subpartition : subpartitions) {
-                subpartition.add(consumer.copy(), partialRecordBytes);
+                int subPartitionBufferSize = subpartition.add(consumer.copy(), 
partialRecordBytes);

Review comment:
       I believe more correctly is to ignore the value for calculation of 
desirable buffer size rather than ignore the total value. I mean:
   ```
   if(subPartitionBufferSize > 0) {
      desirableBufferSize = Math.min(desirableBufferSize, 
subPartitionBufferSize);
   }
   ```
   I think that it is better because if one of the subpartitions fails we are 
still able to send it to another ones. As I understand eventually they all will 
be closed if at least one was closed so maybe it is not so important but anyway 
it is better to follow the current semantic which doesn't forbid sending data 
to subpartitions even if one of them is closed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to