Repository: flink
Updated Branches:
  refs/heads/master afd0f16b8 -> a522c7aae


[docs] Adjust network buffer config for slots and add tl;dr


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a522c7aa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a522c7aa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a522c7aa

Branch: refs/heads/master
Commit: a522c7aae025daa9447cf587a4c8f02e1fa1a1f8
Parents: afd0f16
Author: Ufuk Celebi <[email protected]>
Authored: Mon May 9 10:59:31 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Mon May 9 11:00:35 2016 +0200

----------------------------------------------------------------------
 docs/setup/config.md | 44 +++++++++++++++++++++++++++-----------------
 1 file changed, 27 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a522c7aa/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 6e6e0a6..db189a0 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -54,11 +54,11 @@ The configuration files for the TaskManagers can be 
different, Flink does not as
 - `parallelism.default`: The default parallelism to use for programs that have 
no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs 
running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will 
cause the system to use all available execution resources for the program's 
execution. **Note**: The default parallelism can be overwriten for an entire 
job by calling `setParallelism(int parallelism)` on the `ExecutionEnvironment` 
or by passing `-p <parallelism>` to the Flink Command-line frontend. It can be 
overwritten for single transformations by calling `setParallelism(int
 parallelism)` on an operator. See the [programming 
guide]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for 
more information about the parallelism.
 
-- `fs.default-scheme`: The default filesystem scheme to be used, with the 
necessary authority to contact, e.g. the host:port of the NameNode in the case 
of HDFS (if needed). 
-By default, this is set to `file:///` which points to the local filesystem. 
This means that the local 
-filesystem is going to be used to search for user-specified files **without** 
an explicit scheme 
-definition. As another example, if this is set to `hdfs://localhost:9000/`, 
then a user-specified file path 
-without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going 
to be transformed into 
+- `fs.default-scheme`: The default filesystem scheme to be used, with the 
necessary authority to contact, e.g. the host:port of the NameNode in the case 
of HDFS (if needed).
+By default, this is set to `file:///` which points to the local filesystem. 
This means that the local
+filesystem is going to be used to search for user-specified files **without** 
an explicit scheme
+definition. As another example, if this is set to `hdfs://localhost:9000/`, 
then a user-specified file path
+without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going 
to be transformed into
 `hdfs://localhost:9000/user/USERNAME/in.txt`. This scheme is used **ONLY** if 
no other scheme is specified (explicitly) in the user-provided `URI`.
 
 - `fs.hdfs.hadoopconf`: The absolute path to the Hadoop File System's (HDFS) 
configuration **directory** (OPTIONAL VALUE). Specifying this value allows 
programs to reference HDFS files using short URIs (`hdfs:///path/to/files`, 
without including the address and port of the NameNode in the file URI). 
Without this option, HDFS files can be accessed, but require fully qualified 
URIs like `hdfs://address:port/path/to/files`. This option also causes file 
writers to pick up the HDFS's default values for block sizes and replication 
factors. Flink will look for the "core-site.xml" and "hdfs-site.xml" files in 
the specified directory.
@@ -133,11 +133,11 @@ To use the fixed delay strategy you have to specify 
"fixed-delay".
 To turn the restart behaviour off you have to specify "none".
 Default value "none".
 
-- `restart-strategy.fixed-delay.attempts`: Number of restart attempts, used if 
the default restart strategy is set to "fixed-delay". 
+- `restart-strategy.fixed-delay.attempts`: Number of restart attempts, used if 
the default restart strategy is set to "fixed-delay".
 Default value is 1.
- 
-- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used 
if the default restart strategy is set to "fixed-delay". 
-Default value is the `akka.ask.timeout`. 
+
+- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used 
if the default restart strategy is set to "fixed-delay".
+Default value is the `akka.ask.timeout`.
 
 ## Full Reference
 
@@ -204,9 +204,9 @@ The following parameters configure Flink's JobManager and 
TaskManagers.
 
 The parameters define the behavior of tasks that create result files.
 
-- `fs.default-scheme`: The default filesystem scheme to be used, with the 
necessary authority to contact, e.g. the host:port of the NameNode in the case 
of HDFS (if needed). 
-By default, this is set to `file:///` which points to the local filesystem. 
This means that the local 
-filesystem is going to be used to search for user-specified files **without** 
an explicit scheme 
+- `fs.default-scheme`: The default filesystem scheme to be used, with the 
necessary authority to contact, e.g. the host:port of the NameNode in the case 
of HDFS (if needed).
+By default, this is set to `file:///` which points to the local filesystem. 
This means that the local
+filesystem is going to be used to search for user-specified files **without** 
an explicit scheme
 definition. This scheme is used **ONLY** if no other scheme is specified 
(explicitly) in the user-provided `URI`.
 
 - `fs.overwrite-files`: Specifies whether file output writers should overwrite 
existing files by default. Set to *true* to overwrite by default, *false* 
otherwise. (DEFAULT: false)
@@ -240,10 +240,10 @@ definition. This scheme is used **ONLY** if no other 
scheme is specified (explic
 - `yarn.properties-file.location` (Default: temp directory). When a Flink job 
is submitted to YARN, the JobManager's host and the number of available 
processing slots is written into a properties file, so that the Flink clientis 
able to pick those details up. This configuration parameter allows changing the 
default location of that file (for example for environments sharing a Flink 
installation between users)
 
 - `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed 
with `yarn.application-master.env.` will be passed as environment variables to 
the ApplicationMaster/JobManager process. For example for passing 
`LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set:
-       
+
        yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native"
 
-- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN 
container. By default, the number of `vcores` is set to the number of slots per 
TaskManager, if set, or to 1, otherwise. 
+- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN 
container. By default, the number of `vcores` is set to the number of slots per 
TaskManager, if set, or to 1, otherwise.
 
 - `yarn.taskmanager.env.` Similar to the configuration prefix about, this 
prefix allows setting custom environment variables for the TaskManager 
processes.
 
@@ -274,7 +274,7 @@ For example when running Flink on YARN on an environment 
with a restrictive fire
 
 - `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the 
number of connection retries before the client gives up.
 
-- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before 
persisted jobs are recovered in case of a recovery situation. 
+- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before 
persisted jobs are recovered in case of a recovery situation.
 
 ## Environment
 
@@ -284,12 +284,22 @@ For example when running Flink on YARN on an environment 
with a restrictive fire
 
 ### Configuring the Network Buffers
 
+If you ever see the Exception `java.io.IOException: Insufficient number of 
network buffers`, please use the following formula to adjust the number of 
network buffers:
+
+```
+#slots-per-TM^2 * #TMs * 4
+```
+
+Where `#slots per TM` are the [number of slots per 
TaskManager](#configuring-taskmanager-processing-slots) and `#TMs` are the 
total number of task managers.
+
 Network buffers are a critical resource for the communication layers. They are 
used to buffer records before transmission over a network, and to buffer 
incoming data before dissecting it into records and handing them to the
 application. A sufficient number of network buffers is critical to achieve a 
good throughput.
 
-In general, configure the task manager to have enough buffers that each 
logical network connection on you expect to be open at the same time has a 
dedicated buffer. A logical network connection exists for each point-to-point 
exchange of data over the network, which typically happens at repartitioning- 
or broadcasting steps. In those, each parallel task inside the TaskManager has 
to be able to talk to all other parallel tasks. Hence, the required number of 
buffers on a task manager is *total-degree-of-parallelism* (number of targets) 
\* *intra-node-parallelism* (number of sources in one task manager) \* *n*. 
Here, *n* is a constant that defines how many repartitioning-/broadcasting 
steps you expect to be active at the same time.
+In general, configure the task manager to have enough buffers that each 
logical network connection you expect to be open at the same time has a 
dedicated buffer. A logical network connection exists for each point-to-point 
exchange of data over the network, which typically happens at repartitioning- 
or broadcasting steps (shuffle phase). In those, each parallel task inside the 
TaskManager has to be able to talk to all other parallel tasks. Hence, the 
required number of buffers on a task manager is *total-degree-of-parallelism* 
(number of targets) \* *intra-node-parallelism* (number of sources in one task 
manager) \* *n*. Here, *n* is a constant that defines how many 
repartitioning-/broadcasting steps you expect to be active at the same time.
+
+Since the *intra-node-parallelism* is typically the number of cores, and more 
than 4 repartitioning or broadcasting channels are rarely active in parallel, 
it frequently boils down to `#slots-per-TM^2 * #TMs * 4`.
 
-Since the *intra-node-parallelism* is typically the number of cores, and more 
than 4 repartitioning or broadcasting channels are rarely active in parallel, 
it frequently boils down to *\#cores\^2\^* \* *\#machines* \* 4. To support for 
example a cluster of 20 8-core machines, you should use roughly 5000 network 
buffers for optimal throughput.
+To support for example a cluster of 20 8-slot machines, you should use roughly 
5000 network buffers for optimal throughput.
 
 Each network buffer has by default a size of 32 KiBytes. In the above example, 
the system would allocate roughly 300 MiBytes for network buffers.
 

Reply via email to