Repository: samza
Updated Branches:
  refs/heads/master 3228ed9c4 -> 40ffe4ea5


SAMZA-977: User doc for samza multi-threading


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40ffe4ea
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40ffe4ea
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40ffe4ea

Branch: refs/heads/master
Commit: 40ffe4ea5d45f048e12849ca3ac98325bbd9e99b
Parents: 3228ed9
Author: Xinyu Liu <xi...@linkedin.com>
Authored: Thu Sep 22 16:44:30 2016 -0700
Committer: vjagadish1989 <jvenk...@linkedin.com>
Committed: Thu Sep 22 16:45:34 2016 -0700

----------------------------------------------------------------------
 docs/Gemfile.lock                               |   2 +-
 .../documentation/versioned/api/overview.md     |  21 ++-
 .../versioned/container/event-loop.md           |  38 ++++--
 .../versioned/jobs/configuration-table.html     |  43 ++++++-
 docs/learn/tutorials/versioned/index.md         |   2 +
 .../versioned/samza-async-user-guide.md         | 127 +++++++++++++++++++
 6 files changed, 219 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/Gemfile.lock
----------------------------------------------------------------------
diff --git a/docs/Gemfile.lock b/docs/Gemfile.lock
index 8a236e6..8c75832 100644
--- a/docs/Gemfile.lock
+++ b/docs/Gemfile.lock
@@ -48,7 +48,7 @@ GEM
     rb-inotify (0.9.5)
       ffi (>= 0.5.0)
     redcarpet (3.1.2)
-    safe_yaml (1.0.3)
+    safe_yaml (1.0.4)
     sass (3.3.8)
     timers (1.1.0)
     toml (0.1.1)

http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/learn/documentation/versioned/api/overview.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/api/overview.md 
b/docs/learn/documentation/versioned/api/overview.md
index 67e259c..d589bc6 100644
--- a/docs/learn/documentation/versioned/api/overview.md
+++ b/docs/learn/documentation/versioned/api/overview.md
@@ -19,7 +19,7 @@ title: API Overview
    limitations under the License.
 -->
 
-When writing a stream processor for Samza, you must implement the 
[StreamTask](javadocs/org/apache/samza/task/StreamTask.html) interface:
+When writing a stream processor for Samza, you must implement either 
[StreamTask](javadocs/org/apache/samza/task/StreamTask.html) or 
[AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) 
interface. You should implement StreamTask for synchronous process, where the 
message processing is complete after the *process* method returns. An example 
of StreamTask is a computation that does not involve remote calls:
 
 {% highlight java %}
 package com.example.samza;
@@ -34,6 +34,23 @@ public class MyTaskClass implements StreamTask {
 }
 {% endhighlight %}
 
+The AsyncSteamTask interface, on the other hand, supports asynchronous 
process, where the message processing may not be complete after the 
*processAsync* method returns. Various concurrent libraries like Java NIO, 
ParSeq and Akka can be used here to make asynchronous calls, and the completion 
is marked by invoking the 
[TaskCallback](javadocs/org/apache/samza/task/TaskCallback.html). Samza will 
continue to process next message or shut down the container based on the 
callback status. An example of AsyncStreamTask is a computation that make 
remote calls but don't block on the call completion:
+
+{% highlight java %}
+package com.example.samza;
+
+public class MyAsyncTaskClass implements AsyncStreamTask {
+
+  public void processAsync(IncomingMessageEnvelope envelope,
+                           MessageCollector collector,
+                           TaskCoordinator coordinator,
+                           TaskCallback callback) {
+    // process message with asynchronous calls
+    // fire callback upon completion, e.g. invoking callback from asynchronous 
call completion thread
+  }
+}
+{% endhighlight %}
+
 When you run your job, Samza will create several instances of your class 
(potentially on multiple machines). These task instances process the messages 
in the input streams.
 
 In your job's configuration you can tell Samza which streams you want to 
consume. An incomplete example could look like this (see the [configuration 
documentation](../jobs/configuration.html) for more detail):
@@ -139,5 +156,5 @@ public class SplitStringIntoWords implements StreamTask {
 }
 {% endhighlight %}
 
-For more details on APIs, please refer to 
[Configuration](../jobs/configuration-table.html) and [Javadocs](javadocs)
+For AsyncStreamTask example, follow the tutorial in [Samza Async API and 
Multithreading User 
Guide](../../../tutorials/{{site.version}}/samza-async-user-guide.html). For 
more details on APIs, please refer to 
[Configuration](../jobs/configuration-table.html) and [Javadocs](javadocs).
 ## [SamzaContainer &raquo;](../container/samza-container.html)

http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/learn/documentation/versioned/container/event-loop.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/event-loop.md 
b/docs/learn/documentation/versioned/container/event-loop.md
index 9dcf92c..13a5ea8 100644
--- a/docs/learn/documentation/versioned/container/event-loop.md
+++ b/docs/learn/documentation/versioned/container/event-loop.md
@@ -19,11 +19,13 @@ title: Event Loop
    limitations under the License.
 -->
 
-The event loop is the [container](samza-container.html)'s single thread that 
is in charge of [reading and writing messages](streams.html), [flushing 
metrics](metrics.html), [checkpointing](checkpointing.html), and 
[windowing](windowing.html).
+The event loop orchestrates [reading and processing messages](streams.html), 
[checkpointing](checkpointing.html), [windowing](windowing.html) and [flushing 
metrics](metrics.html) among tasks. 
 
-Samza uses a single thread because every container is designed to use a single 
CPU core; to get more parallelism, simply run more containers. This uses a bit 
more memory than multithreaded parallelism, because each JVM has some overhead, 
but it simplifies resource management and improves isolation between jobs. This 
helps Samza jobs run reliably on a multitenant cluster, where many different 
jobs written by different people are running at the same time.
+By default Samza uses a single thread in each 
[container](samza-container.html) to run the tasks. This fits CPU-bound jobs 
well; to get more CPU processors, simply add more containers. The single thread 
execution also simplifies sharing task state and resource management.
 
-You are strongly discouraged from using threads in your job's code. Samza uses 
multiple threads internally for communicating with input and output streams, 
but all message processing and user code runs on a single-threaded event loop. 
In general, Samza is not thread-safe.
+For IO-bound jobs, Samza supports finer-grained parallelism for both 
synchronous and asynchronous tasks. For synchronous tasks 
([StreamTask](../api/javadocs/org/apache/samza/task/StreamTask.html) and 
[WindowableTask](../api/javadocs/org/apache/samza/task/WindowableTask.html)), 
you can schedule them to run in parallel by configuring the build-in thread 
pool [job.container.thread.pool.size](../jobs/configuration-table.html). This 
fits the blocking-IO task scenario. For asynchronous tasks 
([AsyncStreamTask](../api/javadocs/org/apache/samza/task/AsyncStreamTask.html)),
 you can make async IO calls and trigger callbacks upon completion. The finest 
degree of parallelism Samza provides is within a task, and is configured by 
[task.max.concurrency](../jobs/configuration-table.html).
+
+The latest version of Samza is thread-safe. You can safely access your job’s 
state in [key-value store](state-management.html), write messages and 
checkpoint offset in the task threads. If you have other data shared among 
tasks, such as global variables or static data, it is not thread safe if the 
data can be accessed concurrently by multiple threads, e.g. StreamTask running 
in the configured thread pool with more than one threads. For states within a 
task, such as member variables, Samza guarantees the mutual exclusiveness of 
process, window and commit so there will be no concurrent modifications among 
these operations and any state change from one operation will be fully visible 
to the others.     
 
 ### Event Loop Internals
 
@@ -31,17 +33,35 @@ A container may have multiple 
[SystemConsumers](../api/javadocs/org/apache/samza
 
 The event loop works as follows:
 
-1. Take a message from the incoming message queue;
-2. Give the message to the appropriate [task instance](samza-container.html) 
by calling process() on it;
-3. Call window() on the task instance if it implements 
[WindowableTask](../api/javadocs/org/apache/samza/task/WindowableTask.html), 
and the window time has expired;
+1. Choose a message from the incoming message queue;
+2. Schedule the appropriate [task instance](samza-container.html) to process 
the message;
+3. Schedule window() on the task instance to run if it implements 
WindowableTask, and the window timer has been triggered;
 4. Send any output from the process() and window() calls to the appropriate 
[SystemProducers](../api/javadocs/org/apache/samza/system/SystemProducer.html);
-5. Write checkpoints for any tasks whose [commit interval](checkpointing.html) 
has elapsed.
+5. Write checkpoints and flush the state stores for any tasks whose [commit 
interval](checkpointing.html) has elapsed.
+6. Block if all task instances are busy with processing outstanding messages, 
windowing or checkpointing.
+
+The container does this, in a loop, until it is shut down.
+
+### Semantics for Synchronous Tasks v.s. Asynchronous Tasks
+
+The semantics of the event loop differs when running synchronous tasks and 
asynchronous tasks:
+
+* For synchronous tasks (StreamTask and WindowableTask), process() and 
window() will run on the single main thread by default. You can configure 
job.container.thread.pool.size to be greater than 1, and event loop will 
schedule the process() and window() to run in the thread pool.  
+* For Asynchronous tasks (AsyncStreamTask), processAsync() will always be 
invoked in a single thread, while callbacks can be triggered from a different 
user thread. 
+
+In both cases, the default concurrency within a task is 1, meaning at most one 
outstanding message in processing per task. This guarantees in-order message 
processing in a topic partition. You can further increase it by configuring 
task.max.concurrency to be greater than 1. This allows multiple outstanding 
messages to be processed in parallel by a task. This option increases the 
parallelism within a task, but may result in out-of-order processing and 
completion.
+
+The following semantics are guaranteed in any of the above cases (for 
happens-before semantics, see 
[here](https://docs.oracle.com/javase/tutorial/essential/concurrency/memconsist.html)):
+
+* If task.max.concurrency = 1, each message process completion in a task is 
guaranteed to happen-before the next invocation of process()/processAsync() of 
the same task. If task.max.concurrency > 1, there is no such happens-before 
constraint and user should synchronize access to any shared/global variables in 
the Task..
+* WindowableTask.window() is called when no invocations to 
process()/processAsync() are pending and no new process()/processAsync() 
invocations can be scheduled until it completes. Therefore, a guarantee that 
all previous process()/processAsync() invocations happen before an invocation 
of WindowableTask.window(). An invocation to WindowableTask.window() is 
guaranteed to happen-before any subsequent process()/processAsync() 
invocations. The Samza engine is responsible for ensuring that window is 
invoked in a timely manner.
+* Checkpointing is guaranteed to only cover events that are fully processed. 
It happens only when there are no pending process()/processAsync() or 
WindowableTask.window() invocations. All preceding invocations happen-before 
checkpointing and checkpointing happens-before all subsequent invocations.
 
-The container does this, in a loop, until it is shut down. Note that although 
there can be multiple task instances within a container (depending on the 
number of input stream partitions), their process() and window() methods are 
all called on the same thread, never concurrently on different threads.
+More details and examples can be found in [Samza Async API and Multithreading 
User Guide](../../../tutorials/{{site.version}}/samza-async-user-guide.html).
 
 ### Lifecycle
 
-The only way in which a developer can hook into a SamzaContainer's lifecycle 
is through the standard InitableTask, ClosableTask, StreamTask, and 
WindowableTask. In cases where pluggable logic needs to be added to wrap a 
StreamTask, the StreamTask can be wrapped by another StreamTask implementation 
that handles the custom logic before calling into the wrapped StreamTask.
+The only way in which a developer can hook into a SamzaContainer's lifecycle 
is through the standard InitableTask, ClosableTask, StreamTask/AsyncStreamTask, 
and WindowableTask. In cases where pluggable logic needs to be added to wrap a 
StreamTask, the StreamTask can be wrapped by another StreamTask implementation 
that handles the custom logic before calling into the wrapped StreamTask.
 
 A concrete example is a set of StreamTasks that all want to share the same 
try/catch logic in their process() method. A StreamTask can be implemented that 
wraps the original StreamTasks, and surrounds the original process() call with 
the appropriate try/catch logic. For more details, see [this 
discussion](https://issues.apache.org/jira/browse/SAMZA-437).
 

http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 14945e2..1d16f52 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -367,6 +367,22 @@
                 </tr>
 
                 <tr>
+                    <td class="property" 
id="job-container-single-thread-mode">job.container.single.thread.mode</td>
+                    <td class="default">false</td>
+                    <td class="description">
+                        If set to true, samza will fallback to legacy 
single-threaded event loop. Default is false, which enables the <a 
href="../container/event-loop.html">multithreading execution</a>.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" 
id="job-container-thread-pool-size">job.container.thread.pool.size</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        If configured, the container thread pool will be used 
to run synchronous operations of each task in parallel. The operations include 
StreamTask.process(), WindowableTask.window(), and internally Task.commit(). 
Note that the thread pool is not applicable to AsyncStremTask.processAsync(). 
The size should always be greater than zero. If not configured, all task 
operations will run in a single thread.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" 
id="job-host_affinity-enabled">job.host-affinity.enabled</td>
                     <td class="default">false</td>
                     <td class="description">
@@ -377,7 +393,6 @@
                     </td>
                 </tr>
 
-
                 <tr>
                     <th colspan="3" class="section" id="task"><a 
href="../api/overview.html">Task configuration</a></th>
                 </tr>
@@ -388,7 +403,9 @@
                     <td class="description">
                         <strong>Required:</strong> The fully-qualified name of 
the Java class which processes
                         incoming messages from input streams. The class must 
implement
-                        <a 
href="../api/javadocs/org/apache/samza/task/StreamTask.html">StreamTask</a>, 
and may optionally implement
+                        <a 
href="../api/javadocs/org/apache/samza/task/StreamTask.html">StreamTask</a> or
+                        <a 
href="../api/javadocs/org/apache/samza/task/AsyncStreamTask.html">AsyncStreamTask</a>,
+                        and may optionally implement
                         <a 
href="../api/javadocs/org/apache/samza/task/InitableTask.html">InitableTask</a>,
                         <a 
href="../api/javadocs/org/apache/samza/task/ClosableTask.html">ClosableTask</a> 
and/or
                         <a 
href="../api/javadocs/org/apache/samza/task/WindowableTask.html">WindowableTask</a>.
@@ -634,6 +651,28 @@
                 </tr>
 
                 <tr>
+                    <td class="property" 
id="task-max-concurrency">task.max.concurrency</td>
+                    <td class="default">1</td>
+                    <td class="description">
+                        Max number of outstanding messages being processed per 
task at a time, and it’s applicable to both StreamTask and AsyncStreamTask. 
The values can be:
+                        <dl>
+                            <dt><code>1</code></dt>
+                            <dd>Each task processes one message at a time. 
Next message will wait until the current message process completes. This 
ensures strict in-order processing.</dd>
+                            <dt><code>&gt1</code></dt>
+                            <dd>Multiple outstanding messages are allowed to 
be processed per task at a time. The completion can be out of order. This 
option increases the parallelism within a task, but may result in out-of-order 
processing.</dd>
+                        </dl>
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" 
id="task-callback-timeout-ms">task.callback.timeout.ms</td>
+                    <td class="default"></td>
+                    <td class="description">
+                        This property is for AsyncStreamTask only. It defines 
the max time interval from processAsync() to callback is fired. When the 
timeout happens, it will throw a TaskCallbackTimeoutException and shut down the 
container. Default is no timeout.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="streams"><a 
href="../container/streams.html">Systems (input and output streams)</a></th>
                 </tr>
 

http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/learn/tutorials/versioned/index.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/index.md 
b/docs/learn/tutorials/versioned/index.md
index ca2b08f..6d6295f 100644
--- a/docs/learn/tutorials/versioned/index.md
+++ b/docs/learn/tutorials/versioned/index.md
@@ -31,6 +31,8 @@ title: Tutorials
 
 [Getting Started with Samza REST](samza-rest-getting-started.html)
 
+[Samza Async API and Multithreading User Guide](samza-async-user-guide.html)
+
 <!-- TODO a bunch of tutorials
 [Log Walkthrough](log-walkthrough.html)
 <a href="configuring-kafka-system.html">Configuring a Kafka System</a><br/>

http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/learn/tutorials/versioned/samza-async-user-guide.md
----------------------------------------------------------------------
diff --git a/docs/learn/tutorials/versioned/samza-async-user-guide.md 
b/docs/learn/tutorials/versioned/samza-async-user-guide.md
new file mode 100644
index 0000000..30865a8
--- /dev/null
+++ b/docs/learn/tutorials/versioned/samza-async-user-guide.md
@@ -0,0 +1,127 @@
+---
+layout: page
+title: Samza Async API and Multithreading User Guide
+---
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+This tutorial provides examples and guide to use Samza asynchronous API and 
multithreading.
+
+### Synchronous Process with Multithreading
+
+If your job process involves synchronous IO, or blocking IO, you can simply 
configure the Samza build-in thread pool to run your tasks in parallel. In the 
following example, SyncRestTask uses Jersey client to makes rest calls in each 
process().
+
+{% highlight java %}
+public class SyncRestTask implements StreamTask, InitableTask, ClosableTask {
+  private Client client;
+  private WebTarget target;
+
+  @Override
+  public void init(Config config, TaskContext taskContext) throws Exception {
+    client = ClientBuilder.newClient();
+    target = client.target("http://example.com/resource/";).path("hello");
+  }
+
+  @Override
+  public void process(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator) {
+    Response response = target.request().get();
+    System.out.println("Response status code " + response.getStatus() + " 
received.");
+  }
+
+  @Override
+  public void close() throws Exception {
+    client.close();
+  }
+}
+{% endhighlight %}
+
+By default Samza will run this task sequentially in a single thread. In below 
we configure the thread pool of size 16 to run the tasks in parallel:
+
+{% highlight jproperties %}
+# Thread pool to run synchronous tasks in parallel.
+job.container.thread.pool.size=16
+{% endhighlight %}
+
+**NOTE:** The thread pool will be used to run all the synchronous operations 
of a task, including StreamTask.process(), WindowableTask.window(), and 
internally Task.commit(). This is for maximizing the parallelism between tasks 
as well as reducing the blocking time. When running tasks in multithreading, 
Samza still guarantees the in-order processing of the messages within a task by 
default.
+
+### Asynchronous Process with AsyncStreamTask API
+
+If your job process is asynchronous, e.g. making non-blocking remote IO calls, 
[AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) 
interface provides the support for it. In the following example AsyncRestTask 
makes asynchronous rest call and triggers callback once it's complete. 
+
+{% highlight java %}
+public class AsyncRestTask implements AsyncStreamTask, InitableTask, 
ClosableTask {
+  private Client client;
+  private WebTarget target;
+
+  @Override
+  public void init(Config config, TaskContext taskContext) throws Exception {
+    client = ClientBuilder.newClient();
+    target = client.target("http://example.com/resource/";).path("hello");
+  }
+
+  @Override
+  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector 
collector,
+      TaskCoordinator coordinator, final TaskCallback callback) {
+    target.request().async().get(new InvocationCallback<Response>() {
+      @Override
+      public void completed(Response response) {
+        System.out.println("Response status code " + response.getStatus() + " 
received.");
+        callback.complete();
+      }
+
+      @Override
+      public void failed(Throwable throwable) {
+        System.out.println("Invocation failed.");
+        callback.failure(throwable);
+      }
+    });
+  }
+
+  @Override
+  public void close() throws Exception {
+    client.close();
+  }
+}
+{% endhighlight %}
+
+In the above example, the process is not complete when processAsync() returns. 
In the callback thread from Jersey client, we trigger 
[TaskCallback](javadocs/org/apache/samza/task/TaskCallback.html) to indicate 
the process is done. In order to make sure the callback will be triggered 
within certain time interval, e.g. 5 seconds, you can config the following 
property:
+
+{% highlight jproperties %}
+# Timeout for processAsync() callback. When the timeout happens, it will throw 
a TaskCallbackTimeoutException and shut down the container.
+task.callback.timeout.ms=5000
+{% endhighlight %}
+
+**NOTE:** Samza also guarantees the in-order process of the messages within an 
AsyncStreamTask by default, meaning the next processAsync() of a task won't be 
called until the previous processAsync() callback has been triggered.
+
+### Out-of-order Process
+
+In both cases above, Samza supports in-order process by default. Further 
parallelism is also supported by allowing a task to process multiple 
outstanding messages in parallel. The following config allows one task to 
process at most 4 outstanding messages in parallel at a time: 
+
+{% highlight jproperties %}
+# Max number of outstanding messages being processed per task at a time, 
applicable to both StreamTask and AsyncStreamTask.
+task.max.concurrency=4
+{% endhighlight %}
+
+**NOTE:** In case of AsyncStreamTask, processAsync() is still invoked in the 
order of the message arrivals, but the completion can go out of order. In case 
of StreamTask with multithreading, process() can run out-of-order since they 
are dispatched to a thread pool. This option should **NOT** be used when strict 
ordering of the output is required.
+
+### Guaranteed Semantics
+
+In any of the scenarios, Samza guarantees the following semantics:
+
+* Samza is thead-safe. You can safely access your job’s state in key-value 
store, write messages and checkpoint offset in the task threads. If you have 
other data shared among tasks, such as global variables or static data, it is 
not thread safe if the data can be accessed concurrently by multiple threads, 
e.g. StreamTask running in the configured thread pool with more than one 
threads. For states within a task, such as member variables, Samza guarantees 
the mutual exclusiveness of process, window and commit so there will be no 
concurrent modifications among these operations and any state change from one 
operation will be fully visible to the others.
+* WindowableTask.window is called when no outstanding process/processAsync and 
no new process/processAsync invocations can be scheduled until it completes. 
The Samza engine is responsible for ensuring that window is invoked in a timely 
manner.
+* Checkpointing is guaranteed to only cover events that are fully processed. 
It is persisted in commit() method.
\ No newline at end of file

Reply via email to