This is an automated email from the ASF dual-hosted git repository. yingjie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4275525fedd238a8b57edf46d22dc36ce19df846 Author: kevin.cyj <[email protected]> AuthorDate: Thu Jan 13 18:00:51 2022 +0800 [FLINK-25638][network] Increase the default write buffer size of sort-shuffle to 16M This closes #18350. --- .../shortcodes/generated/all_taskmanager_network_section.html | 4 ++-- .../generated/netty_shuffle_environment_configuration.html | 4 ++-- .../flink/configuration/NettyShuffleEnvironmentOptions.java | 10 ++++++---- .../org/apache/flink/test/runtime/BlockingShuffleITCase.java | 4 ++++ 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html index 9952de2..33a0ce3 100644 --- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html +++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html @@ -136,9 +136,9 @@ </tr> <tr> <td><h5>taskmanager.network.sort-shuffle.min-buffers</h5></td> - <td style="word-wrap: break-word;">64</td> + <td style="word-wrap: break-word;">512</td> <td>Integer</td> - <td>Minimum number of network buffers required per sort-merge blocking result partition. For production usage, it is suggested to increase this config value to at least 2048 (64M memory if the default 32K memory segment size is used) to improve the data compression ratio and reduce the small network packets. Usually, several hundreds of megabytes memory is enough for large scale batch jobs. Note: you may also need to increase the size of total network memory to avoid the 'ins [...] + <td>Minimum number of network buffers required per blocking result partition for sort-shuffle. For production usage, it is suggested to increase this config value to at least 2048 (64M memory if the default 32K memory segment size is used) to improve the data compression ratio and reduce the small network packets. Usually, several hundreds of megabytes memory is enough for large scale batch jobs. Note: you may also need to increase the size of total network memory to avoid th [...] </tr> <tr> <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td> diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html index 10023d8..602b015 100644 --- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html +++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html @@ -124,9 +124,9 @@ </tr> <tr> <td><h5>taskmanager.network.sort-shuffle.min-buffers</h5></td> - <td style="word-wrap: break-word;">64</td> + <td style="word-wrap: break-word;">512</td> <td>Integer</td> - <td>Minimum number of network buffers required per sort-merge blocking result partition. For production usage, it is suggested to increase this config value to at least 2048 (64M memory if the default 32K memory segment size is used) to improve the data compression ratio and reduce the small network packets. Usually, several hundreds of megabytes memory is enough for large scale batch jobs. Note: you may also need to increase the size of total network memory to avoid the 'ins [...] + <td>Minimum number of network buffers required per blocking result partition for sort-shuffle. For production usage, it is suggested to increase this config value to at least 2048 (64M memory if the default 32K memory segment size is used) to improve the data compression ratio and reduce the small network packets. Usually, several hundreds of megabytes memory is enough for large scale batch jobs. Note: you may also need to increase the size of total network memory to avoid th [...] </tr> <tr> <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java index 67c3280..526b4bd 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java @@ -197,15 +197,17 @@ public class NettyShuffleEnvironmentOptions { + " help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be" + " increased in case of higher round trip times between nodes and/or larger number of machines in the cluster."); - /** Minimum number of network buffers required per sort-merge blocking result partition. */ + /** + * Minimum number of network buffers required per blocking result partition for sort-shuffle. + */ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) public static final ConfigOption<Integer> NETWORK_SORT_SHUFFLE_MIN_BUFFERS = key("taskmanager.network.sort-shuffle.min-buffers") .intType() - .defaultValue(64) + .defaultValue(512) .withDescription( - "Minimum number of network buffers required per sort-merge blocking " - + "result partition. For production usage, it is suggested to " + "Minimum number of network buffers required per blocking result partition" + + " for sort-shuffle. For production usage, it is suggested to " + "increase this config value to at least 2048 (64M memory if " + "the default 32K memory segment size is used) to improve the " + "data compression ratio and reduce the small network packets." diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java index 4c50bc7..2991ba0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java @@ -65,6 +65,8 @@ public class BlockingShuffleITCase { Configuration configuration = new Configuration(); configuration.setInteger( NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1); + configuration.setInteger( + NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64); JobGraph jobGraph = createJobGraph(1000000); JobGraphRunningUtil.execute( @@ -76,6 +78,8 @@ public class BlockingShuffleITCase { Configuration configuration = new Configuration(); configuration.setInteger( NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1); + configuration.setInteger( + NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64); JobGraph jobGraph = createJobGraph(0); JobGraphRunningUtil.execute(
