zhijiangW commented on a change in pull request #7713: [FLINK-10995][network] 
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r262788274
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
 ##########
 @@ -247,136 +388,121 @@ public void testBroadcastEventMixedRecords() throws 
Exception {
                }
 
                TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-
                ResultPartitionWriter partitionWriter = new 
CollectingPartitionWriter(queues, bufferProvider);
-               RecordWriter<ByteArrayIO> writer = new 
RecordWriter<>(partitionWriter);
-               CheckpointBarrier barrier = new 
CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               RecordWriter<ByteArrayIO> writer = isBroadcastWriter ?
+                       RecordWriter.createRecordWriter(partitionWriter, new 
OutputEmitter(ShipStrategyType.BROADCAST, 0), "test") :
+                       RecordWriter.createRecordWriter(partitionWriter);
+
+               CheckpointBarrier barrier = new CheckpointBarrier(0, 0, 
CheckpointOptions.forCheckpointWithDefaultLocation());
 
                // Emit records on some channels first (requesting buffers), 
then
                // broadcast the event. The record buffers should be emitted 
first, then
                // the event. After the event, no new buffer should be 
requested.
 
-               // (i) Smaller than the buffer size (single buffer request => 1)
+               // (i) Smaller than the buffer size
                byte[] bytes = new byte[bufferSize / 2];
                rand.nextBytes(bytes);
 
                writer.emit(new ByteArrayIO(bytes));
 
-               // (ii) Larger than the buffer size (two buffer requests => 1 + 
2)
+               // (ii) Larger than the buffer size
                bytes = new byte[bufferSize + 1];
                rand.nextBytes(bytes);
 
                writer.emit(new ByteArrayIO(bytes));
 
-               // (iii) Exactly the buffer size (single buffer request => 1 + 
2 + 1)
+               // (iii) Exactly the buffer size
                bytes = new byte[bufferSize - lenBytes];
                rand.nextBytes(bytes);
 
                writer.emit(new ByteArrayIO(bytes));
 
-               // (iv) Nothing on the 4th channel (no buffer request => 1 + 2 
+ 1 + 0 = 4)
-
-               // (v) Broadcast the event
+               // (iv) Broadcast the event
                writer.broadcastEvent(barrier);
 
-               assertEquals(4, bufferProvider.getNumberOfCreatedBuffers());
+               if (isBroadcastWriter) {
+                       assertEquals(3, 
bufferProvider.getNumberOfCreatedBuffers());
 
-               BufferOrEvent boe;
-               assertEquals(2, queues[0].size()); // 1 buffer + 1 event
-               assertTrue(parseBuffer(queues[0].remove(), 0).isBuffer());
-               assertEquals(3, queues[1].size()); // 2 buffers + 1 event
-               assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
-               assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
-               assertEquals(2, queues[2].size()); // 1 buffer + 1 event
-               assertTrue(parseBuffer(queues[2].remove(), 2).isBuffer());
-               assertEquals(1, queues[3].size()); // 0 buffers + 1 event
+                       for (int i = 0; i < numberOfChannels; i++) {
+                               assertEquals(4, queues[i].size()); // 3 buffer 
+ 1 event
 
-               // every queue's last element should be the event
-               for (int i = 0; i < numberOfChannels; i++) {
-                       boe = parseBuffer(queues[i].remove(), i);
-                       assertTrue(boe.isEvent());
-                       assertEquals(barrier, boe.getEvent());
+                               for (int j = 0; j < 3; j++) {
+                                       
assertTrue(parseBuffer(queues[i].remove(), 0).isBuffer());
+                               }
+
+                               BufferOrEvent boe = 
parseBuffer(queues[i].remove(), i);
+                               assertTrue(boe.isEvent());
+                               assertEquals(barrier, boe.getEvent());
+                       }
+               } else {
+                       assertEquals(4, 
bufferProvider.getNumberOfCreatedBuffers());
+
+                       assertEquals(2, queues[0].size()); // 1 buffer + 1 event
+                       assertTrue(parseBuffer(queues[0].remove(), 
0).isBuffer());
+                       assertEquals(3, queues[1].size()); // 2 buffers + 1 
event
+                       assertTrue(parseBuffer(queues[1].remove(), 
1).isBuffer());
+                       assertTrue(parseBuffer(queues[1].remove(), 
1).isBuffer());
+                       assertEquals(2, queues[2].size()); // 1 buffer + 1 event
+                       assertTrue(parseBuffer(queues[2].remove(), 
2).isBuffer());
+                       assertEquals(1, queues[3].size()); // 0 buffers + 1 
event
+
+                       // every queue's last element should be the event
+                       for (int i = 0; i < numberOfChannels; i++) {
+                               BufferOrEvent boe = 
parseBuffer(queues[i].remove(), i);
+                               assertTrue(boe.isEvent());
+                               assertEquals(barrier, boe.getEvent());
+                       }
                }
        }
 
-       /**
-        * Tests that event buffers are properly recycled when broadcasting 
events
-        * to multiple channels.
-        */
-       @Test
-       public void testBroadcastEventBufferReferenceCounting() throws 
Exception {
-
+       private void broadcastBufferOrEventReferenceCounting(boolean 
isBroadcastWriter, boolean isBroadcastEvent) throws Exception {
                @SuppressWarnings("unchecked")
                ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new 
ArrayDeque(), new ArrayDeque() };
 
-               ResultPartitionWriter partition =
-                       new CollectingPartitionWriter(queues, new 
TestPooledBufferProvider(Integer.MAX_VALUE));
-               RecordWriter<?> writer = new RecordWriter<>(partition);
+               ResultPartitionWriter partition = new 
CollectingPartitionWriter(queues, new 
TestPooledBufferProvider(Integer.MAX_VALUE));
+               RecordWriter<IntValue> writer = isBroadcastWriter ?
+                       RecordWriter.createRecordWriter(partition, new 
OutputEmitter(ShipStrategyType.BROADCAST, 0), "test") :
+                       RecordWriter.createRecordWriter(partition);
 
-               writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
+               if (isBroadcastEvent) {
 
 Review comment:
   Yes, it would confuse you after I refactor the whole tests.
   I could keep the previous tests unchanged, but if my new tests reuse some 
common codes from previous tests. Do you mind I extract the common parts 
together?  For the old tests not related to my new tests, I would keep them 
unchanged. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to