This is an automated email from the ASF dual-hosted git repository. yingjie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ed699b6ee6b0539087632b68a444f79b95120d84 Author: kevin.cyj <[email protected]> AuthorDate: Thu Jan 13 20:06:44 2022 +0800 [FLINK-25637][network] Make sort-shuffle the default shuffle implementation for batch jobs This closes #18350. --- docs/content.zh/docs/ops/batch/blocking_shuffle.md | 6 +++--- docs/content/docs/ops/batch/blocking_shuffle.md | 6 +++--- .../generated/all_taskmanager_network_section.html | 4 ++-- .../netty_shuffle_environment_configuration.html | 4 ++-- .../NettyShuffleEnvironmentOptions.java | 22 +++++++++++----------- .../test_high_parallelism_iterations.sh | 1 + flink-end-to-end-tests/test-scripts/test_tpcds.sh | 1 + .../minicluster/MiniClusterConfiguration.java | 8 ++++++++ ...tractTaskManagerProcessFailureRecoveryTest.java | 2 ++ .../JobManagerHAProcessFailureRecoveryITCase.java | 2 ++ .../flink/test/runtime/BlockingShuffleITCase.java | 12 ++++++++---- .../test/runtime/ShuffleCompressionITCase.java | 5 +++-- 12 files changed, 46 insertions(+), 27 deletions(-) diff --git a/docs/content.zh/docs/ops/batch/blocking_shuffle.md b/docs/content.zh/docs/ops/batch/blocking_shuffle.md index 74cc30c..da1384c 100644 --- a/docs/content.zh/docs/ops/batch/blocking_shuffle.md +++ b/docs/content.zh/docs/ops/batch/blocking_shuffle.md @@ -37,7 +37,7 @@ Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Ta ## Hash Shuffle -`Hash Shuffle` 是 blocking shuffle 的默认实现,它为每个下游任务将每个上游任务的结果以单独文件的方式保存在 TaskManager 本地磁盘上。当下游任务运行时会向上游的 TaskManager 请求分片,TaskManager 读取文件之后通过网络传输(给下游任务)。 +对于 1.14 以及更低的版本,`Hash Shuffle` 是 blocking shuffle 的默认实现,它为每个下游任务将每个上游任务的结果以单独文件的方式保存在 TaskManager 本地磁盘上。当下游任务运行时会向上游的 TaskManager 请求分片,TaskManager 读取文件之后通过网络传输(给下游任务)。 `Hash Shuffle` 为读写文件提供了不同的机制: @@ -68,11 +68,11 @@ Flink [DataStream API]({{< ref "docs/dev/datastream/execution_mode" >}}) 和 [Ta ## Sort Shuffle -`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现。不同于 `Hash Shuffle`,sort shuffle 将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下 sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖 `sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref "docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考 [FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和 [FLINK-19614](https://issues.a [...] +`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash Shuffle`,sort shuffle 将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下 sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖 `sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref "docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考 [FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和 [FLINK-19614](h [...] 当使用sort blocking shuffle的时候有些配置需要适配: - [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): 配置该选项以启用 shuffle data 压缩,大部分任务建议开启除非你的数据压缩比率比较低。对于 1.14 以及更低的版本默认为 false,1.15 版本起默认为 true。 -- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): 根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort shuffle`。 +- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): 根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort shuffle`。对于 1.15 以下的版本,它的默认值是 `Integer.MAX_VALUE`,所以默认情况下总是会使用 `hash shuffle`。从 1.15 开始,它的默认值是 1, 所以默认情况下总是会使用 `sort shuffle`。 - [taskmanager.network.sort-shuffle.min-buffers]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): 配置该选项以控制数据写缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。 - [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): 配置该选项以控制数据读取缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。 diff --git a/docs/content/docs/ops/batch/blocking_shuffle.md b/docs/content/docs/ops/batch/blocking_shuffle.md index 7bf647b..c6654f4 100644 --- a/docs/content/docs/ops/batch/blocking_shuffle.md +++ b/docs/content/docs/ops/batch/blocking_shuffle.md @@ -37,7 +37,7 @@ They will be detailed in the following sections. ## Hash Shuffle -The default blocking shuffle implementation, `Hash Shuffle`, has each upstream task persist its results in a separate file for each downstream task on the local disk of the TaskManager. When the downstream tasks run, they will request partitions from the upstream TaskManager's, which read the files and transmit data via the network. +The default blocking shuffle implementation for 1.14 and lower, `Hash Shuffle`, has each upstream task persist its results in a separate file for each downstream task on the local disk of the TaskManager. When the downstream tasks run, they will request partitions from the upstream TaskManager's, which read the files and transmit data via the network. `Hash Shuffle` provides different mechanisms for writing and reading files: @@ -68,11 +68,11 @@ To further improve the performance, for most jobs we also recommend [enabling co ## Sort Shuffle -`Sort Shuffle` is another blocking shuffle implementation introduced in version 1.13. Different from `Hash Shuffle`, sort shuffle writes only one file for each result partition. When the result partition is read by multiple downstream tasks concurrently, the data file is opened only once and shared by all readers. As a result, the cluster uses fewer resources like inode and file descriptors, which improves stability. Furthermore, by writing fewer files and making a best effort to read da [...] +`Sort Shuffle` is another blocking shuffle implementation introduced in version 1.13 and it becomes the default blocking shuffle implementation in 1.15. Different from `Hash Shuffle`, sort shuffle writes only one file for each result partition. When the result partition is read by multiple downstream tasks concurrently, the data file is opened only once and shared by all readers. As a result, the cluster uses fewer resources like inode and file descriptors, which improves stability. Furt [...] There are several config options that might need adjustment when using sort blocking shuffle: - [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref "docs/deployment/config" >}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option for shuffle data compression. it is suggested to enable it for most jobs except that the compression ratio of your data is low. Defaults to false for 1.14 and lower, and true for 1.15 and higher. -- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): Config option to enable sort shuffle depending on the parallelism of downstream tasks. If parallelism is lower than the configured value, `hash shuffle` will be used, otherwise `sort shuffle` will be used. +- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism): Config option to enable sort shuffle depending on the parallelism of downstream tasks. If parallelism is lower than the configured value, `hash shuffle` will be used, otherwise `sort shuffle` will be used. For versions lower than 1.15, its default value is `Integer.MAX_VALUE`, so hash shuffle will be always used by default. Since 1.15, its default v [...] - [taskmanager.network.sort-shuffle.min-buffers]({{< ref "docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers): Config option to control data writing buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough. - [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref "docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to control data reading buffer size. For large scale jobs, you may need to increase this value, usually, several hundreds of megabytes memory is enough. diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html index 33a0ce3..3501528 100644 --- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html +++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html @@ -142,9 +142,9 @@ </tr> <tr> <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td> - <td style="word-wrap: break-word;">2147483647</td> + <td style="word-wrap: break-word;">1</td> <td>Integer</td> - <td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-sh [...] + <td>Parallelism threshold to switch between sort-based blocking shuffle and hash-based blocking shuffle, which means for batch jobs of smaller parallelism, hash-shuffle will be used and for batch jobs of larger or equal parallelism, sort-shuffle will be used. The value 1 means that sort-shuffle is the default option. Note: For production usage, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-shuffle.siz [...] </tr> </tbody> </table> diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html index 602b015..e64e6b8 100644 --- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html +++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html @@ -130,9 +130,9 @@ </tr> <tr> <td><h5>taskmanager.network.sort-shuffle.min-parallelism</h5></td> - <td style="word-wrap: break-word;">2147483647</td> + <td style="word-wrap: break-word;">1</td> <td>Integer</td> - <td>Parallelism threshold to switch between sort-merge blocking shuffle and the default hash-based blocking shuffle, which means for batch jobs of small parallelism, the hash-based blocking shuffle will be used and for batch jobs of large parallelism, the sort-merge one will be used. Note: For production usage, if sort-merge blocking shuffle is enabled, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-sh [...] + <td>Parallelism threshold to switch between sort-based blocking shuffle and hash-based blocking shuffle, which means for batch jobs of smaller parallelism, hash-shuffle will be used and for batch jobs of larger or equal parallelism, sort-shuffle will be used. The value 1 means that sort-shuffle is the default option. Note: For production usage, you may also need to tune 'taskmanager.network.sort-shuffle.min-buffers' and 'taskmanager.memory.framework.off-heap.batch-shuffle.siz [...] </tr> </tbody> </table> diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java index 526b4bd..cafca62 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java @@ -218,24 +218,24 @@ public class NettyShuffleEnvironmentOptions { + " config value."); /** - * Parallelism threshold to switch between sort-merge based blocking shuffle and the default - * hash-based blocking shuffle. + * Parallelism threshold to switch between sort-based blocking shuffle and hash-based blocking + * shuffle. */ @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK) public static final ConfigOption<Integer> NETWORK_SORT_SHUFFLE_MIN_PARALLELISM = key("taskmanager.network.sort-shuffle.min-parallelism") .intType() - .defaultValue(Integer.MAX_VALUE) + .defaultValue(1) .withDescription( String.format( - "Parallelism threshold to switch between sort-merge blocking " - + "shuffle and the default hash-based blocking shuffle," - + " which means for batch jobs of small parallelism, " - + "the hash-based blocking shuffle will be used and for" - + " batch jobs of large parallelism, the sort-merge one" - + " will be used. Note: For production usage, if sort-" - + "merge blocking shuffle is enabled, you may also need" - + " to tune '%s' and '%s' for better performance.", + "Parallelism threshold to switch between sort-based blocking " + + "shuffle and hash-based blocking shuffle, which means" + + " for batch jobs of smaller parallelism, hash-shuffle" + + " will be used and for batch jobs of larger or equal " + + "parallelism, sort-shuffle will be used. The value 1 " + + "means that sort-shuffle is the default option. Note:" + + " For production usage, you may also need to tune " + + "'%s' and '%s' for better performance.", NETWORK_SORT_SHUFFLE_MIN_BUFFERS.key(), // raw string key is used here to avoid interdependence, a test // is implemented to guard that when the target key is modified, diff --git a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh index cc1fb13..310f2bc 100755 --- a/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh +++ b/flink-end-to-end-tests/test-scripts/test_high_parallelism_iterations.sh @@ -33,6 +33,7 @@ set_config_key "taskmanager.numberOfTaskSlots" "$SLOTS_PER_TM" set_config_key "taskmanager.memory.network.min" "160m" set_config_key "taskmanager.memory.network.max" "160m" set_config_key "taskmanager.memory.framework.off-heap.size" "300m" +set_config_key "taskmanager.network.sort-shuffle.min-buffers" "64" print_mem_use start_cluster diff --git a/flink-end-to-end-tests/test-scripts/test_tpcds.sh b/flink-end-to-end-tests/test-scripts/test_tpcds.sh index 8eaf51d..4f88c1f 100755 --- a/flink-end-to-end-tests/test-scripts/test_tpcds.sh +++ b/flink-end-to-end-tests/test-scripts/test_tpcds.sh @@ -57,6 +57,7 @@ echo "[INFO]Preparing Flink cluster..." set_config_key "taskmanager.memory.process.size" "4096m" set_config_key "taskmanager.numberOfTaskSlots" "4" set_config_key "parallelism.default" "4" +set_config_key "taskmanager.memory.network.fraction" "0.2" start_cluster diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java index 1deb221..980c0a8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterConfiguration.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.ClusterOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.UnmodifiableConfiguration; @@ -71,6 +72,13 @@ public class MiniClusterConfiguration { TaskExecutorResourceUtils.adjustForLocalExecution(modifiedConfig); + // reduce the default number of network buffers used by sort-shuffle to avoid the + // "Insufficient number of network buffers" error. + if (!modifiedConfig.contains( + NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS)) { + modifiedConfig.set(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 16); + } + // set default io pool size. if (!modifiedConfig.contains(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE)) { modifiedConfig.set(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE, DEFAULT_IO_POOL_SIZE); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 4010b41..c9900ac 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.HeartbeatManagerOptions; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.plugin.PluginManager; @@ -105,6 +106,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m")); config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("3200k")); config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("3200k")); + config.set(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 16); config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.parse("128m")); config.set(TaskManagerOptions.CPU_CORES, 1.0); config.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full"); diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java index aafd8c6..00c977f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java @@ -31,6 +31,7 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.core.plugin.PluginUtils; @@ -267,6 +268,7 @@ public class JobManagerHAProcessFailureRecoveryITCase extends TestLogger { config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m")); config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("3200k")); config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("3200k")); + config.set(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 16); config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2); config.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.parse("128m")); config.set(TaskManagerOptions.CPU_CORES, 1.0); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java index 2991ba0..ce788a8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/BlockingShuffleITCase.java @@ -48,6 +48,10 @@ public class BlockingShuffleITCase { public void testBoundedBlockingShuffle() throws Exception { JobGraph jobGraph = createJobGraph(1000000); Configuration configuration = new Configuration(); + configuration.setInteger( + NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, + Integer.MAX_VALUE); + JobGraphRunningUtil.execute( jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager); } @@ -56,6 +60,10 @@ public class BlockingShuffleITCase { public void testBoundedBlockingShuffleWithoutData() throws Exception { JobGraph jobGraph = createJobGraph(0); Configuration configuration = new Configuration(); + configuration.setInteger( + NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, + Integer.MAX_VALUE); + JobGraphRunningUtil.execute( jobGraph, configuration, numTaskManagers, numSlotsPerTaskManager); } @@ -64,8 +72,6 @@ public class BlockingShuffleITCase { public void testSortMergeBlockingShuffle() throws Exception { Configuration configuration = new Configuration(); configuration.setInteger( - NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1); - configuration.setInteger( NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64); JobGraph jobGraph = createJobGraph(1000000); @@ -77,8 +83,6 @@ public class BlockingShuffleITCase { public void testSortMergeBlockingShuffleWithoutData() throws Exception { Configuration configuration = new Configuration(); configuration.setInteger( - NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1); - configuration.setInteger( NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, 64); JobGraph jobGraph = createJobGraph(0); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java index d458136..b32db9d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java @@ -87,6 +87,9 @@ public class ShuffleCompressionITCase { configuration.setBoolean( NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false); configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); + configuration.setInteger( + NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, + Integer.MAX_VALUE); JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH); JobGraphRunningUtil.execute(jobGraph, configuration, NUM_TASKMANAGERS, NUM_SLOTS); @@ -97,8 +100,6 @@ public class ShuffleCompressionITCase { Configuration configuration = new Configuration(); configuration.setBoolean( NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, false); - configuration.setInteger( - NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1); configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMinutes(1)); JobGraph jobGraph = createJobGraph(ResultPartitionType.BLOCKING, ExecutionMode.BATCH);
