zhijiangW commented on a change in pull request #9132: [FLINK-13245][network]
Fix the bug of file resource leak while canceling partition request
URL: https://github.com/apache/flink/pull/9132#discussion_r307670761
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
##########
@@ -348,6 +376,68 @@ public void testEnqueueReaderByNotifyingBufferAndCredit()
throws Exception {
assertNull(channel.readOutbound());
}
+ @Test
+ public void testCancelPartitionRequestForUnavailableView() throws
Exception {
+ testCancelPartitionRequest(false);
+ }
+
+ @Test
+ public void testCancelPartitionRequestForAvailableView() throws
Exception {
+ testCancelPartitionRequest(true);
+ }
+
+ private void testCancelPartitionRequest(boolean isAvailableView) throws
Exception {
+ // setup
+ final ResultPartitionManager partitionManager = new
ResultPartitionManager();
+ final ResultPartition partition =
createPartitionWithFilledData(partitionManager);
+ final InputChannelID receiverId = new InputChannelID();
+ final PartitionRequestQueue queue = new PartitionRequestQueue();
+ final CreditBasedSequenceNumberingViewReader reader = new
CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
+ final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+ reader.requestSubpartitionView(partitionManager,
partition.getPartitionId(), 0);
+ // add this reader into allReaders queue
+ queue.notifyReaderCreated(reader);
+
+ // block the channel so that we see an intermediate state in
the test
+ blockChannel(channel);
+
+ // add credit to make this reader available for adding into
availableReaders queue
+ if (isAvailableView) {
+ queue.addCredit(receiverId, 1);
+
assertThat(queue.getAvailableReaders().contains(reader), is(true));
+ }
+
+ // cancel this subpartition view
+ queue.cancel(receiverId);
+ channel.runPendingTasks();
+
+ assertThat(queue.getAvailableReaders().contains(reader),
is(false));
+ // the partition and its reader view should all be released
+ assertThat(reader.isReleased(), is(true));
+ assertThat(partition.isReleased(), is(true));
+ for (ResultSubpartition subpartition :
partition.getAllPartitions()) {
+ assertThat(subpartition.isReleased(), is(true));
+ }
+
+ // cleanup
+ channel.close();
+ }
+
+ private static ResultPartition
createPartitionWithFilledData(ResultPartitionManager partitionManager) throws
Exception {
+ final ResultPartition partition = new ResultPartitionBuilder()
+ .setResultPartitionType(ResultPartitionType.BLOCKING)
+ .setFileChannelManager(fileChannelManager)
+ .setResultPartitionManager(partitionManager)
+ .isReleasedOnConsumption(true)
+ .build();
+
+ partitionManager.registerResultPartition(partition);
+ PartitionTestUtils.writeBuffers(partition, 1, BUFFER_SIZE);
Review comment:
I think both ways are ok here, since we already had this helper method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services