tillrohrmann commented on a change in pull request #8654: 
[FLINK-12647][network] Add feature flag to disable release of consumed blocking 
partitions
URL: https://github.com/apache/flink/pull/8654#discussion_r293359411
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ##########
 @@ -100,6 +103,37 @@ public void testAddOnFinishedBlockingPartition() throws 
Exception {
                testAddOnFinishedPartition(ResultPartitionType.BLOCKING);
        }
 
+       @Test
+       public void 
testExternallyManagedBlockingPartitionIsConsumableMultipleTimes() throws 
IOException {
+               ResultPartitionManager manager = new ResultPartitionManager();
+
+               final ResultPartition partition = new ResultPartitionBuilder()
+                       .setIsExternallyManaged(true)
+                       .setResultPartitionManager(manager)
+                       .setResultPartitionType(ResultPartitionType.BLOCKING)
+                       .build();
+
+               manager.registerResultPartition(partition);
+               partition.finish();
+
+               assertThat(manager.getUnreleasedPartitions(), 
contains(partition.getPartitionId()));
+
+               ResultSubpartitionView subpartitionView1 = 
partition.createSubpartitionView(0, () -> {});
+               subpartitionView1.notifySubpartitionConsumed();
+
+               // partition should not be released on consumption
+               assertThat(manager.getUnreleasedPartitions(), 
contains(partition.getPartitionId()));
+               assertFalse(partition.isReleased());
+
+               // an externally managed blocking partition should be 
consumable multiple times
+               ResultSubpartitionView subpartitionView2 = 
partition.createSubpartitionView(0, () -> { });
+               subpartitionView2.notifySubpartitionConsumed();
+
+               // partition should not be released on consumption
+               assertThat(manager.getUnreleasedPartitions(), 
contains(partition.getPartitionId()));
+               assertFalse(partition.isReleased());
 
 Review comment:
   Here is some code duplication. We could simply execute the calls in a loop.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to