[
https://issues.apache.org/jira/browse/FLINK-3163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15378192#comment-15378192
]
Greg Hogan commented on FLINK-3163:
-----------------------------------
I think we can achieve "good enough" without changing the format of {{masters}}
and {{slaves}}. Mesos and YARN provide cluster management, and it might be best
to keep the Flink configuration simple.
What if we added
* a configuration parameter to enable NUMA which would result in a TaskManager
started on each NUMA node for each IP in {{slaves}}
* a configuration parameter (one or two?) for the JobManager and
ResourceManager to run in their own NUMA node, not shared with a TaskManager
(would the JM and RM share a NUMA node if on the same IP?)
These could be {{taskmanager.compute.numa}}, {{jobmanager.compute.numa}}, and
{{resourcemanager.compute.numa}}.
We could also add, as a related idea, {{taskmanager.compute.fraction}}. This
would operate relative to {{taskmanager.numberOfTaskSlots}} as
{{taskmanager.memory.fraction}} operates relative to
{{taskmanager.memory.size}}. If set to {{1.0}} you would get one slot per
(hyper-threaded) processor.
As [~saliya] noted, binding processes is quite easy. Since I have only dealt
with single-socket systems I have temporarily hard-coded the following in my
build:
{code}
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index e579c0c..5f076d5 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -96,4 +96,10 @@ if [[ $STARTSTOP == "start" ]]; then
args=("--configDir" "${FLINK_CONF_DIR}")
fi
-"${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
+command -v numactl >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+ "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP taskmanager "${args[@]}"
+else
+ numactl --membind=0 --cpunodebind=0 -- "${FLINK_BIN_DIR}"/flink-daemon.sh
$STARTSTOP taskmanager "${args[@]}"
+ numactl --membind=1 --cpunodebind=1 -- "${FLINK_BIN_DIR}"/flink-daemon.sh
$STARTSTOP taskmanager "${args[@]}"
+fi
{code}
> Configure Flink for NUMA systems
> --------------------------------
>
> Key: FLINK-3163
> URL: https://issues.apache.org/jira/browse/FLINK-3163
> Project: Flink
> Issue Type: Improvement
> Components: Startup Shell Scripts
> Affects Versions: 1.0.0
> Reporter: Greg Hogan
> Assignee: Greg Hogan
>
> On NUMA systems Flink can be pinned to a single physical processor ("node")
> using {{numactl --membind=$node --cpunodebind=$node <command>}}. Commonly
> available NUMA systems include the largest AWS and Google Compute instances.
> For example, on an AWS c4.8xlarge system with 36 hyperthreads the user could
> configure a single TaskManager with 36 slots or have Flink create two
> TaskManagers bound to each of the NUMA nodes, each with 18 slots.
> There may be some extra overhead in transferring network buffers between
> TaskManagers on the same system, though the fraction of data shuffled in this
> manner decreases with the size of the cluster. The performance improvement
> from only accessing local memory looks to be significant though difficult to
> benchmark.
> The JobManagers may fit into NUMA nodes rather than requiring full systems.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)