Repository: samza
Updated Branches:
  refs/heads/master 3902e2047 -> f493f5b9e


SAMZA-1004: Fix some logging and javadoc issues for AsyncStreamTask


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f493f5b9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f493f5b9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f493f5b9

Branch: refs/heads/master
Commit: f493f5b9e7b1537eaf3cadfa3ec5fca6804a7f79
Parents: 3902e20
Author: Xinyu Liu <[email protected]>
Authored: Wed Sep 7 15:29:37 2016 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Wed Sep 7 15:29:37 2016 -0700

----------------------------------------------------------------------
 .../src/main/java/org/apache/samza/task/AsyncStreamTask.java | 8 ++++----
 .../scala/org/apache/samza/container/SamzaContainer.scala    | 4 +++-
 2 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f493f5b9/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java
index 684ba0b..de68c79 100644
--- a/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java
+++ b/samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java
@@ -22,7 +22,7 @@ package org.apache.samza.task;
 import org.apache.samza.system.IncomingMessageEnvelope;
 
 /**
- * An AsyncStreamTask is the basic class to support multithreading execution 
in Samza container. It’s provided for better
+ * An AsyncStreamTask is the basic class to support multithreading execution 
in Samza container. It's provided for better
  * parallelism and resource utilization. This class allows task to make 
asynchronous calls and fire callbacks upon completion.
  * Similar to {@link StreamTask}, an AsyncStreamTask may be augmented by 
implementing other interfaces, such as
  * {@link InitableTask}, {@link WindowableTask}, or {@link ClosableTask}. The 
following invariants hold with these mix-ins:
@@ -33,8 +33,8 @@ import org.apache.samza.system.IncomingMessageEnvelope;
  * CloseableTask.close - always the last method invoked on an AsyncStreamTask 
and all other AsyncStreamTask are guaranteed
  * to happen-before it.
  *
- * AsyncStreamTask.processAsync - can run in either a serialized or parallel 
mode. In the serialized mode (task.process.max.inflight.messages=1),
- * each invocation of processAsync is guaranteed to happen-before the next. In 
a parallel execution mode (task.process.max.inflight.messages&gt;1),
+ * AsyncStreamTask.processAsync - can run in either a serialized or parallel 
mode. In the serialized mode (task.max.concurrency=1),
+ * each invocation of processAsync is guaranteed to happen-before the next. In 
a parallel execution mode (task.max.concurrency&gt;1),
  * there is no such happens-before constraint and the AsyncStreamTask is 
required to coordinate any shared state.
  *
  * WindowableTask.window - in either above mode, it is called when no 
invocations to processAsync are pending and no new
@@ -57,4 +57,4 @@ public interface AsyncStreamTask {
    * @param callback Triggers the completion of the process.
    */
   void processAsync(IncomingMessageEnvelope envelope, MessageCollector 
collector, TaskCoordinator coordinator, TaskCallback callback);
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f493f5b9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index f786fc0..4ab4bce 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -222,7 +222,9 @@ object SamzaContainer extends Logging {
     info("Got system consumers: %s" format consumers.keys)
 
     val isAsyncTask = 
classOf[AsyncStreamTask].isAssignableFrom(Class.forName(taskClassName))
-    info("%s is AsyncStreamTask" format taskClassName)
+    if (isAsyncTask) {
+      info("%s is AsyncStreamTask" format taskClassName)
+    }
 
     val producers = systemFactories
       .map {

Reply via email to