pnowojski commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r262486384
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##########
 @@ -247,136 +388,121 @@ public void testBroadcastEventMixedRecords() throws 
Exception {
                }
 
                TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-
                ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
-               RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter);
-               CheckpointBarrier barrier = new 
CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               RecordWriter<ByteArrayIO> writer = isBroadcastWriter ?
+                       RecordWriter.createRecordWriter(partitionWriter, new 
OutputEmitter(ShipStrategyType.BROADCAST, 0), "test") :
+                       RecordWriter.createRecordWriter(partitionWriter);
+
+               CheckpointBarrier barrier = new CheckpointBarrier(0, 0, 
CheckpointOptions.forCheckpointWithDefaultLocation());
 
                // Emit records on some channels first (requesting buffers), 
then
                // broadcast the event. The record buffers should be emitted 
first, then
                // the event. After the event, no new buffer should be 
requested.
 
-               // (i) Smaller than the buffer size (single buffer request => 1)
+               // (i) Smaller than the buffer size
                byte[] bytes = new byte[bufferSize / 2];
                rand.nextBytes(bytes);
 
                writer.emit(new ByteArrayIO(bytes));
 
-               // (ii) Larger than the buffer size (two buffer requests => 1 + 
2)
+               // (ii) Larger than the buffer size
                bytes = new byte[bufferSize + 1];
                rand.nextBytes(bytes);
 
                writer.emit(new ByteArrayIO(bytes));
 
-               // (iii) Exactly the buffer size (single buffer request => 1 + 
2 + 1)
+               // (iii) Exactly the buffer size
                bytes = new byte[bufferSize - lenBytes];
                rand.nextBytes(bytes);
 
                writer.emit(new ByteArrayIO(bytes));
 
-               // (iv) Nothing on the 4th channel (no buffer request => 1 + 2 
+ 1 + 0 = 4)
-
-               // (v) Broadcast the event
+               // (iv) Broadcast the event
                writer.broadcastEvent(barrier);
 
-               assertEquals(4, bufferProvider.getNumberOfCreatedBuffers());
+               if (isBroadcastWriter) {
 
 Review comment:
   with `BroadcastRecordWriterTest` class, this if could be changed to 
overloaded method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to