zhijiangW commented on a change in pull request #9132: [FLINK-13245][network] 
Fix the bug of file resource leak while canceling partition request
URL: https://github.com/apache/flink/pull/9132#discussion_r307670761
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
 ##########
 @@ -348,6 +376,68 @@ public void testEnqueueReaderByNotifyingBufferAndCredit() 
throws Exception {
                assertNull(channel.readOutbound());
        }
 
+       @Test
+       public void testCancelPartitionRequestForUnavailableView() throws 
Exception {
+               testCancelPartitionRequest(false);
+       }
+
+       @Test
+       public void testCancelPartitionRequestForAvailableView() throws 
Exception {
+               testCancelPartitionRequest(true);
+       }
+
+       private void testCancelPartitionRequest(boolean isAvailableView) throws 
Exception {
+               // setup
+               final ResultPartitionManager partitionManager = new 
ResultPartitionManager();
+               final ResultPartition partition = 
createPartitionWithFilledData(partitionManager);
+               final InputChannelID receiverId = new InputChannelID();
+               final PartitionRequestQueue queue = new PartitionRequestQueue();
+               final CreditBasedSequenceNumberingViewReader reader = new 
CreditBasedSequenceNumberingViewReader(receiverId, 0, queue);
+               final EmbeddedChannel channel = new EmbeddedChannel(queue);
+
+               reader.requestSubpartitionView(partitionManager, 
partition.getPartitionId(), 0);
+               // add this reader into allReaders queue
+               queue.notifyReaderCreated(reader);
+
+               // block the channel so that we see an intermediate state in 
the test
+               blockChannel(channel);
+
+               // add credit to make this reader available for adding into 
availableReaders queue
+               if (isAvailableView) {
+                       queue.addCredit(receiverId, 1);
+                       
assertThat(queue.getAvailableReaders().contains(reader), is(true));
+               }
+
+               // cancel this subpartition view
+               queue.cancel(receiverId);
+               channel.runPendingTasks();
+
+               assertThat(queue.getAvailableReaders().contains(reader), 
is(false));
+               // the partition and its reader view should all be released
+               assertThat(reader.isReleased(), is(true));
+               assertThat(partition.isReleased(), is(true));
+               for (ResultSubpartition subpartition : 
partition.getAllPartitions()) {
+                       assertThat(subpartition.isReleased(), is(true));
+               }
+
+               // cleanup
+               channel.close();
+       }
+
+       private static ResultPartition 
createPartitionWithFilledData(ResultPartitionManager partitionManager) throws 
Exception {
+               final ResultPartition partition = new ResultPartitionBuilder()
+                       .setResultPartitionType(ResultPartitionType.BLOCKING)
+                       .setFileChannelManager(fileChannelManager)
+                       .setResultPartitionManager(partitionManager)
+                       .isReleasedOnConsumption(true)
+                       .build();
+
+               partitionManager.registerResultPartition(partition);
+               PartitionTestUtils.writeBuffers(partition, 1, BUFFER_SIZE);
 
 Review comment:
   I think both ways are ok here, since we already had this helper method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to