zhijiangW commented on a change in pull request #7713: [FLINK-10995][network]
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r262788274
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
##########
@@ -247,136 +388,121 @@ public void testBroadcastEventMixedRecords() throws
Exception {
}
TestPooledBufferProvider bufferProvider = new
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-
ResultPartitionWriter partitionWriter = new
CollectingPartitionWriter(queues, bufferProvider);
- RecordWriter<ByteArrayIO> writer = new
RecordWriter<>(partitionWriter);
- CheckpointBarrier barrier = new
CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L,
CheckpointOptions.forCheckpointWithDefaultLocation());
+ RecordWriter<ByteArrayIO> writer = isBroadcastWriter ?
+ RecordWriter.createRecordWriter(partitionWriter, new
OutputEmitter(ShipStrategyType.BROADCAST, 0), "test") :
+ RecordWriter.createRecordWriter(partitionWriter);
+
+ CheckpointBarrier barrier = new CheckpointBarrier(0, 0,
CheckpointOptions.forCheckpointWithDefaultLocation());
// Emit records on some channels first (requesting buffers),
then
// broadcast the event. The record buffers should be emitted
first, then
// the event. After the event, no new buffer should be
requested.
- // (i) Smaller than the buffer size (single buffer request => 1)
+ // (i) Smaller than the buffer size
byte[] bytes = new byte[bufferSize / 2];
rand.nextBytes(bytes);
writer.emit(new ByteArrayIO(bytes));
- // (ii) Larger than the buffer size (two buffer requests => 1 +
2)
+ // (ii) Larger than the buffer size
bytes = new byte[bufferSize + 1];
rand.nextBytes(bytes);
writer.emit(new ByteArrayIO(bytes));
- // (iii) Exactly the buffer size (single buffer request => 1 +
2 + 1)
+ // (iii) Exactly the buffer size
bytes = new byte[bufferSize - lenBytes];
rand.nextBytes(bytes);
writer.emit(new ByteArrayIO(bytes));
- // (iv) Nothing on the 4th channel (no buffer request => 1 + 2
+ 1 + 0 = 4)
-
- // (v) Broadcast the event
+ // (iv) Broadcast the event
writer.broadcastEvent(barrier);
- assertEquals(4, bufferProvider.getNumberOfCreatedBuffers());
+ if (isBroadcastWriter) {
+ assertEquals(3,
bufferProvider.getNumberOfCreatedBuffers());
- BufferOrEvent boe;
- assertEquals(2, queues[0].size()); // 1 buffer + 1 event
- assertTrue(parseBuffer(queues[0].remove(), 0).isBuffer());
- assertEquals(3, queues[1].size()); // 2 buffers + 1 event
- assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
- assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
- assertEquals(2, queues[2].size()); // 1 buffer + 1 event
- assertTrue(parseBuffer(queues[2].remove(), 2).isBuffer());
- assertEquals(1, queues[3].size()); // 0 buffers + 1 event
+ for (int i = 0; i < numberOfChannels; i++) {
+ assertEquals(4, queues[i].size()); // 3 buffer
+ 1 event
- // every queue's last element should be the event
- for (int i = 0; i < numberOfChannels; i++) {
- boe = parseBuffer(queues[i].remove(), i);
- assertTrue(boe.isEvent());
- assertEquals(barrier, boe.getEvent());
+ for (int j = 0; j < 3; j++) {
+
assertTrue(parseBuffer(queues[i].remove(), 0).isBuffer());
+ }
+
+ BufferOrEvent boe =
parseBuffer(queues[i].remove(), i);
+ assertTrue(boe.isEvent());
+ assertEquals(barrier, boe.getEvent());
+ }
+ } else {
+ assertEquals(4,
bufferProvider.getNumberOfCreatedBuffers());
+
+ assertEquals(2, queues[0].size()); // 1 buffer + 1 event
+ assertTrue(parseBuffer(queues[0].remove(),
0).isBuffer());
+ assertEquals(3, queues[1].size()); // 2 buffers + 1
event
+ assertTrue(parseBuffer(queues[1].remove(),
1).isBuffer());
+ assertTrue(parseBuffer(queues[1].remove(),
1).isBuffer());
+ assertEquals(2, queues[2].size()); // 1 buffer + 1 event
+ assertTrue(parseBuffer(queues[2].remove(),
2).isBuffer());
+ assertEquals(1, queues[3].size()); // 0 buffers + 1
event
+
+ // every queue's last element should be the event
+ for (int i = 0; i < numberOfChannels; i++) {
+ BufferOrEvent boe =
parseBuffer(queues[i].remove(), i);
+ assertTrue(boe.isEvent());
+ assertEquals(barrier, boe.getEvent());
+ }
}
}
- /**
- * Tests that event buffers are properly recycled when broadcasting
events
- * to multiple channels.
- */
- @Test
- public void testBroadcastEventBufferReferenceCounting() throws
Exception {
-
+ private void broadcastBufferOrEventReferenceCounting(boolean
isBroadcastWriter, boolean isBroadcastEvent) throws Exception {
@SuppressWarnings("unchecked")
ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new
ArrayDeque(), new ArrayDeque() };
- ResultPartitionWriter partition =
- new CollectingPartitionWriter(queues, new
TestPooledBufferProvider(Integer.MAX_VALUE));
- RecordWriter<?> writer = new RecordWriter<>(partition);
+ ResultPartitionWriter partition = new
CollectingPartitionWriter(queues, new
TestPooledBufferProvider(Integer.MAX_VALUE));
+ RecordWriter<IntValue> writer = isBroadcastWriter ?
+ RecordWriter.createRecordWriter(partition, new
OutputEmitter(ShipStrategyType.BROADCAST, 0), "test") :
+ RecordWriter.createRecordWriter(partition);
- writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
+ if (isBroadcastEvent) {
Review comment:
Yes, it would confuse you after I refactor the whole tests.
I could keep the previous tests unchanged, but if my new tests reuse some
common codes from previous tests. Do you mind I extract the common parts
together? For the old tests not related to my new tests, I would keep them
unchanged.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services