wsry commented on a change in pull request #15259:
URL: https://github.com/apache/flink/pull/15259#discussion_r603742889
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
##########
@@ -246,6 +257,36 @@ public void testWriteLargeRecord() throws Exception {
assertEquals(recordWritten, recordRead);
}
+ @Test
+ public void testDataBroadcast() throws Exception {
+ int numSubpartitions = 10;
+ int numBuffers = 100;
+ int numRecords = 10000;
+
+ BufferPool bufferPool = globalPool.createBufferPool(numBuffers,
numBuffers);
+ SortMergeResultPartition partition =
+ createSortMergedPartition(numSubpartitions, bufferPool);
+
+ for (int i = 0; i < numRecords; ++i) {
+ ByteBuffer record = generateRandomData(bufferSize, new Random());
+ partition.broadcastRecord(record);
+ }
+ partition.finish();
+ partition.close();
+
+ int eventSize =
EventSerializer.toSerializedEvent(EndOfPartitionEvent.INSTANCE).remaining();
+ long dataSize = numSubpartitions * numRecords * bufferSize +
numSubpartitions * eventSize;
+ assertNotNull(partition.getResultFile());
+ assertEquals(2,
checkNotNull(fileChannelManager.getPaths()[0].list()).length);
+ for (File file :
checkNotNull(fileChannelManager.getPaths()[0].listFiles())) {
+ assertTrue(file.length() < numSubpartitions * numRecords *
bufferSize);
Review comment:
This is for broadcast optimization, the file size should be smaller than
the total bytes because we write only one copy of data and the size of data
read from the file should be equal to the total bytes.
I will adjust the code to only check the data file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]