akalash commented on a change in pull request #17024:
URL: https://github.com/apache/flink/pull/17024#discussion_r698661266



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
##########
@@ -692,6 +692,17 @@ public void testDifferentBufferSizeForSubpartitions() 
throws IOException {
         assertEquals(12, subpartition0.pollBuffer().buffer().getSize());
         assertEquals(5, subpartition1.pollBuffer().buffer().getSize());
         assertEquals(4, subpartition1.pollBuffer().buffer().getSize());
+
+        // when: Set the different buffers size.
+        subpartition0.bufferSize(12);
+        subpartition1.bufferSize(5);
+
+        // and: Add the buffer.
+        bufferWritingResultPartition.broadcastRecord(ByteBuffer.allocate(5));
+
+        // then: The buffer less or equal to configured.
+        assertEquals(5, subpartition0.pollBuffer().buffer().getSize());
+        assertEquals(5, subpartition1.pollBuffer().buffer().getSize());

Review comment:
       Please, take a look more carefully at this 
test(`testDifferentBufferSizeForSubpartitions`), there are several scenarios 
that were tested here(send buffer less/greater/equal than the buffer size, 
change the buffer size and send the again). But you have only one test case 
which is not enough. So please, increase your test coverage.
   Also I suggest extracting your test in the separated method like 
`testDynamicBufferSizeForBroadcast` or something similar.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -330,9 +330,12 @@ private BufferBuilder 
appendBroadcastDataForRecordContinuation(
     private void createBroadcastBufferConsumers(BufferBuilder buffer, int 
partialRecordBytes)
             throws IOException {
         try (final BufferConsumer consumer = 
buffer.createBufferConsumerFromBeginning()) {
+            int desirableBufferSize = Integer.MAX_VALUE;
             for (ResultSubpartition subpartition : subpartitions) {
-                subpartition.add(consumer.copy(), partialRecordBytes);
+                int subPartitionBufferSize = subpartition.add(consumer.copy(), 
partialRecordBytes);

Review comment:
       If `subpartition#add` fails it returns the negative value(`-1`), so we 
should take into account this case and for example ignore such value.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to