This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed699b6ee6b0539087632b68a444f79b95120d84
Author: kevin.cyj <[email protected]>
AuthorDate: Thu Jan 13 20:06:44 2022 +0800

    [FLINK-25637][network] Make sort-shuffle the default shuffle implementation 
for batch jobs
    
    This closes #18350.
---
 docs/content.zh/docs/ops/batch/blocking_shuffle.md |  6 +++---
 docs/content/docs/ops/batch/blocking_shuffle.md    |  6 +++---
 .../generated/all_taskmanager_network_section.html |  4 ++--
 .../netty_shuffle_environment_configuration.html   |  4 ++--
 .../NettyShuffleEnvironmentOptions.java            | 22 +++++++++++-----------
 .../test_high_parallelism_iterations.sh            |  1 +
 flink-end-to-end-tests/test-scripts/test_tpcds.sh  |  1 +
 .../minicluster/MiniClusterConfiguration.java      |  8 ++++++++
 ...tractTaskManagerProcessFailureRecoveryTest.java |  2 ++
 .../JobManagerHAProcessFailureRecoveryITCase.java  |  2 ++
 .../flink/test/runtime/BlockingShuffleITCase.java  | 12 ++++++++----
 .../test/runtime/ShuffleCompressionITCase.java     |  5 +++--
 12 files changed, 46 insertions(+), 27 deletions(-)

diff --git a/docs/content.zh/docs/ops/batch/blocking_shuffle.md 
b/docs/content.zh/docs/ops/batch/blocking_shuffle.md
index 74cc30c..da1384c 100644
--- a/docs/content.zh/docs/ops/batch/blocking_shuffle.md
+++ b/docs/content.zh/docs/ops/batch/blocking_shuffle.md
@@ -37,7 +37,7 @@ Flink [DataStream API]({{< ref 
"docs/dev/datastream/execution_mode" >}}) 和 [Ta
 
 ## Hash Shuffle
 
-`Hash Shuffle` 是 blocking shuffle 的默认实现,它为每个下游任务将每个上游任务的结果以单独文件的方式保存在 
TaskManager 本地磁盘上。当下游任务运行时会向上游的 TaskManager 请求分片,TaskManager 
读取文件之后通过网络传输(给下游任务)。
+对于 1.14 以及更低的版本,`Hash Shuffle` 是 blocking shuffle 
的默认实现,它为每个下游任务将每个上游任务的结果以单独文件的方式保存在 TaskManager 本地磁盘上。当下游任务运行时会向上游的 TaskManager 
请求分片,TaskManager 读取文件之后通过网络传输(给下游任务)。
 
 `Hash Shuffle` 为读写文件提供了不同的机制:
 
@@ -68,11 +68,11 @@ Flink [DataStream API]({{< ref 
"docs/dev/datastream/execution_mode" >}}) 和 [Ta
 
 ## Sort Shuffle
 
-`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现。不同于 `Hash Shuffle`,sort 
shuffle 
将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下
 sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖 
`sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref 
"docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考 
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和 
[FLINK-19614](https://issues.a [...]
+`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash 
Shuffle`,sort shuffle 
将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下
 sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖 
`sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref 
"docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考 
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和 
[FLINK-19614](h [...]
 
 当使用sort blocking shuffle的时候有些配置需要适配:
 - [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref 
"docs/deployment/config" 
>}}#taskmanager-network-blocking-shuffle-compression-enabled): 配置该选项以启用 shuffle 
data 压缩,大部分任务建议开启除非你的数据压缩比率比较低。对于 1.14 以及更低的版本默认为 false,1.15 版本起默认为 true。
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref 
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): 
根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort 
shuffle`。
+- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref 
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): 
根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort 
shuffle`。对于 1.15 以下的版本,它的默认值是 `Integer.MAX_VALUE`,所以默认情况下总是会使用 `hash shuffle`。从 
1.15 开始,它的默认值是 1, 所以默认情况下总是会使用 `sort shuffle`。
 - [taskmanager.network.sort-shuffle.min-buffers]({{< ref 
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): 
配置该选项以控制数据写缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
 - [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref 
"docs/deployment/config" 
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): 
配置该选项以控制数据读取缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
 
diff --git a/docs/content/docs/ops/batch/blocking_shuffle.md 
b/docs/content/docs/ops/batch/blocking_shuffle.md
index 7bf647b..c6654f4 100644
--- a/docs/content/docs/ops/batch/blocking_shuffle.md
+++ b/docs/content/docs/ops/batch/blocking_shuffle.md
@@ -37,7 +37,7 @@ They will be detailed in the following sections.
 
 ## Hash Shuffle
 
-The default blocking shuffle implementation, `Hash Shuffle`, has each upstream 
task persist its results in a separate file for each downstream task on the 
local disk of the TaskManager. When the downstream tasks run, they will request 
partitions from the upstream TaskManager's, which read the files and transmit 
data via the network.
+The default blocking shuffle implementation for 1.14 and lower, `Hash 
Shuffle`, has each upstream task persist its results in a separate file for 
each downstream task on the local disk of the TaskManager. When the downstream 
tasks run, they will request partitions from the upstream TaskManager's, which 
read the files and transmit data via the network.
 
 `Hash Shuffle` provides different mechanisms for writing and reading files:
 
@@ -68,11 +68,11 @@ To further improve the performance, for most jobs we also 
recommend [enabling co
 
 ## Sort Shuffle 
 
-`Sort Shuffle` is another blocking shuffle implementation introduced in 
version 1.13. Different from `Hash Shuffle`, sort shuffle writes only one file 
for each result partition. When the result partition is read by multiple 
downstream tasks concurrently, the data file is opened only once and shared by 
all readers. As a result, the cluster uses fewer resources like inode and file 
descriptors, which improves stability. Furthermore, by writing fewer files and 
making a best effort to read da [...]
+`Sort Shuffle` is another blocking shuffle implementation introduced in 
version 1.13 and it becomes the default blocking shuffle implementation in 
1.15. Different from `Hash Shuffle`, sort shuffle writes only one file for each 
result partition. When the result partition is read by multiple downstream 
tasks concurrently, the data file is opened only once and shared by all 
readers. As a result, the cluster uses fewer resources like inode and file 
descriptors, which improves stability. Furt [...]
 
 There are several config options that might need adjustment when using sort 
blocking shuffle:
 - [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref 
"docs/deployment/config" 
>}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option 
for shuffle data compression. it is suggested to enable it for most jobs except 
that the compression ratio of your data is low. Defaults to false for 1.14 and 
lower, and true for 1.15 and higher.
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref 
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): 
Config option to enable sort shuffle depending on the parallelism of downstream 
tasks. If parallelism is lower than the configured value, `hash shuffle` will 
be used, otherwise `sort shuffle` will be used.
+- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref 
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): 
Config option to enable sort shuffle depending on the parallelism of downstream 
tasks. If parallelism is lower than the configured value, `hash shuffle` will 
be used, otherwise `sort shuffle` will be used. For versions lower than 1.15, 
its default value is `Integer.MAX_VALUE`, so hash shuffle will be always used 
by default. Since 1.15, its default v [...]
 - [taskmanager.network.sort-shuffle.min-buffers]({{< ref 
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): 
Config option to control data writing buffer size. For large scale jobs, you 
may need to increase this value, usually, several hundreds of megabytes memory 
is enough.
 - [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref 
"docs/deployment/config" 
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to 
control data reading buffer size. For large scale jobs, you may need to 
increase this value, usually, several hundreds of megabytes memory is enough.
 
diff --git 
a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html 
b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index 33a0ce3..3501528 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -142,9 +142,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
-            <td style="word-wrap: break-word;">2147483647</td>
+            <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
-            <td>Parallelism threshold to switch between sort-merge blocking 
shuffle and the default hash-based blocking shuffle, which means for batch jobs 
of small parallelism, the hash-based blocking shuffle will be used and for 
batch jobs of large parallelism, the sort-merge one will be used. Note: For 
production usage, if sort-merge blocking shuffle is enabled, you may also need 
to tune 'taskmanager.network.sort-shuffle.min-buffers' and 
'taskmanager.memory.framework.off-heap.batch-sh [...]
+            <td>Parallelism threshold to switch between sort-based blocking 
shuffle and hash-based blocking shuffle, which means for batch jobs of smaller 
parallelism, hash-shuffle will be used and for batch jobs of larger or equal 
parallelism, sort-shuffle will be used. The value 1 means that sort-shuffle is 
the default option. Note: For production usage, you may also need to tune 
'taskmanager.network.sort-shuffle.min-buffers' and 
'taskmanager.memory.framework.off-heap.batch-shuffle.siz [...]
         </tr>
     </tbody>
 </table>
diff --git 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index 602b015..e64e6b8 100644
--- 
a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -130,9 +130,9 @@
         </tr>
         <tr>
             <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td>
-            <td style="word-wrap: break-word;">2147483647</td>
+            <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
-            <td>Parallelism threshold to switch between sort-merge blocking 
shuffle and the default hash-based blocking shuffle, which means for batch jobs 
of small parallelism, the hash-based blocking shuffle will be used and for 
batch jobs of large parallelism, the sort-merge one will be used. Note: For 
production usage, if sort-merge blocking shuffle is enabled, you may also need 
to tune 'taskmanager.network.sort-shuffle.min-buffers' and 
'taskmanager.memory.framework.off-heap.batch-sh [...]
+            <td>Parallelism threshold to switch between sort-based blocking 
shuffle and hash-based blocking shuffle, which means for batch jobs of smaller 
parallelism, hash-shuffle will be used and for batch jobs of larger or equal 
parallelism, sort-shuffle will be used. The value 1 means that sort-shuffle is 
the default option. Note: For production usage, you may also need to tune 
'taskmanager.network.sort-shuffle.min-buffers' and 
'taskmanager.memory.framework.off-heap.batch-shuffle.siz [...]
         </tr>
     </tbody>
 </table>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 526b4bd..cafca62 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -218,24 +218,24 @@ public class NettyShuffleEnvironmentOptions {
                                     + " config value.");
 
     /**
-     * Parallelism threshold to switch between sort-merge based blocking 
shuffle and the default
-     * hash-based blocking shuffle.
+     * Parallelism threshold to switch between sort-based blocking shuffle and 
hash-based blocking
+     * shuffle.
      */
     @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
     public static final ConfigOption<Integer> 
NETWORK_SORT_SHUFFLE_MIN_PARALLELISM =
             key("taskmanager.network.sort-shuffle.min-parallelism")
                     .intType()
-                    .defaultValue(Integer.MAX_VALUE)
+                    .defaultValue(1)
                     .withDescription(
                             String.format(
-                                    "Parallelism threshold to switch between 
sort-merge blocking "
-                                            + "shuffle and the default 
hash-based blocking shuffle,"
-                                            + " which means for batch jobs of 
small parallelism, "
-                                            + "the hash-based blocking shuffle 
will be used and for"
-                                            + " batch jobs of large 
parallelism, the sort-merge one"
-                                            + " will be used. Note: For 
production usage, if sort-"
-                                            + "merge blocking shuffle is 
enabled, you may also need"
-                                            + " to tune '%s' and '%s' for 
better performance.",
+                                    "Parallelism threshold to switch between 
sort-based blocking "
+                                            + "shuffle and hash-based blocking 
shuffle, which means"
+                                            + " for batch jobs of smaller 
parallelism, hash-shuffle"
+                                            + " will be used and for batch 
jobs of larger or equal "
+                                            + "parallelism, sort-shuffle will 
be used. The value 1 "
+                                            + "means that sort-shuffle is the 
default option. Note:"
+                                            + " For production usage, you may 
also need to tune "
+                                            + "'%s' and '%s' for better 
performance.",
                                     NETWORK_SORT_SHUFFLE_MIN_BUFFERS.key(),
                                     // raw string key is used here to avoid 
interdependence, a test
                                     // is implemented to guard that when the 
target key is modified,
diff --git 
a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh 
b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
index cc1fb13..310f2bc 100755
--- a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
+++ b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh
@@ -33,6 +33,7 @@ set_config_key "taskmanager.numberOfTaskSlots" "$SLOTS_PER_TM"
 set_config_key "taskmanager.memory.network.min" "160m"
 set_config_key "taskmanager.memory.network.max" "160m"
 set_config_key "taskmanager.memory.framework.off-heap.size" "300m"
+set_config_key "taskmanager.network.sort-shuffle.min-buffers" "64"
 
 print_mem_use
 start_cluster
diff --git a/flink-end-to-end-tests/test-scripts/test_tpcds.sh 
b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
index 8eaf51d..4f88c1f 100755
--- a/flink-end-to-end-tests/test-scripts/test_tpcds.sh
+++ b/flink-end-to-end-tests/test-scripts/test_tpcds.sh
@@ -57,6 +57,7 @@ echo "[INFO]Preparing Flink cluster..."
 set_config_key "taskmanager.memory.process.size" "4096m"
 set_config_key "taskmanager.numberOfTaskSlots" "4"
 set_config_key "parallelism.default" "4"
+set_config_key "taskmanager.memory.network.fraction" "0.2"
 start_cluster
 
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
index 1deb221..980c0a8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
@@ -71,6 +72,13 @@ public class MiniClusterConfiguration {
 
         TaskExecutorResourceUtils.adjustForLocalExecution(modifiedConfig);
 
+        // reduce the default number of network buffers used by sort-shuffle 
to avoid the
+        // "Insufficient number of network buffers" error.
+        if (!modifiedConfig.contains(
+                
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS)) {
+            
modifiedConfig.set(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS,
 16);
+        }
+
         // set default io pool size.
         if 
(!modifiedConfig.contains(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE)) {
             modifiedConfig.set(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE, 
DEFAULT_IO_POOL_SIZE);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 4010b41..c9900ac 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.HeartbeatManagerOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.plugin.PluginManager;
@@ -105,6 +106,7 @@ public abstract class 
AbstractTaskManagerProcessFailureRecoveryTest extends Test
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("4m"));
         config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, 
MemorySize.parse("3200k"));
         config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, 
MemorySize.parse("3200k"));
+        
config.set(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 16);
         config.set(TaskManagerOptions.TASK_HEAP_MEMORY, 
MemorySize.parse("128m"));
         config.set(TaskManagerOptions.CPU_CORES, 1.0);
         config.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, 
"full");
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
index aafd8c6..00c977f 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.plugin.PluginManager;
 import org.apache.flink.core.plugin.PluginUtils;
@@ -267,6 +268,7 @@ public class JobManagerHAProcessFailureRecoveryITCase 
extends TestLogger {
         config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, 
MemorySize.parse("4m"));
         config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, 
MemorySize.parse("3200k"));
         config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, 
MemorySize.parse("3200k"));
+        
config.set(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 16);
         config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
         config.set(TaskManagerOptions.TASK_HEAP_MEMORY, 
MemorySize.parse("128m"));
         config.set(TaskManagerOptions.CPU_CORES, 1.0);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
index 2991ba0..ce788a8 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java
@@ -48,6 +48,10 @@ public class BlockingShuffleITCase {
     public void testBoundedBlockingShuffle() throws Exception {
         JobGraph jobGraph = createJobGraph(1000000);
         Configuration configuration = new Configuration();
+        configuration.setInteger(
+                
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                Integer.MAX_VALUE);
+
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
     }
@@ -56,6 +60,10 @@ public class BlockingShuffleITCase {
     public void testBoundedBlockingShuffleWithoutData() throws Exception {
         JobGraph jobGraph = createJobGraph(0);
         Configuration configuration = new Configuration();
+        configuration.setInteger(
+                
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                Integer.MAX_VALUE);
+
         JobGraphRunningUtil.execute(
                 jobGraph, configuration, numTaskManagers, 
numSlotsPerTaskManager);
     }
@@ -64,8 +72,6 @@ public class BlockingShuffleITCase {
     public void testSortMergeBlockingShuffle() throws Exception {
         Configuration configuration = new Configuration();
         configuration.setInteger(
-                
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
-        configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
         JobGraph jobGraph = createJobGraph(1000000);
@@ -77,8 +83,6 @@ public class BlockingShuffleITCase {
     public void testSortMergeBlockingShuffleWithoutData() throws Exception {
         Configuration configuration = new Configuration();
         configuration.setInteger(
-                
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
-        configuration.setInteger(
                 
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64);
 
         JobGraph jobGraph = createJobGraph(0);
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
index d458136..b32db9d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
@@ -87,6 +87,9 @@ public class ShuffleCompressionITCase {
         configuration.setBoolean(
                 
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
         configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMinutes(1));
+        configuration.setInteger(
+                
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM,
+                Integer.MAX_VALUE);
 
         JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, 
ExecutionMode.BATCH);
         JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS, 
NUM_SLOTS);
@@ -97,8 +100,6 @@ public class ShuffleCompressionITCase {
         Configuration configuration = new Configuration();
         configuration.setBoolean(
                 
NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false);
-        configuration.setInteger(
-                
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
         configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, 
Duration.ofMinutes(1));
 
         JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, 
ExecutionMode.BATCH);

Reply via email to