Repository: samza Updated Branches: refs/heads/master 3902e2047 -> f493f5b9e
SAMZA-1004: Fix some logging and javadoc issues for AsyncStreamTask Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f493f5b9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f493f5b9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f493f5b9 Branch: refs/heads/master Commit: f493f5b9e7b1537eaf3cadfa3ec5fca6804a7f79 Parents: 3902e20 Author: Xinyu Liu <[email protected]> Authored: Wed Sep 7 15:29:37 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Wed Sep 7 15:29:37 2016 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/samza/task/AsyncStreamTask.java | 8 ++++---- .../scala/org/apache/samza/container/SamzaContainer.scala | 4 +++- 2 files changed, 7 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/f493f5b9/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java index 684ba0b..de68c79 100644 --- a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java +++ b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java @@ -22,7 +22,7 @@ package org.apache.samza.task; import org.apache.samza.system.IncomingMessageEnvelope; /** - * An AsyncStreamTask is the basic class to support multithreading execution in Samza container. Itâs provided for better + * An AsyncStreamTask is the basic class to support multithreading execution in Samza container. It's provided for better * parallelism and resource utilization. This class allows task to make asynchronous calls and fire callbacks upon completion. * Similar to {@link StreamTask}, an AsyncStreamTask may be augmented by implementing other interfaces, such as * {@link InitableTask}, {@link WindowableTask}, or {@link ClosableTask}. The following invariants hold with these mix-ins: @@ -33,8 +33,8 @@ import org.apache.samza.system.IncomingMessageEnvelope; * CloseableTask.close - always the last method invoked on an AsyncStreamTask and all other AsyncStreamTask are guaranteed * to happen-before it. * - * AsyncStreamTask.processAsync - can run in either a serialized or parallel mode. In the serialized mode (task.process.max.inflight.messages=1), - * each invocation of processAsync is guaranteed to happen-before the next. In a parallel execution mode (task.process.max.inflight.messages>1), + * AsyncStreamTask.processAsync - can run in either a serialized or parallel mode. In the serialized mode (task.max.concurrency=1), + * each invocation of processAsync is guaranteed to happen-before the next. In a parallel execution mode (task.max.concurrency>1), * there is no such happens-before constraint and the AsyncStreamTask is required to coordinate any shared state. * * WindowableTask.window - in either above mode, it is called when no invocations to processAsync are pending and no new @@ -57,4 +57,4 @@ public interface AsyncStreamTask { * @param callback Triggers the completion of the process. */ void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/samza/blob/f493f5b9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index f786fc0..4ab4bce 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -222,7 +222,9 @@ object SamzaContainer extends Logging { info("Got system consumers: %s" format consumers.keys) val isAsyncTask = classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName)) - info("%s is AsyncStreamTask" format taskClassName) + if (isAsyncTask) { + info("%s is AsyncStreamTask" format taskClassName) + } val producers = systemFactories .map {
