This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9749699cddb8caae7af4a840bcb1441371f19277
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Mar 2 15:29:25 2023 +0800

    [FLINK-31288][runtime] Disable overdraft buffer for non pipelined result 
partition.
---
 .../generated/all_taskmanager_network_section.html        |  2 +-
 .../netty_shuffle_environment_configuration.html          |  2 +-
 .../configuration/NettyShuffleEnvironmentOptions.java     |  3 ++-
 .../io/network/partition/ResultPartitionFactory.java      | 15 ++++++++++++++-
 4 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html 
b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index 202b574a0e1..7d80b82ada8 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -102,7 +102,7 @@
             
<td><h5>taskmanager.network.memory.max-overdraft-buffers-per-gate</h5></td>
             <td style="word-wrap: break-word;">5</td>
             <td>Integer</td>
-            <td>Number of max overdraft network buffers to use for each 
ResultPartition. The overdraft buffers will be used when the subtask cannot 
apply to the normal buffers  due to back pressure, while subtask is performing 
an action that can not be interrupted in the middle,  like serializing a large 
record, flatMap operator producing multiple records for one single input record 
or processing time timer producing large output. In situations like that system 
will allow subtask to requ [...]
+            <td>Number of max overdraft network buffers to use for each 
ResultPartition. The overdraft buffers will be used when the subtask cannot 
apply to the normal buffers  due to back pressure, while subtask is performing 
an action that can not be interrupted in the middle,  like serializing a large 
record, flatMap operator producing multiple records for one single input record 
or processing time timer producing large output. In situations like that system 
will allow subtask to requ [...]
         </tr>
         <tr>
             
<td><h5>taskmanager.network.memory.read-buffer.required-per-gate.max</h5></td>
diff --git 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index 6a6aa6913b9..cbea22beb26 100644
--- 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -90,7 +90,7 @@
             
<td><h5>taskmanager.network.memory.max-overdraft-buffers-per-gate</h5></td>
             <td style="word-wrap: break-word;">5</td>
             <td>Integer</td>
-            <td>Number of max overdraft network buffers to use for each 
ResultPartition. The overdraft buffers will be used when the subtask cannot 
apply to the normal buffers  due to back pressure, while subtask is performing 
an action that can not be interrupted in the middle,  like serializing a large 
record, flatMap operator producing multiple records for one single input record 
or processing time timer producing large output. In situations like that system 
will allow subtask to requ [...]
+            <td>Number of max overdraft network buffers to use for each 
ResultPartition. The overdraft buffers will be used when the subtask cannot 
apply to the normal buffers  due to back pressure, while subtask is performing 
an action that can not be interrupted in the middle,  like serializing a large 
record, flatMap operator producing multiple records for one single input record 
or processing time timer producing large output. In situations like that system 
will allow subtask to requ [...]
         </tr>
         <tr>
             
<td><h5>taskmanager.network.memory.read-buffer.required-per-gate.max</h5></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index b0bd65292e6..cab12fe1972 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -363,7 +363,8 @@ public class NettyShuffleEnvironmentOptions {
                                     + " such uninterruptible action, without 
blocking unaligned checkpoints for long period of"
                                     + " time. Overdraft buffers are provided 
on best effort basis only if the system has some"
                                     + " unused buffers available. Subtask that 
has used overdraft buffers won't be allowed to"
-                                    + " process any more records until the 
overdraft buffers are returned to the pool.");
+                                    + " process any more records until the 
overdraft buffers are returned to the pool."
+                                    + " It should be noted that this config 
option only takes effect for Pipelined Shuffle.");
 
     /** The timeout for requesting exclusive buffers for each channel. */
     @Documentation.ExcludeFromDocumentation(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 3d8c27bc09d..069536e8664 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -301,6 +301,19 @@ public class ResultPartitionFactory {
         }
     }
 
+    /** Return whether this result partition need overdraft buffer. */
+    private static boolean isOverdraftBufferNeeded(ResultPartitionType 
resultPartitionType) {
+        // Only pipelined / pipelined-bounded partition needs overdraft 
buffer. More
+        // specifically, there is no reason to request more buffers for 
non-pipelined (i.e.
+        // batch) shuffle. The reasons are as follows:
+        // 1. For BoundedBlockingShuffle, each full buffer will be directly 
released.
+        // 2. For SortMergeShuffle, the maximum capacity of buffer pool is 4 * 
numSubpartitions. It
+        // is efficient enough to spill this part of memory to disk.
+        // 3. For Hybrid Shuffle, the buffer pool is unbounded. If it can't 
get a normal buffer, it
+        // also can't get an overdraft buffer.
+        return 
resultPartitionType.isPipelinedOrPipelinedBoundedResultPartition();
+    }
+
     /**
      * The minimum pool size should be <code>numberOfSubpartitions + 1</code> 
for two
      * considerations:
@@ -330,7 +343,7 @@ public class ResultPartitionFactory {
                     pair.getRight(),
                     numberOfSubpartitions,
                     maxBuffersPerChannel,
-                    maxOverdraftBuffersPerGate);
+                    isOverdraftBufferNeeded(type) ? maxOverdraftBuffersPerGate 
: 0);
         };
     }
 

Reply via email to