This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch release-1.17 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9749699cddb8caae7af4a840bcb1441371f19277 Author: Weijie Guo <[email protected]> AuthorDate: Thu Mar 2 15:29:25 2023 +0800 [FLINK-31288][runtime] Disable overdraft buffer for non pipelined result partition. --- .../generated/all_taskmanager_network_section.html | 2 +- .../netty_shuffle_environment_configuration.html | 2 +- .../configuration/NettyShuffleEnvironmentOptions.java | 3 ++- .../io/network/partition/ResultPartitionFactory.java | 15 ++++++++++++++- 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html index 202b574a0e1..7d80b82ada8 100644 --- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html +++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html @@ -102,7 +102,7 @@ <td><h5>taskmanager.network.memory.max-overdraft-buffers-per-gate</h5></td> <td style="word-wrap: break-word;">5</td> <td>Integer</td> - <td>Number of max overdraft network buffers to use for each ResultPartition. The overdraft buffers will be used when the subtask cannot apply to the normal buffers due to back pressure, while subtask is performing an action that can not be interrupted in the middle, like serializing a large record, flatMap operator producing multiple records for one single input record or processing time timer producing large output. In situations like that system will allow subtask to requ [...] + <td>Number of max overdraft network buffers to use for each ResultPartition. The overdraft buffers will be used when the subtask cannot apply to the normal buffers due to back pressure, while subtask is performing an action that can not be interrupted in the middle, like serializing a large record, flatMap operator producing multiple records for one single input record or processing time timer producing large output. In situations like that system will allow subtask to requ [...] </tr> <tr> <td><h5>taskmanager.network.memory.read-buffer.required-per-gate.max</h5></td> diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html index 6a6aa6913b9..cbea22beb26 100644 --- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html +++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html @@ -90,7 +90,7 @@ <td><h5>taskmanager.network.memory.max-overdraft-buffers-per-gate</h5></td> <td style="word-wrap: break-word;">5</td> <td>Integer</td> - <td>Number of max overdraft network buffers to use for each ResultPartition. The overdraft buffers will be used when the subtask cannot apply to the normal buffers due to back pressure, while subtask is performing an action that can not be interrupted in the middle, like serializing a large record, flatMap operator producing multiple records for one single input record or processing time timer producing large output. In situations like that system will allow subtask to requ [...] + <td>Number of max overdraft network buffers to use for each ResultPartition. The overdraft buffers will be used when the subtask cannot apply to the normal buffers due to back pressure, while subtask is performing an action that can not be interrupted in the middle, like serializing a large record, flatMap operator producing multiple records for one single input record or processing time timer producing large output. In situations like that system will allow subtask to requ [...] </tr> <tr> <td><h5>taskmanager.network.memory.read-buffer.required-per-gate.max</h5></td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java index b0bd65292e6..cab12fe1972 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java @@ -363,7 +363,8 @@ public class NettyShuffleEnvironmentOptions { + " such uninterruptible action, without blocking unaligned checkpoints for long period of" + " time. Overdraft buffers are provided on best effort basis only if the system has some" + " unused buffers available. Subtask that has used overdraft buffers won't be allowed to" - + " process any more records until the overdraft buffers are returned to the pool."); + + " process any more records until the overdraft buffers are returned to the pool." + + " It should be noted that this config option only takes effect for Pipelined Shuffle."); /** The timeout for requesting exclusive buffers for each channel. */ @Documentation.ExcludeFromDocumentation( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index 3d8c27bc09d..069536e8664 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -301,6 +301,19 @@ public class ResultPartitionFactory { } } + /** Return whether this result partition need overdraft buffer. */ + private static boolean isOverdraftBufferNeeded(ResultPartitionType resultPartitionType) { + // Only pipelined / pipelined-bounded partition needs overdraft buffer. More + // specifically, there is no reason to request more buffers for non-pipelined (i.e. + // batch) shuffle. The reasons are as follows: + // 1. For BoundedBlockingShuffle, each full buffer will be directly released. + // 2. For SortMergeShuffle, the maximum capacity of buffer pool is 4 * numSubpartitions. It + // is efficient enough to spill this part of memory to disk. + // 3. For Hybrid Shuffle, the buffer pool is unbounded. If it can't get a normal buffer, it + // also can't get an overdraft buffer. + return resultPartitionType.isPipelinedOrPipelinedBoundedResultPartition(); + } + /** * The minimum pool size should be <code>numberOfSubpartitions + 1</code> for two * considerations: @@ -330,7 +343,7 @@ public class ResultPartitionFactory { pair.getRight(), numberOfSubpartitions, maxBuffersPerChannel, - maxOverdraftBuffersPerGate); + isOverdraftBufferNeeded(type) ? maxOverdraftBuffersPerGate : 0); }; }
