SAMZA-614 - Document coordinator stream

Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/2ba1ea69
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/2ba1ea69
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/2ba1ea69

Branch: refs/heads/samza-sql
Commit: 2ba1ea69edc5699aaf31e4ee0609c31aa1324f47
Parents: fbfee75
Author: Navina <[email protected]>
Authored: Tue Nov 24 12:53:36 2015 -0800
Committer: Navina <[email protected]>
Committed: Tue Nov 24 12:53:49 2015 -0800

----------------------------------------------------------------------
 .../versioned/container/coordinator-stream.md   | 150 +++++++++++++++++++
 .../versioned/container/serialization.md        |   2 +-
 docs/learn/documentation/versioned/index.html   |   8 +-
 .../versioned/jobs/configuration-table.html     |   8 +-
 4 files changed, 163 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/2ba1ea69/docs/learn/documentation/versioned/container/coordinator-stream.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/coordinator-stream.md 
b/docs/learn/documentation/versioned/container/coordinator-stream.md
new file mode 100644
index 0000000..98c1a84
--- /dev/null
+++ b/docs/learn/documentation/versioned/container/coordinator-stream.md
@@ -0,0 +1,150 @@
+---
+layout: page
+title: Coordinator Stream
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+Samza job is completely driven by the job configuration. Thus, job 
configurations tend to be pretty large. In order to easily serialize such large 
configs and persist them between job executions, Samza writes all 
configurations to a durable stream called the *Coordinator Stream* when a job 
is submitted.
+
+A Coordinator Stream is single partitioned stream to which the configurations 
are written to. It shares the same characteristics as any input stream that can 
be configured in Samza - ordered, replayable and fault-tolerant. The stream 
will contain three major types of messages:
+
+1. Job configuration messages
+2. Task changelog partition assignment messages
+3. Container locality message
+
+### Coordinator Stream Naming
+
+The naming convention is very similar to that of the checkpoint topic that 
get's created.
+
+```java
+"__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), 
jobId.replaceAll("_", "-"))
+```
+
+### Coordinator Stream Message Model
+Coordinator stream messages are modeled as key/value pairs. The key is a list 
of well defined fields: *version*, *type*, and *key*. The value is a *map*. 
There are some pre-defined fields (such as timestamp, host, etc) for the value 
map, which are common to all messages.
+
+The full structure for a CoordinatorStreamMessage is:
+
+```json
+key => ["<version-number>", "<message-type>", "<key>"]
+
+message => {
+    "host": "<hostname>",
+    "username": "<username>",
+    "source": "<source-for-this-message>",
+    "timestamp": <timestamp-of-the-message>,
+    "values": { }
+}
+```
+
+The messages are essentially serialized and transmitted over the wire as JSON 
blobs. Hence, for serialization to work correctly, it is very important to not 
have any unnecessary white spaces. The white spaces in the above JSON blob have 
been shown for legibility only.  
+
+The most important fields are type, key, and values:
+
+* type - defines the kind of message
+* key - defines a key to associate with the values
+* values map - defined on a per-message-type basis, and defines a set of 
values associated with the type
+
+The coordinator stream messages that are currently supported are listed below:
+<style>
+            table th, table td {
+                text-align: left;
+                vertical-align: top;
+                padding: 12px;
+                border-bottom: 1px solid #ccc;
+                border-top: 1px solid #ccc;
+                border-left: 0;
+                border-right: 0;
+            }
+
+            table td.property, table td.default {
+                white-space: nowrap;
+            }
+
+            table th {
+                background-color: #eee;
+            }
+</style>
+<table>
+    <tr>
+        <th>Message</th>
+        <th>Type</th>
+        <th>Key</th>
+        <th>Values Map</th>
+    </tr>
+    <tr>
+        <td> Configuration Message <br />
+            (Applies to all configuration <br />
+             options listed in 
[Configuration](../jobs/configuration-table.html)) </td>
+        <td> set-config </td>
+        <td> &lt;config-name&gt; </td>
+        <td> 'value' => &lt;config-value&gt; </td>
+    </tr>
+    <tr>
+        <td> Task-ChangelogPartition Assignment Message </td>
+        <td> set-changelog </td>
+        <td> 
&lt;[TaskName](../api/org/apache/samza/container/TaskName.java)&gt; </td>
+        <td> 'partition' => &lt;Changelog-Partition-Id&gt;
+        </td>
+    </tr>
+    <tr>
+        <td> Container Locality Message </td>
+        <td> set-container-host-assignment </td>
+        <td> &lt;Container-Id&gt; </td>
+        <td> 'hostname' => &lt;HostName&gt;
+        </td>
+    </tr>
+</table>
+
+### Coordinator Stream Writer
+Samza provides a command line tool to write Job Configuration messages to the 
coordinator stream. The tool can be used as follows:
+{% highlight bash %}
+samza-example/target/bin/run-coordinator-stream-writer.sh \
+  --config-path=file:///path/to/job/config.properties \
+  --type set-config \
+  --key yarn.container.count \
+  --value 8
+{% endhighlight %}
+
+
+## <a name="JobCoordinator"></a>Job Coordinator
+
+The Job Coordinator bootstraps configuration from the coordinator stream each 
time upon job start-up. It periodically catches up with any new data written to 
the coordinator stream and updates the *Job Model*.
+
+Job Model is the data model used to represent a Samza job, which also 
incorporates the Job configuration. The hierarchy of a Samza job - job has 
containers, and each of the containers has tasks - is encapsulated in the Job 
Model, along with relevant information such as container id, task names, 
partition information, etc.
+
+The Job Coordinator exposes the Job Model and Job Configuration via an HTTP 
service. The URL for the Job Coordinator's HTTP service is passed as an 
environment variable to the Samza Containers when the containers are launched. 
Containers may write meta-information, such as locality - the hostname of the 
machine on which the container is running. However, they will read the Job 
Model and Configuration by querying the Job Coordinator via the HTTP service.
+
+Thus, Job Coorindator is the single component that has the latest view of the 
entire job status. This is very useful as it allows us to extend functionality 
of the Job Coordinator, in the future, to manage the lifecycle of the job (such 
as start/stop container, modify task assignment etc).
+
+
+### Job Coordinator Availability
+
+The Job Coordinator resides in the same container as the Samza Application 
Master. Thus, the availability of the Job Coordinator is tied to the 
availability of the Application Master (AM) in the Yarn cluster. The Samza 
containers are started only after initializing the Job Coordinator from the 
Coordinator Stream. In stable condition, when the Samza container comes up, it 
should be able to read the JobModel from the Job Coordinator without timing 
out. 
+
+## Benefits of Coordinator Stream Model
+Writing the configuration to a durable stream opens the door for Samza to do a 
couple of things:
+
+1. Removes the size-bound on the Job configuration
+2. Exposes job-related configuration and metadata to the containers using a 
standard data model and communication interface (See [Job 
Coordinator](#JobCoordinator) for details)
+3. Certain configurations should only be set one time. Changing them in future 
deployment amounts to resetting the entire state of the job because it may 
re-shuffle input partitions to the containers. For example, changing 
[SystemStreamPartitionGrouper](../api/javadocs/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.java)
 on a stateful Samza job would inter-mingle state from different StreamTasks in 
a single changelog partition. Without persistent configuration, there is no 
easy way to check whether a job's current configuration is valid or not.
+4. Job configuration can be dynamically changed by writing to the Coorinator 
Stream. This can enable features that require the job to be reactive to 
configuration change (eg. host-affinity, auto-scaling, dynamic reconfiguration 
etc).
+5. Provides a unified view of the job state, enabling Samza with more powerful 
ways of controlling container controls (See [Job Coordinator](#JobCoordinator) 
for details)
+6. Enables future design of Job Coordinator fail-over since it serves as a 
single source of truth of the current job state
+
+
+For other interesting features that can leverage this model, please refer the 
[design 
document](https://issues.apache.org/jira/secure/attachment/12670650/DESIGN-SAMZA-348-1.pdf).

http://git-wip-us.apache.org/repos/asf/samza/blob/2ba1ea69/docs/learn/documentation/versioned/container/serialization.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/serialization.md 
b/docs/learn/documentation/versioned/container/serialization.md
index 6bcb641..3f6c76a 100644
--- a/docs/learn/documentation/versioned/container/serialization.md
+++ b/docs/learn/documentation/versioned/container/serialization.md
@@ -75,7 +75,7 @@ Each serde is defined with a factory class. Samza comes with 
several builtin ser
 </style>
 <table>
     <tr>
-        <th class="section"> Serde Name</th>
+        <th> Serde Name</th>
         <th> Data Handled </th>
     </tr>
     <tr>

http://git-wip-us.apache.org/repos/asf/samza/blob/2ba1ea69/docs/learn/documentation/versioned/index.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/index.html 
b/docs/learn/documentation/versioned/index.html
index dec5be1..d01a958 100644
--- a/docs/learn/documentation/versioned/index.html
+++ b/docs/learn/documentation/versioned/index.html
@@ -49,7 +49,7 @@ title: Documentation
   <li><a href="api/javadocs">Javadocs</a></li>
 </ul>
 
-<h4>Container</h4>
+<h4>Core</h4>
 
 <ul class="documentation-list">
   <li><a href="container/samza-container.html">SamzaContainer</a></li>
@@ -57,13 +57,14 @@ title: Documentation
   <li><a href="container/serialization.html">Serialization</a></li>
   <li><a href="container/checkpointing.html">Checkpointing</a></li>
   <li><a href="container/state-management.html">State Management</a></li>
-  <li><a href="container/metrics.html">Metrics</a></li>
   <li><a href="container/windowing.html">Windowing</a></li>
+  <li><a href="container/coordinator-stream.html">Coordinator Stream</a></li>
   <li><a href="container/event-loop.html">Event Loop</a></li>
+  <li><a href="container/metrics.html">Metrics</a></li>
   <li><a href="container/jmx.html">JMX</a></li>
 </ul>
 
-<h4>Jobs</h4>
+<h4>Job Deployment</h4>
 
 <ul class="documentation-list">
   <li><a href="jobs/job-runner.html">JobRunner</a></li>
@@ -92,4 +93,5 @@ title: Documentation
 <ul class="documentation-list">
   <li><a href="operations/security.html">Security</a></li>
   <li><a href="operations/kafka.html">Kafka</a></li>
+</ul>
 </div>

http://git-wip-us.apache.org/repos/asf/samza/blob/2ba1ea69/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index b5d3813..09f2b6f 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -156,7 +156,13 @@
                         others' checkpoints, and perhaps interfere with each 
other in other ways.
                     </td>
                 </tr>
-
+                <tr>
+                    <td class="property" 
id="job-coordinator-system">job.coordinator.system</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        <strong>Required:</strong> The <span 
class="system">system-name</span> to use for creating and maintaining the <a 
href="../container/coordinator-stream.html">Coordinator Stream</a>.
+                    </td>
+                </tr>
                 <tr>
                     <td class="property" 
id="job-config-rewriter-class">job.config.rewriter.<br><span 
class="rewriter">rewriter-name</span>.class</td>
                     <td class="default"></td>

Reply via email to