xintongsong commented on code in PR #21843:
URL: https://github.com/apache/flink/pull/21843#discussion_r1099806797
##########
docs/content/docs/deployment/memory/network_mem_tuning.md:
##########
@@ -105,12 +105,14 @@ The size of the buffer can be configured by setting
`taskmanager.memory.segment-
### Input network buffers
-Buffers in the input channel are divided into exclusive and floating buffers.
Exclusive buffers can be used by only one particular channel. A channel can
request additional floating buffers from a buffer pool shared across all
channels belonging to the given input gate. The remaining floating buffers are
optional and are acquired only if there are enough resources available.
+The number of buffers in one pool calculated according to the above formula
can be divided into two parts. The part below this configured value
`taskmanager.network.memory.read-buffer.required-per-gate.max` is required. The
excess part buffers, if any, is optional. A task will fail if the required
buffers cannot be obtained in runtime. A task will not fail due to not
obtaining optional buffers, but may suffer a performance reduction. If not
explicitly configured, the default value is Integer.MAX_VALUE for streaming
workloads, and 1000 for batch workloads.
+
+Generally, `taskmanager.network.memory.read-buffer.required-per-gate.max` need
not configure, and the default value can fulfill most scenes. Setting the
option to a lower value, at least 1, can avoid the "insufficient number of
network buffers" exception as much as possible, but may reduce performance.
Setting the option as Integer.MAX_VALUE to disable the required buffer limit,
When disabled, more read buffers may be required, which is good for
performance, but may lead to more easily throwing insufficient network buffers
exceptions.
In the initialization phase:
-- Flink will try to acquire the configured amount of exclusive buffers for
each channel
-- all exclusive buffers must be fulfilled or the job will fail with an
exception
-- a single floating buffer has to be allocated for Flink to be able to make
progress
+- Get the effectively required buffers, determined by the min value between
the number of buffers calculated according to the above formula and the
configured required buffers by the option
`taskmanager.network.memory.read-buffer.required-per-gate.max`.
+- When the total network buffers are less than the effectively required
buffers, an exception will be thrown.
+- When the memory is sufficient, try to allocate the configured number of
exclusive buffers to each channel. If the memory is insufficient, gradually
reduce the number of exclusive buffers for each channel until all buffers are
floating.
Review Comment:
These can be removed.
##########
docs/content/docs/deployment/memory/mem_setup_tm.md:
##########
@@ -147,7 +147,7 @@ which affect the size of the respective components:
| [Managed memory](#managed-memory) |
[`taskmanager.memory.managed.size`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-managed-size) <br/>
[`taskmanager.memory.managed.fraction`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-managed-fraction)
|
Native memory managed by Flink, reserved for sorting, hash tables, caching of
intermediate results and RocksDB state backend
|
| [Framework Off-heap Memory](#framework-memory) |
[`taskmanager.memory.framework.off-heap.size`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-size)
| [Off-heap direct (or native)
memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink
framework (advanced option)
|
| [Task Off-heap Memory](#configure-off-heap-memory-direct-or-native)|
[`taskmanager.memory.task.off-heap.size`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-task-off-heap-size)
| [Off-heap direct (or native)
memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink
application to run operators
|
-| Network Memory |
[`taskmanager.memory.network.min`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-min) <br/>
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max) <br/>
[`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-fraction) | Direct
memory reserved for data record exchange between tasks (e.g. buffering for the
transfer over the network), is a [capped fractionated component]({{< ref
"docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the
[total Flink memory]({{< ref "docs/deployment/memory/mem_setup"
>}}#configure-total-memory). This memory is used for allocation of [network
buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}) |
+| Network Memory |
[`taskmanager.memory.network.min`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-min) <br/>
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max) <br/>
[`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-fraction) | Direct
memory reserved for data record exchange between tasks (e.g. buffering for the
transfer over the network), is a [capped fractionated component]({{< ref
"docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the
[total Flink memory]({{< ref "docs/deployment/memory/mem_setup"
>}}#configure-total-memory). This memory is used for allocation of [network
buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}). Since 1.17,
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max)
of the network memory size is set as Long.MAX_VALUE, which means the maximum
network memory size is no longer limited by default. If you still want to limit
the maximum network memory size, set the option to a new value, for example,
1g. |
Review Comment:
This change is not necessary. The documentation corresponds to Flink 1.17.
##########
docs/content/docs/deployment/memory/network_mem_tuning.md:
##########
@@ -105,12 +105,14 @@ The size of the buffer can be configured by setting
`taskmanager.memory.segment-
### Input network buffers
-Buffers in the input channel are divided into exclusive and floating buffers.
Exclusive buffers can be used by only one particular channel. A channel can
request additional floating buffers from a buffer pool shared across all
channels belonging to the given input gate. The remaining floating buffers are
optional and are acquired only if there are enough resources available.
+The number of buffers in one pool calculated according to the above formula
can be divided into two parts. The part below this configured value
`taskmanager.network.memory.read-buffer.required-per-gate.max` is required. The
excess part buffers, if any, is optional. A task will fail if the required
buffers cannot be obtained in runtime. A task will not fail due to not
obtaining optional buffers, but may suffer a performance reduction. If not
explicitly configured, the default value is Integer.MAX_VALUE for streaming
workloads, and 1000 for batch workloads.
+
+Generally, `taskmanager.network.memory.read-buffer.required-per-gate.max` need
not configure, and the default value can fulfill most scenes. Setting the
option to a lower value, at least 1, can avoid the "insufficient number of
network buffers" exception as much as possible, but may reduce performance.
Setting the option as Integer.MAX_VALUE to disable the required buffer limit,
When disabled, more read buffers may be required, which is good for
performance, but may lead to more easily throwing insufficient network buffers
exceptions.
Review Comment:
How do users know when to tune this config?
##########
docs/content/docs/deployment/memory/mem_setup_tm.md:
##########
@@ -147,7 +147,7 @@ which affect the size of the respective components:
| [Managed memory](#managed-memory) |
[`taskmanager.memory.managed.size`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-managed-size) <br/>
[`taskmanager.memory.managed.fraction`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-managed-fraction)
|
Native memory managed by Flink, reserved for sorting, hash tables, caching of
intermediate results and RocksDB state backend
|
| [Framework Off-heap Memory](#framework-memory) |
[`taskmanager.memory.framework.off-heap.size`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-size)
| [Off-heap direct (or native)
memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink
framework (advanced option)
|
| [Task Off-heap Memory](#configure-off-heap-memory-direct-or-native)|
[`taskmanager.memory.task.off-heap.size`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-task-off-heap-size)
| [Off-heap direct (or native)
memory](#configure-off-heap-memory-direct-or-native) dedicated to Flink
application to run operators
|
-| Network Memory |
[`taskmanager.memory.network.min`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-min) <br/>
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max) <br/>
[`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-fraction) | Direct
memory reserved for data record exchange between tasks (e.g. buffering for the
transfer over the network), is a [capped fractionated component]({{< ref
"docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the
[total Flink memory]({{< ref "docs/deployment/memory/mem_setup"
>}}#configure-total-memory). This memory is used for allocation of [network
buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}) |
+| Network Memory |
[`taskmanager.memory.network.min`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-min) <br/>
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max) <br/>
[`taskmanager.memory.network.fraction`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-fraction) | Direct
memory reserved for data record exchange between tasks (e.g. buffering for the
transfer over the network), is a [capped fractionated component]({{< ref
"docs/deployment/memory/mem_setup" >}}#capped-fractionated-components) of the
[total Flink memory]({{< ref "docs/deployment/memory/mem_setup"
>}}#configure-total-memory). This memory is used for allocation of [network
buffers]({{< ref "docs/deployment/memory/network_mem_tuning" >}}). Since 1.17,
[`taskmanager.memory.network.max`]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max)
of the network memory size is set as Long.MAX_VALUE, which means the maximum
network memory size is no longer limited by default. If you still want to limit
the maximum network memory size, set the option to a new value, for example,
1g. |
Review Comment:
However, I do find the default value `Long#MAX_VALUE` is displayed as
`9223372036854775807`. It should be override to `infinite`. See
`CleanupOptions#CLEANUP_STRATEGY_FIXED_DELAY_ATTEMPTS` for an example.
There are other options with the same problem. We can add a hotfix commit
for all of them.
##########
docs/content/docs/ops/batch/batch_shuffle.md:
##########
@@ -76,8 +76,9 @@ The memory usage of `mmap` is not accounted for by configured
memory limits, but
`Sort Shuffle` is another blocking shuffle implementation introduced in
version 1.13 and it becomes the default blocking shuffle implementation in
1.15. Different from `Hash Shuffle`, `Sort Shuffle` writes only one file for
each result partition. When the result partition is read by multiple downstream
tasks concurrently, the data file is opened only once and shared by all
readers. As a result, the cluster uses fewer resources like inode and file
descriptors, which improves stability. Furthermore, by writing fewer files and
making a best effort to read data sequentially, `Sort Shuffle` can achieve
better performance than `Hash Shuffle`, especially on HDD. Additionally, `Sort
Shuffle` uses extra managed memory as data reading buffer and does not rely on
`sendfile` or `mmap` mechanism, thus it also works well with [SSL]({{< ref
"docs/deployment/security/security-ssl" >}}). Please refer to
[FLINK-19582](https://issues.apache.org/jira/browse/FLINK-19582) and
[FLINK-19614](https://issues
.apache.org/jira/browse/FLINK-19614) for more details about `Sort Shuffle`.
Here are some config options that might need adjustment when using sort
blocking shuffle:
-- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
Config option to control data writing buffer size. For large scale jobs, you
may need to increase this value, usually, several hundreds of megabytes memory
is enough. Because this memory is allocated from network memory, to increase
this value, you may also need to increase the total network memory by adjusting
[taskmanager.memory.network.fraction]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-fraction), [taskmanager.memory.network.min]({{<
ref "docs/deployment/config" >}}#taskmanager-memory-network-min) and
[taskmanager.memory.network.max]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max) to avoid the potential "Insufficient number
of network buffers" error.
+- [taskmanager.network.sort-shuffle.min-buffers]({{< ref
"docs/deployment/config" >}}#taskmanager-network-sort-shuffle-min-buffers):
Config option to control data writing buffer size. For large scale jobs, you
may need to increase this value, usually, several hundreds of megabytes memory
is enough. Because this memory is allocated from network memory, to increase
this value, you may also need to increase the total network memory by adjusting
[taskmanager.memory.network.fraction]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-fraction), [taskmanager.memory.network.min]({{<
ref "docs/deployment/config" >}}#taskmanager-memory-network-min) to avoid the
potential "Insufficient number of network buffers" error.
- [taskmanager.memory.framework.off-heap.batch-shuffle.size]({{< ref
"docs/deployment/config"
>}}#taskmanager-memory-framework-off-heap-batch-shuffle-size): Config option to
control data reading buffer size. For large scale jobs, you may need to
increase this value, usually, several hundreds of megabytes memory is enough.
Because this memory is cut from the framework off-heap memory, to increase this
value, you need also to increase the total framework off-heap memory by
adjusting [taskmanager.memory.framework.off-heap.size]({{< ref
"docs/deployment/config" >}}#taskmanager-memory-framework-off-heap-size) to
avoid the potential direct memory OOM error.
+- [taskmanager.memory.network.max]({{< ref "docs/deployment/config"
>}}#taskmanager-memory-network-max): Config option to control the maximum total
network memory. Since 1.17, this option is set as Long.MAX_VALUE by default,
which means that the maximum network memory size is no longer limited. If you
still want to limit the maximum network memory, set the config to a new value,
for example, 1g.
Review Comment:
Why this change?
##########
docs/content/docs/deployment/memory/network_mem_tuning.md:
##########
@@ -105,12 +105,14 @@ The size of the buffer can be configured by setting
`taskmanager.memory.segment-
### Input network buffers
-Buffers in the input channel are divided into exclusive and floating buffers.
Exclusive buffers can be used by only one particular channel. A channel can
request additional floating buffers from a buffer pool shared across all
channels belonging to the given input gate. The remaining floating buffers are
optional and are acquired only if there are enough resources available.
+The number of buffers in one pool calculated according to the above formula
can be divided into two parts. The part below this configured value
`taskmanager.network.memory.read-buffer.required-per-gate.max` is required. The
excess part buffers, if any, is optional. A task will fail if the required
buffers cannot be obtained in runtime. A task will not fail due to not
obtaining optional buffers, but may suffer a performance reduction. If not
explicitly configured, the default value is Integer.MAX_VALUE for streaming
workloads, and 1000 for batch workloads.
Review Comment:
"The number of buffers in one pool calculated according to the above
formula" - give it a name
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]