SAMZA-668 - Document YARN host affinity in the website

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9b849bb1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9b849bb1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9b849bb1

Branch: refs/heads/samza-sql
Commit: 9b849bb14fd8a8fc50e0ae4fd1a66674d42335d1
Parents: 2ba1ea6
Author: Navina <[email protected]>
Authored: Tue Nov 24 14:01:44 2015 -0800
Committer: Navina <[email protected]>
Committed: Tue Nov 24 14:01:44 2015 -0800

----------------------------------------------------------------------
 docs/learn/documentation/versioned/index.html   |   1 +
 .../versioned/yarn/application-master.md        |  11 +-
 .../versioned/yarn/yarn-host-affinity.md        | 115 +++++++++++++++++++
 3 files changed, 122 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9b849bb1/docs/learn/documentation/versioned/index.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/index.html 
b/docs/learn/documentation/versioned/index.html
index d01a958..c2a7b29 100644
--- a/docs/learn/documentation/versioned/index.html
+++ b/docs/learn/documentation/versioned/index.html
@@ -81,6 +81,7 @@ title: Documentation
 <ul class="documentation-list">
   <li><a href="yarn/application-master.html">Application Master</a></li>
   <li><a href="yarn/isolation.html">Isolation</a></li>
+  <li><a href="yarn/yarn-host-affinity.html">Host Affinity & Yarn</a></li>
   <li><a href="hdfs/producer.html">Writing to HDFS</a></li>
 <!-- TODO write yarn pages
   <li><a href="">Fault Tolerance</a></li>

http://git-wip-us.apache.org/repos/asf/samza/blob/9b849bb1/docs/learn/documentation/versioned/yarn/application-master.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/yarn/application-master.md 
b/docs/learn/documentation/versioned/yarn/application-master.md
index 807f6f3..ea33706 100644
--- a/docs/learn/documentation/versioned/yarn/application-master.md
+++ b/docs/learn/documentation/versioned/yarn/application-master.md
@@ -27,17 +27,18 @@ Samza's main integration with YARN comes in the form of a 
Samza ApplicationMaste
 
 When the Samza ApplicationMaster starts up, it does the following:
 
-1. Receives configuration from YARN via the STREAMING_CONFIG environment 
variable.
+1. Creates the [Job 
Coordinator](../container/coordinator-stream.html#JobCoordinator) which 
bootstraps the Job Model and config from the [Coordinator 
Stream](../container/coordinator-stream.html).
 2. Starts a JMX server on a random port.
 3. Instantiates a metrics registry and reporters to keep track of relevant 
metrics.
 4. Registers the AM with YARN's RM.
-5. Get the total number of partitions for the Samza job using each input 
stream's PartitionManager (see the [Streams](../container/streams.html) page 
for details).
+5. Gets the total number of partitions for the Samza job using each input 
stream's PartitionManager (see the [Streams](../container/streams.html) page 
for details).
 6. Read the total number of containers requested from the Samza job's 
configuration.
 7. Assign each partition to a container (called a Task Group in Samza's AM 
dashboard).
-8. Make a 
[ResourceRequest](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/api/records/ResourceRequest.html)
 to YARN for each container.
-9. Poll the YARN RM every second to check for allocated and released 
containers.
+8. Make a 
[ResourceRequest](http://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/api/records/ResourceRequest.html)
 to YARN for each container. If [host-affinity](yarn-host-affinity.html) is 
enabled on the job, the AM uses the container locality information provided by 
the Job Coordinator and requests for the same host in the ResourceRequest.
+9. Starts a ContainerAllocator thread that matches allocated containers and 
starts the container process.
+10. Poll the YARN RM every second to check for allocated and released 
containers.
 
-From this point on, the ApplicationMaster just reacts to events from the RM.
+From this point on, the ApplicationMaster just reacts to events from the RM 
and delegates it to the ContainerAllocator thread.
 
 ### Fault Tolerance
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9b849bb1/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md 
b/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
new file mode 100644
index 0000000..108dfbc
--- /dev/null
+++ b/docs/learn/documentation/versioned/yarn/yarn-host-affinity.md
@@ -0,0 +1,115 @@
+---
+layout: page
+title: Host Affinity & YARN
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+In Samza, containers are the units of physical parallelism that runs on a set 
of machines. Each container is essentially a process that runs one or more 
stream tasks. Each task instance consumes one or more partitions of the input 
streams and is associated with its own durable data store. 
+
+We define a *Stateful Samza Job* as the Samza job that uses a key-value store 
in its implementation, alone with an associated changelog stream. In stateful 
samza jobs, there is a 1:1 mapping between the task instance and the data 
store. Since the allocation of containers to machines in the Yarn cluster is 
completely left to Yarn, Samza does not guarantee that a container (and hence, 
its associated task(s)) gets deployed on the same machine. Containers can get 
shuffled in any of the following cases:
+
+1. When a job is upgraded by pointing <code>yarn.package.path</code> to the 
new package path and re-submitted.
+2. When a job is simply restarted by Yarn or the user
+3. When a container failure or premption triggers the SamzaAppMaster to 
re-allocate on another available resource
+
+In any of the above cases, the task's co-located data needs to be restored 
every time a container starts-up. Restoring data each time can be expensive, 
especially for applications that have a large data set. This behavior slows the 
start-up time for the job so much that the job is no longer "near realtime". 
Furthermore, if multiple stateful samza jobs restart around the same time in 
the cluster and they all share the same changelog system, then it is possible 
to quickly saturate the changelog system's network and cause a DDoS.
+
+For instance, consider a Samza job performing a Stream-Table join. Typically, 
such a job requires the dataset to be available on all processors before they 
begin processing the input stream. The dataset is usually large (order > 1TB) 
read-only data that will be used to join or add attributes to incoming 
messages. The job may initialize this cache by populated with data directly 
from a remote store or changelog stream. This cache initialization happens each 
time the container is restarted. This causes significant latency during job 
start-up.
+
+The solution, then, is to simply persist the state store on the machine in 
which the container process is executing and re-allocate the same host for the 
container each time the job is restarted, in order to re-use the persisted 
state. Thus, the ability of Samza to allocate a container to the same machine 
across job restarts is referred to as ***host-affinity***. Samza leverages 
host-affinity to enhance our support for local state re-use.
+
+## How does it work?
+
+When a stateful Samza job is deployed in Yarn, the state stores for the tasks 
are co-located in the current working directory of Yarn's application attempt.
+{% highlight bash %}
+container_working_dir=${yarn.nodemanager.local-dirs}/usercache/${user}/appcache/application_${appid}/container_${contid}/
+
+# Data Stores
+ls ${container_working_dir}/state/${store-name}/${task_name}/
+{% endhighlight %}
+
+This allows the Node Manager's (NM) DeletionService to clean-up the working 
directory once the application completes or fails. In order to re-use local 
state store, the state store needs to be persisted outside the scope of NM's 
deletion service. The cluster administrator should set this location as an 
environment variable in Yarn - <code>LOGGED\_STORE\_BASE\_DIR</code>.
+
+![samza-host-affinity](/img/{{site.version}}/learn/documentation/yarn/samza-host-affinity.png)
+
+When a container is *cleanly shutdown*, Samza also writes the last 
materialized offset from the changelog stream to the checksumed file on disk. 
Thus, there is an *OFFSET* file associated with each state stores' changelog 
partitions, that is consumed by the tasks in the container.
+
+{% highlight bash %}
+${LOGGED_STORE_BASE_DIR}/${job.name}-${job.id}/${store.name}/${task.name}/OFFSET
+{% endhighlight %}
+
+Now, when a container restarts on the same machine after a clean shutdown and 
the OFFSET file exists, the Samza container:
+
+1. Opens the persisted store on disk
+2. Reads the OFFSET file
+3. Deletes the OFFSET file
+4. Restores the state store from the OFFSET value
+
+If the OFFSET file doesn't exist, it creates the state store and consumes from 
the oldest offset in the changelog to re-create the state. Note that Samza 
optimistically deletes the OFFSET file in step 3 to prevent data from getting 
corrupted due to any kind of failure during state restoration. This 
significantly reduces the state restoration time on container start-up as we no 
longer consume from the beginning of the changelog stream.
+
+It is necessary to periodically clean-up unused or orphaned state stores on 
the machines to manage disk-space. This feature is being worked on in 
[SAMZA-656](https://issues.apache.org/jira/browse/SAMZA-656).
+
+In order to re-use local state, Samza has to sucessfully claim the specific 
hosts from the Resource Manager (RM). To support this, the Samza containers 
write their locality information to the [Coordinator 
Stream](../container/coordinator-stream.html) every time they start-up 
successfully. Now, the Samza Application Master (AM) can identify the last 
known host of a container via the [Job 
Coordinator](../container/coordinator-stream.html)(JC) and the application is 
no longer agnostic of the container locality. On a container failure (due to 
any of the above cited reasons), the AM includes the hostname of the expected 
resource in the 
[ResourceRequest](https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java#L239]).
+
+Note that the Yarn cluster has to be configured to use [Fair 
Scheduler](https://hadoop.apache.org/docs/r2.6.0/hadoop-yarn/hadoop-yarn-site/FairScheduler.html)
 with continuous-scheduling enabled. With continuous scheduling, the scheduler 
continuously iterates through all nodes in the cluster, instead of relying on 
the nodes' heartbeat, and schedules work based on previously known status for 
each node, before relaxing locality. Hence, the scheduler takes care of 
relaxing locality after the configured delay. This approach can be considered 
as a "*best-effort stickiness*" policy because it is possible that the 
requested node is not running or does not have sufficient resources at the time 
of request (even though the state in the data stores may be persisted). For 
more details on the choice of Fair Scheduler, please refer the [design 
doc](https://issues.apache.org/jira/secure/attachment/12726945/DESIGN-SAMZA-617-2.pdf).
+
+
+## Configuring YARN cluster to support Host Affinity
+
+1. Enable local state re-use by setting the 
<code>LOGGED\_STORE\_BASE\_DIR</code> environment variable in yarn-env.sh {% 
highlight bash %} 
+export LOGGED_STORE_BASE_DIR=<path-for-state-stores>
+{% endhighlight %} Without this configuration, the state stores are not 
persisted upon a container shutdown. This will effectively mean you will not 
re-use local state and hence, host-affinity becomes a moot operation.
+2. Configure Yarn to use Fair Scheduler and enable continuous-scheduling in 
yarn-site.xml {% highlight xml %}
+<property>
+    <name>yarn.resourcemanager.scheduler.class</name>
+    <description>The class to use as the resource scheduler.</description>
+    
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
+</property>
+<property>
+    <name>yarn.scheduler.fair.continuous-scheduling-enabled</name>
+    <description>Enable Continuous Scheduling of Resource 
Requests</description>
+    <value>true</value>
+</property>
+<property>
+    <name>yarn.schedular.fair.locality-delay-node-ms</name>
+    <description>Delay time in milliseconds before relaxing locality at 
node-level</description>
+    <value>1000</value>  <!-- Should be tuned per requirement -->
+</property>
+<property>
+    <name>yarn.schedular.fair.locality-delay-rack-ms</name>
+    <description>Delay time in milliseconds before relaxing locality at 
rack-level</description>
+    <value>1000*</value> <!-- Should be tuned per requirement -->
+</property>
+{% endhighlight %}
+
+
+## Configuring a Samza job to use Host Affinity
+Any stateful Samza job can leverage this feature to reduce the Mean Time To 
Restore (MTTR) of it's state stores by setting 
<code>yarn.samza.host-affinity</code> to true.
+{% highlight bash %}
+yarn.samza.host-affinity=true  # Default: false
+{% endhighlight %}
+
+Enabling this feature for a stateless Samza job should not have any adverse 
effect on the job.
+
+
+## Host-affinity Guarantees
+As you have observed, host-affinity cannot be guaranteed all the time due to 
varibale load distribution in the Yarn cluster. Hence, this is a best-effort 
policy that Samza provides. However, certain scenarios are worth calling out 
where these guarantees may be hard to achieve or are not applicable.
+
+1. _When the number of containers and/or container-task assignment changes 
across successive application runs_ - We may be able to re-use local state for 
a subset of partitions. Currently, there is no logic in the Job Coordinator to 
handle partitioning of tasks among containers intelligently. Handling this is 
more involved as relates to 
[auto-scaling](https://issues.apache.org/jira/browse/SAMZA-336) of the 
containers.
+2. _When SystemStreamPartitionGrouper changes across successive application 
runs_ - When the grouper logic used to distribute the partitions across 
containers changes, the data in the Coordinator Stream (for changelog-task 
partition assignment etc) and the data stores becomes invalid. Thus, to be 
safe, we should flush out all state-related data from the Coordinator Stream. 
An alternative is to overwrite the Task-ChangelogPartition assignment message 
and the Container Locality message in the Coordinator Stream, before starting 
up the job again.
+

Reply via email to