akalash commented on code in PR #19499:
URL: https://github.com/apache/flink/pull/19499#discussion_r858777920


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -375,11 +399,14 @@ private MemorySegment requestMemorySegment(int 
targetChannel) {
             // target channel over quota; do not return a segment
             if (targetChannel != UNKNOWN_CHANNEL
                     && subpartitionBuffersCount[targetChannel] >= 
maxBuffersPerChannel) {
-                return null;
+                segment = 
requestOverdraftMemorySegmentFromGlobal(targetChannel);
+            } else {
+                segment =
+                        availableMemorySegments.isEmpty()
+                                ? 
requestOverdraftMemorySegmentFromGlobal(targetChannel)

Review Comment:
   It is up to you but in my opinion, `requestOverdraftMemorySegmentFromGlobal` 
which is used in `if` and `else` blocks looks weird. Don't you think that it is 
better just update `if` condition:
   ```
   if (targetChannel != UNKNOWN_CHANNEL
                               && subpartitionBuffersCount[targetChannel] >= 
maxBuffersPerChannel
                       || availableMemorySegments.isEmpty()) {
                   segment = 
requestOverdraftMemorySegmentFromGlobal(targetChannel);
               } else {
                   segment = availableMemorySegments.poll();
               }
   ```
   
   I also don't really sure that we should use overdraft if we still have 
`availableMemorySegments`. Perhaps we just need to ignore 
`maxBuffersPerChannel`:
   
   ```
   if (availableMemorySegments.isEmpty()) {
                   segment = 
requestOverdraftMemorySegmentFromGlobal(targetChannel);
               } else {
                   segment = availableMemorySegments.poll();
               }
   ```
   But I don't know how legal it is. I need to think ...
   



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
                                     + " case of data skew and high number of 
configured floating buffers. This limit is not strictly guaranteed,"
                                     + " and can be ignored by things like 
flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
+    /**
+     * Number of max overdraft network buffers to use for each 
ResultPartition. Currently, it just
+     * supports ResultPartition. InputGate can be supported if necessary.

Review Comment:
   Too much-repeated information. `Number of max overdraft network buffers to 
use for each ResultPartition.` is enough, in my opinion, other sentences just 
repeat it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -430,6 +457,25 @@ private boolean requestMemorySegmentFromGlobal() {
         return false;
     }
 
+    private MemorySegment requestOverdraftMemorySegmentFromGlobal(int 
targetChannel) {
+        assert Thread.holdsLock(availableMemorySegments);
+
+        if (numberOfRequestedOverdraftMemorySegments >= 
maxOverdraftBuffersPerGate
+                || targetChannel == UNKNOWN_CHANNEL) {

Review Comment:
   Why we don't use overdraft for UNKNOWN_CHANNEL?



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
                                     + " case of data skew and high number of 
configured floating buffers. This limit is not strictly guaranteed,"
                                     + " and can be ignored by things like 
flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
+    /**
+     * Number of max overdraft network buffers to use for each 
ResultPartition. Currently, it just
+     * supports ResultPartition. InputGate can be supported if necessary.
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+            key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+                    .intType()
+                    .defaultValue(10)

Review Comment:
   Default value should be discussed later.



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -273,6 +273,26 @@ public class NettyShuffleEnvironmentOptions {
                                     + " case of data skew and high number of 
configured floating buffers. This limit is not strictly guaranteed,"
                                     + " and can be ignored by things like 
flatMap operators, records spanning multiple buffers or single timer"
                                     + " producing large amount of data.");
+    /**
+     * Number of max overdraft network buffers to use for each 
ResultPartition. Currently, it just
+     * supports ResultPartition. InputGate can be supported if necessary.
+     */
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    public static final ConfigOption<Integer> 
NETWORK_MAX_OVERDRAFT_BUFFERS_PER_GATE =
+            key("taskmanager.network.memory.max-overdraft-buffers-per-gate")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription(
+                            String.format(
+                                    "Number of max overdraft network buffers 
to use for each ResultPartition."
+                                            + " The overdraft buffers will be 
used when the ResultPartition cannot apply to the normal buffers from the"
+                                            + " 
LocalBufferPool(LocalBufferPool is unavailable), e.g: the all buffers of 
LocalBufferPool be applied or"
+                                            + " the number of buffers for a 
single channel has reached %s. When the LocalBufferPool is unavailable,"
+                                            + " the LocalBufferPool can 
provide some additional buffers to allow overdraft. It can effectively solve"
+                                            + " the problem that multiple 
output buffers are required to process a single data, causing the Task to block 
in the requestMemory,"
+                                            + " such as: flatMap operator, 
records spanning multiple buffers or a single timer generates a large amount of 
data."
+                                            + " It doesn't need to wait for 
ResultPartition is available to start Unaligned Checkpoint directly.",
+                                    NETWORK_MAX_BUFFERS_PER_CHANNEL.key()));

Review Comment:
   I am not really sure that we should mention LocalBufferPool(we need to check 
another configuration). It is user documentation we should just simply explain 
how and when this configuration can help the user.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##########
@@ -543,6 +543,164 @@ public void testConsistentAvailability() throws Exception 
{
         }
     }
 
+    @Test
+    public void testOverdraftBuffer() throws IOException, InterruptedException 
{

Review Comment:
   Why do you keep all scenarios in one test? It looks too big and difficult to 
read. Can you please, split this test into several tests with the descriptive 
names?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##########
@@ -472,18 +518,22 @@ private void onGlobalPoolAvailable() {
     private boolean shouldBeAvailable() {
         assert Thread.holdsLock(availableMemorySegments);
 
-        return !availableMemorySegments.isEmpty() && 
unavailableSubpartitionsCount == 0;
+        return !availableMemorySegments.isEmpty()
+                && unavailableSubpartitionsCount == 0
+                && numberOfRequestedOverdraftMemorySegments == 0;

Review Comment:
   I see a lot of pattern `unavailableSubpartitionsCount == 0 && 
umberOfRequestedOverdraftMemorySegments == 0` I suppose we need to extract it 
to separated method in order to avoid misuse in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to