akalash commented on a change in pull request #17024:
URL: https://github.com/apache/flink/pull/17024#discussion_r699524505
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -330,9 +330,12 @@ private BufferBuilder
appendBroadcastDataForRecordContinuation(
private void createBroadcastBufferConsumers(BufferBuilder buffer, int
partialRecordBytes)
throws IOException {
try (final BufferConsumer consumer =
buffer.createBufferConsumerFromBeginning()) {
+ int desirableBufferSize = Integer.MAX_VALUE;
for (ResultSubpartition subpartition : subpartitions) {
- subpartition.add(consumer.copy(), partialRecordBytes);
+ int subPartitionBufferSize = subpartition.add(consumer.copy(),
partialRecordBytes);
Review comment:
I believe more correctly is to ignore the value for calculation of
desirable buffer size rather than ignore the total value. I mean:
```
if(subPartitionBufferSize > 0) {
desirableBufferSize = Math.min(desirableBufferSize,
subPartitionBufferSize);
}
```
I think that it is better because if one of the subpartitions fails we are
still able to send it to another ones. As I understand eventually they all will
be closed if at least one was closed so maybe it is not so important but anyway
it is better to follow the current semantic which doesn't forbid sending data
to subpartitions even if one of them is closed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]