gaoyunhaii commented on a change in pull request #18723:
URL: https://github.com/apache/flink/pull/18723#discussion_r805490452
##########
File path: docs/content.zh/docs/ops/batch/blocking_shuffle.md
##########
@@ -56,34 +56,59 @@ Flink [DataStream API]({{< ref
"docs/dev/datastream/execution_mode" >}}) 和 [Ta
{{< /hint >}}
{{< hint info >}}
-`mmap`使用的内存不计算进已有配置的内存限制中,但是一些资源管理框架比如 yarn 将追踪这块内存使用,并且如果容器使用内存超过阈值会被杀掉。
+`mmap`使用的内存不计算进已有配置的内存限制中,但是一些资源管理框架比如 YARN 将追踪这块内存使用,并且如果容器使用内存超过阈值会被杀掉。
{{< /hint >}}
-为了进一步的提升性能,对于绝大多数的任务我们推荐 [启用压缩]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
,除非数据很难被压缩。
-
`Hash Shuffle` 在小规模运行在固态硬盘的任务情况下效果显著,但是依旧有一些问题:
1. 如果任务的规模庞大将会创建很多文件,并且要求同时对这些文件进行大量的写操作。
2. 在机械硬盘情况下,当大量的下游任务同时读取数据,可能会导致随机读写问题。
## Sort Shuffle
-`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash
Shuffle`,sort shuffle
将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下
sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖
`sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref
"docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和
[FLINK-19614](https://issues.apache.org/jira/browse/FLINK-19614)。
+`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash
Shuffle`,`Sort Shuffle`
将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下
`Sort Shuffle` 可以获得比 `Hash Shuffle` 更好的性能。另外,`Sort Shuffle`
使用额外管理的内存作为读数据缓存并不依赖 `sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref
"docs/deployment/security/security-ssl" >}})。关于 `Sort Shuffle` 的更多细节请参考
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和
[FLINK-19614](https://issues.apache.org/jira/browse/FLINK-19614)。
当使用sort blocking shuffle的时候有些配置需要适配:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): 配置该选项以启用 shuffle
data 压缩,大部分任务建议开启除非你的数据压缩比率比较低。对于 1.14 以及更低的版本默认为 false,1.15 版本起默认为 true。
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort
shuffle`。对于 1.15 以下的版本,它的默认值是 `Integer.MAX_VALUE`,所以默认情况下总是会使用 `hash shuffle`。从
1.15 开始,它的默认值是 1, 所以默认情况下总是会使用 `sort shuffle`。
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
配置该选项以控制数据写缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size):
配置该选项以控制数据读取缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
{{< hint info >}}
-目前 `sort shuffle` 只通过分区索引来排序而不是记录本身,也就是说 `sort` 只是被当成数据聚类算法使用。
+目前 `Sort Shuffle` 只通过分区索引来排序而不是记录本身,也就是说 `sort` 只是被当成数据聚类算法使用。
{{< /hint >}}
## 如何选择 Blocking Shuffle
总的来说,
- 对于在固态硬盘上运行的小规模任务而言,两者都可以。
-- 对于在机械硬盘上运行的大规模任务而言,`sort shuffle` 更为合适。
-- 在这两种情况下,你可以考虑 [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
来提升性能,除非数据很难被压缩。
+- 对于在机械硬盘上运行的大规模任务而言,`Sort Shuffle` 更为合适。
+
+要在 `Sort Shuffle` 和 `Hash Shuffle` 间切换,
你需要配置这个参数:[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)。
这个参数根据消费者Task的并发选择当前Task使用`Hash Shuffle` 或 `Sort Shuffle`,如果并发小于配置值则使用 `Hash
Shuffle`,否则使用 `Sort Shuffle`。 对于 1.15 以下版本,它的默认值是 `Integer.MAX_VALUE`,这意味着
`Hash Shuffle` 是默认实现。 从 1.15 起,它的默认值是 1,这意味着 `Sort Shuffle` 是默认实现。
+
+## 性能调优
+
+下面这些建议可以帮助你实现更好的性能,这些对于大规模批作业尤其重要:
+
+1. 如果你使用机械硬盘作为存储设备,请总是使用 `Sort Shuffle`,因为这可以极大的提升稳定性和性能。从 1.15 开始,`Sort
Shuffle` 已经成为默认实现,对于 1.14 以及更低版本,你需要通过将
[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)
配置为 1 以手动开启 `Sort Shuffle`。
+2. 对于 `Sort Shuffle` 和 `Hash Shuffle` 两种实现,你都可以考虑开启 [数据压缩]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
除非数据本身无法压缩。从 1.15 开启,数据压缩是默认开启的,对于 1.14 以及更低版本你需要手动开启。
+3. 当使用 `Sort Shuffle` 时, 减少[独占网络缓冲区]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-buffers-per-channel) 并增加[流动网络缓冲区]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) 有利于性能提升。对于 1.14
以及更高版本,建议将 [taskmanager.network.memory.buffers-per-channel]({{< ref
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) 设为
0 并且将 [taskmanager.network.memory.floating-buffers-per-gate]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) 设为一个较大的值 (比如, 4096)。
这有两个主要的好处:1) 首先这解耦了并发与网络内存使用量,对于大规模作业,这降低了遇到 "Insufficient number of network
buffers" 错误的可能性;2) 网络缓冲区可以根据需求在不同的数据通道间共享流动,这可以提高了网络缓冲区的利用率�
�进而可以提高性能。
Review comment:
`减少[独占网络缓冲区]` 前面多一个空格。
[独占网络缓冲区]和[流动网络缓冲区] 前后的空格最好保持一致,现在是后面有前面没有,下面的链接也类似
##########
File path: docs/content.zh/docs/ops/batch/blocking_shuffle.md
##########
@@ -56,34 +56,59 @@ Flink [DataStream API]({{< ref
"docs/dev/datastream/execution_mode" >}}) 和 [Ta
{{< /hint >}}
{{< hint info >}}
-`mmap`使用的内存不计算进已有配置的内存限制中,但是一些资源管理框架比如 yarn 将追踪这块内存使用,并且如果容器使用内存超过阈值会被杀掉。
+`mmap`使用的内存不计算进已有配置的内存限制中,但是一些资源管理框架比如 YARN 将追踪这块内存使用,并且如果容器使用内存超过阈值会被杀掉。
{{< /hint >}}
-为了进一步的提升性能,对于绝大多数的任务我们推荐 [启用压缩]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
,除非数据很难被压缩。
-
`Hash Shuffle` 在小规模运行在固态硬盘的任务情况下效果显著,但是依旧有一些问题:
1. 如果任务的规模庞大将会创建很多文件,并且要求同时对这些文件进行大量的写操作。
2. 在机械硬盘情况下,当大量的下游任务同时读取数据,可能会导致随机读写问题。
## Sort Shuffle
-`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash
Shuffle`,sort shuffle
将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下
sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖
`sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref
"docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和
[FLINK-19614](https://issues.apache.org/jira/browse/FLINK-19614)。
+`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash
Shuffle`,`Sort Shuffle`
将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下
`Sort Shuffle` 可以获得比 `Hash Shuffle` 更好的性能。另外,`Sort Shuffle`
使用额外管理的内存作为读数据缓存并不依赖 `sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref
"docs/deployment/security/security-ssl" >}})。关于 `Sort Shuffle` 的更多细节请参考
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和
[FLINK-19614](https://issues.apache.org/jira/browse/FLINK-19614)。
当使用sort blocking shuffle的时候有些配置需要适配:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): 配置该选项以启用 shuffle
data 压缩,大部分任务建议开启除非你的数据压缩比率比较低。对于 1.14 以及更低的版本默认为 false,1.15 版本起默认为 true。
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort
shuffle`。对于 1.15 以下的版本,它的默认值是 `Integer.MAX_VALUE`,所以默认情况下总是会使用 `hash shuffle`。从
1.15 开始,它的默认值是 1, 所以默认情况下总是会使用 `sort shuffle`。
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
配置该选项以控制数据写缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size):
配置该选项以控制数据读取缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
{{< hint info >}}
-目前 `sort shuffle` 只通过分区索引来排序而不是记录本身,也就是说 `sort` 只是被当成数据聚类算法使用。
+目前 `Sort Shuffle` 只通过分区索引来排序而不是记录本身,也就是说 `sort` 只是被当成数据聚类算法使用。
{{< /hint >}}
## 如何选择 Blocking Shuffle
总的来说,
- 对于在固态硬盘上运行的小规模任务而言,两者都可以。
-- 对于在机械硬盘上运行的大规模任务而言,`sort shuffle` 更为合适。
-- 在这两种情况下,你可以考虑 [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
来提升性能,除非数据很难被压缩。
+- 对于在机械硬盘上运行的大规模任务而言,`Sort Shuffle` 更为合适。
+
+要在 `Sort Shuffle` 和 `Hash Shuffle` 间切换,
你需要配置这个参数:[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)。
这个参数根据消费者Task的并发选择当前Task使用`Hash Shuffle` 或 `Sort Shuffle`,如果并发小于配置值则使用 `Hash
Shuffle`,否则使用 `Sort Shuffle`。 对于 1.15 以下版本,它的默认值是 `Integer.MAX_VALUE`,这意味着
`Hash Shuffle` 是默认实现。 从 1.15 起,它的默认值是 1,这意味着 `Sort Shuffle` 是默认实现。
+
+## 性能调优
+
+下面这些建议可以帮助你实现更好的性能,这些对于大规模批作业尤其重要:
+
+1. 如果你使用机械硬盘作为存储设备,请总是使用 `Sort Shuffle`,因为这可以极大的提升稳定性和性能。从 1.15 开始,`Sort
Shuffle` 已经成为默认实现,对于 1.14 以及更低版本,你需要通过将
[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)
配置为 1 以手动开启 `Sort Shuffle`。
+2. 对于 `Sort Shuffle` 和 `Hash Shuffle` 两种实现,你都可以考虑开启 [数据压缩]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
除非数据本身无法压缩。从 1.15 开启,数据压缩是默认开启的,对于 1.14 以及更低版本你需要手动开启。
+3. 当使用 `Sort Shuffle` 时, 减少[独占网络缓冲区]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-buffers-per-channel) 并增加[流动网络缓冲区]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) 有利于性能提升。对于 1.14
以及更高版本,建议将 [taskmanager.network.memory.buffers-per-channel]({{< ref
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) 设为
0 并且将 [taskmanager.network.memory.floating-buffers-per-gate]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) 设为一个较大的值 (比如, 4096)。
这有两个主要的好处:1) 首先这解耦了并发与网络内存使用量,对于大规模作业,这降低了遇到 "Insufficient number of network
buffers" 错误的可能性;2) 网络缓冲区可以根据需求在不同的数据通道间共享流动,这可以提高了网络缓冲区的利用率�
�进而可以提高性能。
+4. 增大总的网络内存。目前网络内存的大小是比较保守的。对于大规模作业,为了实现更好的性能,建议将[网络内存比例]({{< ref
"docs/deployment/config" >}}#taskmanager-memory-network-fraction) 增加至至少
0.2。为了使调整生效,你可能需要同时调整[网络内存大小下界]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-min) 以及[网络内存大小上界]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-network-max)。要获取更多信息,你可以参考这个[内存配置文档]({{< ref
"docs/deployment/memory/mem_setup_tm/" >}})。
Review comment:
Do not add `/`, otherwise it would not be compiled. Same to the English
version.
##########
File path: docs/content/docs/ops/batch/blocking_shuffle.md
##########
@@ -59,31 +59,55 @@ If [SSL]({{< ref "docs/deployment/security/security-ssl"
>}}) is enabled, the `f
The memory usage of `mmap` is not accounted for by configured memory limits,
but some resource frameworks like Yarn will track this memory usage and kill
the container if memory exceeds some threshold.
{{< /hint >}}
-To further improve the performance, for most jobs we also recommend [enabling
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
unless the data is hard to compress.
-
`Hash Shuffle` works well for small scale jobs with SSD, but it also have some
disadvantages:
1. If the job scale is large, it might create too many files, and it requires
a large write buffer to write these files at the same time.
2. On HDD, when multiple downstream tasks fetch their data simultaneously, it
might incur the issue of random IO.
## Sort Shuffle
-`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, sort shuffle writes only one file for each
result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, sort shuffle can achieve better
performance than hash shuffle, especially on HDD. Additionally, `sort shuffle`
uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues.apach
e.org/jira/browse/FLINK-19614) for more details about sort shuffle.
+`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for
each result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, `Sort Shuffle` can achieve
better performance than `Hash Shuffle`, especially on HDD. Additionally, `Sort
Shuffle` uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues
.apache.org/jira/browse/FLINK-19614) for more details about `Sort Shuffle`.
-There are several config options that might need adjustment when using sort
blocking shuffle:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option
for shuffle data compression. it is suggested to enable it for most jobs except
that the compression ratio of your data is low. Defaults to false for 1.14 and
lower, and true for 1.15 and higher.
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
Config option to enable sort shuffle depending on the parallelism of downstream
tasks. If parallelism is lower than the configured value, `hash shuffle` will
be used, otherwise `sort shuffle` will be used. For versions lower than 1.15,
its default value is `Integer.MAX_VALUE`, so hash shuffle will be always used
by default. Since 1.15, its default value is 1, so sort shuffle will be always
used by default.
+Here are some config options that might need adjustment when using sort
blocking shuffle:
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
Config option to control data writing buffer size. For large scale jobs, you
may need to increase this value, usually, several hundreds of megabytes memory
is enough.
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to
control data reading buffer size. For large scale jobs, you may need to
increase this value, usually, several hundreds of megabytes memory is enough.
{{< hint info >}}
-Currently `sort shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
+Currently `Sort Shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
{{< /hint >}}
## Choices of Blocking Shuffle
As a summary,
- For small scale jobs running on SSD, both implementation should work.
-- For large scale jobs or for jobs running on HDD, `sort shuffle` should be
more suitable.
-- In both case, you may consider [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress.
+- For large scale jobs or for jobs running on HDD, `Sort Shuffle` should be
more suitable.
+
+To switch between `Sort Shuffle` and `Hash Shuffle`, you need to adjust this
config option: [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism).
It controls which shuffle implementation to use based on the parallelism of
downstream tasks, if the parallelism is lower than the configured value, `Hash
Shuffle` will be used, otherwise `Sort Shuffle` will be used. For versions
lower than 1.15, its default value is `Integer.MAX_VALUE`, so `Hash Shuffle`
will be always used by default. Since 1.15, its default value is 1, so `Sort
Shuffle` will be always used by default.
+
+## Performance Tuning
+
+The following guidelins may help you to achieve better performance especially
for large scale batch jobs:
+
+1. Always use `Sort Shuffle` on spinning disk because `Sort Shuffle` can
largely improve stability and IO performance. Since 1.15, `Sort Shuffle` is
already the default blocking shuffle implementation, for 1.14 and lower
version, you need to enable it manually by setting
[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)
to 1.
+2. For both blocking shuffle implementations, you may consider [enabling data
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress. Since 1.15,
data compression is already enabled by default, for 1.14 and lower version, you
need to enable it manually.
+3. When `Sort Shuffle` is used, decrease the number of [exclusive buffers per
channel]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-buffers-per-channel) and increase the number of
[floating buffers per gate]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) can help. For 1.14
and higher version, it is suggested to set
[taskmanager.network.memory.buffers-per-channel]({{< ref
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) to
0 and set [taskmanager.network.memory.floating-buffers-per-gate]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) to a larger value
(for example, 4096). This setting has two main advantages: 1) It decouples the
network memory consumption from parallelism so for large scale jobs, the
possibility of "Insufficient number of network buffers" error can be decreased;
2) Networker buffers are distributed among different channels according to
needs which can improve the network buffer utilization, thus can improve
performance.
+4. Increase the total size of network memory. Currently, the default network
memory size is pretty modest. For large scale jobs, it's suggested to increase
the total [network memory fraction]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-fraction) to at least 0.2 to achieve better
performance. At the same time, you may also need to adjust the [lower
bound]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and
[upper bound]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max) of the network memory size, please refer to
the [memory configuration document]({{< ref
"docs/deployment/memory/mem_setup_tm/" >}}) for more information.
+5. Increase the memory size for shuffle data write. As mentioned in the above
section, for large scale jobs, it's suggested to increase the number of [write
buffers per result partition]({{< ref "docs/deployment/config"
>}}#taskmanager-network-sort-shuffle-min-buffers) to at least (2 * parallelism)
if you have enough memory. Note that you may also need to increase the total
size of network memory to avoid the "Insufficient number of network buffers"
error after you increase this config value.
+6. Increase the memory size for shuffle data read. As mentioned in the above
section, for large scale jobs, it's suggested to increase the size of the
[shared read memory]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) to a larger value
(for example, 256M or 512M). Because this memory is cut from the framework
off-heap memory, you must increase
[taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-size) by the same size to avoid the
direct memory OOM error.
+
+## Trouble Shooting
+
+Here are some exceptions you may encounter (rarely) and the corresponding
solutions that may help:
+
+| Exceptions | Potential Solutions |
+| :--------- | :------------------ |
+| Insufficient number of network buffers | This means the amount of network
memory is not enough to run the target job and you need to increase the total
network memory size. Note that since 1.15, `Sort Shuffle` has become the
default blocking shuffle implementation and for some cases, it may need more
network memory than before, which means there is a small possibility that your
batch jobs may suffer this issue after upgrading to 1.15. If this is the case,
you just need to increase the total network memory size. |
+| Too many open files | This means that there is no enough file descriptors
left. If you are using `Hash Shuffle`, please switch to `Sort Shuffle`. If you
are already using `Sort Shuffle`, please consider to increase the system limit
for file descriptor and check if the user code consumes too many file
descriptors. |
+| Connection reset by peer | This usually means that the network if unstable
or or under heavy burden. Other issues like SSL handshake timeout mentioned
above may also cause this problem. If you are using `Hash Shuffle`, please
switch to `Sort Shuffle`. If you are already using `Sort Shuffle`, increasing
the [network backlog]({{< ref "docs/deployment/config"
>}}#taskmanager-network-netty-server-backlog) may help. |
Review comment:
if unstable -> is unstable
##########
File path: docs/content/docs/ops/batch/blocking_shuffle.md
##########
@@ -59,31 +59,55 @@ If [SSL]({{< ref "docs/deployment/security/security-ssl"
>}}) is enabled, the `f
The memory usage of `mmap` is not accounted for by configured memory limits,
but some resource frameworks like Yarn will track this memory usage and kill
the container if memory exceeds some threshold.
{{< /hint >}}
-To further improve the performance, for most jobs we also recommend [enabling
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
unless the data is hard to compress.
-
`Hash Shuffle` works well for small scale jobs with SSD, but it also have some
disadvantages:
1. If the job scale is large, it might create too many files, and it requires
a large write buffer to write these files at the same time.
2. On HDD, when multiple downstream tasks fetch their data simultaneously, it
might incur the issue of random IO.
## Sort Shuffle
-`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, sort shuffle writes only one file for each
result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, sort shuffle can achieve better
performance than hash shuffle, especially on HDD. Additionally, `sort shuffle`
uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues.apach
e.org/jira/browse/FLINK-19614) for more details about sort shuffle.
+`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for
each result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, `Sort Shuffle` can achieve
better performance than `Hash Shuffle`, especially on HDD. Additionally, `Sort
Shuffle` uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues
.apache.org/jira/browse/FLINK-19614) for more details about `Sort Shuffle`.
-There are several config options that might need adjustment when using sort
blocking shuffle:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option
for shuffle data compression. it is suggested to enable it for most jobs except
that the compression ratio of your data is low. Defaults to false for 1.14 and
lower, and true for 1.15 and higher.
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
Config option to enable sort shuffle depending on the parallelism of downstream
tasks. If parallelism is lower than the configured value, `hash shuffle` will
be used, otherwise `sort shuffle` will be used. For versions lower than 1.15,
its default value is `Integer.MAX_VALUE`, so hash shuffle will be always used
by default. Since 1.15, its default value is 1, so sort shuffle will be always
used by default.
+Here are some config options that might need adjustment when using sort
blocking shuffle:
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
Config option to control data writing buffer size. For large scale jobs, you
may need to increase this value, usually, several hundreds of megabytes memory
is enough.
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to
control data reading buffer size. For large scale jobs, you may need to
increase this value, usually, several hundreds of megabytes memory is enough.
{{< hint info >}}
-Currently `sort shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
+Currently `Sort Shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
{{< /hint >}}
## Choices of Blocking Shuffle
As a summary,
- For small scale jobs running on SSD, both implementation should work.
-- For large scale jobs or for jobs running on HDD, `sort shuffle` should be
more suitable.
-- In both case, you may consider [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress.
+- For large scale jobs or for jobs running on HDD, `Sort Shuffle` should be
more suitable.
+
+To switch between `Sort Shuffle` and `Hash Shuffle`, you need to adjust this
config option: [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism).
It controls which shuffle implementation to use based on the parallelism of
downstream tasks, if the parallelism is lower than the configured value, `Hash
Shuffle` will be used, otherwise `Sort Shuffle` will be used. For versions
lower than 1.15, its default value is `Integer.MAX_VALUE`, so `Hash Shuffle`
will be always used by default. Since 1.15, its default value is 1, so `Sort
Shuffle` will be always used by default.
+
+## Performance Tuning
+
+The following guidelins may help you to achieve better performance especially
for large scale batch jobs:
+
+1. Always use `Sort Shuffle` on spinning disk because `Sort Shuffle` can
largely improve stability and IO performance. Since 1.15, `Sort Shuffle` is
already the default blocking shuffle implementation, for 1.14 and lower
version, you need to enable it manually by setting
[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)
to 1.
+2. For both blocking shuffle implementations, you may consider [enabling data
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress. Since 1.15,
data compression is already enabled by default, for 1.14 and lower version, you
need to enable it manually.
+3. When `Sort Shuffle` is used, decrease the number of [exclusive buffers per
channel]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-buffers-per-channel) and increase the number of
[floating buffers per gate]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) can help. For 1.14
and higher version, it is suggested to set
[taskmanager.network.memory.buffers-per-channel]({{< ref
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) to
0 and set [taskmanager.network.memory.floating-buffers-per-gate]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) to a larger value
(for example, 4096). This setting has two main advantages: 1) It decouples the
network memory consumption from parallelism so for large scale jobs, the
possibility of "Insufficient number of network buffers" error can be decreased;
2) Networker buffers are distributed among different channels according to
needs which can improve the network buffer utilization, thus can improve
performance.
+4. Increase the total size of network memory. Currently, the default network
memory size is pretty modest. For large scale jobs, it's suggested to increase
the total [network memory fraction]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-fraction) to at least 0.2 to achieve better
performance. At the same time, you may also need to adjust the [lower
bound]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and
[upper bound]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max) of the network memory size, please refer to
the [memory configuration document]({{< ref
"docs/deployment/memory/mem_setup_tm/" >}}) for more information.
+5. Increase the memory size for shuffle data write. As mentioned in the above
section, for large scale jobs, it's suggested to increase the number of [write
buffers per result partition]({{< ref "docs/deployment/config"
>}}#taskmanager-network-sort-shuffle-min-buffers) to at least (2 * parallelism)
if you have enough memory. Note that you may also need to increase the total
size of network memory to avoid the "Insufficient number of network buffers"
error after you increase this config value.
+6. Increase the memory size for shuffle data read. As mentioned in the above
section, for large scale jobs, it's suggested to increase the size of the
[shared read memory]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) to a larger value
(for example, 256M or 512M). Because this memory is cut from the framework
off-heap memory, you must increase
[taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-size) by the same size to avoid the
direct memory OOM error.
+
+## Trouble Shooting
+
+Here are some exceptions you may encounter (rarely) and the corresponding
solutions that may help:
+
+| Exceptions | Potential Solutions |
+| :--------- | :------------------ |
+| Insufficient number of network buffers | This means the amount of network
memory is not enough to run the target job and you need to increase the total
network memory size. Note that since 1.15, `Sort Shuffle` has become the
default blocking shuffle implementation and for some cases, it may need more
network memory than before, which means there is a small possibility that your
batch jobs may suffer this issue after upgrading to 1.15. If this is the case,
you just need to increase the total network memory size. |
Review comment:
suffer this issue -> suffer from this issue ?
##########
File path: docs/content/docs/ops/batch/blocking_shuffle.md
##########
@@ -59,31 +59,55 @@ If [SSL]({{< ref "docs/deployment/security/security-ssl"
>}}) is enabled, the `f
The memory usage of `mmap` is not accounted for by configured memory limits,
but some resource frameworks like Yarn will track this memory usage and kill
the container if memory exceeds some threshold.
{{< /hint >}}
-To further improve the performance, for most jobs we also recommend [enabling
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
unless the data is hard to compress.
-
`Hash Shuffle` works well for small scale jobs with SSD, but it also have some
disadvantages:
1. If the job scale is large, it might create too many files, and it requires
a large write buffer to write these files at the same time.
2. On HDD, when multiple downstream tasks fetch their data simultaneously, it
might incur the issue of random IO.
## Sort Shuffle
-`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, sort shuffle writes only one file for each
result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, sort shuffle can achieve better
performance than hash shuffle, especially on HDD. Additionally, `sort shuffle`
uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues.apach
e.org/jira/browse/FLINK-19614) for more details about sort shuffle.
+`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for
each result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, `Sort Shuffle` can achieve
better performance than `Hash Shuffle`, especially on HDD. Additionally, `Sort
Shuffle` uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues
.apache.org/jira/browse/FLINK-19614) for more details about `Sort Shuffle`.
-There are several config options that might need adjustment when using sort
blocking shuffle:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option
for shuffle data compression. it is suggested to enable it for most jobs except
that the compression ratio of your data is low. Defaults to false for 1.14 and
lower, and true for 1.15 and higher.
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
Config option to enable sort shuffle depending on the parallelism of downstream
tasks. If parallelism is lower than the configured value, `hash shuffle` will
be used, otherwise `sort shuffle` will be used. For versions lower than 1.15,
its default value is `Integer.MAX_VALUE`, so hash shuffle will be always used
by default. Since 1.15, its default value is 1, so sort shuffle will be always
used by default.
+Here are some config options that might need adjustment when using sort
blocking shuffle:
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
Config option to control data writing buffer size. For large scale jobs, you
may need to increase this value, usually, several hundreds of megabytes memory
is enough.
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to
control data reading buffer size. For large scale jobs, you may need to
increase this value, usually, several hundreds of megabytes memory is enough.
{{< hint info >}}
-Currently `sort shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
+Currently `Sort Shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
{{< /hint >}}
## Choices of Blocking Shuffle
As a summary,
- For small scale jobs running on SSD, both implementation should work.
-- For large scale jobs or for jobs running on HDD, `sort shuffle` should be
more suitable.
-- In both case, you may consider [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress.
+- For large scale jobs or for jobs running on HDD, `Sort Shuffle` should be
more suitable.
+
+To switch between `Sort Shuffle` and `Hash Shuffle`, you need to adjust this
config option: [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism).
It controls which shuffle implementation to use based on the parallelism of
downstream tasks, if the parallelism is lower than the configured value, `Hash
Shuffle` will be used, otherwise `Sort Shuffle` will be used. For versions
lower than 1.15, its default value is `Integer.MAX_VALUE`, so `Hash Shuffle`
will be always used by default. Since 1.15, its default value is 1, so `Sort
Shuffle` will be always used by default.
+
+## Performance Tuning
+
+The following guidelins may help you to achieve better performance especially
for large scale batch jobs:
+
+1. Always use `Sort Shuffle` on spinning disk because `Sort Shuffle` can
largely improve stability and IO performance. Since 1.15, `Sort Shuffle` is
already the default blocking shuffle implementation, for 1.14 and lower
version, you need to enable it manually by setting
[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)
to 1.
Review comment:
Might unify `spinning disk` -> `HDD` or reservely
##########
File path: docs/content/docs/ops/batch/blocking_shuffle.md
##########
@@ -59,31 +59,55 @@ If [SSL]({{< ref "docs/deployment/security/security-ssl"
>}}) is enabled, the `f
The memory usage of `mmap` is not accounted for by configured memory limits,
but some resource frameworks like Yarn will track this memory usage and kill
the container if memory exceeds some threshold.
{{< /hint >}}
-To further improve the performance, for most jobs we also recommend [enabling
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
unless the data is hard to compress.
-
`Hash Shuffle` works well for small scale jobs with SSD, but it also have some
disadvantages:
1. If the job scale is large, it might create too many files, and it requires
a large write buffer to write these files at the same time.
2. On HDD, when multiple downstream tasks fetch their data simultaneously, it
might incur the issue of random IO.
## Sort Shuffle
-`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, sort shuffle writes only one file for each
result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, sort shuffle can achieve better
performance than hash shuffle, especially on HDD. Additionally, `sort shuffle`
uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues.apach
e.org/jira/browse/FLINK-19614) for more details about sort shuffle.
+`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for
each result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, `Sort Shuffle` can achieve
better performance than `Hash Shuffle`, especially on HDD. Additionally, `Sort
Shuffle` uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues
.apache.org/jira/browse/FLINK-19614) for more details about `Sort Shuffle`.
-There are several config options that might need adjustment when using sort
blocking shuffle:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option
for shuffle data compression. it is suggested to enable it for most jobs except
that the compression ratio of your data is low. Defaults to false for 1.14 and
lower, and true for 1.15 and higher.
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
Config option to enable sort shuffle depending on the parallelism of downstream
tasks. If parallelism is lower than the configured value, `hash shuffle` will
be used, otherwise `sort shuffle` will be used. For versions lower than 1.15,
its default value is `Integer.MAX_VALUE`, so hash shuffle will be always used
by default. Since 1.15, its default value is 1, so sort shuffle will be always
used by default.
+Here are some config options that might need adjustment when using sort
blocking shuffle:
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
Config option to control data writing buffer size. For large scale jobs, you
may need to increase this value, usually, several hundreds of megabytes memory
is enough.
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to
control data reading buffer size. For large scale jobs, you may need to
increase this value, usually, several hundreds of megabytes memory is enough.
{{< hint info >}}
-Currently `sort shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
+Currently `Sort Shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
{{< /hint >}}
## Choices of Blocking Shuffle
As a summary,
- For small scale jobs running on SSD, both implementation should work.
-- For large scale jobs or for jobs running on HDD, `sort shuffle` should be
more suitable.
-- In both case, you may consider [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress.
+- For large scale jobs or for jobs running on HDD, `Sort Shuffle` should be
more suitable.
+
+To switch between `Sort Shuffle` and `Hash Shuffle`, you need to adjust this
config option: [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism).
It controls which shuffle implementation to use based on the parallelism of
downstream tasks, if the parallelism is lower than the configured value, `Hash
Shuffle` will be used, otherwise `Sort Shuffle` will be used. For versions
lower than 1.15, its default value is `Integer.MAX_VALUE`, so `Hash Shuffle`
will be always used by default. Since 1.15, its default value is 1, so `Sort
Shuffle` will be always used by default.
+
+## Performance Tuning
+
+The following guidelins may help you to achieve better performance especially
for large scale batch jobs:
+
+1. Always use `Sort Shuffle` on spinning disk because `Sort Shuffle` can
largely improve stability and IO performance. Since 1.15, `Sort Shuffle` is
already the default blocking shuffle implementation, for 1.14 and lower
version, you need to enable it manually by setting
[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)
to 1.
+2. For both blocking shuffle implementations, you may consider [enabling data
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress. Since 1.15,
data compression is already enabled by default, for 1.14 and lower version, you
need to enable it manually.
+3. When `Sort Shuffle` is used, decrease the number of [exclusive buffers per
channel]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-buffers-per-channel) and increase the number of
[floating buffers per gate]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) can help. For 1.14
and higher version, it is suggested to set
[taskmanager.network.memory.buffers-per-channel]({{< ref
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) to
0 and set [taskmanager.network.memory.floating-buffers-per-gate]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) to a larger value
(for example, 4096). This setting has two main advantages: 1) It decouples the
network memory consumption from parallelism so for large scale jobs, the
possibility of "Insufficient number of network buffers" error can be decreased;
2) Networker buffers are distributed among different channels according to
needs which can improve the network buffer utilization, thus can improve
performance.
Review comment:
decrease the number of-> decreasing the number of
increase the number of -> increasing the number of
`...according to needs which can improve the network buffer utilization,
thus can improve performance.` ->
`...according to needs, which can improve the network buffer utilization and
further improve the performance.`
##########
File path: docs/content/docs/ops/batch/blocking_shuffle.md
##########
@@ -59,31 +59,55 @@ If [SSL]({{< ref "docs/deployment/security/security-ssl"
>}}) is enabled, the `f
The memory usage of `mmap` is not accounted for by configured memory limits,
but some resource frameworks like Yarn will track this memory usage and kill
the container if memory exceeds some threshold.
{{< /hint >}}
-To further improve the performance, for most jobs we also recommend [enabling
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
unless the data is hard to compress.
-
`Hash Shuffle` works well for small scale jobs with SSD, but it also have some
disadvantages:
1. If the job scale is large, it might create too many files, and it requires
a large write buffer to write these files at the same time.
2. On HDD, when multiple downstream tasks fetch their data simultaneously, it
might incur the issue of random IO.
## Sort Shuffle
-`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, sort shuffle writes only one file for each
result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, sort shuffle can achieve better
performance than hash shuffle, especially on HDD. Additionally, `sort shuffle`
uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues.apach
e.org/jira/browse/FLINK-19614) for more details about sort shuffle.
+`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for
each result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, `Sort Shuffle` can achieve
better performance than `Hash Shuffle`, especially on HDD. Additionally, `Sort
Shuffle` uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues
.apache.org/jira/browse/FLINK-19614) for more details about `Sort Shuffle`.
-There are several config options that might need adjustment when using sort
blocking shuffle:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option
for shuffle data compression. it is suggested to enable it for most jobs except
that the compression ratio of your data is low. Defaults to false for 1.14 and
lower, and true for 1.15 and higher.
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
Config option to enable sort shuffle depending on the parallelism of downstream
tasks. If parallelism is lower than the configured value, `hash shuffle` will
be used, otherwise `sort shuffle` will be used. For versions lower than 1.15,
its default value is `Integer.MAX_VALUE`, so hash shuffle will be always used
by default. Since 1.15, its default value is 1, so sort shuffle will be always
used by default.
+Here are some config options that might need adjustment when using sort
blocking shuffle:
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
Config option to control data writing buffer size. For large scale jobs, you
may need to increase this value, usually, several hundreds of megabytes memory
is enough.
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to
control data reading buffer size. For large scale jobs, you may need to
increase this value, usually, several hundreds of megabytes memory is enough.
{{< hint info >}}
-Currently `sort shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
+Currently `Sort Shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
{{< /hint >}}
## Choices of Blocking Shuffle
As a summary,
- For small scale jobs running on SSD, both implementation should work.
-- For large scale jobs or for jobs running on HDD, `sort shuffle` should be
more suitable.
-- In both case, you may consider [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress.
+- For large scale jobs or for jobs running on HDD, `Sort Shuffle` should be
more suitable.
+
+To switch between `Sort Shuffle` and `Hash Shuffle`, you need to adjust this
config option: [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism).
It controls which shuffle implementation to use based on the parallelism of
downstream tasks, if the parallelism is lower than the configured value, `Hash
Shuffle` will be used, otherwise `Sort Shuffle` will be used. For versions
lower than 1.15, its default value is `Integer.MAX_VALUE`, so `Hash Shuffle`
will be always used by default. Since 1.15, its default value is 1, so `Sort
Shuffle` will be always used by default.
+
+## Performance Tuning
+
+The following guidelins may help you to achieve better performance especially
for large scale batch jobs:
+
+1. Always use `Sort Shuffle` on spinning disk because `Sort Shuffle` can
largely improve stability and IO performance. Since 1.15, `Sort Shuffle` is
already the default blocking shuffle implementation, for 1.14 and lower
version, you need to enable it manually by setting
[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)
to 1.
+2. For both blocking shuffle implementations, you may consider [enabling data
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress. Since 1.15,
data compression is already enabled by default, for 1.14 and lower version, you
need to enable it manually.
+3. When `Sort Shuffle` is used, decrease the number of [exclusive buffers per
channel]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-buffers-per-channel) and increase the number of
[floating buffers per gate]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) can help. For 1.14
and higher version, it is suggested to set
[taskmanager.network.memory.buffers-per-channel]({{< ref
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) to
0 and set [taskmanager.network.memory.floating-buffers-per-gate]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) to a larger value
(for example, 4096). This setting has two main advantages: 1) It decouples the
network memory consumption from parallelism so for large scale jobs, the
possibility of "Insufficient number of network buffers" error can be decreased;
2) Networker buffers are distributed among different channels according to
needs which can improve the network buffer utilization, thus can improve
performance.
+4. Increase the total size of network memory. Currently, the default network
memory size is pretty modest. For large scale jobs, it's suggested to increase
the total [network memory fraction]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-fraction) to at least 0.2 to achieve better
performance. At the same time, you may also need to adjust the [lower
bound]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and
[upper bound]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max) of the network memory size, please refer to
the [memory configuration document]({{< ref
"docs/deployment/memory/mem_setup_tm/" >}}) for more information.
+5. Increase the memory size for shuffle data write. As mentioned in the above
section, for large scale jobs, it's suggested to increase the number of [write
buffers per result partition]({{< ref "docs/deployment/config"
>}}#taskmanager-network-sort-shuffle-min-buffers) to at least (2 * parallelism)
if you have enough memory. Note that you may also need to increase the total
size of network memory to avoid the "Insufficient number of network buffers"
error after you increase this config value.
+6. Increase the memory size for shuffle data read. As mentioned in the above
section, for large scale jobs, it's suggested to increase the size of the
[shared read memory]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) to a larger value
(for example, 256M or 512M). Because this memory is cut from the framework
off-heap memory, you must increase
[taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-size) by the same size to avoid the
direct memory OOM error.
+
+## Trouble Shooting
+
+Here are some exceptions you may encounter (rarely) and the corresponding
solutions that may help:
+
+| Exceptions | Potential Solutions |
+| :--------- | :------------------ |
+| Insufficient number of network buffers | This means the amount of network
memory is not enough to run the target job and you need to increase the total
network memory size. Note that since 1.15, `Sort Shuffle` has become the
default blocking shuffle implementation and for some cases, it may need more
network memory than before, which means there is a small possibility that your
batch jobs may suffer this issue after upgrading to 1.15. If this is the case,
you just need to increase the total network memory size. |
+| Too many open files | This means that there is no enough file descriptors
left. If you are using `Hash Shuffle`, please switch to `Sort Shuffle`. If you
are already using `Sort Shuffle`, please consider to increase the system limit
for file descriptor and check if the user code consumes too many file
descriptors. |
Review comment:
no enough -> not enough ?
consider to increase -> consider increasing?
##########
File path: docs/content.zh/docs/ops/batch/blocking_shuffle.md
##########
@@ -56,34 +56,59 @@ Flink [DataStream API]({{< ref
"docs/dev/datastream/execution_mode" >}}) 和 [Ta
{{< /hint >}}
{{< hint info >}}
-`mmap`使用的内存不计算进已有配置的内存限制中,但是一些资源管理框架比如 yarn 将追踪这块内存使用,并且如果容器使用内存超过阈值会被杀掉。
+`mmap`使用的内存不计算进已有配置的内存限制中,但是一些资源管理框架比如 YARN 将追踪这块内存使用,并且如果容器使用内存超过阈值会被杀掉。
{{< /hint >}}
-为了进一步的提升性能,对于绝大多数的任务我们推荐 [启用压缩]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
,除非数据很难被压缩。
-
`Hash Shuffle` 在小规模运行在固态硬盘的任务情况下效果显著,但是依旧有一些问题:
1. 如果任务的规模庞大将会创建很多文件,并且要求同时对这些文件进行大量的写操作。
2. 在机械硬盘情况下,当大量的下游任务同时读取数据,可能会导致随机读写问题。
## Sort Shuffle
-`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash
Shuffle`,sort shuffle
将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下
sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖
`sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref
"docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和
[FLINK-19614](https://issues.apache.org/jira/browse/FLINK-19614)。
+`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash
Shuffle`,`Sort Shuffle`
将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下
`Sort Shuffle` 可以获得比 `Hash Shuffle` 更好的性能。另外,`Sort Shuffle`
使用额外管理的内存作为读数据缓存并不依赖 `sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref
"docs/deployment/security/security-ssl" >}})。关于 `Sort Shuffle` 的更多细节请参考
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和
[FLINK-19614](https://issues.apache.org/jira/browse/FLINK-19614)。
当使用sort blocking shuffle的时候有些配置需要适配:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): 配置该选项以启用 shuffle
data 压缩,大部分任务建议开启除非你的数据压缩比率比较低。对于 1.14 以及更低的版本默认为 false,1.15 版本起默认为 true。
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort
shuffle`。对于 1.15 以下的版本,它的默认值是 `Integer.MAX_VALUE`,所以默认情况下总是会使用 `hash shuffle`。从
1.15 开始,它的默认值是 1, 所以默认情况下总是会使用 `sort shuffle`。
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
配置该选项以控制数据写缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size):
配置该选项以控制数据读取缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
{{< hint info >}}
-目前 `sort shuffle` 只通过分区索引来排序而不是记录本身,也就是说 `sort` 只是被当成数据聚类算法使用。
+目前 `Sort Shuffle` 只通过分区索引来排序而不是记录本身,也就是说 `sort` 只是被当成数据聚类算法使用。
{{< /hint >}}
## 如何选择 Blocking Shuffle
总的来说,
- 对于在固态硬盘上运行的小规模任务而言,两者都可以。
-- 对于在机械硬盘上运行的大规模任务而言,`sort shuffle` 更为合适。
-- 在这两种情况下,你可以考虑 [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
来提升性能,除非数据很难被压缩。
+- 对于在机械硬盘上运行的大规模任务而言,`Sort Shuffle` 更为合适。
+
+要在 `Sort Shuffle` 和 `Hash Shuffle` 间切换,
你需要配置这个参数:[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)。
这个参数根据消费者Task的并发选择当前Task使用`Hash Shuffle` 或 `Sort Shuffle`,如果并发小于配置值则使用 `Hash
Shuffle`,否则使用 `Sort Shuffle`。 对于 1.15 以下版本,它的默认值是 `Integer.MAX_VALUE`,这意味着
`Hash Shuffle` 是默认实现。 从 1.15 起,它的默认值是 1,这意味着 `Sort Shuffle` 是默认实现。
+
+## 性能调优
+
+下面这些建议可以帮助你实现更好的性能,这些对于大规模批作业尤其重要:
+
+1. 如果你使用机械硬盘作为存储设备,请总是使用 `Sort Shuffle`,因为这可以极大的提升稳定性和性能。从 1.15 开始,`Sort
Shuffle` 已经成为默认实现,对于 1.14 以及更低版本,你需要通过将
[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)
配置为 1 以手动开启 `Sort Shuffle`。
+2. 对于 `Sort Shuffle` 和 `Hash Shuffle` 两种实现,你都可以考虑开启 [数据压缩]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
除非数据本身无法压缩。从 1.15 开启,数据压缩是默认开启的,对于 1.14 以及更低版本你需要手动开启。
+3. 当使用 `Sort Shuffle` 时, 减少[独占网络缓冲区]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-buffers-per-channel) 并增加[流动网络缓冲区]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) 有利于性能提升。对于 1.14
以及更高版本,建议将 [taskmanager.network.memory.buffers-per-channel]({{< ref
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) 设为
0 并且将 [taskmanager.network.memory.floating-buffers-per-gate]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) 设为一个较大的值 (比如, 4096)。
这有两个主要的好处:1) 首先这解耦了并发与网络内存使用量,对于大规模作业,这降低了遇到 "Insufficient number of network
buffers" 错误的可能性;2) 网络缓冲区可以根据需求在不同的数据通道间共享流动,这可以提高了网络缓冲区的利用率�
�进而可以提高性能。
+4. 增大总的网络内存。目前网络内存的大小是比较保守的。对于大规模作业,为了实现更好的性能,建议将[网络内存比例]({{< ref
"docs/deployment/config" >}}#taskmanager-memory-network-fraction) 增加至至少
0.2。为了使调整生效,你可能需要同时调整[网络内存大小下界]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-min) 以及[网络内存大小上界]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-network-max)。要获取更多信息,你可以参考这个[内存配置文档]({{< ref
"docs/deployment/memory/mem_setup_tm/" >}})。
+5. 增大数据写出内存。像上面提到的那样,对于大规模作业,如果有充足的空闲内存,建议增大[数据写出内存]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers)
大小到至少 (2 * 并发数)。注意:在你增大这个配置后,为避免出现 "Insufficient number of network buffers"
错误,你可能还需要增大总的网络内存大小。
+6. 增大数据读取内存。像上面提到的那样,对于大规模作业,建议增大[数据读取内存]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) 到一个较大的值 (比如,256M
或 512M)。因为这个内存是从框架的非堆内存切分出来的,因此你必须增加相同的内存大小到
[taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-size) 以避免出现直接内存溢出错误。
+
+## Trouble Shooting
+
+尽管十分罕见,下面列举了一些你可能会碰到的异常情况以及对应的处理策略:
+
+| 异常情况 | 处理策略 |
+| :--------- | :------------------ |
+| Insufficient number of network buffers |
这意味着网络内存大小不足以支撑作业运行,你需要增加总的网络内存大小。注意:从 1.15 开始,`Sort Shuffle`
已经成为默认实现,对于一些场景,`Sort Shuffle` 可能比 `Hash Shuffle` 需要更多的网络内存,因此当你的批作业升级到 1.15
以后可能会遇到这个网络内存不足的问题。这种情况下,你只需要增大总的网络内存大小即可。|
+| Too many open files | 这意味着文件句柄不够用了。如果你使用的是 `Hash Shuffle`,请切换到 `Sort
Shuffle`。如果你已经在使用 `Sort Shuffle`,请考虑增大操作系统文件句柄上限并且检查是否是作业代码占用了过多的文件句柄。|
+| Connection reset by peer | 这通常意味着网络不太稳定或者压力较大。其他一些原因,比如上面提到的 SSL
握手超时等也可能会导致这一问题。如果你使用的是 `Hash Shuffle`,请切换到 `Sort Shuffle`。如果你已经在使用 `Sort
Shuffle`,增大[网络连接 backlog]({{< ref "docs/deployment/config"
>}}#taskmanager-network-netty-server-backlog) 可能会有所帮助。|
+| Network connection timeout | 这通常意味着网络不太稳定或者压力较大。增大[网络连接超时时间]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-netty-client-connectTimeoutSec) 或者开启[网络连接重试]({{< ref
"docs/deployment/config" >}}#taskmanager-network-retries) 可能会有所帮助。|
+| Socket read/write timeout | 这通常意味着网络传输速度较慢或者压力较大。增大[网络收发缓冲区]({{< ref
"docs/deployment/config" >}}#taskmanager-network-netty-sendReceiveBufferSize)
大小可能会有所帮助。如果作业运行在 Kubernetes 环境,使用 [host network]({{< ref
"docs/deployment/config" >}}#kubernetes-hostnetwork-enabled) 可能会有所帮助。|
+| Read buffer request timeout | 这个问题只会出现在 `Sort
Shuffle`,它意味着对数据读取缓冲区的激烈竞争。要解决这一问题,你可以增大
[taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) 和
[taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-size). |
Review comment:
可能可以改用中文句号
##########
File path: docs/content/docs/ops/batch/blocking_shuffle.md
##########
@@ -59,31 +59,55 @@ If [SSL]({{< ref "docs/deployment/security/security-ssl"
>}}) is enabled, the `f
The memory usage of `mmap` is not accounted for by configured memory limits,
but some resource frameworks like Yarn will track this memory usage and kill
the container if memory exceeds some threshold.
{{< /hint >}}
-To further improve the performance, for most jobs we also recommend [enabling
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
unless the data is hard to compress.
-
`Hash Shuffle` works well for small scale jobs with SSD, but it also have some
disadvantages:
1. If the job scale is large, it might create too many files, and it requires
a large write buffer to write these files at the same time.
2. On HDD, when multiple downstream tasks fetch their data simultaneously, it
might incur the issue of random IO.
## Sort Shuffle
-`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, sort shuffle writes only one file for each
result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, sort shuffle can achieve better
performance than hash shuffle, especially on HDD. Additionally, `sort shuffle`
uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues.apach
e.org/jira/browse/FLINK-19614) for more details about sort shuffle.
+`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for
each result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, `Sort Shuffle` can achieve
better performance than `Hash Shuffle`, especially on HDD. Additionally, `Sort
Shuffle` uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues
.apache.org/jira/browse/FLINK-19614) for more details about `Sort Shuffle`.
-There are several config options that might need adjustment when using sort
blocking shuffle:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option
for shuffle data compression. it is suggested to enable it for most jobs except
that the compression ratio of your data is low. Defaults to false for 1.14 and
lower, and true for 1.15 and higher.
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
Config option to enable sort shuffle depending on the parallelism of downstream
tasks. If parallelism is lower than the configured value, `hash shuffle` will
be used, otherwise `sort shuffle` will be used. For versions lower than 1.15,
its default value is `Integer.MAX_VALUE`, so hash shuffle will be always used
by default. Since 1.15, its default value is 1, so sort shuffle will be always
used by default.
+Here are some config options that might need adjustment when using sort
blocking shuffle:
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
Config option to control data writing buffer size. For large scale jobs, you
may need to increase this value, usually, several hundreds of megabytes memory
is enough.
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to
control data reading buffer size. For large scale jobs, you may need to
increase this value, usually, several hundreds of megabytes memory is enough.
{{< hint info >}}
-Currently `sort shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
+Currently `Sort Shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
{{< /hint >}}
## Choices of Blocking Shuffle
As a summary,
- For small scale jobs running on SSD, both implementation should work.
-- For large scale jobs or for jobs running on HDD, `sort shuffle` should be
more suitable.
-- In both case, you may consider [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress.
+- For large scale jobs or for jobs running on HDD, `Sort Shuffle` should be
more suitable.
+
+To switch between `Sort Shuffle` and `Hash Shuffle`, you need to adjust this
config option: [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism).
It controls which shuffle implementation to use based on the parallelism of
downstream tasks, if the parallelism is lower than the configured value, `Hash
Shuffle` will be used, otherwise `Sort Shuffle` will be used. For versions
lower than 1.15, its default value is `Integer.MAX_VALUE`, so `Hash Shuffle`
will be always used by default. Since 1.15, its default value is 1, so `Sort
Shuffle` will be always used by default.
Review comment:
will be always used by default -> will be used by default ?
##########
File path: docs/content.zh/docs/ops/batch/blocking_shuffle.md
##########
@@ -56,34 +56,59 @@ Flink [DataStream API]({{< ref
"docs/dev/datastream/execution_mode" >}}) 和 [Ta
{{< /hint >}}
{{< hint info >}}
-`mmap`使用的内存不计算进已有配置的内存限制中,但是一些资源管理框架比如 yarn 将追踪这块内存使用,并且如果容器使用内存超过阈值会被杀掉。
+`mmap`使用的内存不计算进已有配置的内存限制中,但是一些资源管理框架比如 YARN 将追踪这块内存使用,并且如果容器使用内存超过阈值会被杀掉。
{{< /hint >}}
-为了进一步的提升性能,对于绝大多数的任务我们推荐 [启用压缩]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
,除非数据很难被压缩。
-
`Hash Shuffle` 在小规模运行在固态硬盘的任务情况下效果显著,但是依旧有一些问题:
1. 如果任务的规模庞大将会创建很多文件,并且要求同时对这些文件进行大量的写操作。
2. 在机械硬盘情况下,当大量的下游任务同时读取数据,可能会导致随机读写问题。
## Sort Shuffle
-`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash
Shuffle`,sort shuffle
将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下
sort shuffle 可以获得比 hash shuffle 更好的性能。另外,`sort shuffle` 使用额外管理的内存作为读数据缓存并不依赖
`sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref
"docs/deployment/security/security-ssl" >}})。关于 sort shuffle 的更多细节请参考
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和
[FLINK-19614](https://issues.apache.org/jira/browse/FLINK-19614)。
+`Sort Shuffle` 是 1.13 版中引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 `Hash
Shuffle`,`Sort Shuffle`
将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并共享给所有的读请求。因此,集群使用更少的资源。例如:节点和文件描述符以提升稳定性。此外,通过写更少的文件和尽可能线性的读取文件,尤其是在使用机械硬盘情况下
`Sort Shuffle` 可以获得比 `Hash Shuffle` 更好的性能。另外,`Sort Shuffle`
使用额外管理的内存作为读数据缓存并不依赖 `sendfile` 或 `mmap` 机制,因此也适用于 [SSL]({{< ref
"docs/deployment/security/security-ssl" >}})。关于 `Sort Shuffle` 的更多细节请参考
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) 和
[FLINK-19614](https://issues.apache.org/jira/browse/FLINK-19614)。
当使用sort blocking shuffle的时候有些配置需要适配:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): 配置该选项以启用 shuffle
data 压缩,大部分任务建议开启除非你的数据压缩比率比较低。对于 1.14 以及更低的版本默认为 false,1.15 版本起默认为 true。
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
根据下游任务的并行度配置该选项以启用 sort shuffle。如果并行度低于设置的值,则使用 `hash shuffle`,否则 `sort
shuffle`。对于 1.15 以下的版本,它的默认值是 `Integer.MAX_VALUE`,所以默认情况下总是会使用 `hash shuffle`。从
1.15 开始,它的默认值是 1, 所以默认情况下总是会使用 `sort shuffle`。
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
配置该选项以控制数据写缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size):
配置该选项以控制数据读取缓存大小。对于大规模的任务而言,你可能需要调大这个值,正常几百兆内存就足够了。
{{< hint info >}}
-目前 `sort shuffle` 只通过分区索引来排序而不是记录本身,也就是说 `sort` 只是被当成数据聚类算法使用。
+目前 `Sort Shuffle` 只通过分区索引来排序而不是记录本身,也就是说 `sort` 只是被当成数据聚类算法使用。
{{< /hint >}}
## 如何选择 Blocking Shuffle
总的来说,
- 对于在固态硬盘上运行的小规模任务而言,两者都可以。
-- 对于在机械硬盘上运行的大规模任务而言,`sort shuffle` 更为合适。
-- 在这两种情况下,你可以考虑 [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
来提升性能,除非数据很难被压缩。
+- 对于在机械硬盘上运行的大规模任务而言,`Sort Shuffle` 更为合适。
+
+要在 `Sort Shuffle` 和 `Hash Shuffle` 间切换,
你需要配置这个参数:[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)。
这个参数根据消费者Task的并发选择当前Task使用`Hash Shuffle` 或 `Sort Shuffle`,如果并发小于配置值则使用 `Hash
Shuffle`,否则使用 `Sort Shuffle`。 对于 1.15 以下版本,它的默认值是 `Integer.MAX_VALUE`,这意味着
`Hash Shuffle` 是默认实现。 从 1.15 起,它的默认值是 1,这意味着 `Sort Shuffle` 是默认实现。
+
+## 性能调优
+
+下面这些建议可以帮助你实现更好的性能,这些对于大规模批作业尤其重要:
+
+1. 如果你使用机械硬盘作为存储设备,请总是使用 `Sort Shuffle`,因为这可以极大的提升稳定性和性能。从 1.15 开始,`Sort
Shuffle` 已经成为默认实现,对于 1.14 以及更低版本,你需要通过将
[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)
配置为 1 以手动开启 `Sort Shuffle`。
+2. 对于 `Sort Shuffle` 和 `Hash Shuffle` 两种实现,你都可以考虑开启 [数据压缩]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
除非数据本身无法压缩。从 1.15 开启,数据压缩是默认开启的,对于 1.14 以及更低版本你需要手动开启。
+3. 当使用 `Sort Shuffle` 时, 减少[独占网络缓冲区]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-buffers-per-channel) 并增加[流动网络缓冲区]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) 有利于性能提升。对于 1.14
以及更高版本,建议将 [taskmanager.network.memory.buffers-per-channel]({{< ref
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) 设为
0 并且将 [taskmanager.network.memory.floating-buffers-per-gate]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) 设为一个较大的值 (比如, 4096)。
这有两个主要的好处:1) 首先这解耦了并发与网络内存使用量,对于大规模作业,这降低了遇到 "Insufficient number of network
buffers" 错误的可能性;2) 网络缓冲区可以根据需求在不同的数据通道间共享流动,这可以提高了网络缓冲区的利用率�
�进而可以提高性能。
+4. 增大总的网络内存。目前网络内存的大小是比较保守的。对于大规模作业,为了实现更好的性能,建议将[网络内存比例]({{< ref
"docs/deployment/config" >}}#taskmanager-memory-network-fraction) 增加至至少
0.2。为了使调整生效,你可能需要同时调整[网络内存大小下界]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-min) 以及[网络内存大小上界]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-network-max)。要获取更多信息,你可以参考这个[内存配置文档]({{< ref
"docs/deployment/memory/mem_setup_tm/" >}})。
+5. 增大数据写出内存。像上面提到的那样,对于大规模作业,如果有充足的空闲内存,建议增大[数据写出内存]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers)
大小到至少 (2 * 并发数)。注意:在你增大这个配置后,为避免出现 "Insufficient number of network buffers"
错误,你可能还需要增大总的网络内存大小。
+6. 增大数据读取内存。像上面提到的那样,对于大规模作业,建议增大[数据读取内存]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) 到一个较大的值 (比如,256M
或 512M)。因为这个内存是从框架的非堆内存切分出来的,因此你必须增加相同的内存大小到
[taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-size) 以避免出现直接内存溢出错误。
+
+## Trouble Shooting
+
+尽管十分罕见,下面列举了一些你可能会碰到的异常情况以及对应的处理策略:
+
+| 异常情况 | 处理策略 |
+| :--------- | :------------------ |
+| Insufficient number of network buffers |
这意味着网络内存大小不足以支撑作业运行,你需要增加总的网络内存大小。注意:从 1.15 开始,`Sort Shuffle`
已经成为默认实现,对于一些场景,`Sort Shuffle` 可能比 `Hash Shuffle` 需要更多的网络内存,因此当你的批作业升级到 1.15
以后可能会遇到这个网络内存不足的问题。这种情况下,你只需要增大总的网络内存大小即可。|
+| Too many open files | 这意味着文件句柄不够用了。如果你使用的是 `Hash Shuffle`,请切换到 `Sort
Shuffle`。如果你已经在使用 `Sort Shuffle`,请考虑增大操作系统文件句柄上限并且检查是否是作业代码占用了过多的文件句柄。|
+| Connection reset by peer | 这通常意味着网络不太稳定或者压力较大。其他一些原因,比如上面提到的 SSL
握手超时等也可能会导致这一问题。如果你使用的是 `Hash Shuffle`,请切换到 `Sort Shuffle`。如果你已经在使用 `Sort
Shuffle`,增大[网络连接 backlog]({{< ref "docs/deployment/config"
>}}#taskmanager-network-netty-server-backlog) 可能会有所帮助。|
+| Network connection timeout | 这通常意味着网络不太稳定或者压力较大。增大[网络连接超时时间]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-netty-client-connectTimeoutSec) 或者开启[网络连接重试]({{< ref
"docs/deployment/config" >}}#taskmanager-network-retries) 可能会有所帮助。|
+| Socket read/write timeout | 这通常意味着网络传输速度较慢或者压力较大。增大[网络收发缓冲区]({{< ref
"docs/deployment/config" >}}#taskmanager-network-netty-sendReceiveBufferSize)
大小可能会有所帮助。如果作业运行在 Kubernetes 环境,使用 [host network]({{< ref
"docs/deployment/config" >}}#kubernetes-hostnetwork-enabled) 可能会有所帮助。|
+| Read buffer request timeout | 这个问题只会出现在 `Sort
Shuffle`,它意味着对数据读取缓冲区的激烈竞争。要解决这一问题,你可以增大
[taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) 和
[taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-size). |
+| No space left on device | 这通常意味着磁盘存储空间或者 inodes 被耗尽。你可以考虑扩展磁盘存储空间或者做一些数据清理。|
+| Out of memory error | 如果你使用的是 `Hash Shuffle`,请切换到 `Sort Shuffle`。如果你已经在使用
`Sort Shuffle` 并且遵循了上面章节的建议,你可以考虑增大相应的内存大小。对于堆上内存,你可以增大
[taskmanager.memory.task.heap.size]({{< ref "docs/deployment/config"
>}}#ttaskmanager-memory-task-heap-size),对于直接内存,你可以增大
[taskmanager.memory.task.off-heap.size]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-task-off-heap-size). |
Review comment:
可能可以改用中文句号
##########
File path: docs/content/docs/ops/batch/blocking_shuffle.md
##########
@@ -59,31 +59,55 @@ If [SSL]({{< ref "docs/deployment/security/security-ssl"
>}}) is enabled, the `f
The memory usage of `mmap` is not accounted for by configured memory limits,
but some resource frameworks like Yarn will track this memory usage and kill
the container if memory exceeds some threshold.
{{< /hint >}}
-To further improve the performance, for most jobs we also recommend [enabling
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
unless the data is hard to compress.
-
`Hash Shuffle` works well for small scale jobs with SSD, but it also have some
disadvantages:
1. If the job scale is large, it might create too many files, and it requires
a large write buffer to write these files at the same time.
2. On HDD, when multiple downstream tasks fetch their data simultaneously, it
might incur the issue of random IO.
## Sort Shuffle
-`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, sort shuffle writes only one file for each
result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, sort shuffle can achieve better
performance than hash shuffle, especially on HDD. Additionally, `sort shuffle`
uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues.apach
e.org/jira/browse/FLINK-19614) for more details about sort shuffle.
+`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for
each result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, `Sort Shuffle` can achieve
better performance than `Hash Shuffle`, especially on HDD. Additionally, `Sort
Shuffle` uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues
.apache.org/jira/browse/FLINK-19614) for more details about `Sort Shuffle`.
-There are several config options that might need adjustment when using sort
blocking shuffle:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option
for shuffle data compression. it is suggested to enable it for most jobs except
that the compression ratio of your data is low. Defaults to false for 1.14 and
lower, and true for 1.15 and higher.
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
Config option to enable sort shuffle depending on the parallelism of downstream
tasks. If parallelism is lower than the configured value, `hash shuffle` will
be used, otherwise `sort shuffle` will be used. For versions lower than 1.15,
its default value is `Integer.MAX_VALUE`, so hash shuffle will be always used
by default. Since 1.15, its default value is 1, so sort shuffle will be always
used by default.
+Here are some config options that might need adjustment when using sort
blocking shuffle:
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
Config option to control data writing buffer size. For large scale jobs, you
may need to increase this value, usually, several hundreds of megabytes memory
is enough.
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to
control data reading buffer size. For large scale jobs, you may need to
increase this value, usually, several hundreds of megabytes memory is enough.
{{< hint info >}}
-Currently `sort shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
+Currently `Sort Shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
{{< /hint >}}
## Choices of Blocking Shuffle
As a summary,
- For small scale jobs running on SSD, both implementation should work.
-- For large scale jobs or for jobs running on HDD, `sort shuffle` should be
more suitable.
-- In both case, you may consider [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress.
+- For large scale jobs or for jobs running on HDD, `Sort Shuffle` should be
more suitable.
+
+To switch between `Sort Shuffle` and `Hash Shuffle`, you need to adjust this
config option: [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism).
It controls which shuffle implementation to use based on the parallelism of
downstream tasks, if the parallelism is lower than the configured value, `Hash
Shuffle` will be used, otherwise `Sort Shuffle` will be used. For versions
lower than 1.15, its default value is `Integer.MAX_VALUE`, so `Hash Shuffle`
will be always used by default. Since 1.15, its default value is 1, so `Sort
Shuffle` will be always used by default.
+
+## Performance Tuning
+
+The following guidelins may help you to achieve better performance especially
for large scale batch jobs:
+
+1. Always use `Sort Shuffle` on spinning disk because `Sort Shuffle` can
largely improve stability and IO performance. Since 1.15, `Sort Shuffle` is
already the default blocking shuffle implementation, for 1.14 and lower
version, you need to enable it manually by setting
[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)
to 1.
+2. For both blocking shuffle implementations, you may consider [enabling data
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress. Since 1.15,
data compression is already enabled by default, for 1.14 and lower version, you
need to enable it manually.
+3. When `Sort Shuffle` is used, decrease the number of [exclusive buffers per
channel]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-buffers-per-channel) and increase the number of
[floating buffers per gate]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) can help. For 1.14
and higher version, it is suggested to set
[taskmanager.network.memory.buffers-per-channel]({{< ref
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) to
0 and set [taskmanager.network.memory.floating-buffers-per-gate]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) to a larger value
(for example, 4096). This setting has two main advantages: 1) It decouples the
network memory consumption from parallelism so for large scale jobs, the
possibility of "Insufficient number of network buffers" error can be decreased;
2) Networker buffers are distributed among different channels according to
needs which can improve the network buffer utilization, thus can improve
performance.
+4. Increase the total size of network memory. Currently, the default network
memory size is pretty modest. For large scale jobs, it's suggested to increase
the total [network memory fraction]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-fraction) to at least 0.2 to achieve better
performance. At the same time, you may also need to adjust the [lower
bound]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and
[upper bound]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max) of the network memory size, please refer to
the [memory configuration document]({{< ref
"docs/deployment/memory/mem_setup_tm/" >}}) for more information.
+5. Increase the memory size for shuffle data write. As mentioned in the above
section, for large scale jobs, it's suggested to increase the number of [write
buffers per result partition]({{< ref "docs/deployment/config"
>}}#taskmanager-network-sort-shuffle-min-buffers) to at least (2 * parallelism)
if you have enough memory. Note that you may also need to increase the total
size of network memory to avoid the "Insufficient number of network buffers"
error after you increase this config value.
+6. Increase the memory size for shuffle data read. As mentioned in the above
section, for large scale jobs, it's suggested to increase the size of the
[shared read memory]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) to a larger value
(for example, 256M or 512M). Because this memory is cut from the framework
off-heap memory, you must increase
[taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-size) by the same size to avoid the
direct memory OOM error.
+
+## Trouble Shooting
+
+Here are some exceptions you may encounter (rarely) and the corresponding
solutions that may help:
+
+| Exceptions | Potential Solutions |
+| :--------- | :------------------ |
+| Insufficient number of network buffers | This means the amount of network
memory is not enough to run the target job and you need to increase the total
network memory size. Note that since 1.15, `Sort Shuffle` has become the
default blocking shuffle implementation and for some cases, it may need more
network memory than before, which means there is a small possibility that your
batch jobs may suffer this issue after upgrading to 1.15. If this is the case,
you just need to increase the total network memory size. |
+| Too many open files | This means that there is no enough file descriptors
left. If you are using `Hash Shuffle`, please switch to `Sort Shuffle`. If you
are already using `Sort Shuffle`, please consider to increase the system limit
for file descriptor and check if the user code consumes too many file
descriptors. |
+| Connection reset by peer | This usually means that the network if unstable
or or under heavy burden. Other issues like SSL handshake timeout mentioned
above may also cause this problem. If you are using `Hash Shuffle`, please
switch to `Sort Shuffle`. If you are already using `Sort Shuffle`, increasing
the [network backlog]({{< ref "docs/deployment/config"
>}}#taskmanager-network-netty-server-backlog) may help. |
+| Network connection timeout | This usually means that the network if unstable
or or under heavy burden and increasing the [network connection timeout]({{<
ref "docs/deployment/config"
>}}#taskmanager-network-netty-client-connectTimeoutSec) or enable [connection
retry]({{< ref "docs/deployment/config" >}}#taskmanager-network-retries) may
help. |
+| Socket read/write timeout | This may indicate that the network is slow or
under heavy burden and increase the [network send/receive buffer size]({{< ref
"docs/deployment/config" >}}#taskmanager-network-netty-sendReceiveBufferSize)
may help. If the job is running in Kubernetes environment, using [host
network]({{< ref "docs/deployment/config" >}}#kubernetes-hostnetwork-enabled)
may also help. |
Review comment:
increase the -> increasing the
##########
File path: docs/content/docs/ops/batch/blocking_shuffle.md
##########
@@ -59,31 +59,55 @@ If [SSL]({{< ref "docs/deployment/security/security-ssl"
>}}) is enabled, the `f
The memory usage of `mmap` is not accounted for by configured memory limits,
but some resource frameworks like Yarn will track this memory usage and kill
the container if memory exceeds some threshold.
{{< /hint >}}
-To further improve the performance, for most jobs we also recommend [enabling
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
unless the data is hard to compress.
-
`Hash Shuffle` works well for small scale jobs with SSD, but it also have some
disadvantages:
1. If the job scale is large, it might create too many files, and it requires
a large write buffer to write these files at the same time.
2. On HDD, when multiple downstream tasks fetch their data simultaneously, it
might incur the issue of random IO.
## Sort Shuffle
-`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, sort shuffle writes only one file for each
result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, sort shuffle can achieve better
performance than hash shuffle, especially on HDD. Additionally, `sort shuffle`
uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues.apach
e.org/jira/browse/FLINK-19614) for more details about sort shuffle.
+`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for
each result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, `Sort Shuffle` can achieve
better performance than `Hash Shuffle`, especially on HDD. Additionally, `Sort
Shuffle` uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues
.apache.org/jira/browse/FLINK-19614) for more details about `Sort Shuffle`.
-There are several config options that might need adjustment when using sort
blocking shuffle:
-- [taskmanager.network.blocking-shuffle.compression.enabled]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-blocking-shuffle-compression-enabled): Config option
for shuffle data compression. it is suggested to enable it for most jobs except
that the compression ratio of your data is low. Defaults to false for 1.14 and
lower, and true for 1.15 and higher.
-- [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism):
Config option to enable sort shuffle depending on the parallelism of downstream
tasks. If parallelism is lower than the configured value, `hash shuffle` will
be used, otherwise `sort shuffle` will be used. For versions lower than 1.15,
its default value is `Integer.MAX_VALUE`, so hash shuffle will be always used
by default. Since 1.15, its default value is 1, so sort shuffle will be always
used by default.
+Here are some config options that might need adjustment when using sort
blocking shuffle:
- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
Config option to control data writing buffer size. For large scale jobs, you
may need to increase this value, usually, several hundreds of megabytes memory
is enough.
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to
control data reading buffer size. For large scale jobs, you may need to
increase this value, usually, several hundreds of megabytes memory is enough.
{{< hint info >}}
-Currently `sort shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
+Currently `Sort Shuffle` only sort records by partition index instead of the
records themselves, that is to say, the `sort` is only used as a data
clustering algorithm.
{{< /hint >}}
## Choices of Blocking Shuffle
As a summary,
- For small scale jobs running on SSD, both implementation should work.
-- For large scale jobs or for jobs running on HDD, `sort shuffle` should be
more suitable.
-- In both case, you may consider [enabling compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress.
+- For large scale jobs or for jobs running on HDD, `Sort Shuffle` should be
more suitable.
+
+To switch between `Sort Shuffle` and `Hash Shuffle`, you need to adjust this
config option: [taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism).
It controls which shuffle implementation to use based on the parallelism of
downstream tasks, if the parallelism is lower than the configured value, `Hash
Shuffle` will be used, otherwise `Sort Shuffle` will be used. For versions
lower than 1.15, its default value is `Integer.MAX_VALUE`, so `Hash Shuffle`
will be always used by default. Since 1.15, its default value is 1, so `Sort
Shuffle` will be always used by default.
+
+## Performance Tuning
+
+The following guidelins may help you to achieve better performance especially
for large scale batch jobs:
+
+1. Always use `Sort Shuffle` on spinning disk because `Sort Shuffle` can
largely improve stability and IO performance. Since 1.15, `Sort Shuffle` is
already the default blocking shuffle implementation, for 1.14 and lower
version, you need to enable it manually by setting
[taskmanager.network.sort-shuffle.min-parallelism]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-parallelism)
to 1.
+2. For both blocking shuffle implementations, you may consider [enabling data
compression]({{< ref
"docs/deployment/config">}}#taskmanager-network-blocking-shuffle-compression-enabled)
to improve the performance unless the data is hard to compress. Since 1.15,
data compression is already enabled by default, for 1.14 and lower version, you
need to enable it manually.
+3. When `Sort Shuffle` is used, decrease the number of [exclusive buffers per
channel]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-buffers-per-channel) and increase the number of
[floating buffers per gate]({{< ref "docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) can help. For 1.14
and higher version, it is suggested to set
[taskmanager.network.memory.buffers-per-channel]({{< ref
"docs/deployment/config" >}}#taskmanager-network-memory-buffers-per-channel) to
0 and set [taskmanager.network.memory.floating-buffers-per-gate]({{< ref
"docs/deployment/config"
>}}#taskmanager-network-memory-floating-buffers-per-gate) to a larger value
(for example, 4096). This setting has two main advantages: 1) It decouples the
network memory consumption from parallelism so for large scale jobs, the
possibility of "Insufficient number of network buffers" error can be decreased;
2) Networker buffers are distributed among different channels according to
needs which can improve the network buffer utilization, thus can improve
performance.
+4. Increase the total size of network memory. Currently, the default network
memory size is pretty modest. For large scale jobs, it's suggested to increase
the total [network memory fraction]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-fraction) to at least 0.2 to achieve better
performance. At the same time, you may also need to adjust the [lower
bound]({{< ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and
[upper bound]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max) of the network memory size, please refer to
the [memory configuration document]({{< ref
"docs/deployment/memory/mem_setup_tm/" >}}) for more information.
+5. Increase the memory size for shuffle data write. As mentioned in the above
section, for large scale jobs, it's suggested to increase the number of [write
buffers per result partition]({{< ref "docs/deployment/config"
>}}#taskmanager-network-sort-shuffle-min-buffers) to at least (2 * parallelism)
if you have enough memory. Note that you may also need to increase the total
size of network memory to avoid the "Insufficient number of network buffers"
error after you increase this config value.
+6. Increase the memory size for shuffle data read. As mentioned in the above
section, for large scale jobs, it's suggested to increase the size of the
[shared read memory]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size) to a larger value
(for example, 256M or 512M). Because this memory is cut from the framework
off-heap memory, you must increase
[taskmanager.memory.framework.off-heap.size]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-size) by the same size to avoid the
direct memory OOM error.
+
+## Trouble Shooting
+
+Here are some exceptions you may encounter (rarely) and the corresponding
solutions that may help:
+
+| Exceptions | Potential Solutions |
+| :--------- | :------------------ |
+| Insufficient number of network buffers | This means the amount of network
memory is not enough to run the target job and you need to increase the total
network memory size. Note that since 1.15, `Sort Shuffle` has become the
default blocking shuffle implementation and for some cases, it may need more
network memory than before, which means there is a small possibility that your
batch jobs may suffer this issue after upgrading to 1.15. If this is the case,
you just need to increase the total network memory size. |
+| Too many open files | This means that there is no enough file descriptors
left. If you are using `Hash Shuffle`, please switch to `Sort Shuffle`. If you
are already using `Sort Shuffle`, please consider to increase the system limit
for file descriptor and check if the user code consumes too many file
descriptors. |
+| Connection reset by peer | This usually means that the network if unstable
or or under heavy burden. Other issues like SSL handshake timeout mentioned
above may also cause this problem. If you are using `Hash Shuffle`, please
switch to `Sort Shuffle`. If you are already using `Sort Shuffle`, increasing
the [network backlog]({{< ref "docs/deployment/config"
>}}#taskmanager-network-netty-server-backlog) may help. |
+| Network connection timeout | This usually means that the network if unstable
or or under heavy burden and increasing the [network connection timeout]({{<
ref "docs/deployment/config"
>}}#taskmanager-network-netty-client-connectTimeoutSec) or enable [connection
retry]({{< ref "docs/deployment/config" >}}#taskmanager-network-retries) may
help. |
Review comment:
if unstable or or under heavy burden -> is unstable or under heavy burden
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]