zhijiangW commented on a change in pull request #9132: [FLINK-13245][network]
Fix the bug of file resource leak while canceling partition request
URL: https://github.com/apache/flink/pull/9132#discussion_r307669449
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
##########
@@ -348,6 +376,68 @@ public void testEnqueueReaderByNotifyingBufferAndCredit()
throws Exception {
assertNull(channel.readOutbound());
}
+ @Test
+ public void testCancelPartitionRequestForUnavailableView() throws
Exception {
+ testCancelPartitionRequest(false);
+ }
+
+ @Test
+ public void testCancelPartitionRequestForAvailableView() throws
Exception {
+ testCancelPartitionRequest(true);
+ }
+
+ private void testCancelPartitionRequest(boolean isAvailableView) throws
Exception {
+ // setup
+ final ResultPartitionManager partitionManager = new
ResultPartitionManager();
+ final ResultPartition partition =
createPartitionWithFilledData(partitionManager);
+ final InputChannelID receiverId = new InputChannelID();
+ final PartitionRequestQueue queue = new PartitionRequestQueue();
+ final CreditBasedSequenceNumberingViewReader reader = new
CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
+ final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+ reader.requestSubpartitionView(partitionManager,
partition.getPartitionId(), 0);
+ // add this reader into allReaders queue
+ queue.notifyReaderCreated(reader);
+
+ // block the channel so that we see an intermediate state in
the test
+ blockChannel(channel);
+
+ // add credit to make this reader available for adding into
availableReaders queue
+ if (isAvailableView) {
+ queue.addCredit(receiverId, 1);
+
assertThat(queue.getAvailableReaders().contains(reader), is(true));
+ }
+
+ // cancel this subpartition view
+ queue.cancel(receiverId);
+ channel.runPendingTasks();
+
+ assertThat(queue.getAvailableReaders().contains(reader),
is(false));
+ // the partition and its reader view should all be released
+ assertThat(reader.isReleased(), is(true));
+ assertThat(partition.isReleased(), is(true));
+ for (ResultSubpartition subpartition :
partition.getAllPartitions()) {
+ assertThat(subpartition.isReleased(), is(true));
+ }
+
+ // cleanup
+ channel.close();
+ }
+
+ private static ResultPartition
createPartitionWithFilledData(ResultPartitionManager partitionManager) throws
Exception {
Review comment:
Agree, it could be moved to `PartitionTestUtils` if to be used in other
relevant classes future.
The current motivation is making the existing test logic clean and it might
also be used for other new unit tests in this class future.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services