AHeise commented on a change in pull request #10029:  [FLINK-14553][runtime] 
Respect non-blocking output in StreamTask#processInput
URL: https://github.com/apache/flink/pull/10029#discussion_r342422212
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ##########
 @@ -327,6 +329,48 @@ private void testReleaseMemory(final ResultPartitionType 
resultPartitionType) th
                }
        }
 
+       @Test
+       public void testPipelinedPartitionBufferPool() throws Exception {
+               testPartitionBufferPool(ResultPartitionType.PIPELINED_BOUNDED);
+       }
+
+       @Test
+       public void testBlockingPartitionBufferPool() throws Exception {
+               testPartitionBufferPool(ResultPartitionType.BLOCKING);
+       }
+
+       private void testPartitionBufferPool(ResultPartitionType type) throws 
Exception {
+               //setup
+               final int networkBuffersPerChannel = 2;
+               final int floatingNetworkBuffersPerGate = 8;
+               final NetworkBufferPool globalPool = new NetworkBufferPool(20, 
1, 1);
+               final ResultPartition partition = new ResultPartitionBuilder()
+                       .setResultPartitionType(type)
+                       .setFileChannelManager(fileChannelManager)
+                       .setNetworkBuffersPerChannel(networkBuffersPerChannel)
+                       
.setFloatingNetworkBuffersPerGate(floatingNetworkBuffersPerGate)
+                       .setNetworkBufferPool(globalPool)
+                       .build();
+
+               try {
+                       partition.setup();
+                       BufferPool bufferPool = partition.getBufferPool();
+                       // verify the amount of buffers in created local pool
+                       assertEquals(partition.getNumberOfSubpartitions() + 1, 
bufferPool.getNumberOfRequiredMemorySegments());
+                       if (type.isBounded()) {
+                               final int maxNumBuffers = 
networkBuffersPerChannel * partition.getNumberOfSubpartitions() + 
floatingNetworkBuffersPerGate;
+                               assertEquals(maxNumBuffers, 
bufferPool.getMaxNumberOfMemorySegments());
+                       } else {
+                               assertEquals(Integer.MAX_VALUE, 
bufferPool.getMaxNumberOfMemorySegments());
+                       }
+
+               } finally {
+                       // cleanup
+                       globalPool.destroyAllBufferPools();
 
 Review comment:
   Again would be cool if we could use JUnit's setup and teardown mechanism 
instead of implementing it manually.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to