http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala
index 7fcaccf..ddd2037 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/Stream.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,55 +18,62 @@
 
 package io.gearpump.streaming.dsl
 
+import scala.language.implicitConversions
+
+import org.slf4j.{Logger, LoggerFactory}
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.dsl.op._
 import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.task.{TaskContext, Task}
-import io.gearpump.util.{Graph, LogUtil}
-import org.slf4j.{Logger, LoggerFactory}
+import io.gearpump.streaming.task.{Task, TaskContext}
+import io.gearpump.util.Graph
 
-class Stream[T](private val graph: Graph[Op,OpEdge], private val thisNode:Op, 
private val edge: Option[OpEdge] = None) {
+class Stream[T](
+    private val graph: Graph[Op, OpEdge], private val thisNode: Op,
+    private val edge: Option[OpEdge] = None) {
 
   /**
-   * convert a value[T] to a list of value[R]
-   * @param fun function
-   * @param description the description message for this operation
-   * @param <R>  the result message type
-   * @return a new stream with type [R]
+   * converts a value[T] to a list of value[R]
+   *
+   * @param fun FlatMap function
+   * @param description The description message for this operation
+   * @return A new stream with type [R]
    */
   def flatMap[R](fun: T => TraversableOnce[R], description: String = null): 
Stream[R] = {
     val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap"))
-    graph.addVertex(flatMapOp )
+    graph.addVertex(flatMapOp)
     graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
     new Stream[R](graph, flatMapOp)
   }
 
   /**
-   * convert value[T] to value[R]
-   * @param fun function
-   * @param <R>  the result message type
-   * @return a new stream with type [R]
+   * Maps message of type T message of type R
+   *
+   * @param fun Function
+   * @return A new stream with type [R]
    */
   def map[R](fun: T => R, description: String = null): Stream[R] = {
-    this.flatMap ({ data =>
+    this.flatMap({ data =>
       Option(fun(data))
     }, Option(description).getOrElse("map"))
   }
 
   /**
-   * reserve records when fun(T) == true
+   * Keeps records when fun(T) == true
+   *
    * @param fun  the filter
    * @return  a new stream after filter
    */
   def filter(fun: T => Boolean, description: String = null): Stream[T] = {
-    this.flatMap ({ data =>
+    this.flatMap({ data =>
       if (fun(data)) Option(data) else None
     }, Option(description).getOrElse("filter"))
   }
 
   /**
-   * Reduce opeartion
+   * Reduces operations.
+   *
    * @param fun  reduction function
    * @param description description message for this operator
    * @return a new stream after reduction
@@ -86,7 +93,8 @@ class Stream[T](private val graph: Graph[Op,OpEdge], private 
val thisNode:Op, pr
   }
 
   /**
-   * Merge data from two stream into one
+   * Merges data from two stream into one
+   *
    * @param other the other stream
    * @return  the merged stream
    */
@@ -99,7 +107,7 @@ class Stream[T](private val graph: Graph[Op,OpEdge], private 
val thisNode:Op, pr
   }
 
   /**
-   * Group by fun(T)
+   * Group by function (T => Group)
    *
    * For example, we have T type, People(name: String, gender: String, age: 
Int)
    * groupBy[People](_.gender) will group the people by gender.
@@ -107,16 +115,17 @@ class Stream[T](private val graph: Graph[Op,OpEdge], 
private val thisNode:Op, pr
    * You can append other combinators after groupBy
    *
    * For example,
-   *
+   * {{{
    * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
+   * }}}
    *
-   * @param fun  group by function
-   * @param parallelism   parallelism level
-   * @param description  the description
-   * @param <Group>  the group type
+   * @param fun  Group by function
+   * @param parallelism  Parallelism level
+   * @param description  The description
    * @return  the grouped stream
    */
-  def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: 
String = null): Stream[T] = {
+  def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: 
String = null)
+    : Stream[T] = {
     val groupOp = GroupByOp(fun, parallelism, 
Option(description).getOrElse("groupBy"))
     graph.addVertex(groupOp)
     graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
@@ -124,33 +133,36 @@ class Stream[T](private val graph: Graph[Op,OpEdge], 
private val thisNode:Op, pr
   }
 
   /**
-   * connect with a low level Processor(TaskDescription)
+   * Connects with a low level Processor(TaskDescription)
+   *
    * @param processor  a user defined processor
    * @param parallelism  parallelism level
-   * @param <R>  the result message type
    * @return  new stream after processing with type [R]
    */
-  def process[R](processor: Class[_ <: Task], parallism: Int, conf: UserConfig 
= UserConfig.empty, description: String = null): Stream[R] = {
-    val processorOp = ProcessorOp(processor, parallism, conf, 
Option(description).getOrElse("process"))
+  def process[R](
+      processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = 
UserConfig.empty,
+      description: String = null): Stream[R] = {
+    val processorOp = ProcessorOp(processor, parallelism, conf,
+      Option(description).getOrElse("process"))
     graph.addVertex(processorOp)
     graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
     new Stream[R](graph, processorOp, Some(Shuffle))
   }
 }
 
-class KVStream[K, V](stream: Stream[Tuple2[K, V]]){
+class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
   /**
-   * Apply to Stream[Tuple2[K,V]]
-   * Group by the key of a KV tuple
-   * For (key, value) will groupby key
+   * GroupBy key
+   *
+   * Applies to Stream[Tuple2[K,V]]
+   *
    * @param parallelism  the parallelism for this operation
    * @return  the new KV stream
    */
-  def groupByKey(parallism: Int = 1): Stream[Tuple2[K, V]] = {
-    stream.groupBy(Stream.getTupleKey[K, V], parallism, "groupByKey")
+  def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
+    stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
   }
 
-
   /**
    * Sum the value of the tuples
    *
@@ -160,31 +172,39 @@ class KVStream[K, V](stream: Stream[Tuple2[K, V]]){
    * @param numeric  the numeric operations
    * @return  the sum stream
    */
-  def sum(implicit numeric: Numeric[V]) = {
+  def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
     stream.reduce(Stream.sumByValue[K, V](numeric), "sum")
   }
 }
 
 object Stream {
 
-  def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]) = new 
Stream[T](graph, node, edge)
+  def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): 
Stream[T] = {
+    new Stream[T](graph, node, edge)
+  }
 
   def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
 
   def sumByValue[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => 
Tuple2[K, V]
   = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
 
-  implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): 
KVStream[K, V] = new KVStream(stream)
+  implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): 
KVStream[K, V] = {
+    new KVStream(stream)
+  }
 
   implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
-    def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, 
description: String): Stream[T] = {
-      implicit val sink = DataSinkOp[T](dataSink, parallism, conf, 
Some(description).getOrElse("traversable"))
+    def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, 
description: String)
+      : Stream[T] = {
+      implicit val sink = DataSinkOp[T](dataSink, parallism, conf,
+        Some(description).getOrElse("traversable"))
       stream.graph.addVertex(sink)
       stream.graph.addEdge(stream.thisNode, Shuffle, sink)
       new Stream[T](stream.graph, sink)
     }
 
-    def sink[T](sink: Class[_ <: Task], parallism: Int, conf: UserConfig = 
UserConfig.empty, description: String = null): Stream[T] = {
+    def sink[T](
+        sink: Class[_ <: Task], parallism: Int, conf: UserConfig = 
UserConfig.empty,
+        description: String = null): Stream[T] = {
       val sinkOp = ProcessorOp(sink, parallism, conf, 
Option(description).getOrElse("source"))
       stream.graph.addVertex(sinkOp)
       stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp)
@@ -198,8 +218,7 @@ class LoggerSink[T] extends DataSink {
 
   private var context: TaskContext = null
 
-
-  override def open(context: TaskContext) = {
+  override def open(context: TaskContext): Unit = {
     this.logger = context.logger
   }
 
@@ -207,6 +226,5 @@ class LoggerSink[T] extends DataSink {
     logger.info("logging message " + message.msg)
   }
 
-  override def close() = Unit
-}
-
+  override def close(): Unit = Unit
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala
index cfcfed2..525000d 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/StreamApp.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,36 +18,40 @@
 
 package io.gearpump.streaming.dsl
 
-import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
+import scala.language.implicitConversions
+
+import akka.actor.ActorSystem
+
+import io.gearpump.cluster.UserConfig
+import io.gearpump.cluster.client.ClientContext
 import io.gearpump.streaming.StreamApplication
-import io.gearpump.streaming.dsl.op.{Shuffle, ProcessorOp, DataSourceOp, 
OpEdge, Op}
+import io.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp}
 import io.gearpump.streaming.dsl.plan.Planner
 import io.gearpump.streaming.source.DataSource
 import io.gearpump.streaming.task.{Task, TaskContext}
-import io.gearpump.cluster.UserConfig
-import io.gearpump.cluster.client.ClientContext
 import io.gearpump.util.Graph
 import io.gearpump.{Message, TimeStamp}
 
 /**
  * Example:
+ * {{{
+ * val data = "This is a good start, bingo!! bingo!!"
+ * app.fromCollection(data.lines.toList).
+ * // word => (word, count)
+ * flatMap(line => line.split("[\\s]+")).map((_, 1)).
+ * // (word, count1), (word, count2) => (word, count1 + count2)
+ * groupBy(kv => kv._1).reduce(sum(_, _))
  *
- *
-    val data = "This is a good start, bingo!! bingo!!"
-    app.fromCollection(data.lines.toList).
-      // word => (word, count)
-      flatMap(line => line.split("[\\s]+")).map((_, 1)).
-      // (word, count1), (word, count2) => (word, count1 + count2)
-      groupBy(kv => kv._1).reduce(sum(_, _))
-
-    val appId = context.submit(app)
-    context.close()
+ * val appId = context.submit(app)
+ * context.close()
+ * }}}
  *
  * @param name name of app
  */
-class StreamApp(val name: String, system: ActorSystem, userConfig: UserConfig, 
val graph: Graph[Op, OpEdge]) {
+class StreamApp(
+    val name: String, system: ActorSystem, userConfig: UserConfig, val graph: 
Graph[Op, OpEdge]) {
 
-  def this(name: String ,system: ActorSystem, userConfig: UserConfig) = {
+  def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
     this(name, system, userConfig, Graph.empty[Op, OpEdge])
   }
 
@@ -60,7 +64,10 @@ class StreamApp(val name: String, system: ActorSystem, 
userConfig: UserConfig, v
 }
 
 object StreamApp {
-  def apply(name: String, context: ClientContext, userConfig: UserConfig = 
UserConfig.empty) = new StreamApp(name, context.system, userConfig)
+  def apply(name: String, context: ClientContext, userConfig: UserConfig = 
UserConfig.empty)
+    : StreamApp = {
+    new StreamApp(name, context.system, userConfig)
+  }
 
   implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication 
= {
     streamApp.plan
@@ -80,7 +87,8 @@ object StreamApp {
       source(dataSource, parallism, conf, description = null)
     }
 
-    def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, 
description: String): Stream[T] = {
+    def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, 
description: String)
+      : Stream[T] = {
       implicit val sourceOp = DataSourceOp(dataSource, parallism, conf, 
description)
       app.graph.addVertex(sourceOp)
       new Stream[T](app.graph, sourceOp)
@@ -89,7 +97,8 @@ object StreamApp {
       this.source(new CollectionDataSource[T](seq), parallism, 
UserConfig.empty, description)
     }
 
-    def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, 
description: String): Stream[T] = {
+    def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, 
description: String)
+      : Stream[T] = {
       val sourceOp = ProcessorOp(source, parallism, conf, 
Option(description).getOrElse("source"))
       app.graph.addVertex(sourceOp)
       new Stream[T](app.graph, sourceOp)
@@ -97,6 +106,7 @@ object StreamApp {
   }
 }
 
+/** A test message source which generated message sequence repeatedly. */
 class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
   val list = seq.toList
   var index = 0

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 03bfa81..549cc6e 100644
--- 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,47 +18,60 @@
 
 package io.gearpump.streaming.dsl.javaapi
 
+import scala.collection.JavaConverters._
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.dsl.Stream
 import io.gearpump.streaming.javaapi.dsl.functions._
 import io.gearpump.streaming.task.Task
 
-import scala.collection.JavaConverters._
-
 /**
  * Java DSL
  */
 class JavaStream[T](val stream: Stream[T]) {
 
+  /** FlatMap on stream */
   def flatMap[R](fn: FlatMapFunction[T, R], description: String): 
JavaStream[R] = {
-    new JavaStream[R](stream.flatMap({t: T => fn(t).asScala}, description))
+    new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description))
   }
 
+  /** Map on stream */
   def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
-    new JavaStream[R](stream.map({t: T => fn(t)}, description))
+    new JavaStream[R](stream.map({ t: T => fn(t) }, description))
   }
 
+  /** Only keep the messages that FilterFunction returns true.  */
   def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.filter({t: T => fn(t)}, description))
+    new JavaStream[T](stream.filter({ t: T => fn(t) }, description))
   }
 
+  /** Does aggregation on the stream */
   def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.reduce({(t1: T, t2: T) => fn(t1, t2)}, 
description))
+    new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, 
description))
   }
 
   def log(): Unit = {
     stream.log()
   }
 
+  /** Merges streams of same type together */
   def merge(other: JavaStream[T], description: String): JavaStream[T] = {
     new JavaStream[T](stream.merge(other.stream, description))
   }
 
-  def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, 
description: String): JavaStream[T] = {
+  /**
+   * Group by a stream and turns it to a list of sub-streams. Operations 
chained after
+   * groupBy applies to sub-streams.
+   */
+  def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, 
description: String)
+    : JavaStream[T] = {
     new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, 
description))
   }
 
-  def process[R](processor: Class[_ <: Task], parallelism: Int, conf: 
UserConfig, description: String): JavaStream[R] = {
+  /** Add a low level Processor to process messages */
+  def process[R](
+      processor: Class[_ <: Task], parallelism: Int, conf: UserConfig, 
description: String)
+    : JavaStream[R] = {
     new JavaStream[R](stream.process(processor, parallelism, conf, 
description))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
index 0ad03cd..e39e054 100644
--- 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
+++ 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,31 +19,29 @@
 package io.gearpump.streaming.dsl.javaapi
 
 import java.util.Collection
+import scala.collection.JavaConverters._
 
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
 import io.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
 import io.gearpump.streaming.source.DataSource
 
-import scala.collection.JavaConverters._
-
 class JavaStreamApp(name: String, context: ClientContext, userConfig: 
UserConfig) {
 
   private val streamApp = StreamApp(name, context, userConfig)
 
   def source[T](collection: Collection[T], parallelism: Int,
-                conf: UserConfig, description: String): JavaStream[T] = {
+      conf: UserConfig, description: String): JavaStream[T] = {
     val dataSource = new CollectionDataSource(collection.asScala.toSeq)
     source(dataSource, parallelism, conf, description)
   }
 
   def source[T](dataSource: DataSource, parallelism: Int,
-                conf: UserConfig, description: String): JavaStream[T] = {
+      conf: UserConfig, description: String): JavaStream[T] = {
     new JavaStream[T](streamApp.source(dataSource, parallelism, conf, 
description))
   }
 
   def run(): Unit = {
     context.submit(streamApp)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala
index e0b89ce..f0a86fa 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/op/OP.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -34,27 +34,39 @@ sealed trait Op {
 /**
  * When translated to running DAG, SlaveOP can be attach to MasterOP or other 
SlaveOP
  * "Attach" means running in same Actor.
- *
  */
 trait SlaveOp[T] extends Op
 
-case class FlatMapOp[T, R](fun: (T) => TraversableOnce[R], description: 
String, conf: UserConfig = UserConfig.empty) extends SlaveOp[T]
+case class FlatMapOp[T, R](
+    fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = 
UserConfig.empty)
+  extends SlaveOp[T]
 
-case class ReduceOp[T](fun: (T, T) =>T, description: String, conf: UserConfig 
= UserConfig.empty) extends SlaveOp[T]
+case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig 
= UserConfig.empty)
+  extends SlaveOp[T]
 
 trait MasterOp extends Op
 
 trait ParameterizedOp[T] extends MasterOp
 
-case class MergeOp(description: String, override val conf: UserConfig = 
UserConfig.empty) extends MasterOp
+case class MergeOp(description: String, override val conf: UserConfig = 
UserConfig.empty)
+  extends MasterOp
 
-case class GroupByOp[T, R](fun: T => R, parallelism: Int, description: String, 
override val conf: UserConfig = UserConfig.empty) extends ParameterizedOp[T]
+case class GroupByOp[T, R](
+    fun: T => R, parallelism: Int, description: String,
+    override val conf: UserConfig = UserConfig.empty)
+  extends ParameterizedOp[T]
 
-case class ProcessorOp[T <: Task](processor: Class[T], parallelism: Int, conf: 
UserConfig, description: String) extends ParameterizedOp[T]
+case class ProcessorOp[T <: Task](
+    processor: Class[T], parallelism: Int, conf: UserConfig, description: 
String)
+  extends ParameterizedOp[T]
 
-case class DataSourceOp[T](dataSource: DataSource, parallelism: Int, conf: 
UserConfig, description: String) extends ParameterizedOp[T]
+case class DataSourceOp[T](
+    dataSource: DataSource, parallelism: Int, conf: UserConfig, description: 
String)
+  extends ParameterizedOp[T]
 
-case class DataSinkOp[T](dataSink: DataSink, parallelism: Int, conf: 
UserConfig, description: String) extends ParameterizedOp[T]
+case class DataSinkOp[T](
+    dataSink: DataSink, parallelism: Int, conf: UserConfig, description: 
String)
+  extends ParameterizedOp[T]
 
 /**
  * Contains operators which can be chained to single one.
@@ -70,8 +82,8 @@ case class OpChain(ops: List[Op]) extends Op {
   def description: String = null
 
   override def conf: UserConfig = {
-    // the head's conf has priority
-    ops.reverse.foldLeft(UserConfig.empty){(conf, op) =>
+    // The head's conf has priority
+    ops.reverse.foldLeft(UserConfig.empty) { (conf, op) =>
       conf.withConfig(op.conf)
     }
   }
@@ -84,7 +96,6 @@ trait OpEdge
  *
  * For example, map, flatmap operation doesn't require network shuffle, we can 
use Direct
  * to represent the relation with upstream operators.
- *
  */
 case object Direct extends OpEdge
 
@@ -93,7 +104,6 @@ case object Direct extends OpEdge
  *
  * For example, map, flatmap operation doesn't require network shuffle, we can 
use Direct
  * to represent the relation with upstream operators.
- *
  */
 case object Shuffle extends OpEdge
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
index 7dad9cc..b842c7b 100644
--- 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
+++ 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,13 +20,12 @@ package io.gearpump.streaming.dsl.partitioner
 
 import io.gearpump.Message
 import io.gearpump.partitioner.UnicastPartitioner
-/** Partition messages by applying group by function first.
-  *
- * @param groupBy
- * First apply message with groupBy function, then pick the hashCode of the 
output to do the partitioning.
- * You must define hashCode() for output type of groupBy function.
+
+/**
+ * Partition messages by applying group by function first.
  *
  * For example:
+ * {{{
  * case class People(name: String, gender: String)
  *
  * object Test{
@@ -34,9 +33,13 @@ import io.gearpump.partitioner.UnicastPartitioner
  *   val groupBy: (People => String) = people => people.gender
  *   val partitioner = GroupByPartitioner(groupBy)
  * }
+ * }}}
+ *
+ * @param groupBy First apply message with groupBy function, then pick the 
hashCode of the output
+ *   to do the partitioning. You must define hashCode() for output type of 
groupBy function.
  */
 class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends 
UnicastPartitioner {
-  override def getPartition(msg : Message, partitionNum : Int, 
currentPartitionId: Int) : Int = {
+  override def getPartition(msg: Message, partitionNum: Int, 
currentPartitionId: Int): Int = {
     val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode()
     (hashCode & Integer.MAX_VALUE) % partitionNum
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala
index 400c9b1..f916124 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/OpTranslator.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,24 +18,25 @@
 
 package io.gearpump.streaming.dsl.plan
 
+import scala.collection.TraversableOnce
+
 import akka.actor.ActorSystem
-import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.source.DataSource
-import io.gearpump.streaming.{Processor, Constants}
-import io.gearpump.streaming.dsl.op._
-import io.gearpump.streaming.task.{StartTime, TaskContext, Task}
+import org.slf4j.Logger
+
 import io.gearpump._
 import io.gearpump.cluster.UserConfig
-import Constants._
-import Processor.DefaultProcessor
-import OpTranslator._
+import io.gearpump.streaming.Constants._
+import io.gearpump.streaming.Processor
+import io.gearpump.streaming.Processor.DefaultProcessor
+import io.gearpump.streaming.dsl.op._
+import io.gearpump.streaming.dsl.plan.OpTranslator._
+import io.gearpump.streaming.sink.DataSink
+import io.gearpump.streaming.source.DataSource
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 import io.gearpump.util.LogUtil
-import org.slf4j.Logger
-
-import scala.collection.TraversableOnce
 
 /**
- * Translate a OP to a TaskDescription
+ * Translates a OP to a TaskDescription
  */
 class OpTranslator extends java.io.Serializable {
   val LOG: Logger = LogUtil.getLogger(getClass)
@@ -55,7 +56,7 @@ class OpTranslator extends java.io.Serializable {
             Processor[SourceTask[Object, Object]](parallism,
               description = description + "." + func.description,
               userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
-          case groupby@ GroupByOp(_, parallism, description, _) =>
+          case groupby@GroupByOp(_, parallism, description, _) =>
             Processor[GroupByTask[Object, Object, Object]](parallism,
               description = description + "." + func.description,
               userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, 
groupby))
@@ -79,6 +80,8 @@ class OpTranslator extends java.io.Serializable {
         Processor[TransformTask[Object, Object]](1,
           description = func.description,
           taskConf = userConfig)
+      case chain: OpChain =>
+        throw new RuntimeException("Not supposed to be called!")
     }
   }
 
@@ -87,8 +90,12 @@ class OpTranslator extends java.io.Serializable {
     val totalFunction = ops.foldLeft(func) { (fun, op) =>
 
       val opFunction = op match {
-        case flatmap: FlatMapOp[Object, Object] => new 
FlatMapFunction(flatmap.fun, flatmap.description)
-        case reduce: ReduceOp[Object] => new ReduceFunction(reduce.fun, 
reduce.description)
+        case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] =>
+          new FlatMapFunction(flatmap.fun, flatmap.description)
+        case reduce: ReduceOp[Object @unchecked] =>
+          new ReduceFunction(reduce.fun, reduce.description)
+        case _ =>
+          throw new RuntimeException("Not supposed to be called!")
       }
       fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]])
     }
@@ -107,18 +114,22 @@ object OpTranslator {
     def description: String
   }
 
-  class DummyInputFunction[T] extends SingleInputFunction[T, T]{
-    override def andThen[OUTER](other: SingleInputFunction[T, OUTER]): 
SingleInputFunction[T, OUTER] = {
+  class DummyInputFunction[T] extends SingleInputFunction[T, T] {
+    override def andThen[OUTER](other: SingleInputFunction[T, OUTER])
+      : SingleInputFunction[T, OUTER] = {
       other
     }
 
-    //should never be called
-    override def process(value: T) = None
+    // Should never be called
+    override def process(value: T): TraversableOnce[T] = None
 
     override def description: String = ""
   }
 
-  class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE], 
second: SingleInputFunction[MIDDLE, OUT]) extends SingleInputFunction[IN, OUT] {
+  class AndThen[IN, MIDDLE, OUT](
+      first: SingleInputFunction[IN, MIDDLE], second: 
SingleInputFunction[MIDDLE, OUT])
+    extends SingleInputFunction[IN, OUT] {
+
     override def process(value: IN): TraversableOnce[OUT] = {
       first.process(value).flatMap(second.process(_))
     }
@@ -130,7 +141,9 @@ object OpTranslator {
     }
   }
 
-  class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], 
descriptionMessage: String) extends SingleInputFunction[IN, OUT] {
+  class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], 
descriptionMessage: String)
+    extends SingleInputFunction[IN, OUT] {
+
     override def process(value: IN): TraversableOnce[OUT] = {
       fun(value)
     }
@@ -140,7 +153,9 @@ object OpTranslator {
     }
   }
 
-  class ReduceFunction[T](fun: (T, T)=>T, descriptionMessage: String) extends 
SingleInputFunction[T, T] {
+  class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String)
+    extends SingleInputFunction[T, T] {
+
     private var state: Any = null
 
     override def process(value: T): TraversableOnce[T] = {
@@ -155,10 +170,13 @@ object OpTranslator {
     override def description: String = descriptionMessage
   }
 
-  class GroupByTask[IN, GROUP, OUT](groupBy: IN => GROUP, taskContext: 
TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
+  class GroupByTask[IN, GROUP, OUT](
+      groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig)
+    extends Task(taskContext, userConf) {
 
     def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(userConf.getValue[GroupByOp[IN, 
GROUP]](GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun,
+      this(userConf.getValue[GroupByOp[IN, GROUP]](
+        GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun,
         taskContext, userConf)
     }
 
@@ -172,24 +190,29 @@ object OpTranslator {
 
       val group = groupBy(msg.msg.asInstanceOf[IN])
       if (!groups.contains(group)) {
-        val operator = userConf.getValue[SingleInputFunction[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
+        val operator =
+          userConf.getValue[SingleInputFunction[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
         groups += group -> operator
       }
 
       val operator = groups(group)
 
-      operator.process(msg.msg.asInstanceOf[IN]).foreach{msg =>
+      operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
         taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
       }
     }
   }
 
-  class SourceTask[T, OUT](source: DataSource, operator: 
Option[SingleInputFunction[T, OUT]], taskContext: TaskContext, userConf: 
UserConfig) extends Task(taskContext, userConf) {
+  class SourceTask[T, OUT](
+      source: DataSource, operator: Option[SingleInputFunction[T, OUT]], 
taskContext: TaskContext,
+      userConf: UserConfig)
+    extends Task(taskContext, userConf) {
 
     def this(taskContext: TaskContext, userConf: UserConfig) = {
       this(
         
userConf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(taskContext.system).get,
-        userConf.getValue[SingleInputFunction[T, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(taskContext.system),
+        userConf.getValue[SingleInputFunction[T, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(
+          taskContext.system),
         taskContext, userConf)
     }
 
@@ -200,7 +223,7 @@ object OpTranslator {
 
     override def onNext(msg: Message): Unit = {
       val time = System.currentTimeMillis()
-      //Todo: determine the batch size
+      // TODO: determine the batch size
       source.read(1).foreach(msg => {
         operator match {
           case Some(operator) =>
@@ -219,15 +242,18 @@ object OpTranslator {
       self ! Message("next", System.currentTimeMillis())
     }
 
-    override def onStop() = {
+    override def onStop(): Unit = {
       source.close()
     }
   }
 
-  class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]], 
taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, 
userConf) {
+  class TransformTask[IN, OUT](
+      operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
+      userConf: UserConfig) extends Task(taskContext, userConf) {
 
     def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(userConf.getValue[SingleInputFunction[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
+      this(userConf.getValue[SingleInputFunction[IN, OUT]](
+        GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, 
userConf)
     }
 
     override def onStart(startTime: StartTime): Unit = {
@@ -238,7 +264,7 @@ object OpTranslator {
 
       operator match {
         case Some(operator) =>
-          operator.process(msg.msg.asInstanceOf[IN]).foreach{ msg =>
+          operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
             taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
           }
         case None =>
@@ -247,9 +273,12 @@ object OpTranslator {
     }
   }
 
-  class SinkTask[T](dataSink: DataSink, taskContext: TaskContext, userConf: 
UserConfig) extends Task(taskContext, userConf) {
+  class SinkTask[T](dataSink: DataSink, taskContext: TaskContext, userConf: 
UserConfig)
+    extends Task(taskContext, userConf) {
+
     def this(taskContext: TaskContext, userConf: UserConfig) = {
-      
this(userConf.getValue[DataSink](GEARPUMP_STREAMING_SINK)(taskContext.system).get,
 taskContext, userConf)
+      
this(userConf.getValue[DataSink](GEARPUMP_STREAMING_SINK)(taskContext.system).get,
+        taskContext, userConf)
     }
 
     override def onStart(startTime: StartTime): Unit = {
@@ -260,7 +289,7 @@ object OpTranslator {
       dataSink.write(msg)
     }
 
-    override def onStop() = {
+    override def onStop(): Unit = {
       dataSink.close()
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala 
b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala
index ee0818d..aafd8d3 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,35 +19,38 @@
 package io.gearpump.streaming.dsl.plan
 
 import akka.actor.ActorSystem
+
+import io.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, 
Partitioner}
 import io.gearpump.streaming.Processor
 import io.gearpump.streaming.dsl.op._
 import io.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import io.gearpump.streaming.task.Task
-import io.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, 
Partitioner}
 import io.gearpump.util.Graph
 
 class Planner {
 
   /*
-   * Convert Dag[Op] to Dag[TaskDescription] so that we can run it easily.
+   * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of 
the low
+   * level Graph API.
    */
-  def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem): 
Graph[Processor[_ <: Task], _ <: Partitioner] = {
+  def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem)
+    : Graph[Processor[_ <: Task], _ <: Partitioner] = {
 
     val opTranslator = new OpTranslator()
 
     val newDag = optimize(dag)
-    newDag.mapEdge {(node1, edge, node2) =>
+    newDag.mapEdge { (node1, edge, node2) =>
       edge match {
         case Shuffle =>
           node2.head match {
-            case groupBy: GroupByOp[Any, Any] =>
+            case groupBy: GroupByOp[Any @unchecked, Any @unchecked] =>
               new GroupByPartitioner(groupBy.fun)
             case _ => new HashPartitioner
           }
         case Direct =>
           new CoLocationPartitioner
       }
-    }.mapVertex {opChain =>
+    }.mapVertex { opChain =>
       opTranslator.translate(opChain)
     }
   }
@@ -65,11 +68,12 @@ class Planner {
     newGraph
   }
 
-  private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: 
OpChain): Graph[OpChain, OpEdge] = {
+  private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: 
OpChain)
+    : Graph[OpChain, OpEdge] = {
     if (dag.outDegreeOf(node1) == 1 &&
       dag.inDegreeOf(node2) == 1 &&
-      // for processor node, we don't allow it to merge with downstream 
operators
-      !node1.head.isInstanceOf[ProcessorOp[_<:Task]]) {
+      // For processor node, we don't allow it to merge with downstream 
operators
+      !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) {
       val (_, edge, _) = dag.outgoingEdgesOf(node1)(0)
       if (edge == Direct) {
         val opList = OpChain(node1.ops ++ node2.ops)
@@ -82,7 +86,7 @@ class Planner {
           dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3)
         }
 
-        //remove the old vertex
+        // Remove the old vertex
         dag.removeVertex(node1)
         dag.removeVertex(node2)
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala 
b/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala
index cbce65b..48007d5 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/executor/Executor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,17 +19,21 @@
 package io.gearpump.streaming.executor
 
 import java.lang.management.ManagementFactory
+import scala.concurrent.duration._
 
 import akka.actor.SupervisorStrategy.Resume
 import akka.actor._
 import com.typesafe.config.Config
-import io.gearpump.WorkerId
+import org.apache.commons.lang.exception.ExceptionUtils
+import org.slf4j.Logger
+
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.cluster.{ClusterConfig, ExecutorContext, UserConfig}
 import io.gearpump.metrics.Metrics.ReportMetrics
 import io.gearpump.metrics.{JvmMetricsSet, Metrics, MetricsReporterService}
 import io.gearpump.serializer.SerializationFramework
 import io.gearpump.streaming.AppMasterToExecutor.{MsgLostException, 
TasksChanged, TasksLaunched, _}
-import io.gearpump.streaming.ExecutorToAppMaster.{UnRegisterTask, MessageLoss, 
RegisterExecutor, RegisterTask}
+import io.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, 
RegisterExecutor, RegisterTask, UnRegisterTask}
 import io.gearpump.streaming.ProcessorId
 import io.gearpump.streaming.executor.Executor._
 import io.gearpump.streaming.executor.TaskLauncher.TaskArgument
@@ -37,11 +41,6 @@ import io.gearpump.streaming.task.{Subscriber, TaskId}
 import io.gearpump.transport.{Express, HostPort}
 import io.gearpump.util.Constants._
 import io.gearpump.util.{ActorUtil, Constants, LogUtil, TimeOutScheduler}
-import org.apache.commons.lang.exception.ExceptionUtils
-import org.slf4j.Logger
-
-import scala.concurrent.duration._
-import scala.language.postfixOps
 
 /**
  * Executor is child of AppMaster.
@@ -64,18 +63,20 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
 
   private val LOG: Logger = LogUtil.getLogger(getClass, executor = executorId, 
app = appId)
 
-  implicit val timeOut = FUTURE_TIMEOUT
+  private implicit val timeOut = FUTURE_TIMEOUT
   private val address = ActorUtil.getFullPath(context.system, self.path)
   private val systemConfig = context.system.settings.config
   private val serializerPool = getSerializerPool()
   private val taskDispatcher = 
systemConfig.getString(Constants.GEARPUMP_TASK_DISPATCHER)
 
   private var state = State.ACTIVE
-  private var transitionStart = 0L // state transition start, in unix time
-  private var transitionEnd = 0L // state transition end, in unix time
+  private var transitionStart = 0L
+  // States transition start, in unix time
+  private var transitionEnd = 0L
+  // States transition end, in unix time
   private val transitWarningThreshold = 5000 // ms,
 
-  // start health check Ticks
+  // Starts health check Ticks
   self ! HealthCheck
 
   LOG.info(s"Executor $executorId has been started, start to register 
itself...")
@@ -92,31 +93,34 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
   val metricsEnabled = systemConfig.getBoolean(GEARPUMP_METRIC_ENABLED)
 
   if (metricsEnabled) {
-    // register jvm metrics
+    // Registers jvm metrics
     Metrics(context.system).register(new 
JvmMetricsSet(s"app$appId.executor$executorId"))
 
-    val metricsReportService = context.actorOf(Props(new 
MetricsReporterService(Metrics(context.system))))
+    val metricsReportService = context.actorOf(Props(new 
MetricsReporterService(
+      Metrics(context.system))))
     appMaster.tell(ReportMetrics, metricsReportService)
   }
 
   private val NOT_INITIALIZED = -1
-  def receive : Receive = applicationReady(dagVersion = NOT_INITIALIZED)
+  def receive: Receive = applicationReady(dagVersion = NOT_INITIALIZED)
 
   private def getTaskId(actorRef: ActorRef): Option[TaskId] = {
     tasks.find(_._2 == actorRef).map(_._1)
   }
 
   override val supervisorStrategy =
-    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
+    OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1.minute) {
       case _: MsgLostException =>
         val taskId = getTaskId(sender)
-        val cause =  s"We got MessageLossException from task 
${getTaskId(sender)}, replaying application..."
+        val cause = s"We got MessageLossException from task 
${getTaskId(sender)}, " +
+          s"replaying application..."
         LOG.error(cause)
-        taskId.foreach(appMaster ! MessageLoss(executorId, _,  cause))
+        taskId.foreach(appMaster ! MessageLoss(executorId, _, cause))
         Resume
       case ex: Throwable =>
         val taskId = getTaskId(sender)
-        val errorMsg = s"We got ${ex.getClass.getName} from $taskId, we will 
treat it as MessageLoss, so that the system will replay all lost message"
+        val errorMsg = s"We got ${ex.getClass.getName} from $taskId, we will 
treat it as" +
+          s" MessageLoss, so that the system will replay all lost message"
         LOG.error(errorMsg, ex)
         val detailErrorMsg = errorMsg + "\n" + ExceptionUtils.getStackTrace(ex)
         taskId.foreach(appMaster ! MessageLoss(executorId, _, detailErrorMsg))
@@ -129,22 +133,27 @@ class Executor(executorContext: ExecutorContext, userConf 
: UserConfig, launcher
 
   private def assertVersion(expectVersion: Int, version: Int, clue: Any): Unit 
= {
     if (expectVersion != version) {
-      val errorMessage = s"Version mismatch: we expect dag version 
$expectVersion, but get $version; clue: $clue"
+      val errorMessage = s"Version mismatch: we expect dag version 
$expectVersion, " +
+        s"but get $version; clue: $clue"
       LOG.error(errorMessage)
       throw new DagVersionMismatchException(errorMessage)
     }
   }
 
-  def dynamicDagPhase1(dagVersion: Int, launched: List[TaskId], changed: 
List[ChangeTask], registered: List[TaskId]): Receive = {
+  def dynamicDagPhase1(
+      dagVersion: Int, launched: List[TaskId], changed: List[ChangeTask], 
registered: List[TaskId])
+    : Receive = {
     state = State.DYNAMIC_DAG_PHASE1
     box({
-      case launch@LaunchTasks(taskIds, version, processorDescription, 
subscribers: List[Subscriber]) => {
+      case launch@LaunchTasks(taskIds, version, processorDescription,
+      subscribers: List[Subscriber]) => {
         assertVersion(dagVersion, version, clue = launch)
 
         LOG.info(s"Launching Task $taskIds for app: $appId")
         val taskArgument = TaskArgument(version, processorDescription, 
subscribers)
         taskIds.foreach(taskArgumentStore.add(_, taskArgument))
-        val newAdded = launcher.launch(taskIds, taskArgument, context, 
serializerPool, taskDispatcher)
+        val newAdded = launcher.launch(taskIds, taskArgument, context, 
serializerPool,
+          taskDispatcher)
         newAdded.foreach { newAddedTask =>
           context.watch(newAddedTask._2)
         }
@@ -160,12 +169,14 @@ class Executor(executorContext: ExecutorContext, userConf 
: UserConfig, launcher
         val newChangedTasks = taskIds.map { taskId =>
           for (taskArgument <- taskArgumentStore.get(dagVersion, taskId)) {
             val processorDescription = 
taskArgument.processorDescription.copy(life = life)
-            taskArgumentStore.add(taskId, TaskArgument(dagVersion, 
processorDescription, subscribers))
+            taskArgumentStore.add(taskId, TaskArgument(dagVersion, 
processorDescription,
+              subscribers))
           }
           ChangeTask(taskId, dagVersion, life, subscribers)
         }
         sender ! TasksChanged(taskIds)
-        context.become(dynamicDagPhase1(dagVersion, launched, changed ++ 
newChangedTasks, registered))
+        context.become(dynamicDagPhase1(dagVersion, launched, changed ++ 
newChangedTasks,
+          registered))
 
       case locations@TaskLocationsReady(taskLocations, version) =>
         LOG.info(s"TaskLocations Ready...")
@@ -174,7 +185,9 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
         // Check whether all tasks has been registered.
         if ((launched.toSet -- registered.toSet).isEmpty) {
           // Confirm all tasks has been registered.
-          val result = taskLocations.locations.filter(location => 
!location._1.equals(express.localHost)).flatMap { kv =>
+          val result = taskLocations.locations.filter {
+            location => !location._1.equals(express.localHost)
+          }.flatMap { kv =>
             val (host, taskIdList) = kv
             taskIdList.map(taskId => (TaskId.toLong(taskId), host))
           }
@@ -189,14 +202,17 @@ class Executor(executorContext: ExecutorContext, userConf 
: UserConfig, launcher
           }
           context.become(dynamicDagPhase2(dagVersion, launched, changed))
         } else {
-          LOG.error("Inconsistency between AppMaser and Executor! AppMaster 
thinks DynamicDag transition is ready, " +
-            "while Executor have not get all tasks registered, that task will 
not be functional...")
-          //reject TaskLocations...
+          LOG.error("Inconsistency between AppMaser and Executor! AppMaster 
thinks DynamicDag " +
+            "transition is ready, while Executor have not get all tasks 
registered, " +
+            "that task will not be functional...")
+
+          // Reject TaskLocations...
           val missedTasks = (launched.toSet -- registered.toSet).toList
-          val errorMsg = "We have not received TaskRegistered for following 
tasks: " + missedTasks.mkString(", ")
+          val errorMsg = "We have not received TaskRegistered for following 
tasks: " +
+            missedTasks.mkString(", ")
           LOG.error(errorMsg)
           sender ! TaskLocationsRejected(dagVersion, executorId, errorMsg, 
null)
-          // stay with current status...
+          // Stays with current status...
         }
 
       case confirm: TaskRegistered =>
@@ -205,10 +221,11 @@ class Executor(executorContext: ExecutorContext, userConf 
: UserConfig, launcher
             tasks += confirm.taskId -> actorRef
             actorRef forward confirm
         }
-        context.become(dynamicDagPhase1(dagVersion, launched, changed, 
registered :+ confirm.taskId))
+        context.become(dynamicDagPhase1(dagVersion, launched, changed,
+          registered :+ confirm.taskId))
 
       case rejected: TaskRejected =>
-        // means this task shoud not exists...
+        // Means this task shoud not exists...
         tasks.get(rejected.taskId).foreach(_ ! PoisonPill)
         tasks -= rejected.taskId
         LOG.error(s"Task ${rejected.taskId} is rejected by AppMaster, shutting 
down it...")
@@ -218,7 +235,8 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
     })
   }
 
-  def dynamicDagPhase2(dagVersion: Int, launched: List[TaskId], changed: 
List[ChangeTask]): Receive = {
+  def dynamicDagPhase2(dagVersion: Int, launched: List[TaskId], changed: 
List[ChangeTask])
+    : Receive = {
     LOG.info("Transit to dynamic Dag Phase2")
     state = State.DYNAMIC_DAG_PHASE2
     box {
@@ -240,29 +258,36 @@ class Executor(executorContext: ExecutorContext, userConf 
: UserConfig, launcher
     transitionEnd = System.currentTimeMillis()
 
     if (dagVersion != NOT_INITIALIZED) {
-      LOG.info("Transit to state Application Ready. This transition takes " + 
(transitionEnd - transitionStart) + " milliseconds")
+      LOG.info("Transit to state Application Ready. This transition takes " +
+        (transitionEnd - transitionStart) + " milliseconds")
     }
     box {
       case start: StartDynamicDag =>
         LOG.info("received StartDynamicDag")
         if (start.dagVersion > dagVersion) {
           transitionStart = System.currentTimeMillis()
-          LOG.info(s"received $start, Executor transit to dag version: 
${start.dagVersion} from current version $dagVersion")
-          context.become(dynamicDagPhase1(start.dagVersion, 
List.empty[TaskId], List.empty[ChangeTask], List.empty[TaskId]))
+          LOG.info(s"received $start, Executor transit to dag version: 
${start.dagVersion} from " +
+            s"current version $dagVersion")
+          context.become(dynamicDagPhase1(start.dagVersion, List.empty[TaskId],
+            List.empty[ChangeTask], List.empty[TaskId]))
         }
       case launch: LaunchTasks =>
         if (launch.dagVersion > dagVersion) {
           transitionStart = System.currentTimeMillis()
-          LOG.info(s"received $launch, Executor transit to dag version: 
${launch.dagVersion} from current version $dagVersion")
-          context.become(dynamicDagPhase1(launch.dagVersion, 
List.empty[TaskId], List.empty[ChangeTask], List.empty[TaskId]))
+          LOG.info(s"received $launch, Executor transit to dag " +
+            s"version: ${launch.dagVersion} from current version $dagVersion")
+          context.become(dynamicDagPhase1(launch.dagVersion, 
List.empty[TaskId],
+            List.empty[ChangeTask], List.empty[TaskId]))
           self forward launch
         }
 
       case change: ChangeTasks =>
         if (change.dagVersion > dagVersion) {
           transitionStart = System.currentTimeMillis()
-          LOG.info(s"received $change, Executor transit to dag version: 
${change.dagVersion} from current version $dagVersion")
-          context.become(dynamicDagPhase1(change.dagVersion, 
List.empty[TaskId], List.empty[ChangeTask], List.empty[TaskId]))
+          LOG.info(s"received $change, Executor transit to dag version: 
${change.dagVersion} from" +
+            s" current version $dagVersion")
+          context.become(dynamicDagPhase1(change.dagVersion, 
List.empty[TaskId],
+            List.empty[ChangeTask], List.empty[TaskId]))
           self forward change
         }
 
@@ -274,8 +299,8 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
         }
         tasks -= taskId
 
-      case unRegister @ UnRegisterTask(taskId, _) =>
-        // send UnRegisterTask to AppMaster
+      case unRegister@UnRegisterTask(taskId, _) =>
+        // Sends UnRegisterTask to AppMaster
         appMaster ! unRegister
     }
   }
@@ -289,14 +314,15 @@ class Executor(executorContext: ExecutorContext, userConf 
: UserConfig, launcher
             val newNeedRestart = needRestart :+ taskId
             val newRemain = remain - 1
             if (newRemain == 0) {
-              val newRestarted = newNeedRestart.map{ taskId_ =>
+              val newRestarted = newNeedRestart.map { taskId_ =>
                 val taskActor = launchTask(taskId_, 
taskArgumentStore.get(dagVersion, taskId_).get)
                 context.watch(taskActor)
                 taskId_ -> taskActor
               }.toMap
 
               tasks = newRestarted
-              context.become(dynamicDagPhase1(dagVersion, newNeedRestart, 
List.empty[ChangeTask], List.empty[TaskId]))
+              context.become(dynamicDagPhase1(dagVersion, newNeedRestart, 
List.empty[ChangeTask],
+                List.empty[TaskId]))
             } else {
               context.become(restartingTasks(dagVersion, newRemain, 
newNeedRestart))
             }
@@ -308,7 +334,8 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
   val terminationWatch: Receive = {
     case Terminated(actor) =>
       if (actor.compareTo(appMaster) == 0) {
-        LOG.info(s"AppMaster ${appMaster.path.toString} is terminated, 
shutting down current executor $appId, $executorId")
+        LOG.info(s"AppMaster ${appMaster.path.toString} is terminated, 
shutting down current " +
+          s"executor $appId, $executorId")
         context.stop(self)
       } else {
         self ! TaskStopped(actor)
@@ -320,7 +347,8 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
       LOG.info(s"Executor received restart tasks")
       val tasksToRestart = tasks.keys.count(taskArgumentStore.get(dagVersion, 
_).nonEmpty)
       express.remoteAddressMap.send(Map.empty[Long, HostPort])
-      context.become(restartingTasks(dagVersion, remain = tasksToRestart, 
needRestart = List.empty[TaskId]))
+      context.become(restartingTasks(dagVersion, remain = tasksToRestart,
+        needRestart = List.empty[TaskId]))
 
       tasks.values.foreach {
         case task: ActorRef => task ! PoisonPill
@@ -329,7 +357,7 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
 
   def executorService: Receive = terminationWatch orElse onRestartTasks orElse 
{
     case taskChanged: TaskChanged =>
-      //skip
+    // Skip
     case get: GetExecutorSummary =>
       val logFile = LogUtil.applicationLogDir(systemConfig)
       val processorTasks = 
tasks.keySet.groupBy(_.processorId).mapValues(_.toList).view.force
@@ -346,7 +374,7 @@ class Executor(executorContext: ExecutorContext, userConf : 
UserConfig, launcher
     case query: QueryExecutorConfig =>
       sender ! 
ExecutorConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
     case HealthCheck =>
-      context.system.scheduler.scheduleOnce(3 second)(HealthCheck)
+      context.system.scheduler.scheduleOnce(3.second)(HealthCheck)
       if (state != State.ACTIVE && (transitionEnd - transitionStart) > 
transitWarningThreshold) {
         LOG.error(s"Executor status: " + state +
           s", it takes too long(${transitionEnd - transitionStart}) to do 
transition")
@@ -392,17 +420,18 @@ object Executor {
     }
 
     /**
-     * when the new DAG is successfully deployed, then we should remove 
obsolete TaskArgument of old DAG.
+     * When the new DAG is successfully deployed, then we should remove 
obsolete
+     * TaskArgument of old DAG.
      */
-    def removeObsoleteVersion: Unit = {
-      store = store.map{ kv =>
+    def removeObsoleteVersion(): Unit = {
+      store = store.map { kv =>
         val (k, list) = kv
         (k, list.take(1))
       }
     }
 
     def removeNewerVersion(currentVersion: Int): Unit = {
-      store = store.map{ kv =>
+      store = store.map { kv =>
         val (k, list) = kv
         (k, list.filter(_.dagVersion <= currentVersion))
       }
@@ -412,18 +441,20 @@ object Executor {
   case class TaskStopped(task: ActorRef)
 
   case class ExecutorSummary(
-    id: Int,
-    workerId: WorkerId,
-    actorPath: String,
-    logFile: String,
-    status: String,
-    taskCount: Int,
-    tasks: Map[ProcessorId, List[TaskId]],
-    jvmName: String
+      id: Int,
+      workerId: WorkerId,
+      actorPath: String,
+      logFile: String,
+      status: String,
+      taskCount: Int,
+      tasks: Map[ProcessorId, List[TaskId]],
+      jvmName: String
   )
 
   object ExecutorSummary {
-    def empty: ExecutorSummary = ExecutorSummary(0, WorkerId.unspecified, "", 
"", "", 1, null, jvmName = "")
+    def empty: ExecutorSummary = {
+      ExecutorSummary(0, WorkerId.unspecified, "", "", "", 1, null, jvmName = 
"")
+    }
   }
 
   case class GetExecutorSummary(executorId: Int)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala
index e485d56..c40aa5f 100644
--- 
a/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala
+++ 
b/streaming/src/main/scala/io/gearpump/streaming/executor/ExecutorRestartPolicy.scala
@@ -15,18 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package io.gearpump.streaming.executor
 
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.util.RestartPolicy
+package io.gearpump.streaming.executor
 
 import scala.collection.immutable
 import scala.concurrent.duration.Duration
 
+import io.gearpump.streaming.task.TaskId
+import io.gearpump.util.RestartPolicy
+
 /**
- * @param maxNrOfRetries the number of times a executor is allowed to be 
restarted, negative value means no limit,
- *   if the limit is exceeded the policy will not allow to restart the executor
- * @param withinTimeRange duration of the time window for maxNrOfRetries, 
Duration.Inf means no window
+ *
+ * Controls how many retries to recover failed executors.
+ *
+ * @param maxNrOfRetries the number of times a executor is allowed to be 
restarted,
+ *                       negative value means no limit, if the limit is 
exceeded the policy
+ *                       will not allow to restart the executor
+ * @param withinTimeRange duration of the time window for maxNrOfRetries, 
Duration.Inf
+ *                        means no window
  */
 class ExecutorRestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) {
   private var executorToTaskIds = Map.empty[Int, Set[TaskId]]
@@ -45,12 +51,14 @@ class ExecutorRestartPolicy(maxNrOfRetries: Int, 
withinTimeRange: Duration) {
     executorToTaskIds.get(executorId).map { taskIds =>
       taskIds.foreach { taskId =>
         taskRestartPolocies.get(taskId).map { policy =>
-          if(!policy.allowRestart) {
+          if (!policy.allowRestart) {
+            // scalastyle:off return
             return false
+            // scalastyle:on return
           }
         }
       }
     }
     true
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala 
b/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala
index b742889..d377ea4 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/executor/TaskLauncher.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-
 package io.gearpump.streaming.executor
 
 import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
+
 import io.gearpump.cluster.{ExecutorContext, UserConfig}
 import io.gearpump.serializer.SerializationFramework
 import io.gearpump.streaming.ProcessorDescription
@@ -28,8 +28,11 @@ import io.gearpump.streaming.task._
 import io.gearpump.streaming.util.ActorPathUtil
 
 trait ITaskLauncher {
+
+  /** Launch a list of task actors */
   def launch(taskIds: List[TaskId], argument: TaskArgument,
-      context: ActorRefFactory, serializer: SerializationFramework, 
dispatcher: String): Map[TaskId, ActorRef]
+      context: ActorRefFactory, serializer: SerializationFramework, 
dispatcher: String)
+    : Map[TaskId, ActorRef]
 }
 
 class TaskLauncher(
@@ -41,8 +44,10 @@ class TaskLauncher(
     taskActorClass: Class[_ <: Actor])
   extends ITaskLauncher{
 
-  override def launch(taskIds: List[TaskId], argument: TaskArgument,
-      context: ActorRefFactory, serializer: SerializationFramework, 
dispatcher: String): Map[TaskId, ActorRef] = {
+  override def launch(
+      taskIds: List[TaskId], argument: TaskArgument,
+      context: ActorRefFactory, serializer: SerializationFramework, 
dispatcher: String)
+    : Map[TaskId, ActorRef] = {
     import argument.{processorDescription, subscribers}
 
     val taskConf = userConf.withConfig(processorDescription.taskConf)
@@ -57,8 +62,8 @@ class TaskLauncher(
     var tasks = Map.empty[TaskId, ActorRef]
     taskIds.foreach { taskId =>
       val task = new TaskWrapper(taskId, taskClass, taskContext, taskConf)
-      val taskActor = context.actorOf(Props(taskActorClass, taskId, 
taskContext, userConf, task, serializer).
-        withDispatcher(dispatcher), ActorPathUtil.taskActorName(taskId))
+      val taskActor = context.actorOf(Props(taskActorClass, taskId, 
taskContext, userConf, task,
+        serializer).withDispatcher(dispatcher), 
ActorPathUtil.taskActorName(taskId))
       tasks += taskId -> taskActor
     }
     tasks
@@ -67,7 +72,9 @@ class TaskLauncher(
 
 object TaskLauncher {
 
-  case class TaskArgument(dagVersion: Int, processorDescription: 
ProcessorDescription, subscribers: List[Subscriber])
+  case class TaskArgument(
+      dagVersion: Int, processorDescription: ProcessorDescription,
+      subscribers: List[Subscriber])
 
   def apply(executorContext: ExecutorContext, userConf: UserConfig): 
TaskLauncher = {
     import executorContext.{appId, appMaster, appName, executorId}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala
index f92970c..8be2e72 100644
--- 
a/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala
+++ 
b/streaming/src/main/scala/io/gearpump/streaming/metrics/ProcessorAggregator.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,6 +21,7 @@ package io.gearpump.streaming.metrics
 import java.util
 
 import com.typesafe.config.Config
+
 import io.gearpump.TimeStamp
 import io.gearpump.cluster.ClientToMaster.ReadOption
 import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
@@ -32,24 +33,20 @@ import 
io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
 
 /**
  *
- * [[ProcessorAggregator]] does aggregation after grouping by these three 
attributes:
- * 1. processorId
- * 2. time section(represented as a index integer)
- * 3. metricName(like sendThroughput)
- *
- *
- * It assumes for each [[HistoryMetricsItem]], the name follow the format
- *   app[appId].processor[processorId].task[taskId].[metricName]
+ * Does aggregation on metrics after grouping by these three attributes:
+ *  1. processorId
+ *  2. time section(represented as a index integer)
+ *  3. metricName(like sendThroughput)
  *
+ * It assumes that for each 
[[io.gearpump.cluster.MasterToClient.HistoryMetricsItem]], the name
+ * follow the format 
app(appId).processor(processorId).task(taskId).(metricName)
  *
  * It parses the name to get processorId and metricName. If the parsing fails, 
then current
- * [[HistoryMetricsItem]] will be skipped.
- *
- *
- * This class is optimized for performance.
+ * [[io.gearpump.cluster.MasterToClient.HistoryMetricsItem]] will be skipped.
  *
+ * NOTE: this class is optimized for performance.
  */
-class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends 
MetricsAggregator{
+class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends 
MetricsAggregator {
 
   def this(config: Config) = {
     this(HistoryMetricsConfig(config))
@@ -58,9 +55,8 @@ class ProcessorAggregator(historyMetricConfig: 
HistoryMetricsConfig) extends Met
   private val aggregatorFactory: AggregatorFactory = new AggregatorFactory()
 
   /**
-   * Accept options:
+   * Accepts options:
    * key: "readOption", value: one of "readLatest", "readRecent", "readHistory"
-   *
    */
   override def aggregate(options: Map[String, String],
       inputs: Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = {
@@ -68,14 +64,15 @@ class ProcessorAggregator(historyMetricConfig: 
HistoryMetricsConfig) extends Met
     aggregate(readOption, inputs, System.currentTimeMillis())
   }
 
-  def aggregate(readOption: ReadOption.ReadOption, inputs: 
Iterator[HistoryMetricsItem], now: TimeStamp):
-      List[HistoryMetricsItem] = {
+  def aggregate(
+      readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], 
now: TimeStamp)
+    : List[HistoryMetricsItem] = {
     val (start, end, interval) = getTimeRange(readOption, now)
     val timeSlotsCount = ((end - start - 1) / interval + 1).toInt
     val map = new MultiLayerMap[Aggregator](timeSlotsCount)
 
     val taskIdentity = new TaskIdentity(0, null)
-    while(inputs.hasNext) {
+    while (inputs.hasNext) {
       val item = inputs.next()
 
       if (item.value.isInstanceOf[Meter] || 
item.value.isInstanceOf[Histogram]) {
@@ -97,7 +94,7 @@ class ProcessorAggregator(historyMetricConfig: 
HistoryMetricsConfig) extends Met
     val result = new Array[HistoryMetricsItem](map.size)
     val iterator = map.valueIterator
     var index = 0
-    while(iterator.hasNext()) {
+    while (iterator.hasNext()) {
       val op = iterator.next()
       result(index) = op.result
       index += 1
@@ -106,8 +103,9 @@ class ProcessorAggregator(historyMetricConfig: 
HistoryMetricsConfig) extends Met
     result.toList
   }
 
-  // return (start, end, interval)
-  private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp): 
(TimeStamp, TimeStamp, TimeStamp) = {
+  // Returns (start, end, interval)
+  private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp)
+    : (TimeStamp, TimeStamp, TimeStamp) = {
     readOption match {
       case ReadOption.ReadRecent =>
         val end = now
@@ -120,7 +118,7 @@ class ProcessorAggregator(historyMetricConfig: 
HistoryMetricsConfig) extends Met
         val interval = historyMetricConfig.retainHistoryDataIntervalMs
         (floor(start, interval), floor(end, interval), interval)
       case _ =>
-        // all data points are aggregated together.
+        // All data points are aggregated together.
         (0L, Long.MaxValue, Long.MaxValue)
     }
   }
@@ -132,7 +130,7 @@ class ProcessorAggregator(historyMetricConfig: 
HistoryMetricsConfig) extends Met
     (value / interval) * interval
   }
 
-  // returns "app0.processor0:sendThroughput" as the group Id.
+  // Returns "app0.processor0:sendThroughput" as the group Id.
   private def parseName(name: String, result: TaskIdentity): Boolean = {
     val taskIndex = name.indexOf(TASK_TAG)
     if (taskIndex > 0) {
@@ -157,20 +155,16 @@ object ProcessorAggregator {
 
   private val TASK_TAG = ".task"
 
-
   private class TaskIdentity(var task: Short, var group: String)
 
-
   /**
    *
    * MultiLayerMap has multiple layers. For each layer, there
    * is a hashMap.
    *
-   * To access a value with [[get]], user need to specify first layer Id, then 
key.
-   *
+   * To access a value with get, user need to specify first layer Id, then key.
    *
    * This class is optimized for performance.
-   *
    */
   class MultiLayerMap[Value](layers: Int) {
 
@@ -197,11 +191,11 @@ object ProcessorAggregator {
     }
 
     def size: Int = _size
-    
+
     def valueIterator: util.Iterator[Value] = {
       val iterators = new Array[util.Iterator[Value]](layers)
       var layer = 0
-      while(layer < layers) {
+      while (layer < layers) {
         iterators(layer) = map(layer).values().iterator()
         layer += 1
       }
@@ -213,7 +207,7 @@ object ProcessorAggregator {
       val map = new Array[java.util.HashMap[String, Value]](layers)
       var index = 0
       val length = map.length
-      while(index < length) {
+      while (index < length) {
         map(index) = new java.util.HashMap[String, Value]()
         index += 1
       }
@@ -291,11 +285,11 @@ object ProcessorAggregator {
 
     override def result: HistoryMetricsItem = {
       HistoryMetricsItem(startTime, Meter(name, count, meanRate,
-          m1, rateUnit))
+        m1, rateUnit))
     }
   }
 
-  class AggregatorFactory{
+  class AggregatorFactory {
     def create(item: HistoryMetricsItem, name: String): Aggregator = {
       item.value match {
         case meter: Meter => new MeterAggregator(name)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala
index a9bc167..dbf79ec 100644
--- 
a/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala
+++ 
b/streaming/src/main/scala/io/gearpump/streaming/metrics/TaskFilterAggregator.scala
@@ -1,28 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package io.gearpump.streaming.metrics
 
+import scala.collection.mutable.ListBuffer
+import scala.util.{Failure, Success, Try}
+
 import com.typesafe.config.Config
+
 import io.gearpump.cluster.ClientToMaster.ReadOption
 import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
 import io.gearpump.metrics.MetricsAggregator
-import io.gearpump.util.{LogUtil, Constants}
-
-import scala.collection.mutable.ListBuffer
-import scala.util.{Failure, Success, Try}
+import io.gearpump.util.{Constants, LogUtil}
 
 /**
- * It filters the latest metrics data by specifying a
+ * Filters the latest metrics data by specifying a
  * processor Id range, and taskId range.
  */
-class TaskFilterAggregator (maxLimit: Int) extends MetricsAggregator {
+class TaskFilterAggregator(maxLimit: Int) extends MetricsAggregator {
 
-  import TaskFilterAggregator._
+  import io.gearpump.streaming.metrics.TaskFilterAggregator._
 
   def this(config: Config) = {
     this(config.getInt(Constants.GEARPUMP_METRICS_MAX_LIMIT))
   }
-  override def aggregate(options: Map[String, String], inputs: 
Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = {
+  override def aggregate(options: Map[String, String], inputs: 
Iterator[HistoryMetricsItem])
+    : List[HistoryMetricsItem] = {
+
     if (options.get(ReadOption.Key) != Some(ReadOption.ReadLatest)) {
-      // return empty set
+      // Returns empty set
       List.empty[HistoryMetricsItem]
     } else {
       val parsed = Options.parse(options)
@@ -34,7 +55,8 @@ class TaskFilterAggregator (maxLimit: Int) extends 
MetricsAggregator {
     }
   }
 
-  def aggregate(options: Options, inputs: Iterator[HistoryMetricsItem]): 
List[HistoryMetricsItem] = {
+  def aggregate(options: Options, inputs: Iterator[HistoryMetricsItem])
+    : List[HistoryMetricsItem] = {
 
     val result = new ListBuffer[HistoryMetricsItem]
     val effectiveLimit = Math.min(options.limit, maxLimit)
@@ -42,7 +64,7 @@ class TaskFilterAggregator (maxLimit: Int) extends 
MetricsAggregator {
 
     val taskIdentity = new TaskIdentity(0, 0)
 
-    while(inputs.hasNext && count < effectiveLimit) {
+    while (inputs.hasNext && count < effectiveLimit) {
       val item = inputs.next()
       if (parseName(item.value.name, taskIdentity)) {
         if (taskIdentity.processor >= options.startProcessor &&
@@ -57,15 +79,17 @@ class TaskFilterAggregator (maxLimit: Int) extends 
MetricsAggregator {
     result.toList
   }
 
-  // Assume the name format is: "app0.processor0.task0:sendThroughput"
-  // return (processorId, taskId)
-  // return true if success
+  // Assume the name format is: "app0.processor0.task0:sendThroughput", returns
+  // (processorId, taskId)
+  //
+  // returns true if success
   private def parseName(name: String, result: TaskIdentity): Boolean = {
     val processorStart = name.indexOf(PROCESSOR_TAG)
     if (processorStart != -1) {
       val taskStart = name.indexOf(TASK_TAG, processorStart + 1)
       if (taskStart != -1) {
-        val processorId = name.substring(processorStart, 
taskStart).substring(PROCESSOR_TAG.length).toInt
+        val processorId = name.substring(processorStart, 
taskStart).substring(PROCESSOR_TAG.length)
+          .toInt
         result.processor = processorId
         val taskEnd = name.indexOf(":", taskStart + 1)
         if (taskEnd != -1) {
@@ -84,7 +108,7 @@ class TaskFilterAggregator (maxLimit: Int) extends 
MetricsAggregator {
   }
 }
 
-object TaskFilterAggregator{
+object TaskFilterAggregator {
   val StartTask = "startTask"
   val EndTask = "endTask"
   val StartProcessor = "startProcessor"
@@ -96,7 +120,8 @@ object TaskFilterAggregator{
 
   private class TaskIdentity(var processor: Int, var task: Int)
 
-  case class Options(limit: Int, startTask: Int, endTask: Int, startProcessor: 
Int, endProcessor: Int)
+  case class Options(
+      limit: Int, startTask: Int, endTask: Int, startProcessor: Int, 
endProcessor: Int)
 
   private val LOG = LogUtil.getLogger(getClass)
 
@@ -107,7 +132,7 @@ object TaskFilterAggregator{
     }
 
     def parse(options: Map[String, String]): Options = {
-      //do sanity check
+      // Do sanity check
       val optionTry = Try {
         val startTask = options.get(StartTask).map(_.toInt).getOrElse(0)
         val endTask = 
options.get(EndTask).map(_.toInt).getOrElse(Integer.MAX_VALUE)
@@ -120,7 +145,8 @@ object TaskFilterAggregator{
       optionTry match {
         case Success(options) => options
         case Failure(ex) =>
-          LOG.error("Failed to parse the options in TaskFilterAggregator. 
Error msg: " + ex.getMessage)
+          LOG.error("Failed to parse the options in TaskFilterAggregator. 
Error msg: " +
+            ex.getMessage)
           null
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/package.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/package.scala 
b/streaming/src/main/scala/io/gearpump/streaming/package.scala
index d255522..95d51f0 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/package.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/package.scala
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package io.gearpump
 
 package object streaming {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala 
b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala
index 3e062ed..b036619 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSink.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,12 +22,10 @@ import io.gearpump.Message
 import io.gearpump.streaming.task.TaskContext
 
 /**
- * interface to implement custom data sink
- * where result of a DAG is typically written
- *
+ * Interface to implement custom data sink where result of a DAG is typically 
written
  * a DataSink could be a data store like HBase or simply a console
  *
- * an example would be like
+ * An example would be like:
  * {{{
  *  class ConsoleSink extends DataSink[String] {
  *
@@ -41,26 +39,26 @@ import io.gearpump.streaming.task.TaskContext
  *  }
  * }}}
  *
- * subclass is required to be serializable
+ * Subclass is required to be serializable
  */
 trait DataSink extends java.io.Serializable {
 
   /**
-   * open connection to data sink
+   * Opens connection to data sink
    * invoked at onStart() method of [[io.gearpump.streaming.task.Task]]
    * @param context is the task context at runtime
    */
   def open(context: TaskContext): Unit
 
   /**
-   * write message into data sink
+   * Writes message into data sink
    * invoked at onNext() method of [[io.gearpump.streaming.task.Task]]
    * @param message wraps data to be written out
    */
   def write(message: Message): Unit
 
   /**
-   * close connection to data sink
+   * Closes connection to data sink
    * invoked at onClose() method of [[io.gearpump.streaming.task.Task]]
    */
   def close(): Unit

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala 
b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala
index e83b40d..d753cc2 100644
--- 
a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala
+++ 
b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkProcessor.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,11 +19,12 @@
 package io.gearpump.streaming.sink
 
 import akka.actor.ActorSystem
-import io.gearpump.streaming.Processor
+
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.Processor
 
 /**
- * utility that helps user to create a DAG ending in [[DataSink]]
+ * Utility that helps user to create a DAG ending in [[DataSink]]
  * user should pass in a [[DataSink]].
  *
  * here is an example to build a DAG that does word count and write to 
KafkaSink
@@ -36,10 +37,12 @@ import io.gearpump.cluster.UserConfig
  * }}}
  */
 object DataSinkProcessor {
-  def apply(dataSink: DataSink,
-            parallelism: Int,
-            description: String = "",
-            taskConf: UserConfig = UserConfig.empty)(implicit system: 
ActorSystem): Processor[DataSinkTask] = {
+  def apply(
+      dataSink: DataSink,
+      parallelism: Int,
+      description: String = "",
+      taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
+    : Processor[DataSinkTask] = {
     Processor[DataSinkTask](parallelism, description = description,
       taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink))
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala 
b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala
index 95a6718..7436617 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/sink/DataSinkTask.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,19 @@
 
 package io.gearpump.streaming.sink
 
-import io.gearpump.streaming.task.{Task, TaskContext, StartTime}
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 
 object DataSinkTask {
   val DATA_SINK = "data_sink"
 }
 
 /**
- * general task that runs any [[DataSink]]
+ * General task that runs any [[DataSink]]
  */
-class DataSinkTask (context: TaskContext, conf: UserConfig) extends 
Task(context, conf) {
-  import DataSinkTask._
+class DataSinkTask(context: TaskContext, conf: UserConfig) extends 
Task(context, conf) {
+  import io.gearpump.streaming.sink.DataSinkTask._
 
   private val sink = conf.getValue[DataSink](DATA_SINK).get
 
@@ -47,5 +47,4 @@ class DataSinkTask (context: TaskContext, conf: UserConfig) 
extends Task(context
     LOG.info("closing data sink...")
     sink.close()
   }
-
 }

Reply via email to