This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4275525fedd238a8b57edf46d22dc36ce19df846
Author: kevin.cyj <[email protected]>
AuthorDate: Thu Jan 13 18:00:51 2022 +0800

    [FLINK-25638][network] Increase the default write buffer size of 
sort-shuffle to 16M
    
    This closes #18350.
---
 .../shortcodes/generated/all_taskmanager_network_section.html  |  4 ++--
 .../generated/netty_shuffle_environment_configuration.html     |  4 ++--
 .../flink/configuration/NettyShuffleEnvironmentOptions.java    | 10 ++++++----
 .../org/apache/flink/test/runtime/BlockingShuffleITCase.java   |  4 ++++
 4 files changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html 
b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index 9952de2..33a0ce3 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -136,9 +136,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-buffers</h5></td>
-            <td style="word-wrap: break-word;">64</td>
+            <td style="word-wrap: break-word;">512</td>
             <td>Integer</td>
-            <td>Minimum number of network buffers required per sort-merge 
blocking result partition. For production usage, it is suggested to increase 
this config value to at least 2048 (64M memory if the default 32K memory 
segment size is used) to improve the data compression ratio and reduce the 
small network packets. Usually, several hundreds of megabytes memory is enough 
for large scale batch jobs. Note: you may also need to increase the size of 
total network memory to avoid the 'ins [...]
+            <td>Minimum number of network buffers required per blocking result 
partition for sort-shuffle. For production usage, it is suggested to increase 
this config value to at least 2048 (64M memory if the default 32K memory 
segment size is used) to improve the data compression ratio and reduce the 
small network packets. Usually, several hundreds of megabytes memory is enough 
for large scale batch jobs. Note: you may also need to increase the size of 
total network memory to avoid th [...]
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
diff --git 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index 10023d8..602b015 100644
--- 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -124,9 +124,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-buffers</h5></td>
-            <td style="word-wrap: break-word;">64</td>
+            <td style="word-wrap: break-word;">512</td>
             <td>Integer</td>
-            <td>Minimum number of network buffers required per sort-merge 
blocking result partition. For production usage, it is suggested to increase 
this config value to at least 2048 (64M memory if the default 32K memory 
segment size is used) to improve the data compression ratio and reduce the 
small network packets. Usually, several hundreds of megabytes memory is enough 
for large scale batch jobs. Note: you may also need to increase the size of 
total network memory to avoid the 'ins [...]
+            <td>Minimum number of network buffers required per blocking result 
partition for sort-shuffle. For production usage, it is suggested to increase 
this config value to at least 2048 (64M memory if the default 32K memory 
segment size is used) to improve the data compression ratio and reduce the 
small network packets. Usually, several hundreds of megabytes memory is enough 
for large scale batch jobs. Note: you may also need to increase the size of 
total network memory to avoid th [...]
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 67c3280..526b4bd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -197,15 +197,17 @@ public class NettyShuffleEnvironmentOptions {
                                     + " help relieve back-pressure caused by 
unbalanced data distribution among the subpartitions. This value should be"
                                     + " increased in case of higher round trip 
times between nodes and/or larger number of machines in the cluster.");
 
-    /** Minimum number of network buffers required per sort-merge blocking 
result partition. */
+    /**
+     * Minimum number of network buffers required per blocking result 
partition for sort-shuffle.
+     */
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
     public static final ConfigOption<Integer> NETWORK_SORT_SHUFFLE_MIN_BUFFERS 
=
             key("taskmanager.network.sort-shuffle.min-buffers")
                     .intType()
-                    .defaultValue(64)
+                    .defaultValue(512)
                     .withDescription(
-                            "Minimum number of network buffers required per 
sort-merge blocking "
-                                    + "result partition. For production usage, 
it is suggested to "
+                            "Minimum number of network buffers required per 
blocking result partition"
+                                    + " for sort-shuffle. For production 
usage, it is suggested to "
                                     + "increase this config value to at least 
2048 (64M memory if "
                                     + "the default 32K memory segment size is 
used) to improve the "
                                     + "data compression ratio and reduce the 
small network packets."
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index 4c50bc7..2991ba0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -65,6 +65,8 @@ public class BlockingShuffleITCase {
         Configuration configuration = new Configuration();
         configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+        configuration.setInteger(
+                
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
         JobGraph jobGraph = createJobGraph(1000000);
         JobGraphRunningUtil.execute(
@@ -76,6 +78,8 @@ public class BlockingShuffleITCase {
         Configuration configuration = new Configuration();
         configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
+        configuration.setInteger(
+                
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
         JobGraph jobGraph = createJobGraph(0);
         JobGraphRunningUtil.execute(

Reply via email to