pnowojski commented on a change in pull request #7713: [FLINK-10995][network]
Copy intermediate serialization results only once for broadcast mode
URL: https://github.com/apache/flink/pull/7713#discussion_r262486384
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
##########
@@ -247,136 +388,121 @@ public void testBroadcastEventMixedRecords() throws
Exception {
}
TestPooledBufferProvider bufferProvider = new
TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-
ResultPartitionWriter partitionWriter = new
CollectingPartitionWriter(queues, bufferProvider);
- RecordWriter<ByteArrayIO> writer = new
RecordWriter<>(partitionWriter);
- CheckpointBarrier barrier = new
CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L,
CheckpointOptions.forCheckpointWithDefaultLocation());
+ RecordWriter<ByteArrayIO> writer = isBroadcastWriter ?
+ RecordWriter.createRecordWriter(partitionWriter, new
OutputEmitter(ShipStrategyType.BROADCAST, 0), "test") :
+ RecordWriter.createRecordWriter(partitionWriter);
+
+ CheckpointBarrier barrier = new CheckpointBarrier(0, 0,
CheckpointOptions.forCheckpointWithDefaultLocation());
// Emit records on some channels first (requesting buffers),
then
// broadcast the event. The record buffers should be emitted
first, then
// the event. After the event, no new buffer should be
requested.
- // (i) Smaller than the buffer size (single buffer request => 1)
+ // (i) Smaller than the buffer size
byte[] bytes = new byte[bufferSize / 2];
rand.nextBytes(bytes);
writer.emit(new ByteArrayIO(bytes));
- // (ii) Larger than the buffer size (two buffer requests => 1 +
2)
+ // (ii) Larger than the buffer size
bytes = new byte[bufferSize + 1];
rand.nextBytes(bytes);
writer.emit(new ByteArrayIO(bytes));
- // (iii) Exactly the buffer size (single buffer request => 1 +
2 + 1)
+ // (iii) Exactly the buffer size
bytes = new byte[bufferSize - lenBytes];
rand.nextBytes(bytes);
writer.emit(new ByteArrayIO(bytes));
- // (iv) Nothing on the 4th channel (no buffer request => 1 + 2
+ 1 + 0 = 4)
-
- // (v) Broadcast the event
+ // (iv) Broadcast the event
writer.broadcastEvent(barrier);
- assertEquals(4, bufferProvider.getNumberOfCreatedBuffers());
+ if (isBroadcastWriter) {
Review comment:
with `BroadcastRecordWriterTest` class, this if could be changed to
overloaded method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services