Repository: samza Updated Branches: refs/heads/master 3228ed9c4 -> 40ffe4ea5
SAMZA-977: User doc for samza multi-threading Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/40ffe4ea Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/40ffe4ea Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/40ffe4ea Branch: refs/heads/master Commit: 40ffe4ea5d45f048e12849ca3ac98325bbd9e99b Parents: 3228ed9 Author: Xinyu Liu <xi...@linkedin.com> Authored: Thu Sep 22 16:44:30 2016 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Thu Sep 22 16:45:34 2016 -0700 ---------------------------------------------------------------------- docs/Gemfile.lock | 2 +- .../documentation/versioned/api/overview.md | 21 ++- .../versioned/container/event-loop.md | 38 ++++-- .../versioned/jobs/configuration-table.html | 43 ++++++- docs/learn/tutorials/versioned/index.md | 2 + .../versioned/samza-async-user-guide.md | 127 +++++++++++++++++++ 6 files changed, 219 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/Gemfile.lock ---------------------------------------------------------------------- diff --git a/docs/Gemfile.lock b/docs/Gemfile.lock index 8a236e6..8c75832 100644 --- a/docs/Gemfile.lock +++ b/docs/Gemfile.lock @@ -48,7 +48,7 @@ GEM rb-inotify (0.9.5) ffi (>= 0.5.0) redcarpet (3.1.2) - safe_yaml (1.0.3) + safe_yaml (1.0.4) sass (3.3.8) timers (1.1.0) toml (0.1.1) http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/learn/documentation/versioned/api/overview.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/api/overview.md b/docs/learn/documentation/versioned/api/overview.md index 67e259c..d589bc6 100644 --- a/docs/learn/documentation/versioned/api/overview.md +++ b/docs/learn/documentation/versioned/api/overview.md @@ -19,7 +19,7 @@ title: API Overview limitations under the License. --> -When writing a stream processor for Samza, you must implement the [StreamTask](javadocs/org/apache/samza/task/StreamTask.html) interface: +When writing a stream processor for Samza, you must implement either [StreamTask](javadocs/org/apache/samza/task/StreamTask.html) or [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) interface. You should implement StreamTask for synchronous process, where the message processing is complete after the *process* method returns. An example of StreamTask is a computation that does not involve remote calls: {% highlight java %} package com.example.samza; @@ -34,6 +34,23 @@ public class MyTaskClass implements StreamTask { } {% endhighlight %} +The AsyncSteamTask interface, on the other hand, supports asynchronous process, where the message processing may not be complete after the *processAsync* method returns. Various concurrent libraries like Java NIO, ParSeq and Akka can be used here to make asynchronous calls, and the completion is marked by invoking the [TaskCallback](javadocs/org/apache/samza/task/TaskCallback.html). Samza will continue to process next message or shut down the container based on the callback status. An example of AsyncStreamTask is a computation that make remote calls but don't block on the call completion: + +{% highlight java %} +package com.example.samza; + +public class MyAsyncTaskClass implements AsyncStreamTask { + + public void processAsync(IncomingMessageEnvelope envelope, + MessageCollector collector, + TaskCoordinator coordinator, + TaskCallback callback) { + // process message with asynchronous calls + // fire callback upon completion, e.g. invoking callback from asynchronous call completion thread + } +} +{% endhighlight %} + When you run your job, Samza will create several instances of your class (potentially on multiple machines). These task instances process the messages in the input streams. In your job's configuration you can tell Samza which streams you want to consume. An incomplete example could look like this (see the [configuration documentation](../jobs/configuration.html) for more detail): @@ -139,5 +156,5 @@ public class SplitStringIntoWords implements StreamTask { } {% endhighlight %} -For more details on APIs, please refer to [Configuration](../jobs/configuration-table.html) and [Javadocs](javadocs) +For AsyncStreamTask example, follow the tutorial in [Samza Async API and Multithreading User Guide](../../../tutorials/{{site.version}}/samza-async-user-guide.html). For more details on APIs, please refer to [Configuration](../jobs/configuration-table.html) and [Javadocs](javadocs). ## [SamzaContainer »](../container/samza-container.html) http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/learn/documentation/versioned/container/event-loop.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/event-loop.md b/docs/learn/documentation/versioned/container/event-loop.md index 9dcf92c..13a5ea8 100644 --- a/docs/learn/documentation/versioned/container/event-loop.md +++ b/docs/learn/documentation/versioned/container/event-loop.md @@ -19,11 +19,13 @@ title: Event Loop limitations under the License. --> -The event loop is the [container](samza-container.html)'s single thread that is in charge of [reading and writing messages](streams.html), [flushing metrics](metrics.html), [checkpointing](checkpointing.html), and [windowing](windowing.html). +The event loop orchestrates [reading and processing messages](streams.html), [checkpointing](checkpointing.html), [windowing](windowing.html) and [flushing metrics](metrics.html) among tasks. -Samza uses a single thread because every container is designed to use a single CPU core; to get more parallelism, simply run more containers. This uses a bit more memory than multithreaded parallelism, because each JVM has some overhead, but it simplifies resource management and improves isolation between jobs. This helps Samza jobs run reliably on a multitenant cluster, where many different jobs written by different people are running at the same time. +By default Samza uses a single thread in each [container](samza-container.html) to run the tasks. This fits CPU-bound jobs well; to get more CPU processors, simply add more containers. The single thread execution also simplifies sharing task state and resource management. -You are strongly discouraged from using threads in your job's code. Samza uses multiple threads internally for communicating with input and output streams, but all message processing and user code runs on a single-threaded event loop. In general, Samza is not thread-safe. +For IO-bound jobs, Samza supports finer-grained parallelism for both synchronous and asynchronous tasks. For synchronous tasks ([StreamTask](../api/javadocs/org/apache/samza/task/StreamTask.html) and [WindowableTask](../api/javadocs/org/apache/samza/task/WindowableTask.html)), you can schedule them to run in parallel by configuring the build-in thread pool [job.container.thread.pool.size](../jobs/configuration-table.html). This fits the blocking-IO task scenario. For asynchronous tasks ([AsyncStreamTask](../api/javadocs/org/apache/samza/task/AsyncStreamTask.html)), you can make async IO calls and trigger callbacks upon completion. The finest degree of parallelism Samza provides is within a task, and is configured by [task.max.concurrency](../jobs/configuration-table.html). + +The latest version of Samza is thread-safe. You can safely access your jobâs state in [key-value store](state-management.html), write messages and checkpoint offset in the task threads. If you have other data shared among tasks, such as global variables or static data, it is not thread safe if the data can be accessed concurrently by multiple threads, e.g. StreamTask running in the configured thread pool with more than one threads. For states within a task, such as member variables, Samza guarantees the mutual exclusiveness of process, window and commit so there will be no concurrent modifications among these operations and any state change from one operation will be fully visible to the others. ### Event Loop Internals @@ -31,17 +33,35 @@ A container may have multiple [SystemConsumers](../api/javadocs/org/apache/samza The event loop works as follows: -1. Take a message from the incoming message queue; -2. Give the message to the appropriate [task instance](samza-container.html) by calling process() on it; -3. Call window() on the task instance if it implements [WindowableTask](../api/javadocs/org/apache/samza/task/WindowableTask.html), and the window time has expired; +1. Choose a message from the incoming message queue; +2. Schedule the appropriate [task instance](samza-container.html) to process the message; +3. Schedule window() on the task instance to run if it implements WindowableTask, and the window timer has been triggered; 4. Send any output from the process() and window() calls to the appropriate [SystemProducers](../api/javadocs/org/apache/samza/system/SystemProducer.html); -5. Write checkpoints for any tasks whose [commit interval](checkpointing.html) has elapsed. +5. Write checkpoints and flush the state stores for any tasks whose [commit interval](checkpointing.html) has elapsed. +6. Block if all task instances are busy with processing outstanding messages, windowing or checkpointing. + +The container does this, in a loop, until it is shut down. + +### Semantics for Synchronous Tasks v.s. Asynchronous Tasks + +The semantics of the event loop differs when running synchronous tasks and asynchronous tasks: + +* For synchronous tasks (StreamTask and WindowableTask), process() and window() will run on the single main thread by default. You can configure job.container.thread.pool.size to be greater than 1, and event loop will schedule the process() and window() to run in the thread pool. +* For Asynchronous tasks (AsyncStreamTask), processAsync() will always be invoked in a single thread, while callbacks can be triggered from a different user thread. + +In both cases, the default concurrency within a task is 1, meaning at most one outstanding message in processing per task. This guarantees in-order message processing in a topic partition. You can further increase it by configuring task.max.concurrency to be greater than 1. This allows multiple outstanding messages to be processed in parallel by a task. This option increases the parallelism within a task, but may result in out-of-order processing and completion. + +The following semantics are guaranteed in any of the above cases (for happens-before semantics, see [here](https://docs.oracle.com/javase/tutorial/essential/concurrency/memconsist.html)): + +* If task.max.concurrency = 1, each message process completion in a task is guaranteed to happen-before the next invocation of process()/processAsync() of the same task. If task.max.concurrency > 1, there is no such happens-before constraint and user should synchronize access to any shared/global variables in the Task.. +* WindowableTask.window() is called when no invocations to process()/processAsync() are pending and no new process()/processAsync() invocations can be scheduled until it completes. Therefore, a guarantee that all previous process()/processAsync() invocations happen before an invocation of WindowableTask.window(). An invocation to WindowableTask.window() is guaranteed to happen-before any subsequent process()/processAsync() invocations. The Samza engine is responsible for ensuring that window is invoked in a timely manner. +* Checkpointing is guaranteed to only cover events that are fully processed. It happens only when there are no pending process()/processAsync() or WindowableTask.window() invocations. All preceding invocations happen-before checkpointing and checkpointing happens-before all subsequent invocations. -The container does this, in a loop, until it is shut down. Note that although there can be multiple task instances within a container (depending on the number of input stream partitions), their process() and window() methods are all called on the same thread, never concurrently on different threads. +More details and examples can be found in [Samza Async API and Multithreading User Guide](../../../tutorials/{{site.version}}/samza-async-user-guide.html). ### Lifecycle -The only way in which a developer can hook into a SamzaContainer's lifecycle is through the standard InitableTask, ClosableTask, StreamTask, and WindowableTask. In cases where pluggable logic needs to be added to wrap a StreamTask, the StreamTask can be wrapped by another StreamTask implementation that handles the custom logic before calling into the wrapped StreamTask. +The only way in which a developer can hook into a SamzaContainer's lifecycle is through the standard InitableTask, ClosableTask, StreamTask/AsyncStreamTask, and WindowableTask. In cases where pluggable logic needs to be added to wrap a StreamTask, the StreamTask can be wrapped by another StreamTask implementation that handles the custom logic before calling into the wrapped StreamTask. A concrete example is a set of StreamTasks that all want to share the same try/catch logic in their process() method. A StreamTask can be implemented that wraps the original StreamTasks, and surrounds the original process() call with the appropriate try/catch logic. For more details, see [this discussion](https://issues.apache.org/jira/browse/SAMZA-437). http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 14945e2..1d16f52 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -367,6 +367,22 @@ </tr> <tr> + <td class="property" id="job-container-single-thread-mode">job.container.single.thread.mode</td> + <td class="default">false</td> + <td class="description"> + If set to true, samza will fallback to legacy single-threaded event loop. Default is false, which enables the <a href="../container/event-loop.html">multithreading execution</a>. + </td> + </tr> + + <tr> + <td class="property" id="job-container-thread-pool-size">job.container.thread.pool.size</td> + <td class="default"></td> + <td class="description"> + If configured, the container thread pool will be used to run synchronous operations of each task in parallel. The operations include StreamTask.process(), WindowableTask.window(), and internally Task.commit(). Note that the thread pool is not applicable to AsyncStremTask.processAsync(). The size should always be greater than zero. If not configured, all task operations will run in a single thread. + </td> + </tr> + + <tr> <td class="property" id="job-host_affinity-enabled">job.host-affinity.enabled</td> <td class="default">false</td> <td class="description"> @@ -377,7 +393,6 @@ </td> </tr> - <tr> <th colspan="3" class="section" id="task"><a href="../api/overview.html">Task configuration</a></th> </tr> @@ -388,7 +403,9 @@ <td class="description"> <strong>Required:</strong> The fully-qualified name of the Java class which processes incoming messages from input streams. The class must implement - <a href="../api/javadocs/org/apache/samza/task/StreamTask.html">StreamTask</a>, and may optionally implement + <a href="../api/javadocs/org/apache/samza/task/StreamTask.html">StreamTask</a> or + <a href="../api/javadocs/org/apache/samza/task/AsyncStreamTask.html">AsyncStreamTask</a>, + and may optionally implement <a href="../api/javadocs/org/apache/samza/task/InitableTask.html">InitableTask</a>, <a href="../api/javadocs/org/apache/samza/task/ClosableTask.html">ClosableTask</a> and/or <a href="../api/javadocs/org/apache/samza/task/WindowableTask.html">WindowableTask</a>. @@ -634,6 +651,28 @@ </tr> <tr> + <td class="property" id="task-max-concurrency">task.max.concurrency</td> + <td class="default">1</td> + <td class="description"> + Max number of outstanding messages being processed per task at a time, and itâs applicable to both StreamTask and AsyncStreamTask. The values can be: + <dl> + <dt><code>1</code></dt> + <dd>Each task processes one message at a time. Next message will wait until the current message process completes. This ensures strict in-order processing.</dd> + <dt><code>>1</code></dt> + <dd>Multiple outstanding messages are allowed to be processed per task at a time. The completion can be out of order. This option increases the parallelism within a task, but may result in out-of-order processing.</dd> + </dl> + </td> + </tr> + + <tr> + <td class="property" id="task-callback-timeout-ms">task.callback.timeout.ms</td> + <td class="default"></td> + <td class="description"> + This property is for AsyncStreamTask only. It defines the max time interval from processAsync() to callback is fired. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container. Default is no timeout. + </td> + </tr> + + <tr> <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th> </tr> http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/learn/tutorials/versioned/index.md ---------------------------------------------------------------------- diff --git a/docs/learn/tutorials/versioned/index.md b/docs/learn/tutorials/versioned/index.md index ca2b08f..6d6295f 100644 --- a/docs/learn/tutorials/versioned/index.md +++ b/docs/learn/tutorials/versioned/index.md @@ -31,6 +31,8 @@ title: Tutorials [Getting Started with Samza REST](samza-rest-getting-started.html) +[Samza Async API and Multithreading User Guide](samza-async-user-guide.html) + <!-- TODO a bunch of tutorials [Log Walkthrough](log-walkthrough.html) <a href="configuring-kafka-system.html">Configuring a Kafka System</a><br/> http://git-wip-us.apache.org/repos/asf/samza/blob/40ffe4ea/docs/learn/tutorials/versioned/samza-async-user-guide.md ---------------------------------------------------------------------- diff --git a/docs/learn/tutorials/versioned/samza-async-user-guide.md b/docs/learn/tutorials/versioned/samza-async-user-guide.md new file mode 100644 index 0000000..30865a8 --- /dev/null +++ b/docs/learn/tutorials/versioned/samza-async-user-guide.md @@ -0,0 +1,127 @@ +--- +layout: page +title: Samza Async API and Multithreading User Guide +--- +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +This tutorial provides examples and guide to use Samza asynchronous API and multithreading. + +### Synchronous Process with Multithreading + +If your job process involves synchronous IO, or blocking IO, you can simply configure the Samza build-in thread pool to run your tasks in parallel. In the following example, SyncRestTask uses Jersey client to makes rest calls in each process(). + +{% highlight java %} +public class SyncRestTask implements StreamTask, InitableTask, ClosableTask { + private Client client; + private WebTarget target; + + @Override + public void init(Config config, TaskContext taskContext) throws Exception { + client = ClientBuilder.newClient(); + target = client.target("http://example.com/resource/").path("hello"); + } + + @Override + public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { + Response response = target.request().get(); + System.out.println("Response status code " + response.getStatus() + " received."); + } + + @Override + public void close() throws Exception { + client.close(); + } +} +{% endhighlight %} + +By default Samza will run this task sequentially in a single thread. In below we configure the thread pool of size 16 to run the tasks in parallel: + +{% highlight jproperties %} +# Thread pool to run synchronous tasks in parallel. +job.container.thread.pool.size=16 +{% endhighlight %} + +**NOTE:** The thread pool will be used to run all the synchronous operations of a task, including StreamTask.process(), WindowableTask.window(), and internally Task.commit(). This is for maximizing the parallelism between tasks as well as reducing the blocking time. When running tasks in multithreading, Samza still guarantees the in-order processing of the messages within a task by default. + +### Asynchronous Process with AsyncStreamTask API + +If your job process is asynchronous, e.g. making non-blocking remote IO calls, [AsyncStreamTask](javadocs/org/apache/samza/task/AsyncStreamTask.html) interface provides the support for it. In the following example AsyncRestTask makes asynchronous rest call and triggers callback once it's complete. + +{% highlight java %} +public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask { + private Client client; + private WebTarget target; + + @Override + public void init(Config config, TaskContext taskContext) throws Exception { + client = ClientBuilder.newClient(); + target = client.target("http://example.com/resource/").path("hello"); + } + + @Override + public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, + TaskCoordinator coordinator, final TaskCallback callback) { + target.request().async().get(new InvocationCallback<Response>() { + @Override + public void completed(Response response) { + System.out.println("Response status code " + response.getStatus() + " received."); + callback.complete(); + } + + @Override + public void failed(Throwable throwable) { + System.out.println("Invocation failed."); + callback.failure(throwable); + } + }); + } + + @Override + public void close() throws Exception { + client.close(); + } +} +{% endhighlight %} + +In the above example, the process is not complete when processAsync() returns. In the callback thread from Jersey client, we trigger [TaskCallback](javadocs/org/apache/samza/task/TaskCallback.html) to indicate the process is done. In order to make sure the callback will be triggered within certain time interval, e.g. 5 seconds, you can config the following property: + +{% highlight jproperties %} +# Timeout for processAsync() callback. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container. +task.callback.timeout.ms=5000 +{% endhighlight %} + +**NOTE:** Samza also guarantees the in-order process of the messages within an AsyncStreamTask by default, meaning the next processAsync() of a task won't be called until the previous processAsync() callback has been triggered. + +### Out-of-order Process + +In both cases above, Samza supports in-order process by default. Further parallelism is also supported by allowing a task to process multiple outstanding messages in parallel. The following config allows one task to process at most 4 outstanding messages in parallel at a time: + +{% highlight jproperties %} +# Max number of outstanding messages being processed per task at a time, applicable to both StreamTask and AsyncStreamTask. +task.max.concurrency=4 +{% endhighlight %} + +**NOTE:** In case of AsyncStreamTask, processAsync() is still invoked in the order of the message arrivals, but the completion can go out of order. In case of StreamTask with multithreading, process() can run out-of-order since they are dispatched to a thread pool. This option should **NOT** be used when strict ordering of the output is required. + +### Guaranteed Semantics + +In any of the scenarios, Samza guarantees the following semantics: + +* Samza is thead-safe. You can safely access your jobâs state in key-value store, write messages and checkpoint offset in the task threads. If you have other data shared among tasks, such as global variables or static data, it is not thread safe if the data can be accessed concurrently by multiple threads, e.g. StreamTask running in the configured thread pool with more than one threads. For states within a task, such as member variables, Samza guarantees the mutual exclusiveness of process, window and commit so there will be no concurrent modifications among these operations and any state change from one operation will be fully visible to the others. +* WindowableTask.window is called when no outstanding process/processAsync and no new process/processAsync invocations can be scheduled until it completes. The Samza engine is responsible for ensuring that window is invoked in a timely manner. +* Checkpointing is guaranteed to only cover events that are fully processed. It is persisted in commit() method. \ No newline at end of file