akalash commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r899148430


##########
docs/layouts/shortcodes/generated/all_taskmanager_network_section.html:
##########
@@ -80,6 +80,12 @@
             <td>Integer</td>
             <td>Number of max buffers that can be used for each channel. If a 
channel exceeds the number of max buffers, it will make the task become 
unavailable, cause the back pressure and block the data processing. This might 
speed up checkpoint alignment by preventing excessive growth of the buffered 
in-flight data in case of data skew and high number of configured floating 
buffers. This limit is not strictly guaranteed, and can be ignored by things 
like flatMap operators, records spanning multiple buffers or single timer 
producing large amount of data.</td>
         </tr>
+        <tr>
+            
<td><h5>taskmanager.network.memory.max-overdraft-buffers-per-gate</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Integer</td>
+            <td>Number of max overdraft network buffers to use for each 
ResultPartition. The overdraft buffers will be used when the ResultPartition 
cannot apply to the normal buffers, e.g: the all buffers of ResultPartition be 
applied or the number of buffers for a single channel has reached 
taskmanager.network.memory.max-buffers-per-channel. When the ResultPartition is 
unavailable, the ResultPartition can provide some additional buffers to allow 
overdraft. It can effectively solve the problem that multiple output buffers 
are required to process a single data, causing the Task to block in the 
requestMemory, such as: flatMap operator, records spanning multiple buffers or 
a single timer generates a large amount of data. It doesn't need to wait for 
ResultPartition is available to start Unaligned Checkpoint directly.</td>
+        </tr>

Review Comment:
   Just for the check. Did you generate this documentation automatically(`mvn 
-Pgenerate-config-docs ...`)?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -549,6 +551,157 @@ public void testConsistentAvailability() throws Exception 
{
         }
     }
 
+    @Test
+    public void testWithoutOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 0;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    @Test
+    public void testWithOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 2;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    private void testUseAllBuffersForSingleChannel(
+            int poolSize, int maxBuffersPerChannel, int maxOverdraftBuffers) 
throws Exception {
+        int numberOfSubpartitions = 2;
+        checkArgument(maxBuffersPerChannel > poolSize / numberOfSubpartitions);
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        1,
+                        Integer.MAX_VALUE,
+                        numberOfSubpartitions,
+                        maxBuffersPerChannel,
+                        maxOverdraftBuffers);
+        bufferPool.setNumBuffers(poolSize);
+
+        // Request all buffers inside the buffer pool
+        AutoCloseableRegistry channelCloseableRegistry = new 
AutoCloseableRegistry();
+        for (int i = 0; i < maxBuffersPerChannel; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            channelCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        AutoCloseableRegistry poolCloseableRegistry = new 
AutoCloseableRegistry();
+        for (int i = maxBuffersPerChannel; i < poolSize; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            poolCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        // request overdraft buffer
+        AutoCloseableRegistry overdraftCloseableRegistry = new 
AutoCloseableRegistry();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        for (int i = 0; i < maxOverdraftBuffers; i++) {
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            overdraftCloseableRegistry.registerCloseable(bufferBuilder);
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, i + 1, 
false);
+        }
+
+        BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(1);

Review Comment:
   I think that it makes sense to check that `requestBufferBuilder` for `0` 
channel returns NULL value as well 



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -549,6 +551,157 @@ public void testConsistentAvailability() throws Exception 
{
         }
     }
 
+    @Test
+    public void testWithoutOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 0;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    @Test
+    public void testWithOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 2;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    private void testUseAllBuffersForSingleChannel(
+            int poolSize, int maxBuffersPerChannel, int maxOverdraftBuffers) 
throws Exception {
+        int numberOfSubpartitions = 2;
+        checkArgument(maxBuffersPerChannel > poolSize / numberOfSubpartitions);
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        1,
+                        Integer.MAX_VALUE,
+                        numberOfSubpartitions,
+                        maxBuffersPerChannel,
+                        maxOverdraftBuffers);
+        bufferPool.setNumBuffers(poolSize);
+
+        // Request all buffers inside the buffer pool
+        AutoCloseableRegistry channelCloseableRegistry = new 
AutoCloseableRegistry();
+        for (int i = 0; i < maxBuffersPerChannel; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            channelCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        AutoCloseableRegistry poolCloseableRegistry = new 
AutoCloseableRegistry();
+        for (int i = maxBuffersPerChannel; i < poolSize; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            poolCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        // request overdraft buffer
+        AutoCloseableRegistry overdraftCloseableRegistry = new 
AutoCloseableRegistry();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        for (int i = 0; i < maxOverdraftBuffers; i++) {
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            overdraftCloseableRegistry.registerCloseable(bufferBuilder);
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, i + 1, 
false);
+        }
+
+        BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(1);
+        assertNull(bufferBuilder);
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 
maxOverdraftBuffers, false);
+
+        // release all bufferBuilder
+        overdraftCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        poolCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        channelCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+
+        bufferPool.lazyDestroy();
+    }
+
+    private void testUseAllBuffersForMultiChannel(

Review Comment:
   I can be mistaken but this method looks the same as above 
(`testUseAllBuffersForSingleChannel`). I think you can merge them into one but 
where you want to use the different channels you can use (`i % numberOfChannel`)



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -549,6 +551,157 @@ public void testConsistentAvailability() throws Exception 
{
         }
     }
 
+    @Test
+    public void testWithoutOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 0;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    @Test
+    public void testWithOverdraftBuffer() throws Exception {
+        localBufferPool.lazyDestroy();
+        int maxOverdraftBuffers = 2;
+
+        testUseAllBuffersForSingleChannel(4, 3, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(4, 3, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(8, 5, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(8, 5, maxOverdraftBuffers);
+
+        testUseAllBuffersForSingleChannel(12, 10, maxOverdraftBuffers);
+        testUseAllBuffersForMultiChannel(12, 10, maxOverdraftBuffers);
+    }
+
+    private void testUseAllBuffersForSingleChannel(
+            int poolSize, int maxBuffersPerChannel, int maxOverdraftBuffers) 
throws Exception {
+        int numberOfSubpartitions = 2;
+        checkArgument(maxBuffersPerChannel > poolSize / numberOfSubpartitions);
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        1,
+                        Integer.MAX_VALUE,
+                        numberOfSubpartitions,
+                        maxBuffersPerChannel,
+                        maxOverdraftBuffers);
+        bufferPool.setNumBuffers(poolSize);
+
+        // Request all buffers inside the buffer pool
+        AutoCloseableRegistry channelCloseableRegistry = new 
AutoCloseableRegistry();
+        for (int i = 0; i < maxBuffersPerChannel; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            channelCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        AutoCloseableRegistry poolCloseableRegistry = new 
AutoCloseableRegistry();
+        for (int i = maxBuffersPerChannel; i < poolSize; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            poolCloseableRegistry.registerCloseable(bufferBuilder);
+        }
+
+        // request overdraft buffer
+        AutoCloseableRegistry overdraftCloseableRegistry = new 
AutoCloseableRegistry();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        for (int i = 0; i < maxOverdraftBuffers; i++) {
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);
+            assertNotNull(bufferBuilder);
+            overdraftCloseableRegistry.registerCloseable(bufferBuilder);
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, i + 1, 
false);
+        }
+
+        BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(1);
+        assertNull(bufferBuilder);
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 
maxOverdraftBuffers, false);
+
+        // release all bufferBuilder
+        overdraftCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        poolCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, false);
+        channelCloseableRegistry.close();
+        assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+
+        bufferPool.lazyDestroy();
+    }
+
+    private void testUseAllBuffersForMultiChannel(
+            int poolSize, int maxBuffersPerChannel, int maxOverdraftBuffers) 
throws Exception {
+        int numberOfSubpartitions = 2;
+        checkArgument(maxBuffersPerChannel > poolSize / numberOfSubpartitions);
+        LocalBufferPool bufferPool =
+                new LocalBufferPool(
+                        networkBufferPool,
+                        1,
+                        Integer.MAX_VALUE,
+                        numberOfSubpartitions,
+                        maxBuffersPerChannel,
+                        maxOverdraftBuffers);
+        bufferPool.setNumBuffers(poolSize);
+
+        // Request all buffers inside the buffer pool
+        AutoCloseableRegistry channel0CloseableRegistry = new 
AutoCloseableRegistry();
+        for (int i = 0; i < poolSize / 2; i++) {
+            assertRequestedOverdraftBufferAndIsAvailable(bufferPool, 0, true);
+            BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(0);

Review Comment:
   For example here:
   ```
   BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(i % 
numberOfChannels);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to