http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala
index c25acb1..c8a44f6 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/dsl/StreamSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,18 @@
 
 package io.gearpump.streaming.dsl
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.util.{Either, Left, Right}
+
 import akka.actor._
-import io.gearpump.streaming.task.{StartTime, TaskContext}
+import org.mockito.Mockito.when
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
 import io.gearpump.Message
-import io.gearpump.cluster.{TestUtil, UserConfig}
 import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.{TestUtil, UserConfig}
 import io.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner}
 import io.gearpump.streaming.dsl.StreamSpec.Join
 import io.gearpump.streaming.dsl.partitioner.GroupByPartitioner
@@ -30,23 +37,18 @@ import io.gearpump.streaming.dsl.plan.OpTranslator._
 import io.gearpump.streaming.task.{StartTime, Task, TaskContext}
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph._
-import org.mockito.Mockito.when
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
 
-import scala.util.{Either, Left, Right}
-
-class StreamSpec  extends FlatSpec with Matchers with BeforeAndAfterAll  with 
MockitoSugar {
+class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
 
   implicit var system: ActorSystem = null
 
-  override def beforeAll: Unit = {
-    system = ActorSystem("test",  TestUtil.DEFAULT_CONFIG)
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
   }
 
   override def afterAll(): Unit = {
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   it should "translate the DSL to a DAG" in {
@@ -55,7 +57,7 @@ class StreamSpec  extends FlatSpec with Matchers with 
BeforeAndAfterAll  with Mo
 
     val app = StreamApp("dsl", context)
 
-    val data  =
+    val data =
       """
         five  four three  two    one
         five  four three  two
@@ -75,7 +77,9 @@ class StreamSpec  extends FlatSpec with Matchers with 
BeforeAndAfterAll  with Mo
 
     val appDescription = app.plan
 
-    val dagTopology = 
appDescription.dag.mapVertex(_.taskClass).mapEdge((node1, edge, node2) => 
edge.partitionerFactory.partitioner.getClass.getName)
+    val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { 
(node1, edge, node2) =>
+      edge.partitionerFactory.partitioner.getClass.getName
+    }
     val expectedDagTopology = getExpectedDagTopology
 
     
assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet))
@@ -102,14 +106,14 @@ class StreamSpec  extends FlatSpec with Matchers with 
BeforeAndAfterAll  with Mo
 
 object StreamSpec {
 
-  class Join(taskContext : TaskContext, userConf : UserConfig) extends 
Task(taskContext, userConf) {
+  class Join(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
 
     var query: String = null
     override def onStart(startTime: StartTime): Unit = {}
 
     override def onNext(msg: Message): Unit = {
       msg.msg match {
-        case Left(wordCount: (String, Int)) =>
+        case Left(wordCount: (String @unchecked, Int @unchecked)) =>
           if (query != null && wordCount._1 == query) {
             taskContext.output(new Message(wordCount))
           }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
index fa2c8cf..03dd242 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,12 @@
 
 package io.gearpump.streaming.dsl.partitioner
 
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.Message
 import io.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People
-import org.scalatest.{BeforeAndAfterAll, Matchers, FlatSpec}
 
-class GroupByPartitionerSpec  extends FlatSpec with Matchers with 
BeforeAndAfterAll {
+class GroupByPartitionerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
   it should "use the outpout of groupBy function to do partition" in {
     val mark = People("Mark", "male")
     val tom = People("Tom", "male")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
index 13af9c2..62a0f95 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,15 @@
 
 package io.gearpump.streaming.dsl.plan
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.ActorSystem
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest._
+
 import io.gearpump.Message
 import io.gearpump.cluster.{TestUtil, UserConfig}
 import io.gearpump.streaming.Constants._
@@ -26,29 +34,25 @@ import io.gearpump.streaming.MockUtil
 import io.gearpump.streaming.dsl.CollectionDataSource
 import io.gearpump.streaming.dsl.plan.OpTranslator._
 import io.gearpump.streaming.task.StartTime
-import org.mockito.ArgumentCaptor
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest._
 
-class OpTranslatorSpec  extends FlatSpec with Matchers with BeforeAndAfterAll {
+class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
 
   "andThen" should "chain multiple single input function" in {
     val dummy = new DummyInputFunction[String]
     val split = new FlatMapFunction[String, String](line => line.split("\\s"), 
"split")
 
-    val filter = new FlatMapFunction[String, String](word => if (word.isEmpty) 
None else Some(word), "filter")
+    val filter = new FlatMapFunction[String, String](word =>
+      if (word.isEmpty) None else Some(word), "filter")
 
     val map = new FlatMapFunction[String, Int](word => Some(1), "map")
 
-    val sum = new ReduceFunction[Int]({ (left, right) => left + right}, "sum")
+    val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum")
 
     val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum)
 
-
     assert(all.description == "split.filter.map.sum")
 
-    val data  =
+    val data =
       """
       five  four three  two    one
       five  four three  two
@@ -67,22 +71,25 @@ class OpTranslatorSpec  extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     val conf = UserConfig.empty
     val data = "one two three".split("\\s")
 
-    //source with no transformer
-    val source = new SourceTask[String, String](new 
CollectionDataSource[String](data), None, taskContext, conf)
+    // Source with no transformer
+    val source = new SourceTask[String, String](new 
CollectionDataSource[String](data), None,
+      taskContext, conf)
     source.onStart(StartTime(0))
     source.onNext(Message("next"))
     verify(taskContext, times(1)).output(anyObject())
 
-    //source with transformer
+    // Source with transformer
     val anotherTaskContext = MockUtil.mockTaskContext
     val double = new FlatMapFunction[String, String](word => List(word, word), 
"double")
-    val another = new SourceTask(new CollectionDataSource[String](data), 
Some(double), anotherTaskContext, conf)
+    val another = new SourceTask(new CollectionDataSource[String](data), 
Some(double),
+      anotherTaskContext, conf)
     another.onStart(StartTime(0))
     another.onNext(Message("next"))
     verify(anotherTaskContext, times(2)).output(anyObject())
   }
 
-  "GroupByTask" should "group input by groupBy Function and apply attached 
operator for each group" in {
+  "GroupByTask" should "group input by groupBy Function and " +
+    "apply attached operator for each group" in {
 
     val data = "1 2  2  3 3  3"
 
@@ -92,32 +99,33 @@ class OpTranslatorSpec  extends FlatSpec with Matchers with 
BeforeAndAfterAll {
       left + right
     }, "concat")
 
-    implicit val system = ActorSystem("test",  TestUtil.DEFAULT_CONFIG)
-    val config = UserConfig.empty.withValue[SingleInputFunction[String, 
String]](GEARPUMP_STREAMING_OPERATOR, concat)
+    implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+    val config = UserConfig.empty.withValue[SingleInputFunction[String, 
String]](
+    GEARPUMP_STREAMING_OPERATOR, concat)
 
     val taskContext = MockUtil.mockTaskContext
 
     val task = new GroupByTask[String, String, String](input => input, 
taskContext, config)
     task.onStart(StartTime(0))
 
-    val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]);
+    val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
 
     data.split("\\s+").foreach { word =>
       task.onNext(Message(word))
     }
-    verify(taskContext, times(6)).output(peopleCaptor.capture());
+    verify(taskContext, times(6)).output(peopleCaptor.capture())
 
     import scala.collection.JavaConverters._
 
     val values = peopleCaptor.getAllValues().asScala.map(input => 
input.msg.asInstanceOf[String])
     assert(values.mkString(",") == "1,2,22,3,33,333")
-    system.shutdown
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   "MergeTask" should "accept two stream and apply the attached operator" in {
 
-    //source with transformer
+    // Source with transformer
     val taskContext = MockUtil.mockTaskContext
     val conf = UserConfig.empty
     val double = new FlatMapFunction[String, String](word => List(word, word), 
"double")
@@ -126,7 +134,7 @@ class OpTranslatorSpec  extends FlatSpec with Matchers with 
BeforeAndAfterAll {
 
     val data = "1 2  2  3 3  3".split("\\s+")
 
-    data.foreach{input =>
+    data.foreach { input =>
       task.onNext(Message(input))
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala
index 151a188..1d94776 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/executor/ExecutorSpec.scala
@@ -1,27 +1,34 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package io.gearpump.streaming.executor
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
-import io.gearpump.WorkerId
+import org.mockito.Matchers._
+import org.mockito.Mockito.{times, _}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.appmaster.WorkerInfo
 import io.gearpump.cluster.scheduler.Resource
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.cluster.{ExecutorContext, TestUtil, UserConfig}
 import io.gearpump.streaming.AppMasterToExecutor._
 import io.gearpump.streaming.ExecutorToAppMaster.RegisterTask
@@ -30,12 +37,6 @@ import 
io.gearpump.streaming.executor.TaskLauncherSpec.MockTask
 import io.gearpump.streaming.task.{Subscriber, TaskId}
 import io.gearpump.streaming.{LifeTime, ProcessorDescription}
 import io.gearpump.transport.HostPort
-import org.mockito.Matchers._
-import org.mockito.Mockito.{times, _}
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import scala.language.postfixOps
-
 
 class ExecutorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   val appId = 0
@@ -51,22 +52,25 @@ class ExecutorSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
   }
 
   override def afterAll(): Unit = {
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   it should "call launcher to launch task" in {
     val worker = TestProbe()
     val workerInfo = WorkerInfo(workerId, worker.ref)
-    val executorContext = ExecutorContext(executorId, workerInfo, appId, 
"app", appMaster.ref, Resource(2))
+    val executorContext = ExecutorContext(executorId, workerInfo, appId, "app",
+      appMaster.ref, Resource(2))
     val taskLauncher = mock(classOf[ITaskLauncher])
     val executor = system.actorOf(Props(new Executor(executorContext, 
userConf, taskLauncher)))
-    val processor = ProcessorDescription(id = 0, taskClass = 
classOf[MockTask].getName, parallelism = 2)
+    val processor = ProcessorDescription(id = 0, taskClass = 
classOf[MockTask].getName,
+      parallelism = 2)
     val taskIds = List(TaskId(0, 0), TaskId(0, 1))
-    val launchTasks = LaunchTasks(taskIds, dagVersion = 0,  processor, 
List.empty[Subscriber])
+    val launchTasks = LaunchTasks(taskIds, dagVersion = 0, processor, 
List.empty[Subscriber])
 
     val task = TestProbe()
-    when(taskLauncher.launch(any(), any(), any(), any(), 
any())).thenReturn(taskIds.map((_, task.ref)).toMap)
+    when(taskLauncher.launch(any(), any(), any(), any(), any()))
+      .thenReturn(taskIds.map((_, task.ref)).toMap)
 
     val client = TestProbe()
     client.send(executor, launchTasks)
@@ -91,7 +95,8 @@ class ExecutorSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     task.expectMsgType[StartTask]
     task.expectMsgType[StartTask]
 
-    val changeTasks = ChangeTasks(taskIds, dagVersion = 1, life = LifeTime(0, 
Long.MaxValue), List.empty[Subscriber])
+    val changeTasks = ChangeTasks(taskIds, dagVersion = 1, life = LifeTime(0, 
Long.MaxValue),
+      List.empty[Subscriber])
 
     client.send(executor, changeTasks)
     client.expectMsgType[TasksChanged]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
index d220e60..c97ae4c 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskArgumentStoreSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,13 @@
  */
 package io.gearpump.streaming.executor
 
-import io.gearpump.streaming.executor.TaskLauncher.TaskArgument
 import org.scalatest._
-import io.gearpump.streaming.executor.Executor.{TaskArgumentStore}
-import scala.language.postfixOps
+
+import io.gearpump.streaming.executor.Executor.TaskArgumentStore
+import io.gearpump.streaming.executor.TaskLauncher.TaskArgument
 import io.gearpump.streaming.task.TaskId
 
-class TaskArgumentStoreSpec  extends FlatSpec with Matchers with 
BeforeAndAfterEach {
+class TaskArgumentStoreSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
   it should "retain all history of taskArgument" in {
     val version0 = TaskArgument(0, null, null)
     val version2 = version0.copy(dagVersion = 2)
@@ -32,15 +32,14 @@ class TaskArgumentStoreSpec  extends FlatSpec with Matchers 
with BeforeAndAfterE
     store.add(task, version0)
     store.add(task, version2)
 
-    // we should return a version which is same or older than expected version
+    // Should return a version which is same or older than expected version
     assert(store.get(dagVersion = 1, task) == Some(version0))
     assert(store.get(dagVersion = 0, task) == Some(version0))
     assert(store.get(dagVersion = 2, task) == Some(version2))
 
-    store.removeObsoleteVersion
+    store.removeObsoleteVersion()
     assert(store.get(dagVersion = 1, task) == None)
     assert(store.get(dagVersion = 0, task) == None)
     assert(store.get(dagVersion = 2, task) == Some(version2))
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala
index f9decf5..c135c5b 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/executor/TaskLauncherSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,19 +17,21 @@
  */
 package io.gearpump.streaming.executor
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{Actor, ActorSystem}
 import akka.testkit.TestProbe
+import org.scalatest._
+
 import io.gearpump.cluster.{TestUtil, UserConfig}
 import io.gearpump.serializer.SerializationFramework
 import io.gearpump.streaming.ProcessorDescription
 import io.gearpump.streaming.executor.TaskLauncher.TaskArgument
 import io.gearpump.streaming.executor.TaskLauncherSpec.{MockTask, 
MockTaskActor}
 import io.gearpump.streaming.task.{Task, TaskContext, TaskContextData, TaskId, 
TaskWrapper}
-import org.scalatest._
-
-import scala.language.postfixOps
 
-class TaskLauncherSpec  extends FlatSpec with Matchers with BeforeAndAfterAll {
+class TaskLauncherSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   val appId = 0
   val executorId = 0
   var appMaster: TestProbe = null
@@ -42,17 +44,20 @@ class TaskLauncherSpec  extends FlatSpec with Matchers with 
BeforeAndAfterAll {
   }
 
   override def afterAll(): Unit = {
-    system.shutdown()
-    system.awaitTermination()
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   it should "able to launch tasks" in {
-    val launcher = new TaskLauncher(appId, "app", executorId, appMaster.ref, 
userConf, classOf[MockTaskActor])
+    val launcher = new TaskLauncher(appId, "app", executorId, appMaster.ref,
+      userConf, classOf[MockTaskActor])
     val taskIds = List(TaskId(0, 0), TaskId(0, 1))
-    val processor = ProcessorDescription(id = 0, taskClass = 
classOf[MockTask].getName, parallelism = 2)
+    val processor = ProcessorDescription(id = 0, taskClass = 
classOf[MockTask].getName,
+      parallelism = 2)
     val argument = TaskArgument(0, processor, null)
 
-    val tasks = launcher.launch(taskIds, argument, system, null, 
"gearpump.shared-thread-pool-dispatcher")
+    val tasks = launcher.launch(taskIds, argument, system, null,
+      "gearpump.shared-thread-pool-dispatcher")
     tasks.keys.toSet shouldBe taskIds.toSet
   }
 }
@@ -67,6 +72,7 @@ object TaskLauncherSpec {
     def receive: Receive = null
   }
 
-  class MockTask(taskContext : TaskContext, userConf : UserConfig) extends 
Task(taskContext, userConf) {
+  class MockTask(taskContext: TaskContext, userConf: UserConfig)
+    extends Task(taskContext, userConf) {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
index c147a4b..da1ca9f 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/metrics/ProcessorAggregatorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,26 +18,27 @@
 
 package io.gearpump.streaming.metrics
 
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.scalatest.{FlatSpec, Matchers}
+
 import io.gearpump.cluster.ClientToMaster.ReadOption
 import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
-import io.gearpump.metrics.Metrics.{Gauge, Meter, Histogram}
-import io.gearpump.streaming.metrics.ProcessorAggregator.{AggregatorFactory, 
MeterAggregator, HistogramAggregator, MultiLayerMap}
+import io.gearpump.metrics.Metrics.{Gauge, Histogram, Meter}
+import io.gearpump.streaming.metrics.ProcessorAggregator.{AggregatorFactory, 
HistogramAggregator, MeterAggregator, MultiLayerMap}
 import io.gearpump.streaming.task.TaskId
 import io.gearpump.util.HistoryMetricsService.HistoryMetricsConfig
-import org.scalatest.{Matchers, FlatSpec}
-import scala.collection.JavaConverters._
-import scala.util.Random
 
 class ProcessorAggregatorSpec extends FlatSpec with Matchers {
 
-
   "MultiLayerMap" should "maintain multiple layers HashMap" in {
     val layers = 3
     val map = new MultiLayerMap[String](layers)
 
     assert(map.get(layer = 0, "key") == null)
 
-    // illegal, handle safely
+    // Illegal, handle safely
     assert(map.get(layer = 10, "key") == null)
 
     map.put(layer = 0, "key", "value")
@@ -47,7 +48,7 @@ class ProcessorAggregatorSpec extends FlatSpec with Matchers {
     map.put(layer = 2, "key3", "value3")
     map.put(layer = 2, "key4", "value4")
 
-    // illegal, should be ignored
+    // Illegal, should be ignored
     map.put(layer = 4, "key5", "value5")
 
     assert(map.size == 4)
@@ -69,15 +70,15 @@ class ProcessorAggregatorSpec extends FlatSpec with 
Matchers {
 
     val result = aggregator.result
 
-    //pick old time as aggregated time
+    // Picks old time as aggregated time
     assert(result.time == olderTime)
 
-    // do average
+    // Does average
     val check = result.value.asInstanceOf[Histogram]
 
     assert(check.mean - expect.mean < 0.01)
     assert(check.stddev - expect.stddev < 0.01)
-    assert(check.median - expect.median< 0.01)
+    assert(check.median - expect.median < 0.01)
     assert(check.p95 - expect.p95 < 0.01)
     assert(check.p99 - expect.p99 < 0.01)
     assert(check.p999 - expect.p999 < 0.01)
@@ -98,13 +99,13 @@ class ProcessorAggregatorSpec extends FlatSpec with 
Matchers {
 
     val result = aggregator.result
 
-    //pick old time
+    // Picks old time
     assert(result.time == olderTime)
 
-    // do summing
+    // Does summing
     val check = result.value.asInstanceOf[Meter]
 
-    assert(check.count ==  expect.count)
+    assert(check.count == expect.count)
     assert(check.m1 - expect.m1 < 0.01)
     assert(check.meanRate - expect.meanRate < 0.01)
     assert(check.rateUnit == expect.rateUnit)
@@ -124,25 +125,34 @@ class ProcessorAggregatorSpec extends FlatSpec with 
Matchers {
   }
 
   "ProcessorAggregator" should "aggregate on different read options" in {
-    val hours = 2 // maintain 2 hours history
-    val seconds = 2 // maintain 2 seconds recent data
-    val taskCount = 5 // for each processor
-    val metricCount = 100 // for each task, have metricCount metrics
-    val range = new HistoryMetricsConfig(hours, hours / 2 * 3600 * 1000, 
seconds, seconds / 2 * 1000)
+    val hours = 2 // Maintains 2 hours history
+    val seconds = 2 // Maintains 2 seconds recent data
+    val taskCount = 5 // For each processor
+    val metricCount = 100 // For each task, have metricCount metrics
+    val range = new HistoryMetricsConfig(hours, hours / 2 * 3600 * 1000,
+      seconds, seconds / 2 * 1000)
 
     val aggregator = new ProcessorAggregator(range)
 
     def count(value: Int): Int = value
 
-    def inputs(timeRange: Long) = {
-      (0 until taskCount).map(TaskId(processorId = 0, _)).flatMap(histogram(_, 
"receiveLatency", timeRange, metricCount)).toList ++
-      (0 until taskCount).map(TaskId(processorId = 0, _)).flatMap(histogram(_, 
"processTime", timeRange, metricCount)).toList ++
-      (0 until taskCount).map(TaskId(processorId = 1, _)).flatMap(histogram(_, 
"receiveLatency", timeRange, metricCount)).toList ++
-      (0 until taskCount).map(TaskId(processorId = 1, _)).flatMap(histogram(_, 
"processTime", timeRange, metricCount)).toList ++
-      (0 until taskCount).map(TaskId(processorId = 0, _)).flatMap(meter(_, 
"sendThroughput", timeRange, metricCount)).toList ++
-      (0 until taskCount).map(TaskId(processorId = 0, _)).flatMap(meter(_, 
"receiveThroughput", timeRange, metricCount)).toList ++
-      (0 until taskCount).map(TaskId(processorId = 1, _)).flatMap(meter(_, 
"sendThroughput", timeRange, metricCount)).toList ++
-      (0 until taskCount).map(TaskId(processorId = 1, _)).flatMap(meter(_, 
"receiveThroughput", timeRange, metricCount)).toList
+    def inputs(timeRange: Long): List[HistoryMetricsItem] = {
+      (0 until taskCount).map(TaskId(processorId = 0, _))
+        .flatMap(histogram(_, "receiveLatency", timeRange, 
metricCount)).toList ++
+        (0 until taskCount).map(TaskId(processorId = 0, _))
+          .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList 
++
+        (0 until taskCount).map(TaskId(processorId = 1, _))
+          .flatMap(histogram(_, "receiveLatency", timeRange, 
metricCount)).toList ++
+        (0 until taskCount).map(TaskId(processorId = 1, _))
+          .flatMap(histogram(_, "processTime", timeRange, metricCount)).toList 
++
+        (0 until taskCount).map(TaskId(processorId = 0, _))
+          .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList 
++
+        (0 until taskCount).map(TaskId(processorId = 0, _))
+          .flatMap(meter(_, "receiveThroughput", timeRange, 
metricCount)).toList ++
+        (0 until taskCount).map(TaskId(processorId = 1, _))
+          .flatMap(meter(_, "sendThroughput", timeRange, metricCount)).toList 
++
+        (0 until taskCount).map(TaskId(processorId = 1, _))
+          .flatMap(meter(_, "receiveThroughput", timeRange, 
metricCount)).toList
     }
 
     def check(list: List[HistoryMetricsItem], countMap: Map[String, Int]): 
Boolean = {
@@ -150,14 +160,15 @@ class ProcessorAggregatorSpec extends FlatSpec with 
Matchers {
       nameCount sameElements countMap
     }
 
-    // aggregate on processor and meterNames,
+    // Aggregates on processor and meterNames,
     val input = inputs(Long.MaxValue)
-    val readLatest = aggregator.aggregate(ReadOption.ReadLatest, 
input.iterator, now = Long.MaxValue)
-    assert(readLatest.size == 8) //2 processor * 4 metrics type
+    val readLatest = aggregator.aggregate(ReadOption.ReadLatest,
+      input.iterator, now = Long.MaxValue)
+    assert(readLatest.size == 8) // 2 processor * 4 metrics type
     assert(check(readLatest, Map(
       "app0.processor0:receiveLatency" -> count(1),
       "app0.processor0:processTime" -> count(1),
-      "app0.processor0:sendThroughput"-> count(1),
+      "app0.processor0:sendThroughput" -> count(1),
       "app0.processor0:receiveThroughput" -> count(1),
       "app0.processor1:receiveLatency" -> count(1),
       "app0.processor1:processTime" -> count(1),
@@ -165,13 +176,14 @@ class ProcessorAggregatorSpec extends FlatSpec with 
Matchers {
       "app0.processor1:receiveThroughput" -> count(1)
     )))
 
-    // aggregate on processor and meterNames and time range,
-    val readRecent = aggregator.aggregate(ReadOption.ReadRecent, 
inputs(seconds * 1000).iterator, now = seconds * 1000)
-    assert(readRecent.size == 16) //2 processor * 4 metrics type * 2 time range
+    // Aggregates on processor and meterNames and time range,
+    val readRecent = aggregator.aggregate(ReadOption.ReadRecent,
+      inputs(seconds * 1000).iterator, now = seconds * 1000)
+    assert(readRecent.size == 16) // 2 processor * 4 metrics type * 2 time 
range
     assert(check(readRecent, Map(
       "app0.processor0:receiveLatency" -> count(2),
       "app0.processor0:processTime" -> count(2),
-      "app0.processor0:sendThroughput"-> count(2),
+      "app0.processor0:sendThroughput" -> count(2),
       "app0.processor0:receiveThroughput" -> count(2),
       "app0.processor1:receiveLatency" -> count(2),
       "app0.processor1:processTime" -> count(2),
@@ -179,13 +191,14 @@ class ProcessorAggregatorSpec extends FlatSpec with 
Matchers {
       "app0.processor1:receiveThroughput" -> count(2)
     )))
 
-    // aggregate on processor and meterNames and time range,
-    val readHistory = aggregator.aggregate(ReadOption.ReadHistory, 
inputs(hours * 3600 * 1000).iterator, now = hours * 3600 * 1000)
-    assert(readHistory.size == 16) //2 processor * 4 metrics type * 2 time 
ranges
+    // Aggregates on processor and meterNames and time range,
+    val readHistory = aggregator.aggregate(ReadOption.ReadHistory,
+      inputs(hours * 3600 * 1000).iterator, now = hours * 3600 * 1000)
+    assert(readHistory.size == 16) // 2 processor * 4 metrics type * 2 time 
ranges
     assert(check(readHistory, Map(
       "app0.processor0:receiveLatency" -> count(2),
       "app0.processor0:processTime" -> count(2),
-      "app0.processor0:sendThroughput"-> count(2),
+      "app0.processor0:sendThroughput" -> count(2),
       "app0.processor0:receiveThroughput" -> count(2),
       "app0.processor1:receiveLatency" -> count(2),
       "app0.processor1:processTime" -> count(2),
@@ -194,9 +207,11 @@ class ProcessorAggregatorSpec extends FlatSpec with 
Matchers {
     )))
   }
 
-  private def histogram(taskId: TaskId, metricName: String = "latency", 
timeRange: Long = Long.MaxValue, repeat: Int = 1): List[HistoryMetricsItem] = {
+  private def histogram(
+      taskId: TaskId, metricName: String = "latency", timeRange: Long = 
Long.MaxValue,
+      repeat: Int = 1): List[HistoryMetricsItem] = {
     val random = new Random()
-    (0 until repeat).map {_ =>
+    (0 until repeat).map { _ =>
       new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange),
         new 
Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:$metricName",
           Math.abs(random.nextDouble()),
@@ -209,7 +224,8 @@ class ProcessorAggregatorSpec extends FlatSpec with 
Matchers {
     }.toList
   }
 
-  private def meter(taskId: TaskId, metricName: String, timeRange: Long, 
repeat: Int): List[HistoryMetricsItem] = {
+  private def meter(taskId: TaskId, metricName: String, timeRange: Long, 
repeat: Int)
+    : List[HistoryMetricsItem] = {
     val random = new Random()
     (0 until repeat).map { _ =>
       new HistoryMetricsItem(Math.abs(random.nextLong() % timeRange),
@@ -225,19 +241,20 @@ class ProcessorAggregatorSpec extends FlatSpec with 
Matchers {
   "ProcessorAggregator" should "handle smoothly for unsupported metric type 
and " +
     "error formatted metric name" in {
     val invalid = List(
-      // unsupported metric type
+      // Unsupported metric type
       HistoryMetricsItem(0, new Gauge("app0.processor0.task0:gauge", 100)),
 
-      //wrong format: should be app0.processor0.task0:throughput
+      // Wrong format: should be app0.processor0.task0:throughput
       HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 
0, 0, ""))
     )
 
     val valid = histogram(TaskId(0, 0), repeat = 10)
 
     val aggregator = new ProcessorAggregator(new HistoryMetricsConfig(0, 0, 0, 
0))
-    val result = aggregator.aggregate(ReadOption.ReadLatest, (valid ++ 
invalid).toIterator, now = Long.MaxValue)
+    val result = aggregator.aggregate(ReadOption.ReadLatest, (valid ++ 
invalid).toIterator,
+      now = Long.MaxValue)
 
-    // for one taskId, will only use one data point.
+    // For one taskId, will only use one data point.
     assert(result.size == 1)
     assert(result.head.value.name == "app0.processor0:latency")
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
index 7924acb..f654eb9 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/metrics/TaskFilterAggregatorSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,26 +18,27 @@
 
 package io.gearpump.streaming.metrics
 
+import scala.util.Random
+
+import org.scalatest.{FlatSpec, Matchers}
+
 import io.gearpump.cluster.MasterToClient.HistoryMetricsItem
-import io.gearpump.metrics.Metrics.{Meter, Gauge, Histogram}
+import io.gearpump.metrics.Metrics.{Histogram, Meter}
 import io.gearpump.streaming.metrics.TaskFilterAggregator.Options
-import io.gearpump.streaming.task.{StartTime, TaskId}
-import org.scalatest.{Matchers, FlatSpec}
-
-import scala.util.Random
+import io.gearpump.streaming.task.TaskId
 
-class TaskFilterAggregatorSpec  extends FlatSpec with Matchers {
+class TaskFilterAggregatorSpec extends FlatSpec with Matchers {
 
   def metric(taskId: TaskId): HistoryMetricsItem = {
     val random = new Random()
     new HistoryMetricsItem(Math.abs(random.nextLong()),
       new 
Histogram(s"app0.processor${taskId.processorId}.task${taskId.index}:latency",
-        0, 0,0,0,0,0))
+        0, 0, 0, 0, 0, 0))
   }
 
   it should "filter data on processor range, task range combination" in {
-    val inputs = (0 until 10).flatMap{processor =>
-      (0 until 10).map{ task =>
+    val inputs = (0 until 10).flatMap { processor =>
+      (0 until 10).map { task =>
         metric(TaskId(processor, task))
       }
     }.toList
@@ -45,17 +46,17 @@ class TaskFilterAggregatorSpec  extends FlatSpec with 
Matchers {
     val globalLimit = 10
     val aggregator = new TaskFilterAggregator(globalLimit)
 
-    // limit not met, return all matches in this matrix
+    // Limit not met, return all matches in this matrix
     var options = new Options(limit = 20, startTask = 3, endTask = 6,
       startProcessor = 3, endProcessor = 6)
     assert(aggregator.aggregate(options, inputs.iterator).size == 9)
 
-    // user limit reached
+    // User limit reached
     options = new Options(limit = 3, startTask = 3, endTask = 5,
       startProcessor = 3, endProcessor = 5)
     assert(aggregator.aggregate(options, inputs.iterator).size == 3)
 
-    // global limit reached
+    // Global limit reached
     options = new Options(limit = 20, startTask = 3, endTask = 8,
       startProcessor = 3, endProcessor = 8)
     assert(aggregator.aggregate(options, inputs.iterator).size == globalLimit)
@@ -67,8 +68,8 @@ class TaskFilterAggregatorSpec  extends FlatSpec with 
Matchers {
   }
 
   it should "skip wrong format metrics" in {
-    val invalid = List{
-      //wrong format: should be app0.processor0.task0:throughput
+    val invalid = List {
+      // Wrong format: should be app0.processor0.task0:throughput
       HistoryMetricsItem(0, new Meter("app0.processor0.task0/throughput", 100, 
0, 0, ""))
     }
     val options = Options.acceptAll

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
index 10bc48e..752a8f4 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/source/DefaultTimeStampFilterSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,10 +18,11 @@
 
 package io.gearpump.streaming.source
 
-import io.gearpump.{TimeStamp, Message}
 import org.scalacheck.Gen
-import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import io.gearpump.{Message, TimeStamp}
 
 class DefaultTimeStampFilterSpec extends PropSpec with PropertyChecks with 
Matchers {
   property("DefaultTimeStampFilter should filter message against give 
timestamp") {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
index 483200f..ea670f6 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,14 +18,15 @@
 
 package io.gearpump.streaming.state.impl
 
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.transaction.api.CheckpointStore
-import org.mockito.{Matchers => MockitoMatchers}
 import org.mockito.Mockito._
+import org.mockito.{Matchers => MockitoMatchers}
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import io.gearpump.TimeStamp
+import io.gearpump.streaming.transaction.api.CheckpointStore
 
 class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
 
@@ -56,7 +57,7 @@ class CheckpointManagerSpec extends PropSpec with 
PropertyChecks with Matchers w
     }
   }
 
-   property("CheckpointManager should close CheckpointStore") {
+  property("CheckpointManager should close CheckpointStore") {
     forAll(checkpointIntervalGen) {
       (checkpointInterval: Long) =>
         val checkpointStore = mock[CheckpointStore]
@@ -85,5 +86,4 @@ class CheckpointManagerSpec extends PropSpec with 
PropertyChecks with Matchers w
         checkpointManager.getCheckpointTime shouldBe empty
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
index 86996ab..4bbad3b 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/InMemoryCheckpointStoreSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala
index e420a86..b20b666 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/NonWindowStateSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,15 +18,16 @@
 
 package io.gearpump.streaming.state.impl
 
-import io.gearpump.TimeStamp
-import io.gearpump.streaming.state.api.{Monoid, Serializer}
+import scala.util.Success
+
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import scala.util.Success
+import io.gearpump.TimeStamp
+import io.gearpump.streaming.state.api.{Monoid, Serializer}
 
 class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
 
@@ -89,7 +90,6 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with
         state.left shouldBe plus
         state.right shouldBe zero
         state.get shouldBe Some(plus)
-
     }
   }
 
@@ -129,5 +129,4 @@ class NonWindowStateSpec extends PropSpec with 
PropertyChecks with Matchers with
         state.get shouldBe Some(plus)
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala
index f0489a3..e12ec9b 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,12 @@
 
 package io.gearpump.streaming.state.impl
 
-import io.gearpump.TimeStamp
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
-import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+import io.gearpump.TimeStamp
 
 class WindowSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
 
@@ -43,10 +44,10 @@ class WindowSpec extends PropSpec with PropertyChecks with 
Matchers with Mockito
     forAll(timestampGen, windowSizeGen, windowStepGen) {
       (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
         val window = new Window(windowSize, windowStep)
-        window.range shouldBe (0L, windowSize)
+        window.range shouldBe(0L, windowSize)
 
         window.slideOneStep()
-        window.range shouldBe (windowStep, windowSize + windowStep)
+        window.range shouldBe(windowStep, windowSize + windowStep)
 
         window.slideTo(timestamp)
         val (startTime, endTime) = window.range

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala
index dd54d41..bc79ff6 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/WindowStateSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,17 +18,18 @@
 
 package io.gearpump.streaming.state.impl
 
-import io.gearpump._
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.state.api.{Serializer, Group}
+import scala.collection.immutable.TreeMap
+import scala.util.Success
+
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import scala.collection.immutable.TreeMap
-import scala.util.Success
+import io.gearpump._
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.state.api.{Group, Serializer}
 
 class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
 
@@ -91,7 +92,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
       state.right shouldBe zero
       state.get shouldBe Some(zero)
 
-      val start =  checkpointTime - 1
+      val start = checkpointTime - 1
       val end = checkpointTime + 1
       val size = end - start
       val step = 1L
@@ -130,11 +131,10 @@ class WindowStateSpec extends PropSpec with 
PropertyChecks with Matchers with Mo
       val right = mock[AnyRef]
       val plus = mock[AnyRef]
 
-
       when(group.zero).thenReturn(zero, zero)
       val state = new WindowState[AnyRef](group, serializer, taskContext, 
window)
 
-      val start =  checkpointTime - 1
+      val start = checkpointTime - 1
       val end = checkpointTime + 1
       val size = end - start
       val step = 1L
@@ -147,8 +147,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
       when(group.plus(left, zero)).thenReturn(left, Nil: _*)
       when(taskContext.upstreamMinClock).thenReturn(0L)
 
-      // time < checkpointTime
-      // update left in current window
+      // Time < checkpointTime
+      // Update left in current window
       state.setNextCheckpointTime(checkpointTime)
       state.update(start, left)
 
@@ -166,8 +166,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
       when(group.plus(left, right)).thenReturn(plus, Nil: _*)
       when(taskContext.upstreamMinClock).thenReturn(0L)
 
-      // time >= checkpointTime
-      // update right in current window
+      // Time >= checkpointTime
+      // Update right in current window
       state.setNextCheckpointTime(checkpointTime)
       state.update(checkpointTime, right)
 
@@ -176,10 +176,9 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
       state.right shouldBe right
       state.get shouldBe Some(plus)
       state.getIntervalStates(start, end) shouldBe
-          TreeMap(Interval(start, start + step) -> left, Interval(start + 
step, end) -> right)
-
+        TreeMap(Interval(start, start + step) -> left, Interval(start + step, 
end) -> right)
 
-      // slide window
+      // Slides window forward
       when(window.range).thenReturn((start, end), (start + step, end + step))
       when(window.shouldSlide).thenReturn(true)
       when(taskContext.upstreamMinClock).thenReturn(checkpointTime)
@@ -197,10 +196,10 @@ class WindowStateSpec extends PropSpec with 
PropertyChecks with Matchers with Mo
       state.right shouldBe plus
       state.get shouldBe Some(plus)
       state.getIntervalStates(start, end + step) shouldBe
-          TreeMap(
-            Interval(start, start + step) -> left,
-            Interval(start + step, end) -> right,
-            Interval(end, end + step) -> right)
+        TreeMap(
+          Interval(start, start + step) -> left,
+          Interval(start + step, end) -> right,
+          Interval(end, end + step) -> right)
     }
   }
 
@@ -232,7 +231,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
       timestamp / windowStep * windowStep should (be <= interval.startTime)
       (timestamp - windowSize) / windowStep * windowStep should (be <= 
interval.startTime)
       (timestamp / windowStep + 1) * windowStep should (be >= interval.endTime)
-      ((timestamp - windowSize) / windowStep + 1) * windowStep + windowSize 
should (be >= interval.endTime)
+      ((timestamp - windowSize) / windowStep + 1) * windowStep + windowSize 
should
+        (be >= interval.endTime)
       checkpointTime should (be <= interval.startTime or be >= 
interval.endTime)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
index 5099466..60baa74 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/storage/InMemoryAppStoreOnMasterSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,17 +17,16 @@
  */
 package io.gearpump.streaming.storage
 
-import io.gearpump.streaming.StreamingTestUtil
-import io.gearpump.cluster.{MasterHarness, MiniCluster, TestUtil}
-import io.gearpump.streaming.StreamingTestUtil
-import io.gearpump.util.Constants
-import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
-
 import scala.concurrent.Await
 import scala.concurrent.duration._
-import scala.language.postfixOps
 
-class InMemoryAppStoreOnMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterAll{
+import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
+
+import io.gearpump.cluster.{MasterHarness, MiniCluster}
+import io.gearpump.streaming.StreamingTestUtil
+import io.gearpump.util.Constants
+
+class InMemoryAppStoreOnMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterAll {
   implicit val timeout = Constants.FUTURE_TIMEOUT
   implicit val dispatcher = MasterHarness.cachedPool
 
@@ -45,14 +44,18 @@ class InMemoryAppStoreOnMasterSpec extends WordSpec with 
Matchers with BeforeAnd
       store.put("Int_type", 1024)
       store.put("Tuple2_type", ("element1", 1024))
 
-      val future1 = store.get("String_type").map { value => 
value.asInstanceOf[String] should be("this is a string")}
-      val future2 = store.get("Int_type").map { value => 
value.asInstanceOf[Int] should be(1024)}
-      val future3 = store.get("Tuple2_type").map { value => 
value.asInstanceOf[(String, Int)] should be(("element1", 1024))}
-      val future4 = store.get("key").map { value => value.asInstanceOf[Object] 
should be(null)}
-      Await.result(future1, 15 seconds)
-      Await.result(future2, 15 seconds)
-      Await.result(future3, 15 seconds)
-      Await.result(future4, 15 seconds)
+      val future1 = store.get("String_type").map { value =>
+        value.asInstanceOf[String] should be("this is a string")
+      }
+      val future2 = store.get("Int_type").map { value => 
value.asInstanceOf[Int] should be(1024) }
+      val future3 = store.get("Tuple2_type").map { value =>
+        value.asInstanceOf[(String, Int)] should be(("element1", 1024))
+      }
+      val future4 = store.get("key").map { value => value.asInstanceOf[Object] 
should be(null) }
+      Await.result(future1, 15.seconds)
+      Await.result(future2, 15.seconds)
+      Await.result(future3, 15.seconds)
+      Await.result(future4, 15.seconds)
       miniCluster.shutDown
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala
index ffc6df1..ffb343e 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriberSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,29 +17,32 @@
  */
 package io.gearpump.streaming.task
 
+import org.scalatest.{FlatSpec, Matchers}
+
 import io.gearpump.partitioner.{HashPartitioner, Partitioner}
 import io.gearpump.streaming.task.SubscriberSpec.TestTask
 import io.gearpump.streaming.{DAG, ProcessorDescription}
 import io.gearpump.util.Graph
 import io.gearpump.util.Graph._
-import org.scalatest.{FlatSpec, Matchers}
 
-class SubscriberSpec  extends FlatSpec with Matchers {
+class SubscriberSpec extends FlatSpec with Matchers {
   "Subscriber.of" should "return all subscriber for a processor" in {
 
     val sourceProcessorId = 0
-    val task1 = ProcessorDescription(id = sourceProcessorId, taskClass = 
classOf[TestTask].getName, parallelism = 1)
+    val task1 = ProcessorDescription(id = sourceProcessorId, taskClass =
+      classOf[TestTask].getName, parallelism = 1)
     val task2 = ProcessorDescription(id = 1, taskClass = 
classOf[TestTask].getName, parallelism = 1)
     val task3 = ProcessorDescription(id = 2, taskClass = 
classOf[TestTask].getName, parallelism = 1)
     val partitioner = Partitioner[HashPartitioner]
-    val dag = DAG(Graph(task1 ~ partitioner ~> task2, task1 ~ partitioner ~> 
task3, task2 ~ partitioner ~> task3))
-
+    val dag = DAG(Graph(task1 ~ partitioner ~> task2, task1 ~ partitioner ~> 
task3,
+      task2 ~ partitioner ~> task3))
 
     val subscribers = Subscriber.of(sourceProcessorId, dag)
     assert(subscribers.size == 2)
 
     assert(subscribers.toSet ==
-      Set(Subscriber(1, partitioner, task2.parallelism, task2.life), 
Subscriber(2, partitioner, task3.parallelism, task3.life)))
+      Set(Subscriber(1, partitioner, task2.parallelism, task2.life), 
Subscriber(2, partitioner,
+        task3.parallelism, task3.life)))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
index ecad47b..57f4b8c 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/task/SubscriptionSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,16 +20,15 @@ package io.gearpump.streaming.task
 
 import java.util.Random
 
+import org.mockito.Mockito._
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{FlatSpec, Matchers}
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
-import io.gearpump.partitioner.{Partitioner, HashPartitioner}
-import io.gearpump.streaming.{LifeTime, ProcessorDescription}
+import io.gearpump.partitioner.{HashPartitioner, Partitioner}
 import io.gearpump.streaming.task.SubscriptionSpec.NextTask
-import org.scalatest.{FlatSpec}
-
-import org.mockito.Mockito._
-import org.scalatest.{Matchers}
-import org.scalatest.mock.MockitoSugar
+import io.gearpump.streaming.{LifeTime, ProcessorDescription}
 
 class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
   val appId = 0
@@ -41,16 +40,18 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
   val partitioner = Partitioner[HashPartitioner]
 
   val parallism = 2
-  val downstreamProcessor = ProcessorDescription(downstreamProcessorId, 
classOf[NextTask].getName, parallism)
-  val subscriber = Subscriber(downstreamProcessorId, partitioner, 
downstreamProcessor.parallelism, downstreamProcessor.life)
+  val downstreamProcessor = ProcessorDescription(downstreamProcessorId, 
classOf[NextTask].getName,
+    parallism)
+  val subscriber = Subscriber(downstreamProcessorId, partitioner, 
downstreamProcessor.parallelism,
+    downstreamProcessor.life)
 
   private def prepare: (Subscription, ExpressTransport) = {
     val transport = mock[ExpressTransport]
     val subscription = new Subscription(appId, executorId, taskId, subscriber, 
session, transport)
-    subscription.start
+    subscription.start()
 
     val expectedAckRequest = InitialAckRequest(taskId, session)
-    verify(transport, times(1)).transport(expectedAckRequest, TaskId(1,0), 
TaskId(1, 1))
+    verify(transport, times(1)).transport(expectedAckRequest, TaskId(1, 0), 
TaskId(1, 1))
 
     (subscription, transport)
   }
@@ -67,41 +68,40 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
     val msg1 = new Message("1", timestamp = 70)
     subscription.sendMessage(msg1)
 
-    verify(transport, times(1)).transport(msg1, TaskId(1,1))
+    verify(transport, times(1)).transport(msg1, TaskId(1, 1))
     assert(subscription.minClock == 70)
 
     val msg2 = new Message("0", timestamp = 50)
     subscription.sendMessage(msg2)
-    verify(transport, times(1)).transport(msg2, TaskId(1,0))
+    verify(transport, times(1)).transport(msg2, TaskId(1, 0))
 
     // minClock has been set to smaller one
     assert(subscription.minClock == 50)
 
     val initialMinClock = subscription.minClock
 
-    //ack initial AckRequest(0)
+    // Acks initial AckRequest(0)
     subscription.receiveAck(Ack(TaskId(1, 1), 0, 0, session))
     subscription.receiveAck(Ack(TaskId(1, 0), 0, 0, session))
 
-    //send 100 messages
-    100 until 200 foreach {clock =>
+    // Sends 100 messages
+    100 until 200 foreach { clock =>
       subscription.sendMessage(Message("1", clock))
       subscription.sendMessage(Message("2", clock))
     }
 
-    // ack not received, minClock no change
+    // Ack not received, minClock no change
     assert(subscription.minClock == initialMinClock)
 
     subscription.receiveAck(Ack(TaskId(1, 1), 100, 100, session))
     subscription.receiveAck(Ack(TaskId(1, 0), 100, 100, session))
 
-    // ack received, minClock changed
+    // Ack received, minClock changed
     assert(subscription.minClock > initialMinClock)
 
-
-    // we expect to receive two ackRequest for two downstream tasks
+    // Expects to receive two ackRequest for two downstream tasks
     val ackRequestForTask0 = AckRequest(taskId, 200, session)
-    verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1,0))
+    verify(transport, times(1)).transport(ackRequestForTask0, TaskId(1, 0))
 
     val ackRequestForTask1 = AckRequest(taskId, 200, session)
     verify(transport, times(1)).transport(ackRequestForTask1, TaskId(1, 1))
@@ -109,13 +109,12 @@ class SubscriptionSpec extends FlatSpec with Matchers 
with MockitoSugar {
 
   it should "disallow more message sending if there is no ack back" in {
     val (subscription, transport) = prepare
-    //send 100 messages
-    0 until (Subscription.MAX_PENDING_MESSAGE_COUNT * 2 + 1) foreach {clock =>
+    // send 100 messages
+    0 until (Subscription.MAX_PENDING_MESSAGE_COUNT * 2 + 1) foreach { clock =>
       subscription.sendMessage(Message(randomMessage, clock))
     }
 
     assert(subscription.allowSendingMoreMessages() == false)
-
   }
 
   it should "report minClock as Long.MaxValue when there is no pending 
message" in {
@@ -128,19 +127,16 @@ class SubscriptionSpec extends FlatSpec with Matchers 
with MockitoSugar {
   }
 
   private def randomMessage: String = new Random().nextInt.toString
-
 }
 
 object SubscriptionSpec {
 
+  class NextTask(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
 
-  class NextTask(taskContext : TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-    import taskContext.{output, self}
-
-    override def onStart(startTime : StartTime) : Unit = {
+    override def onStart(startTime: StartTime): Unit = {
     }
 
-    override def onNext(msg : Message) : Unit = {
+    override def onNext(msg: Message): Unit = {
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala 
b/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala
index 275cdb0..a48f887 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/task/TaskActorSpec.scala
@@ -1,44 +1,46 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package io.gearpump.streaming.task
 
 import akka.actor.{ExtendedActorSystem, Props}
 import akka.testkit._
-import com.typesafe.config.ConfigFactory
+import com.typesafe.config.{Config, ConfigFactory}
+import org.mockito.Mockito.{mock, times, verify, when}
+import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec}
+
 import io.gearpump.Message
 import io.gearpump.cluster.{MasterHarness, TestUtil, UserConfig}
 import io.gearpump.partitioner.{HashPartitioner, Partitioner}
 import io.gearpump.serializer.{FastKryoSerializer, SerializationFramework}
-import io.gearpump.streaming.AppMasterToExecutor.{StartTask, TaskRegistered, 
ChangeTask, MsgLostException, TaskChanged}
+import io.gearpump.streaming.AppMasterToExecutor.{ChangeTask, 
MsgLostException, StartTask, TaskChanged, TaskRegistered}
 import io.gearpump.streaming.task.TaskActorSpec.TestTask
 import io.gearpump.streaming.{DAG, LifeTime, ProcessorDescription}
 import io.gearpump.util.Graph._
 import io.gearpump.util.{Graph, Util}
-import org.mockito.Mockito.{mock, times, verify, when}
-import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec}
-
 
 class TaskActorSpec extends WordSpec with Matchers with BeforeAndAfterEach 
with MasterHarness {
-  override def config = ConfigFactory.parseString(
-    """ akka.loggers = ["akka.testkit.TestEventListener"]
-      | akka.test.filter-leeway = 20000
-    """.stripMargin).
-    withFallback(TestUtil.DEFAULT_CONFIG)
+  protected override def config: Config = {
+    ConfigFactory.parseString(
+      """ akka.loggers = ["akka.testkit.TestEventListener"]
+        | akka.test.filter-leeway = 20000
+      """.stripMargin).
+      withFallback(TestUtil.DEFAULT_CONFIG)
+  }
 
   val appId = 0
   val task1 = ProcessorDescription(id = 0, taskClass = 
classOf[TestTask].getName, parallelism = 1)
@@ -54,7 +56,7 @@ class TaskActorSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
 
   var mockSerializerPool: SerializationFramework = null
 
-  override def beforeEach() = {
+  override def beforeEach(): Unit = {
     startActorSystem()
     mockMaster = TestProbe()(getActorSystem)
 
@@ -71,8 +73,10 @@ class TaskActorSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
   "TaskActor" should {
     "register itself to AppMaster when started" in {
       val mockTask = mock(classOf[TaskWrapper])
-      val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, 
taskContext1, UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem)
-      testActor ! TaskRegistered(taskId1, 0, Util.randInt)
+      val testActor = TestActorRef[TaskActor](Props(
+        new TaskActor(taskId1, taskContext1, UserConfig.empty,
+          mockTask, mockSerializerPool)))(getActorSystem)
+      testActor ! TaskRegistered(taskId1, 0, Util.randInt())
       testActor ! StartTask(taskId1)
 
       implicit val system = getActorSystem
@@ -84,8 +88,9 @@ class TaskActorSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
 
     "respond to ChangeTask" in {
       val mockTask = mock(classOf[TaskWrapper])
-      val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, 
taskContext1, UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem)
-      testActor ! TaskRegistered(taskId1, 0, Util.randInt)
+      val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, 
taskContext1,
+      UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem)
+      testActor ! TaskRegistered(taskId1, 0, Util.randInt())
       testActor ! StartTask(taskId1)
       mockMaster.expectMsgType[GetUpstreamMinClock]
 
@@ -97,8 +102,9 @@ class TaskActorSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
       val mockTask = mock(classOf[TaskWrapper])
       val msg = Message("test")
 
-      val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, 
taskContext1, UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem)
-      testActor.tell(TaskRegistered(taskId1, 0, Util.randInt), mockMaster.ref)
+      val testActor = TestActorRef[TaskActor](Props(new TaskActor(taskId1, 
taskContext1,
+      UserConfig.empty, mockTask, mockSerializerPool)))(getActorSystem)
+      testActor.tell(TaskRegistered(taskId1, 0, Util.randInt()), 
mockMaster.ref)
       testActor.tell(StartTask(taskId1), mockMaster.ref)
 
       testActor.tell(msg, testActor)
@@ -107,7 +113,7 @@ class TaskActorSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
     }
   }
 
-  override def afterEach() = {
+  override def afterEach(): Unit = {
     shutdownActorSystem()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/unmanagedlibs/README.md
----------------------------------------------------------------------
diff --git a/unmanagedlibs/README.md b/unmanagedlibs/README.md
index 23a5895..7c51d2c 100644
--- a/unmanagedlibs/README.md
+++ b/unmanagedlibs/README.md
@@ -1,5 +1,3 @@
-This folder contains the unmanaged dependency files that couldn't be achieved 
from public repositories.
+This folder contains the unmanaged dependency libraries. We will copy the jars 
under directory {scala-version}/ to out/target/pack/lib/
 
-Related issue: https://github.com/gearpump/gearpump/issues/1816
-
-We built a new akka-actor artifact to replace the flawed one.
\ No newline at end of file
+For example, jars under 2.11/ will be copied to output/target/pack/lib when 
building for scala 2.11.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/version.sbt
----------------------------------------------------------------------
diff --git a/version.sbt b/version.sbt
index e74ced4..7b23e02 100644
--- a/version.sbt
+++ b/version.sbt
@@ -1 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 version in ThisBuild := "0.8.1-SNAPSHOT"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/yarnconf/README.md
----------------------------------------------------------------------
diff --git a/yarnconf/README.md b/yarnconf/README.md
index 38fa818..87b6b3d 100644
--- a/yarnconf/README.md
+++ b/yarnconf/README.md
@@ -1,8 +1,10 @@
-### Put YARN configuration files under classpath
+This directory contains YARN configurations that you want to use when 
launching Gearpump over YARN.
+
+### How to put YARN configuration files under classpath?
   Before calling "yarnclient launch", make sure you have put all YARN 
configuration 
   files under classpath. Typically, you can just copy all files under 
   $HADOOP_HOME/etc/hadoop from one of the YARN Cluster machine to  
"conf/yarnconf" 
   of gearpump.
 
-NOTE: The "conf/yarnconf" is only effecive when you run "yarnclient". When you 
-run other commands like "gear", "conf/yarnconf" will not be addded into 
classpath.
\ No newline at end of file
+NOTE: The "conf/yarnconf" is only effective when you run "yarnclient". When 
you 
+run other commands like "gear", "conf/yarnconf" will not be added into 
classpath.
\ No newline at end of file

Reply via email to