Repository: flink Updated Branches: refs/heads/master 0c9c04d19 -> 11c868f91
[FLINK-3163] [scripts] Configure Flink for NUMA systems Start a TaskManager on each NUMA node on each worker when the new configuration option 'taskmanager.compute.numa' is enabled. This closes #3249 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11c868f9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11c868f9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11c868f9 Branch: refs/heads/master Commit: 11c868f91db773af626ac6ac4dcba9820c13fa8a Parents: 0c9c04d Author: Greg Hogan <[email protected]> Authored: Wed Feb 1 12:13:49 2017 -0500 Committer: Greg Hogan <[email protected]> Committed: Tue Feb 21 11:00:22 2017 -0500 ---------------------------------------------------------------------- docs/setup/config.md | 4 ++++ flink-dist/src/main/flink-bin/bin/config.sh | 13 +++++++++++++ .../src/main/flink-bin/bin/taskmanager.sh | 20 +++++++++++++++++++- 3 files changed, 36 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/11c868f9/docs/setup/config.md ---------------------------------------------------------------------- diff --git a/docs/setup/config.md b/docs/setup/config.md index c4618da..b21c647 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -72,6 +72,10 @@ without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to ## Advanced Options +### Compute + +- `taskmanager.compute.numa`: When enabled a TaskManager is started on each NUMA node for each worker listed in *conf/slaves* (DEFAULT: false). Note: only supported when deploying Flink as a standalone cluster. + ### Managed Memory By default, Flink allocates a fraction of `0.7` of the total memory configured via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to run the batch operators efficiently. It prevents `OutOfMemoryException`s because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system. http://git-wip-us.apache.org/repos/asf/flink/blob/11c868f9/flink-dist/src/main/flink-bin/bin/config.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index 0f24034..568aba3 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -96,6 +96,8 @@ KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction" KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap" KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate" +KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa" + KEY_ENV_PID_DIR="env.pid.dir" KEY_ENV_LOG_DIR="env.log.dir" KEY_ENV_LOG_MAX="env.log.max" @@ -217,6 +219,17 @@ if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} "false" "${YAML_CONF}") fi +# Verify that NUMA tooling is available +command -v numactl >/dev/null 2>&1 +if [[ $? -ne 0 ]]; then + FLINK_TM_COMPUTE_NUMA="false" +else + # Define FLINK_TM_COMPUTE_NUMA if it is not already set + if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then + FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}") + fi +fi + if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}") fi http://git-wip-us.apache.org/repos/asf/flink/blob/11c868f9/flink-dist/src/main/flink-bin/bin/taskmanager.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh index e579c0c..6a745cb 100755 --- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh +++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh @@ -96,4 +96,22 @@ if [[ $STARTSTOP == "start" ]]; then args=("--configDir" "${FLINK_CONF_DIR}") fi -"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}" +TM_COMMAND="${FLINK_BIN_DIR}/flink-daemon.sh $STARTSTOP taskmanager ${args[@]}" + +if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then + # Start a single TaskManager + $TM_COMMAND +else + # Example output from `numactl --show` on an AWS c4.8xlarge: + # policy: default + # preferred node: current + # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 + # cpubind: 0 1 + # nodebind: 0 1 + # membind: 0 1 + read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ") + for NODE_ID in "${NODE_LIST[@]:1}"; do + # Start a TaskManager for each NUMA node + numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- $TM_COMMAND + done +fi
