pnowojski commented on a change in pull request #7713: [FLINK-10995][network]
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r262487320
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
##########
@@ -247,136 +388,121 @@ public void testBroadcastEventMixedRecords() throws
Exception {
}
TestPooledBufferProvider bufferProvider = new
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-
ResultPartitionWriter partitionWriter = new
CollectingPartitionWriter(queues, bufferProvider);
- RecordWriter<ByteArrayIO> writer = new
RecordWriter<>(partitionWriter);
- CheckpointBarrier barrier = new
CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L,
CheckpointOptions.forCheckpointWithDefaultLocation());
+ RecordWriter<ByteArrayIO> writer = isBroadcastWriter ?
+ RecordWriter.createRecordWriter(partitionWriter, new
OutputEmitter(ShipStrategyType.BROADCAST, 0), "test") :
+ RecordWriter.createRecordWriter(partitionWriter);
+
+ CheckpointBarrier barrier = new CheckpointBarrier(0, 0,
CheckpointOptions.forCheckpointWithDefaultLocation());
// Emit records on some channels first (requesting buffers),
then
// broadcast the event. The record buffers should be emitted
first, then
// the event. After the event, no new buffer should be
requested.
- // (i) Smaller than the buffer size (single buffer request => 1)
+ // (i) Smaller than the buffer size
byte[] bytes = new byte[bufferSize / 2];
rand.nextBytes(bytes);
writer.emit(new ByteArrayIO(bytes));
- // (ii) Larger than the buffer size (two buffer requests => 1 +
2)
+ // (ii) Larger than the buffer size
bytes = new byte[bufferSize + 1];
rand.nextBytes(bytes);
writer.emit(new ByteArrayIO(bytes));
- // (iii) Exactly the buffer size (single buffer request => 1 +
2 + 1)
+ // (iii) Exactly the buffer size
bytes = new byte[bufferSize - lenBytes];
rand.nextBytes(bytes);
writer.emit(new ByteArrayIO(bytes));
- // (iv) Nothing on the 4th channel (no buffer request => 1 + 2
+ 1 + 0 = 4)
-
- // (v) Broadcast the event
+ // (iv) Broadcast the event
writer.broadcastEvent(barrier);
- assertEquals(4, bufferProvider.getNumberOfCreatedBuffers());
+ if (isBroadcastWriter) {
+ assertEquals(3,
bufferProvider.getNumberOfCreatedBuffers());
- BufferOrEvent boe;
- assertEquals(2, queues[0].size()); // 1 buffer + 1 event
- assertTrue(parseBuffer(queues[0].remove(), 0).isBuffer());
- assertEquals(3, queues[1].size()); // 2 buffers + 1 event
- assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
- assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
- assertEquals(2, queues[2].size()); // 1 buffer + 1 event
- assertTrue(parseBuffer(queues[2].remove(), 2).isBuffer());
- assertEquals(1, queues[3].size()); // 0 buffers + 1 event
+ for (int i = 0; i < numberOfChannels; i++) {
+ assertEquals(4, queues[i].size()); // 3 buffer
+ 1 event
- // every queue's last element should be the event
- for (int i = 0; i < numberOfChannels; i++) {
- boe = parseBuffer(queues[i].remove(), i);
- assertTrue(boe.isEvent());
- assertEquals(barrier, boe.getEvent());
+ for (int j = 0; j < 3; j++) {
+
assertTrue(parseBuffer(queues[i].remove(), 0).isBuffer());
+ }
+
+ BufferOrEvent boe =
parseBuffer(queues[i].remove(), i);
+ assertTrue(boe.isEvent());
+ assertEquals(barrier, boe.getEvent());
+ }
+ } else {
+ assertEquals(4,
bufferProvider.getNumberOfCreatedBuffers());
+
+ assertEquals(2, queues[0].size()); // 1 buffer + 1 event
+ assertTrue(parseBuffer(queues[0].remove(),
0).isBuffer());
+ assertEquals(3, queues[1].size()); // 2 buffers + 1
event
+ assertTrue(parseBuffer(queues[1].remove(),
1).isBuffer());
+ assertTrue(parseBuffer(queues[1].remove(),
1).isBuffer());
+ assertEquals(2, queues[2].size()); // 1 buffer + 1 event
+ assertTrue(parseBuffer(queues[2].remove(),
2).isBuffer());
+ assertEquals(1, queues[3].size()); // 0 buffers + 1
event
+
+ // every queue's last element should be the event
+ for (int i = 0; i < numberOfChannels; i++) {
+ BufferOrEvent boe =
parseBuffer(queues[i].remove(), i);
+ assertTrue(boe.isEvent());
+ assertEquals(barrier, boe.getEvent());
+ }
}
}
- /**
- * Tests that event buffers are properly recycled when broadcasting
events
- * to multiple channels.
- */
- @Test
- public void testBroadcastEventBufferReferenceCounting() throws
Exception {
-
+ private void broadcastBufferOrEventReferenceCounting(boolean
isBroadcastWriter, boolean isBroadcastEvent) throws Exception {
@SuppressWarnings("unchecked")
ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new
ArrayDeque(), new ArrayDeque() };
- ResultPartitionWriter partition =
- new CollectingPartitionWriter(queues, new
TestPooledBufferProvider(Integer.MAX_VALUE));
- RecordWriter<?> writer = new RecordWriter<>(partition);
+ ResultPartitionWriter partition = new
CollectingPartitionWriter(queues, new
TestPooledBufferProvider(Integer.MAX_VALUE));
+ RecordWriter<IntValue> writer = isBroadcastWriter ?
+ RecordWriter.createRecordWriter(partition, new
OutputEmitter(ShipStrategyType.BROADCAST, 0), "test") :
+ RecordWriter.createRecordWriter(partition);
- writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
+ if (isBroadcastEvent) {
Review comment:
+1 for the code deduplication, but could you do it in separate commit
(preferably before the actual change)? Now it's confusing which changes are
functional and which are caused by deduplication/refactoring.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services