[ 
https://issues.apache.org/jira/browse/FLINK-3163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15378192#comment-15378192
 ] 

Greg Hogan commented on FLINK-3163:
-----------------------------------

I think we can achieve "good enough" without changing the format of {{masters}} 
and {{slaves}}. Mesos and YARN provide cluster management, and it might be best 
to keep the Flink configuration simple.

What if we added
* a configuration parameter to enable NUMA which would result in a TaskManager 
started on each NUMA node for each IP in {{slaves}}
* a configuration parameter (one or two?) for the JobManager and 
ResourceManager to run in their own NUMA node, not shared with a TaskManager 
(would the JM and RM share a NUMA node if on the same IP?)

These could be {{taskmanager.compute.numa}}, {{jobmanager.compute.numa}}, and 
{{resourcemanager.compute.numa}}.

We could also add, as a related idea, {{taskmanager.compute.fraction}}. This 
would operate relative to {{taskmanager.numberOfTaskSlots}} as 
{{taskmanager.memory.fraction}} operates relative to 
{{taskmanager.memory.size}}. If set to {{1.0}} you would get one slot per 
(hyper-threaded) processor.

As [~saliya] noted, binding processes is quite easy. Since I have only dealt 
with single-socket systems I have temporarily hard-coded the following in my 
build:

{code}
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh 
b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index e579c0c..5f076d5 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -96,4 +96,10 @@ if [[ $STARTSTOP == "start" ]]; then
     args=("--configDir" "${FLINK_CONF_DIR}")
 fi
 
-"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
+command -v numactl >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+    "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
+else
+    numactl --membind=0 --cpunodebind=0 -- "${FLINK_BIN_DIR}"/flink-daemon.sh 
$STARTSTOP taskmanager "${args[@]}"
+    numactl --membind=1 --cpunodebind=1 -- "${FLINK_BIN_DIR}"/flink-daemon.sh 
$STARTSTOP taskmanager "${args[@]}"
+fi
{code}

> Configure Flink for NUMA systems
> --------------------------------
>
>                 Key: FLINK-3163
>                 URL: https://issues.apache.org/jira/browse/FLINK-3163
>             Project: Flink
>          Issue Type: Improvement
>          Components: Startup Shell Scripts
>    Affects Versions: 1.0.0
>            Reporter: Greg Hogan
>            Assignee: Greg Hogan
>
> On NUMA systems Flink can be pinned to a single physical processor ("node") 
> using {{numactl --membind=$node --cpunodebind=$node <command>}}. Commonly 
> available NUMA systems include the largest AWS and Google Compute instances.
> For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could 
> configure a single TaskManager with 36 slots or have Flink create two 
> TaskManagers bound to each of the NUMA nodes, each with 18 slots.
> There may be some extra overhead in transferring network buffers between 
> TaskManagers on the same system, though the fraction of data shuffled in this 
> manner decreases with the size of the cluster. The performance improvement 
> from only accessing local memory looks to be significant though difficult to 
> benchmark.
> The JobManagers may fit into NUMA nodes rather than requiring full systems.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to