Repository: flink Updated Branches: refs/heads/release-1.0 115d12aa1 -> d6a1d09f5
[docs] Adjust network buffer config for slots and add tl;dr Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6a1d09f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6a1d09f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6a1d09f Branch: refs/heads/release-1.0 Commit: d6a1d09f5c37c0d4b0a262f13651b6fe8e81f42c Parents: 115d12a Author: Ufuk Celebi <[email protected]> Authored: Mon May 9 10:59:31 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Mon May 9 11:00:57 2016 +0200 ---------------------------------------------------------------------- docs/setup/config.md | 44 +++++++++++++++++++++++++++----------------- 1 file changed, 27 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d6a1d09f/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index d39cf9c..9f3ac4e 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -54,11 +54,11 @@ The configuration files for the TaskManagers can be different, Flink does not as - `parallelism.default`: The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program's execution. **Note**: The default parallelism can be overwriten for an entire job by calling `setParallelism(int parallelism)` on the `ExecutionEnvironment` or by passing `-p <parallelism>` to the Flink Command-line frontend. It can be overwritten for single transformations by calling `setParallelism(int parallelism)` on an operator. See the [programming guide]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for more information about the parallelism. -- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed). -By default, this is set to `file:///` which points to the local filesystem. This means that the local -filesystem is going to be used to search for user-specified files **without** an explicit scheme -definition. As another example, if this is set to `hdfs://localhost:9000/`, then a user-specified file path -without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to be transformed into +- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed). +By default, this is set to `file:///` which points to the local filesystem. This means that the local +filesystem is going to be used to search for user-specified files **without** an explicit scheme +definition. As another example, if this is set to `hdfs://localhost:9000/`, then a user-specified file path +without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to be transformed into `hdfs://localhost:9000/user/USERNAME/in.txt`. This scheme is used **ONLY** if no other scheme is specified (explicitly) in the user-provided `URI`. - `fs.hdfs.hadoopconf`: The absolute path to the Hadoop File System's (HDFS) configuration **directory** (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (`hdfs:///path/to/files`, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like `hdfs://address:port/path/to/files`. This option also causes file writers to pick up the HDFS's default values for block sizes and replication factors. Flink will look for the "core-site.xml" and "hdfs-site.xml" files in teh specified directory. @@ -133,11 +133,11 @@ To use the fixed delay strategy you have to specify "fixed-delay". To turn the restart behaviour off you have to specify "none". Default value "none". -- `restart-strategy.fixed-delay.attempts`: Number of restart attempts, used if the default restart strategy is set to "fixed-delay". +- `restart-strategy.fixed-delay.attempts`: Number of restart attempts, used if the default restart strategy is set to "fixed-delay". Default value is 1. - -- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay". -Default value is the `akka.ask.timeout`. + +- `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay". +Default value is the `akka.ask.timeout`. ## Full Reference @@ -203,9 +203,9 @@ The following parameters configure Flink's JobManager and TaskManagers. The parameters define the behavior of tasks that create result files. -- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed). -By default, this is set to `file:///` which points to the local filesystem. This means that the local -filesystem is going to be used to search for user-specified files **without** an explicit scheme +- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed). +By default, this is set to `file:///` which points to the local filesystem. This means that the local +filesystem is going to be used to search for user-specified files **without** an explicit scheme definition. This scheme is used **ONLY** if no other scheme is specified (explicitly) in the user-provided `URI`. - `fs.overwrite-files`: Specifies whether file output writers should overwrite existing files by default. Set to *true* to overwrite by default, *false* otherwise. (DEFAULT: false) @@ -239,10 +239,10 @@ definition. This scheme is used **ONLY** if no other scheme is specified (explic - `yarn.properties-file.location` (Default: temp directory). When a Flink job is submitted to YARN, the JobManager's host and the number of available processing slots is written into a properties file, so that the Flink clientis able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users) - `yarn.application-master.env.`*ENV_VAR1=value* Configuration values prefixed with `yarn.application-master.env.` will be passed as environment variables to the ApplicationMaster/JobManager process. For example for passing `LD_LIBRARY_PATH` as an env variable to the ApplicationMaster, set: - + yarn.application-master.env.LD_LIBRARY_PATH: "/usr/lib/native" -- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise. +- `yarn.containers.vcores` The number of virtual cores (vcores) per YARN container. By default, the number of `vcores` is set to the number of slots per TaskManager, if set, or to 1, otherwise. - `yarn.taskmanager.env.` Similar to the configuration prefix about, this prefix allows setting custom environment variables for the TaskManager processes. @@ -273,7 +273,7 @@ For example when running Flink on YARN on an environment with a restrictive fire - `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up. -- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. +- `recovery.job.delay`: (Default 'akka.ask.timeout') Defines the delay before persisted jobs are recovered in case of a recovery situation. ## Environment @@ -283,12 +283,22 @@ For example when running Flink on YARN on an environment with a restrictive fire ### Configuring the Network Buffers +If you ever see the Exception `java.io.IOException: Insufficient number of network buffers`, please use the following formula to adjust the number of network buffers: + +``` +#slots-per-TM^2 * #TMs * 4 +``` + +Where `#slots per TM` are the [number of slots per TaskManager](#configuring-taskmanager-processing-slots) and `#TMs` are the total number of task managers. + Network buffers are a critical resource for the communication layers. They are used to buffer records before transmission over a network, and to buffer incoming data before dissecting it into records and handing them to the application. A sufficient number of network buffers is critical to achieve a good throughput. -In general, configure the task manager to have enough buffers that each logical network connection on you expect to be open at the same time has a dedicated buffer. A logical network connection exists for each point-to-point exchange of data over the network, which typically happens at repartitioning- or broadcasting steps. In those, each parallel task inside the TaskManager has to be able to talk to all other parallel tasks. Hence, the required number of buffers on a task manager is *total-degree-of-parallelism* (number of targets) \* *intra-node-parallelism* (number of sources in one task manager) \* *n*. Here, *n* is a constant that defines how many repartitioning-/broadcasting steps you expect to be active at the same time. +In general, configure the task manager to have enough buffers that each logical network connection you expect to be open at the same time has a dedicated buffer. A logical network connection exists for each point-to-point exchange of data over the network, which typically happens at repartitioning- or broadcasting steps (shuffle phase). In those, each parallel task inside the TaskManager has to be able to talk to all other parallel tasks. Hence, the required number of buffers on a task manager is *total-degree-of-parallelism* (number of targets) \* *intra-node-parallelism* (number of sources in one task manager) \* *n*. Here, *n* is a constant that defines how many repartitioning-/broadcasting steps you expect to be active at the same time. + +Since the *intra-node-parallelism* is typically the number of cores, and more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently boils down to `#slots-per-TM^2 * #TMs * 4`. -Since the *intra-node-parallelism* is typically the number of cores, and more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently boils down to *\#cores\^2\^* \* *\#machines* \* 4. To support for example a cluster of 20 8-core machines, you should use roughly 5000 network buffers for optimal throughput. +To support for example a cluster of 20 8-slot machines, you should use roughly 5000 network buffers for optimal throughput. Each network buffer has by default a size of 32 KiBytes. In the above example, the system would allocate roughly 300 MiBytes for network buffers.
