Repository: flink
Updated Branches:
  refs/heads/master 0c9c04d19 -> 11c868f91


[FLINK-3163] [scripts] Configure Flink for NUMA systems

Start a TaskManager on each NUMA node on each worker when the new
configuration option 'taskmanager.compute.numa' is enabled.

This closes #3249


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11c868f9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11c868f9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11c868f9

Branch: refs/heads/master
Commit: 11c868f91db773af626ac6ac4dcba9820c13fa8a
Parents: 0c9c04d
Author: Greg Hogan <[email protected]>
Authored: Wed Feb 1 12:13:49 2017 -0500
Committer: Greg Hogan <[email protected]>
Committed: Tue Feb 21 11:00:22 2017 -0500

----------------------------------------------------------------------
 docs/setup/config.md                            |  4 ++++
 flink-dist/src/main/flink-bin/bin/config.sh     | 13 +++++++++++++
 .../src/main/flink-bin/bin/taskmanager.sh       | 20 +++++++++++++++++++-
 3 files changed, 36 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/11c868f9/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index c4618da..b21c647 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -72,6 +72,10 @@ without explicit scheme definition, such as 
`/user/USERNAME/in.txt`, is going to
 
 ## Advanced Options
 
+### Compute
+
+- `taskmanager.compute.numa`: When enabled a TaskManager is started on each 
NUMA node for each worker listed in *conf/slaves* (DEFAULT: false). Note: only 
supported when deploying Flink as a standalone cluster.
+
 ### Managed Memory
 
 By default, Flink allocates a fraction of `0.7` of the total memory configured 
via `taskmanager.heap.mb` for its managed memory. Managed memory helps Flink to 
run the batch operators efficiently. It prevents `OutOfMemoryException`s 
because Flink knows how much memory it can use to execute operations. If Flink 
runs out of managed memory, it utilizes disk space. Using managed memory, some 
operations can be performed directly on the raw data without having to 
deserialize the data to convert it into Java objects. All in all, managed 
memory improves the robustness and speed of the system.

http://git-wip-us.apache.org/repos/asf/flink/blob/11c868f9/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh 
b/flink-dist/src/main/flink-bin/bin/config.sh
index 0f24034..568aba3 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -96,6 +96,8 @@ KEY_TASKM_MEM_MANAGED_FRACTION="taskmanager.memory.fraction"
 KEY_TASKM_OFFHEAP="taskmanager.memory.off-heap"
 KEY_TASKM_MEM_PRE_ALLOCATE="taskmanager.memory.preallocate"
 
+KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
+
 KEY_ENV_PID_DIR="env.pid.dir"
 KEY_ENV_LOG_DIR="env.log.dir"
 KEY_ENV_LOG_MAX="env.log.max"
@@ -217,6 +219,17 @@ if [ -z "${FLINK_TM_MEM_PRE_ALLOCATE}" ]; then
     FLINK_TM_MEM_PRE_ALLOCATE=$(readFromConfig ${KEY_TASKM_MEM_PRE_ALLOCATE} 
"false" "${YAML_CONF}")
 fi
 
+# Verify that NUMA tooling is available
+command -v numactl >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+    FLINK_TM_COMPUTE_NUMA="false"
+else
+    # Define FLINK_TM_COMPUTE_NUMA if it is not already set
+    if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then
+        FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} 
"false" "${YAML_CONF}")
+    fi
+fi
+
 if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
     MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} 
${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}")
 fi

http://git-wip-us.apache.org/repos/asf/flink/blob/11c868f9/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh 
b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index e579c0c..6a745cb 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -96,4 +96,22 @@ if [[ $STARTSTOP == "start" ]]; then
     args=("--configDir" "${FLINK_CONF_DIR}")
 fi
 
-"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
+TM_COMMAND="${FLINK_BIN_DIR}/flink-daemon.sh $STARTSTOP taskmanager ${args[@]}"
+
+if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
+    # Start a single TaskManager
+    $TM_COMMAND
+else
+    # Example output from `numactl --show` on an AWS c4.8xlarge:
+    # policy: default
+    # preferred node: current
+    # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 
23 24 25 26 27 28 29 30 31 32 33 34 35
+    # cpubind: 0 1
+    # nodebind: 0 1
+    # membind: 0 1
+    read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
+    for NODE_ID in "${NODE_LIST[@]:1}"; do
+        # Start a TaskManager for each NUMA node
+        numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- $TM_COMMAND
+    done
+fi

Reply via email to