http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala
 
b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala
index a898adf..46067ab 100644
--- 
a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala
+++ 
b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -39,5 +39,4 @@ class KafkaDefaultGrouperSpec extends PropSpec with 
PropertyChecks with Matchers
       }
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala
 
b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala
index 2ee7260..ad315fe 100644
--- 
a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala
+++ 
b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,7 +23,7 @@ import java.util.Properties
 import kafka.admin.AdminUtils
 import kafka.common.KafkaException
 import kafka.server.{KafkaConfig => KafkaServerConfig, KafkaServer}
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{TestUtils, Utils}
 
 trait KafkaServerHarness extends ZookeeperHarness {
   val configs: List[KafkaServerConfig]
@@ -35,8 +35,9 @@ trait KafkaServerHarness extends ZookeeperHarness {
 
   override def setUp() {
     super.setUp
-    if (configs.size <= 0)
+    if (configs.size <= 0) {
       throw new KafkaException("Must supply at least one server config.")
+    }
     brokerList = TestUtils.getBrokerListStrFromConfigs(configs)
     servers = configs.map(TestUtils.createServer(_))
   }
@@ -47,12 +48,14 @@ trait KafkaServerHarness extends ZookeeperHarness {
     super.tearDown
   }
 
-  def createTopicUntilLeaderIsElected(topic: String, partitions: Int, 
replicas: Int, timeout: Long = 10000) = {
+  def createTopicUntilLeaderIsElected(
+      topic: String, partitions: Int, replicas: Int, timeout: Long = 10000)
+    : Map[Int, Option[Int]] = {
     val zkClient = connectZk()
     try {
-      // create topic
+      // Creates topic
       AdminUtils.createTopic(zkClient, topic, partitions, replicas, new 
Properties)
-      // wait until the update metadata request for new topic reaches all 
servers
+      // Waits until the update metadata request for new topic reaches all 
servers
       (0 until partitions).map { case i =>
         TestUtils.waitUntilMetadataIsPropagated(servers, topic, i, timeout)
         i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i, 
timeout)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala
 
b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala
index a2b25be..45fe760 100644
--- 
a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala
+++ 
b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,7 @@
 
 package io.gearpump.streaming.kafka.util
 
-import kafka.utils.{ZKStringSerializer, Utils, TestZKUtils}
+import kafka.utils.{TestZKUtils, Utils, ZKStringSerializer}
 import kafka.zk.EmbeddedZookeeper
 import org.I0Itec.zkclient.ZkClient
 
@@ -29,8 +29,9 @@ trait ZookeeperHarness {
   private var zookeeper: EmbeddedZookeeper = null
 
   def getZookeeper: EmbeddedZookeeper = zookeeper
-  def connectZk = () => new ZkClient(zkConnect, zkSessionTimeout, 
zkConnectionTimeout, ZKStringSerializer)
-
+  def connectZk: () => ZkClient = () => {
+    new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
ZKStringSerializer)
+  }
 
   def setUp() {
     zookeeper = new EmbeddedZookeeper(zkConnect)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala
----------------------------------------------------------------------
diff --git 
a/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala
 
b/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala
index 39e2b24..3199128 100644
--- 
a/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala
+++ 
b/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,7 +18,8 @@
 
 package io.gearpump.streaming.monoid
 
-import com.twitter.algebird.{Monoid => ABMonoid, Group => ABGroup}
+import com.twitter.algebird.{Group => ABGroup, Monoid => ABMonoid}
+
 import io.gearpump.streaming.state.api.{Group, Monoid}
 
 class AlgebirdMonoid[T](monoid: ABMonoid[T]) extends Monoid[T] {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala
----------------------------------------------------------------------
diff --git 
a/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala
 
b/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala
index f2f085a..9578e0a 100644
--- 
a/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala
+++ 
b/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,10 +18,11 @@
 
 package io.gearpump.streaming.serializer
 
+import scala.util.Try
+
 import com.twitter.chill.KryoInjection
-import io.gearpump.streaming.state.api.Serializer
 
-import scala.util.Try
+import io.gearpump.streaming.state.api.Serializer
 
 class ChillSerializer[T] extends Serializer[T] {
   override def serialize(t: T): Array[Byte] =

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/README.md
----------------------------------------------------------------------
diff --git a/integrationtest/README.md b/integrationtest/README.md
index 95fa6c5..4d85b00 100644
--- a/integrationtest/README.md
+++ b/integrationtest/README.md
@@ -60,7 +60,6 @@ For a not that clean solution, here is the steps:
   ```
 3. Run `sbt it:test`
 
-
 ## Manual test by creating docker cluster manually
 
 To launch a Gearpump cluster manually, you can run the commands as follows.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/resources/log4j.properties 
b/integrationtest/core/src/it/resources/log4j.properties
index f7d0832..e45bd9a 100644
--- a/integrationtest/core/src/it/resources/log4j.properties
+++ b/integrationtest/core/src/it/resources/log4j.properties
@@ -7,7 +7,7 @@
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#      http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
index 570e98d..32d8bb7 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,15 +20,13 @@ package io.gearpump.integrationtest
 import io.gearpump.integrationtest.minicluster.MiniCluster
 
 /**
- * Provides an instance of SUT.
- *
- * By default it will instantiate a standalone Gearpump mini cluster.
+ * Provides a min cluster of Gearpump, which contains one or more masters, and 
workers.
  */
 object MiniClusterProvider {
 
   private var instance = new MiniCluster
 
-  def get = instance
+  def get: MiniCluster = instance
 
   def set(instance: MiniCluster): MiniCluster = {
     this.instance = instance
@@ -40,5 +38,4 @@ object MiniClusterProvider {
    * test spec will be responsible for cluster creation.
    */
   var managed = false
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
index c07222f..6e4a471 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
  */
 package io.gearpump.integrationtest
 
+import org.scalatest._
+
 import io.gearpump.cluster.MasterToAppMaster
 import io.gearpump.cluster.MasterToAppMaster.AppMasterData
 import io.gearpump.util.LogUtil
-import org.scalatest._
 
 /**
  * The abstract test spec
@@ -55,7 +56,7 @@ trait TestSpecBase
 
   var restartClusterRequired: Boolean = false
 
-  override def beforeEach(td: TestData) = {
+  override def beforeEach(td: TestData): Unit = {
 
     LOGGER.debug(s">### 
=============================================================")
     LOGGER.debug(s">###1 Prepare test: ${td.name}\n")
@@ -67,7 +68,7 @@ trait TestSpecBase
     LOGGER.debug(s">###2 Start test: ${td.name}\n")
   }
 
-  override def afterEach(td: TestData) = {
+  override def afterEach(td: TestData): Unit = {
     LOGGER.debug(s"<### 
=============================================================")
     LOGGER.debug(s"<###3 End test: ${td.name}\n")
 
@@ -88,4 +89,4 @@ trait TestSpecBase
     app.appName shouldEqual expectedAppName
     app
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
index 1c9fdfd..9c0c779 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -124,7 +124,6 @@ class CommandLineSpec extends TestSpecBase {
     appId
   }
 
-
   private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: 
String): Unit = {
     val actual = commandLineClient.queryApp(appId)
     actual should include(s"application: $appId, ")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
index 7c9176a..321b395 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
  */
 package io.gearpump.integrationtest.checklist
 
-import io.gearpump.integrationtest.{Util, TestSpecBase}
-import io.gearpump.integrationtest.kafka._
 import org.scalatest.TestData
 
+import io.gearpump.integrationtest.kafka._
+import io.gearpump.integrationtest.{TestSpecBase, Util}
+
 /**
  * The test spec checks the Kafka datasource connector
  */
@@ -40,7 +41,7 @@ class ConnectorKafkaSpec extends TestSpecBase {
     super.afterAll()
   }
 
-  override def afterEach(test: TestData) = {
+  override def afterEach(test: TestData): Unit = {
     super.afterEach(test)
     if (producer != null) {
       producer.stop()
@@ -68,7 +69,7 @@ class ConnectorKafkaSpec extends TestSpecBase {
 
       // verify
       expectAppIsRunning(appId, "KafkaReadWrite")
-      Util.retryUntil(()=>kafkaCluster.getLatestOffset(sinkTopic) == 
messageNum,
+      Util.retryUntil(() => kafkaCluster.getLatestOffset(sinkTopic) == 
messageNum,
         "kafka all message written")
     }
   }
@@ -97,23 +98,23 @@ class ConnectorKafkaSpec extends TestSpecBase {
 
       // verify #1
       expectAppIsRunning(appId, "KafkaReadWrite")
-      Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, 
"app running")
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
 
       // verify #2
       val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
       restClient.killExecutor(appId, executorToKill) shouldBe true
-      
Util.retryUntil(()=>restClient.queryExecutorBrief(appId).map(_.executorId).max 
> executorToKill,
+      Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+        .map(_.executorId).max > executorToKill,
         s"executor $executorToKill killed")
 
       // verify #3
       val detector = new MessageLossDetector(producer.lastWriteNum)
       val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
         host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort)
-      Util.retryUntil(()=>{
+      Util.retryUntil(() => {
         kafkaReader.read()
         detector.allReceived
       }, "kafka all message read")
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
index c8c57f5..588e7c7 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,7 +17,7 @@
  */
 package io.gearpump.integrationtest.checklist
 
-import io.gearpump.integrationtest.{Util, TestSpecBase}
+import io.gearpump.integrationtest.{TestSpecBase, Util}
 import io.gearpump.metrics.Metrics.Meter
 import io.gearpump.streaming._
 import io.gearpump.streaming.appmaster.ProcessorSummary
@@ -50,7 +50,7 @@ class DynamicDagSpec extends TestSpecBase {
       val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
       replaceProcessor(appId, 1, sumTaskClass)
       var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(()=>{
+      Util.retryUntil(() => {
         laterProcessors = restClient.queryStreamingAppDetail(appId).processors
         laterProcessors.size == formerProcessors.size + 1
       }, "new processor successfully added")
@@ -65,7 +65,7 @@ class DynamicDagSpec extends TestSpecBase {
       val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
       replaceProcessor(appId, 0, splitTaskClass)
       var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(()=>{
+      Util.retryUntil(() => {
         laterProcessors = restClient.queryStreamingAppDetail(appId).processors
         laterProcessors.size == formerProcessors.size + 1
       }, "new processor added")
@@ -80,7 +80,7 @@ class DynamicDagSpec extends TestSpecBase {
       val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
       replaceProcessor(appId, 1, sumTaskClass)
       var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(()=>{
+      Util.retryUntil(() => {
         laterProcessors = restClient.queryStreamingAppDetail(appId).processors
         laterProcessors.size == formerProcessors.size + 1
       }, "new processor added")
@@ -88,12 +88,12 @@ class DynamicDagSpec extends TestSpecBase {
 
       val fakeTaskClass = "io.gearpump.streaming.examples.wordcount.Fake"
       replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass)
-      Util.retryUntil(()=>{
+      Util.retryUntil(() => {
         val processorsAfterFailure = 
restClient.queryStreamingAppDetail(appId).processors
         processorsAfterFailure.size == laterProcessors.size
       }, "new processor added")
       val currentClock = restClient.queryStreamingAppDetail(appId).clock
-      Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 
currentClock,
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
currentClock,
         "app clock is advancing")
     }
 
@@ -106,19 +106,18 @@ class DynamicDagSpec extends TestSpecBase {
       val formerProcessors = 
restClient.queryStreamingAppDetail(appId).processors
       replaceProcessor(appId, 1, sumTaskClass)
       var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(()=>{
+      Util.retryUntil(() => {
         laterProcessors = restClient.queryStreamingAppDetail(appId).processors
         laterProcessors.size == formerProcessors.size + 1
       }, "new processor added")
       processorHasThroughput(appId, laterProcessors.keySet.max, 
"receiveThroughput")
 
       restClient.killAppMaster(appId) shouldBe true
-      Util.retryUntil(()=>restClient.queryApp(appId).appMasterPath != 
formerAppMaster,
+      Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != 
formerAppMaster,
         "new AppMaster created")
       val processors = restClient.queryStreamingAppDetail(appId).processors
       processors.size shouldEqual laterProcessors.size
     }
-
   }
 
   private def expectSolJarSubmittedWithAppId(): Int = {
@@ -126,7 +125,7 @@ class DynamicDagSpec extends TestSpecBase {
     val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length)
     success shouldBe true
     expectAppIsRunning(appId, solName)
-    Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, 
"app running")
+    Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, 
"app running")
     appId
   }
 
@@ -147,7 +146,7 @@ class DynamicDagSpec extends TestSpecBase {
   }
 
   private def processorHasThroughput(appId: Int, processorId: Int, metrics: 
String): Unit = {
-    Util.retryUntil(()=>{
+    Util.retryUntil(() => {
       val actual = restClient.queryStreamingAppMetrics(appId, current = false,
         path = "processor" + processorId)
       val throughput = actual.metrics.filter(_.value.name.endsWith(metrics))
@@ -155,5 +154,4 @@ class DynamicDagSpec extends TestSpecBase {
       throughput.forall(_.value.asInstanceOf[Meter].count > 0L)
     }, "new processor has message received")
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
index efee3c6..20d1b12 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +17,11 @@
  */
 package io.gearpump.integrationtest.checklist
 
-import io.gearpump.integrationtest.Docker._
+import org.apache.log4j.Logger
+
 import io.gearpump.integrationtest.{Docker, TestSpecBase, Util}
 import io.gearpump.streaming._
 import io.gearpump.streaming.appmaster.ProcessorSummary
-import org.apache.log4j.Logger
 
 /**
  * The test spec will perform destructive operations to check the stability
@@ -49,12 +49,13 @@ class ExampleSpec extends TestSpecBase {
 
       def verify(): Boolean = {
         val workerNum = cluster.getWorkerHosts.length
-        val result = commandLineClient.submitAppAndCaptureOutput(distShellJar, 
workerNum, args.mkString(" ")).split("\n").
+        val result = commandLineClient.submitAppAndCaptureOutput(distShellJar,
+          workerNum, args.mkString(" ")).split("\n").
           filterNot(line => line.startsWith("[INFO]") || line.isEmpty)
         expectedHostNames.forall(result.contains)
       }
 
-      Util.retryUntil(()=>verify(),
+      Util.retryUntil(() => verify(),
         s"executors started on all expected hosts 
${expectedHostNames.mkString(", ")}")
     }
   }
@@ -66,10 +67,12 @@ class ExampleSpec extends TestSpecBase {
     "can submit immediately after killing a former one" in {
       // setup
       val formerAppId = restClient.getNextAvailableAppId()
-      val formerSubmissionSuccess = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      val formerSubmissionSuccess =
+        restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       formerSubmissionSuccess shouldBe true
       expectAppIsRunning(formerAppId, wordCountName)
-      
Util.retryUntil(()=>restClient.queryStreamingAppDetail(formerAppId).clock > 0, 
"app running")
+      Util.retryUntil(() =>
+        restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app 
running")
       restClient.killApp(formerAppId)
 
       // exercise
@@ -109,9 +112,9 @@ class ExampleSpec extends TestSpecBase {
       expectAppIsRunning(appId, appName)
 
       // exercise
-      Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, 
"app submitted")
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app submitted")
       val formerClock = restClient.queryStreamingAppDetail(appId).clock
-      Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 
formerClock,
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
formerClock,
         "app clock is advancing")
     }
 
@@ -126,14 +129,14 @@ class ExampleSpec extends TestSpecBase {
       val expectedProcessorId = formerProcessors.size
       val expectedParallelism = processor0.parallelism + 1
       val expectedDescription = processor0.description + "new"
-      val replaceMe = new ProcessorDescription(processor0.id, 
processor0.taskClass, expectedParallelism,
-        description = expectedDescription)
+      val replaceMe = new ProcessorDescription(processor0.id, 
processor0.taskClass,
+        expectedParallelism, description = expectedDescription)
 
       // exercise
       val success = restClient.replaceStreamingAppProcessor(appId, replaceMe)
       success shouldBe true
       var laterProcessors: Map[ProcessorId, ProcessorSummary] = null
-      Util.retryUntil(()=>{
+      Util.retryUntil(() => {
         laterProcessors = restClient.queryStreamingAppDetail(appId).processors
         laterProcessors.size == formerProcessors.size + 1
       }, "new process added")
@@ -142,5 +145,4 @@ class ExampleSpec extends TestSpecBase {
       laterProcessor0.description shouldEqual expectedDescription
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
index 35c31cd..3f70339 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
@@ -1,12 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package io.gearpump.integrationtest.checklist
 
-import io.gearpump.integrationtest.Docker._
+import org.apache.log4j.Logger
+
 import io.gearpump.integrationtest.hadoop.HadoopCluster._
-import io.gearpump.integrationtest.{Util, TestSpecBase}
-import io.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader, 
MessageLossDetector, NumericalDataProducer}
 import io.gearpump.integrationtest.kafka.KafkaCluster._
-import org.apache.log4j.Logger
+import io.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader}
+import io.gearpump.integrationtest.{TestSpecBase, Util}
 
+/**
+ * Checks message delivery consistency, like at-least-once, and exactly-once.
+ */
 class MessageDeliverySpec extends TestSpecBase {
 
   private val LOG = Logger.getLogger(getClass)
@@ -49,7 +70,8 @@ class MessageDeliverySpec extends TestSpecBase {
 
             // verify #1
             expectAppIsRunning(appId, "MessageCount")
-            
Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app 
is running")
+            Util.retryUntil(() => 
restClient.queryStreamingAppDetail(appId).clock > 0,
+              "app is running")
 
             // wait for checkpoint to take place
             Thread.sleep(1000)
@@ -57,17 +79,19 @@ class MessageDeliverySpec extends TestSpecBase {
             LOG.info("Trigger message replay by kill and restart the 
executors")
             val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
             restClient.killExecutor(appId, executorToKill) shouldBe true
-            Util.retryUntil(()=> 
restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill,
-              s"executor $executorToKill killed")
+            Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+              .map(_.executorId).max > executorToKill, s"executor 
$executorToKill killed")
 
             producer.stop()
             val producedNumbers = producer.producedNumbers
-            LOG.info(s"In total, numbers in range[${producedNumbers.start}, 
${producedNumbers.end - 1}] have been written to Kafka")
+            LOG.info(s"In total, numbers in range[${producedNumbers.start}" +
+              s", ${producedNumbers.end - 1}] have been written to Kafka")
 
             // verify #3
             val kafkaSourceOffset = kafkaCluster.getLatestOffset(sourceTopic)
 
-            assert(producedNumbers.size == kafkaSourceOffset, "produced 
message should match Kafka queue size")
+            assert(producedNumbers.size == kafkaSourceOffset,
+              "produced message should match Kafka queue size")
 
             LOG.info(s"The Kafka source topic $sourceTopic offset is " + 
kafkaSourceOffset)
 
@@ -83,11 +107,13 @@ class MessageDeliverySpec extends TestSpecBase {
 
             val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
               host = kafkaCluster.advertisedHost, port = 
kafkaCluster.advertisedPort)
-            Util.retryUntil(()=>{
+            Util.retryUntil(() => {
               kafkaReader.read()
-              LOG.info(s"Received message count: 
${detector.latestMessageCount}, expect: ${producedNumbers.size}")
+              LOG.info(s"Received message count: 
${detector.latestMessageCount}, " +
+                s"expect: ${producedNumbers.size}")
               detector.latestMessageCount == producedNumbers.size
-            }, "MessageCountApp calculated message count matches expected in 
case of message replay")
+            }, "MessageCountApp calculated message count matches " +
+              "expected in case of message replay")
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
index 722cb33..f4abde7 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,13 @@
  */
 package io.gearpump.integrationtest.checklist
 
+import scala.concurrent.duration._
+
 import io.gearpump.cluster.MasterToAppMaster
 import io.gearpump.cluster.master.MasterStatus
 import io.gearpump.cluster.worker.WorkerSummary
 import io.gearpump.integrationtest.{TestSpecBase, Util}
 
-import scala.concurrent.duration._
-
 /**
  * The test spec checks REST service usage
  */
@@ -62,7 +62,8 @@ class RestServiceSpec extends TestSpecBase {
     "reject a repeated submission request while the application is running" in 
{
       // setup
       val appId = restClient.getNextAvailableAppId()
-      val formerSubmissionSuccess = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
+      val formerSubmissionSuccess = restClient.submitApp(wordCountJar,
+        cluster.getWorkerHosts.length)
       formerSubmissionSuccess shouldBe true
       expectAppIsRunning(appId, wordCountName)
 
@@ -77,14 +78,16 @@ class RestServiceSpec extends TestSpecBase {
       success shouldBe false
     }
 
-    "submit a wordcount application with 4 split and 3 sum processors and 
expect parallelism of processors match the given number" in {
+    "submit a wordcount application with 4 split and 3 sum processors and 
expect " +
+      "parallelism of processors match the given number" in {
       // setup
       val splitNum = 4
       val sumNum = 3
       val appId = restClient.getNextAvailableAppId()
 
       // exercise
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length, s"-split $splitNum -sum $sumNum")
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length,
+        s"-split $splitNum -sum $sumNum")
       success shouldBe true
       expectAppIsRunning(appId, wordCountName)
       val processors = restClient.queryStreamingAppDetail(appId).processors
@@ -104,7 +107,8 @@ class RestServiceSpec extends TestSpecBase {
 
       // exercise
       expectMetricsAvailable(
-        restClient.queryStreamingAppMetrics(appId, current = 
true).metrics.nonEmpty, "metrics available")
+        restClient.queryStreamingAppMetrics(appId, current = 
true).metrics.nonEmpty,
+        "metrics available")
       val actual = restClient.queryStreamingAppMetrics(appId, current = true)
       actual.path shouldEqual s"app$appId.processor*"
       actual.metrics.foreach(metric => {
@@ -119,7 +123,8 @@ class RestServiceSpec extends TestSpecBase {
       }, "metrics available")
     }
 
-    "can obtain application corresponding executors' metrics and the metrics 
will keep changing" in {
+    "can obtain application corresponding executors' metrics and " +
+      "the metrics will keep changing" in {
       // setup
       val appId = restClient.getNextAvailableAppId()
       val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length)
@@ -128,7 +133,8 @@ class RestServiceSpec extends TestSpecBase {
 
       // exercise
       expectMetricsAvailable(
-        restClient.queryExecutorMetrics(appId, current = 
true).metrics.nonEmpty, "metrics available")
+        restClient.queryExecutorMetrics(appId, current = 
true).metrics.nonEmpty,
+        "metrics available")
       val actual = restClient.queryExecutorMetrics(appId, current = true)
       actual.path shouldEqual s"app$appId.executor*"
       actual.metrics.foreach(metric => {
@@ -194,7 +200,7 @@ class RestServiceSpec extends TestSpecBase {
 
       // exercise
       var runningWorkers: Array[WorkerSummary] = Array.empty
-      Util.retryUntil(()=>{
+      Util.retryUntil(() => {
         runningWorkers = restClient.listRunningWorkers()
         runningWorkers.length == expectedWorkersCount
       }, "all workers running")
@@ -207,13 +213,14 @@ class RestServiceSpec extends TestSpecBase {
       // setup
       restartClusterRequired = true
       val formerWorkersCount = cluster.getWorkerHosts.length
-      Util.retryUntil(()=>restClient.listRunningWorkers().length == 
formerWorkersCount,
+      Util.retryUntil(() => restClient.listRunningWorkers().length == 
formerWorkersCount,
         "all workers running")
       val workerName = "newWorker"
 
       // exercise
       cluster.addWorkerNode(workerName)
-      Util.retryUntil(()=>restClient.listRunningWorkers().length > 
formerWorkersCount, "new worker added")
+      Util.retryUntil(() => restClient.listRunningWorkers().length > 
formerWorkersCount,
+        "new worker added")
       cluster.getWorkerHosts.length shouldEqual formerWorkersCount + 1
       restClient.listRunningWorkers().length shouldEqual formerWorkersCount + 1
     }
@@ -252,7 +259,8 @@ class RestServiceSpec extends TestSpecBase {
       restClient.listRunningWorkers().foreach { worker =>
         val workerId = worker.workerId
         expectMetricsAvailable(
-          restClient.queryWorkerMetrics(workerId, current = 
true).metrics.nonEmpty, "metrics available")
+          restClient.queryWorkerMetrics(workerId, current = 
true).metrics.nonEmpty,
+          "metrics available")
         val actual = restClient.queryWorkerMetrics(workerId, current = true)
         actual.path shouldEqual s"worker$workerId"
         actual.metrics.foreach(metric => {
@@ -323,13 +331,14 @@ class RestServiceSpec extends TestSpecBase {
       val originSplitNum = 4
       val originSumNum = 3
       val originAppId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length, s"-split $originSplitNum -sum $originSumNum")
+      val success = restClient.submitApp(wordCountJar, 
cluster.getWorkerHosts.length,
+        s"-split $originSplitNum -sum $originSumNum")
       success shouldBe true
       expectAppIsRunning(originAppId, wordCountName)
       val originAppDetail = restClient.queryStreamingAppDetail(originAppId)
 
       // exercise
-      Util.retryUntil(()=>restClient.restartApp(originAppId), "app restarted")
+      Util.retryUntil(() => restClient.restartApp(originAppId), "app 
restarted")
       val killedApp = restClient.queryApp(originAppId)
       killedApp.appId shouldEqual originAppId
       killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive
@@ -355,7 +364,6 @@ class RestServiceSpec extends TestSpecBase {
   private def expectMetricsAvailable(condition: => Boolean, 
conditionDescription: String): Unit = {
     val config = restClient.queryMasterConfig()
     val reportInterval = 
Duration(config.getString("gearpump.metrics.report-interval-ms") + "ms")
-    Util.retryUntil(()=>condition, conditionDescription, interval = 
reportInterval)
+    Util.retryUntil(() => condition, conditionDescription, interval = 
reportInterval)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
index ffad3ec..b002f70 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,15 +17,16 @@
  */
 package io.gearpump.integrationtest.checklist
 
-import io.gearpump.WorkerId
+import scala.concurrent.duration.Duration
+
 import io.gearpump.cluster.MasterToAppMaster
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.integrationtest.{TestSpecBase, Util}
 import io.gearpump.util.{Constants, LogUtil}
 
-import scala.concurrent.duration.Duration
-
 /**
- * The test spec will perform destructive operations to check the stability
+ * The test spec will perform destructive operations to check the stability. 
Operations
+ * contains shutting-down appmaster, executor, or worker, and etc..
  */
 class StabilitySpec extends TestSpecBase {
 
@@ -36,13 +37,13 @@ class StabilitySpec extends TestSpecBase {
       // setup
       val appId = commandLineClient.submitApp(wordCountJar)
       val formerAppMaster = restClient.queryApp(appId).appMasterPath
-      Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, 
"app running")
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
       ensureClockStoredInMaster()
 
       // exercise
       restClient.killAppMaster(appId) shouldBe true
       // todo: how long master will begin to recover and how much time for the 
recovering?
-      Util.retryUntil(()=>restClient.queryApp(appId).appMasterPath != 
formerAppMaster,
+      Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != 
formerAppMaster,
         "appmaster killed and restarted")
 
       // verify
@@ -56,14 +57,15 @@ class StabilitySpec extends TestSpecBase {
     "will create a new executor and application will replay from the latest 
application clock" in {
       // setup
       val appId = commandLineClient.submitApp(wordCountJar)
-      Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, 
"app running")
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
       val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
       ensureClockStoredInMaster()
 
       // exercise
       restClient.killExecutor(appId, executorToKill) shouldBe true
       // todo: how long appmaster will begin to recover and how much time for 
the recovering?
-      
Util.retryUntil(()=>restClient.queryExecutorBrief(appId).map(_.executorId).max 
> executorToKill,
+      Util.retryUntil(() => restClient.queryExecutorBrief(appId)
+        .map(_.executorId).max > executorToKill,
         s"executor $executorToKill killed and restarted")
 
       // verify
@@ -85,7 +87,7 @@ class StabilitySpec extends TestSpecBase {
       // setup
       restartClusterRequired = true
       val appId = commandLineClient.submitApp(wordCountJar)
-      Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, 
"app running")
+      Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
 
       val allexecutors = restClient.queryExecutorBrief(appId)
       val maxExecutor = allexecutors.sortBy(_.executorId).last
@@ -93,8 +95,10 @@ class StabilitySpec extends TestSpecBase {
 
       val appMaster = allexecutors.find(_.executorId == 
Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
 
-      LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, 
worker: ${maxExecutor.workerId}")
-      val executorsSharingSameWorker = allexecutors.filter(_.workerId == 
maxExecutor.workerId).map(_.executorId)
+      LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, " +
+        s"worker: ${maxExecutor.workerId}")
+      val executorsSharingSameWorker = allexecutors
+        .filter(_.workerId == maxExecutor.workerId).map(_.executorId)
       LOG.info(s"These executors sharing the same worker Id 
${maxExecutor.workerId}," +
         s" ${executorsSharingSameWorker.mkString(",")}")
 
@@ -102,7 +106,8 @@ class StabilitySpec extends TestSpecBase {
       val workerIdToKill = maxExecutor.workerId
       cluster.removeWorkerNode(hostName(workerIdToKill))
 
-      val appMasterKilled = executorsSharingSameWorker.exists(_ == 
Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
+      val appMasterKilled = executorsSharingSameWorker
+        .exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID)
 
       def executorsMigrated(): Boolean = {
         val executors = restClient.queryExecutorBrief(appId)
@@ -118,7 +123,7 @@ class StabilitySpec extends TestSpecBase {
         }
       }
 
-      Util.retryUntil(()=> {
+      Util.retryUntil(() => {
         executorsMigrated()
       }, s"new executor created with id > ${maxExecutor.executorId} when 
worker is killed")
 
@@ -144,13 +149,14 @@ class StabilitySpec extends TestSpecBase {
 
       // verify
       val aliveWorkers = cluster.getWorkerHosts
-      Util.retryUntil(()=>aliveWorkers.forall(worker => 
!cluster.nodeIsOnline(worker)),
+      Util.retryUntil(() => aliveWorkers.forall(worker => 
!cluster.nodeIsOnline(worker)),
         "all workers down")
     }
   }
 
   private def ensureClockStoredInMaster(): Unit = {
-    // todo: 5000ms is a fixed sync period in clock service. we wait for 
5000ms to assume the clock is stored
+    // TODO: 5000ms is a fixed sync period in clock service.
+    // we wait for 5000ms to assume the clock is stored
     Thread.sleep(5000)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
index 710cfa7..7d0a672 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,8 +22,8 @@ import io.gearpump.integrationtest.storm.StormClient
 import io.gearpump.integrationtest.{TestSpecBase, Util}
 
 /**
-  * The test spec checks the compatibility of running Storm applications
-  */
+ * The test spec checks the compatibility of running Storm applications
+ */
 class StormCompatibilitySpec extends TestSpecBase {
 
   private lazy val stormClient = {
@@ -58,7 +58,7 @@ class StormCompatibilitySpec extends TestSpecBase {
 
   "Storm over Gearpump" should withStorm {
     stormVersion =>
-     s"support basic topologies ($stormVersion)" in {
+      s"support basic topologies ($stormVersion)" in {
         val stormJar = getStormJar(stormVersion)
         val topologyName = getTopologyName("exclamation", stormVersion)
 
@@ -70,7 +70,7 @@ class StormCompatibilitySpec extends TestSpecBase {
           appName = topologyName)
 
         // verify
-        Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
+        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock 
> 0, "app running")
       }
 
       s"support to run a python version of wordcount ($stormVersion)" in {
@@ -85,7 +85,7 @@ class StormCompatibilitySpec extends TestSpecBase {
           appName = topologyName)
 
         // verify
-        Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
+        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock 
> 0, "app running")
       }
 
       s"support DRPC ($stormVersion)" in {
@@ -102,7 +102,7 @@ class StormCompatibilitySpec extends TestSpecBase {
         val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway)
 
         // verify
-        Util.retryUntil(()=>{
+        Util.retryUntil(() => {
           drpcClient.execute("reach", "notaurl.com") == "0"
         }, "drpc reach == 0")
         drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16"
@@ -121,16 +121,17 @@ class StormCompatibilitySpec extends TestSpecBase {
           appName = topologyName)
 
         // verify
-        Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 
0, "app running")
+        Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock 
> 0, "app running")
       }
 
       s"support at-least-once semantics with Storm's Kafka connector 
($stormVersion)" in {
 
         val stormJar = getStormJar(stormVersion)
         val topologyName = getTopologyName("storm_kafka", stormVersion)
-        val stormKafkaTopology = 
s"io.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology"
+        val stormKafkaTopology =
+          
s"io.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology"
 
-        import KafkaCluster._
+        import io.gearpump.integrationtest.kafka.KafkaCluster._
         withKafkaCluster(cluster) {
           kafkaCluster =>
             val sourcePartitionNum = 2
@@ -142,7 +143,7 @@ class StormCompatibilitySpec extends TestSpecBase {
 
             val args = Array("-topologyName", topologyName, "-sourceTopic", 
sourceTopic,
               "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, 
"-brokerList", brokerList,
-              "-spoutNum", s"$sourcePartitionNum",  "-boltNum", 
s"$sinkPartitionNum"
+              "-spoutNum", s"$sourcePartitionNum", "-boltNum", 
s"$sinkPartitionNum"
             )
 
             kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
@@ -156,22 +157,24 @@ class StormCompatibilitySpec extends TestSpecBase {
                 args = args.mkString(" "),
                 appName = topologyName)
 
-              
Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0,  "app 
running")
+              Util.retryUntil(() =>
+                restClient.queryStreamingAppDetail(appId).clock > 0, "app 
running")
 
               // kill executor and verify at-least-once is guaranteed on 
application restart
               val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
               restClient.killExecutor(appId, executorToKill) shouldBe true
-              
Util.retryUntil(()=>restClient.queryExecutorBrief(appId).map(_.executorId).max 
> executorToKill,
+              Util.retryUntil(() =>
+                restClient.queryExecutorBrief(appId).map(_.executorId).max > 
executorToKill,
                 s"executor $executorToKill killed")
 
               // verify no message loss
               val detector = new
-                      MessageLossDetector(producer.lastWriteNum)
+                  MessageLossDetector(producer.lastWriteNum)
               val kafkaReader = new
-                      SimpleKafkaReader(detector, sinkTopic, host = 
kafkaCluster.advertisedHost,
-                        port = kafkaCluster.advertisedPort)
+                  SimpleKafkaReader(detector, sinkTopic, host = 
kafkaCluster.advertisedHost,
+                    port = kafkaCluster.advertisedPort)
 
-              Util.retryUntil (()=>{
+              Util.retryUntil(() => {
                 kafkaReader.read()
                 detector.allReceived
               }, "all kafka message read")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
index 65d0d4c..4178e66 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,15 @@
  */
 package io.gearpump.integrationtest.suites
 
+import org.scalatest._
+
 import io.gearpump.integrationtest.MiniClusterProvider
 import io.gearpump.integrationtest.checklist._
 import io.gearpump.integrationtest.minicluster.MiniCluster
-import org.scalatest._
 
 /**
- * Launch a Gearpump cluster in standalone mode and run all test specs
+ * Launch a Gearpump cluster in standalone mode and run all test specs. To 
test a specific
+ * test spec, you need to comment out other lines.
  */
 class StandaloneModeSuite extends Suites(
   new CommandLineSpec,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
index 64ef9a7..aee2681 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,7 +17,6 @@
  */
 package io.gearpump.integrationtest
 
-import io.gearpump.integrationtest.ShellExec._
 import org.apache.log4j.Logger
 
 /**
@@ -38,7 +37,6 @@ object Docker {
     ShellExec.exec(s"docker exec $container $command", s"EXEC $container")
   }
 
-
   /**
    * @throws RuntimeException in case retval != 0
    */
@@ -48,7 +46,6 @@ object Docker {
     }
   }
 
-
   final def executeSilently(container: String, command: String): Boolean = {
     trace(container, s"Execute silently $command") {
       doExecuteSilently(container, command)
@@ -119,12 +116,14 @@ object Docker {
   /**
    * @throws RuntimeException in case particular container is created already
    */
-  private def createAndStartContainer(name: String, options: String, command: 
String, image: String): String = {
-    ShellExec.execAndCaptureOutput(s"docker run $options --name $name $image 
$command", s"MAKE $name")
+  private def createAndStartContainer(
+      name: String, options: String, command: String, image: String): String = 
{
+    ShellExec.execAndCaptureOutput(s"docker run $options " +
+      s"--name $name $image $command", s"MAKE $name")
   }
 
   final def killAndRemoveContainer(name: String): Boolean = {
-    trace(name, s"kill and remove container"){
+    trace(name, s"kill and remove container") {
       ShellExec.exec(s"docker rm -f $name", s"STOP $name")
     }
   }
@@ -141,7 +140,8 @@ object Docker {
     ShellExec.execAndCaptureOutput(s"docker inspect $option $container", 
s"EXEC $container")
   }
 
-  final def curl(container: String, url: String, options: Array[String] = 
Array.empty[String]): String = {
+  final def curl(container: String, url: String, options: Array[String] = 
Array.empty[String])
+    : String = {
     trace(container, s"curl $url") {
       doExecute(container, s"curl -s ${options.mkString(" ")} $url")
     }
@@ -172,7 +172,9 @@ object Docker {
   }
 
   private def trace[T](container: String, msg: String)(fun: => T): T = {
-    Console.println()
+    // scalastyle:off println
+    Console.println() // A empty line to let the output looks better.
+    // scalastyle:on println
     LOG.debug(s"Container $container====>> $msg")
     LOG.debug("INPUT==>>")
     val response = fun
@@ -198,8 +200,12 @@ object Docker {
         x.toString
     }
 
-    val preview = if (output.length > PREVIEW_MAX_LENGTH)
-      output.substring(0, PREVIEW_MAX_LENGTH) + "..." else output
+    val preview = if (output.length > PREVIEW_MAX_LENGTH) {
+      output.substring(0, PREVIEW_MAX_LENGTH) + "..."
+    }
+    else {
+      output
+    }
     preview
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
index 1c01b51..9045efe 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,15 +17,14 @@
  */
 package io.gearpump.integrationtest
 
-import org.apache.commons.lang.text.{StrMatcher, StrTokenizer}
-import org.apache.log4j.Logger
-import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer
-
+import scala.collection.JavaConverters._
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent._
 import scala.concurrent.duration._
 import scala.sys.process._
-import scala.collection.JavaConversions._
+
+import org.apache.log4j.Logger
+import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer
 
 /**
  * The class is used to execute command in a shell
@@ -41,7 +40,7 @@ object ShellExec {
    */
   private def splitQuotedString(str: String): List[String] = {
     val splitter = new QuotedStringTokenizer(str, " \t\n\r")
-    splitter.asInstanceOf[java.util.Enumeration[String]].toList
+    splitter.asInstanceOf[java.util.Enumeration[String]].asScala.toList
   }
 
   def exec(command: String, sender: String, timeout: Duration = 
PROCESS_TIMEOUT): Boolean = {
@@ -49,7 +48,8 @@ object ShellExec {
 
     val p = splitQuotedString(command).run()
     val f = Future(blocking(p.exitValue())) // wrap in Future
-    val retval = try {
+    val retval = {
+      try {
         Await.result(f, timeout)
       } catch {
         case _: TimeoutException =>
@@ -57,12 +57,13 @@ object ShellExec {
           p.destroy()
           p.exitValue()
       }
-
+    }
     LOG.debug(s"$sender <= exit $retval")
     retval == 0
   }
 
-  def execAndCaptureOutput(command: String, sender: String, timeout: Duration 
= PROCESS_TIMEOUT): String = {
+  def execAndCaptureOutput(command: String, sender: String, timeout: Duration 
= PROCESS_TIMEOUT)
+    : String = {
     LOG.debug(s"$sender => `$command`")
 
     val buf = new StringBuilder
@@ -70,17 +71,22 @@ object ShellExec {
       (e: String) => buf.append(e).append("\n"))
     val p = splitQuotedString(command).run(processLogger)
     val f = Future(blocking(p.exitValue())) // wrap in Future
-    val retval = try {
+    val retval = {
+      try {
         Await.result(f, timeout)
       } catch {
         case _: TimeoutException =>
           p.destroy()
           p.exitValue()
       }
+    }
     val output = buf.toString().trim
     val PREVIEW_MAX_LENGTH = 200
-    val preview = if (output.length > PREVIEW_MAX_LENGTH)
-      output.substring(0, PREVIEW_MAX_LENGTH) + "..." else output
+    val preview = if (output.length > PREVIEW_MAX_LENGTH) {
+      output.substring(0, PREVIEW_MAX_LENGTH) + "..."
+    } else {
+      output
+    }
 
     LOG.debug(s"$sender <= `$preview` exit $retval")
     if (retval != 0) {
@@ -89,5 +95,4 @@ object ShellExec {
     }
     output
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
index d4dc1ee..71429b2 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,11 +17,11 @@
  */
 package io.gearpump.integrationtest
 
-import org.apache.log4j.Logger
-
 import scala.concurrent.duration._
 import scala.util.{Failure, Success, Try}
 
+import org.apache.log4j.Logger
+
 object Util {
 
   private val LOG = Logger.getLogger(getClass)
@@ -40,8 +40,9 @@ object Util {
     }
   }
 
-  def retryUntil(condition: ()=> Boolean, conditionDescription: String, 
maxTries: Int = 15,
-                 interval: Duration = 10.seconds): Unit = {
+  def retryUntil(
+      condition: () => Boolean, conditionDescription: String, maxTries: Int = 
15,
+      interval: Duration = 10.seconds): Unit = {
     var met = false
     var tries = 0
 
@@ -56,7 +57,8 @@ object Util {
       tries += 1
 
       if (!met) {
-        LOG.error(s"Failed due to (false == $conditionDescription), retrying 
for the ${tries} times...")
+        LOG.error(s"Failed due to (false == $conditionDescription), " +
+          s"retrying for the ${tries} times...")
         Thread.sleep(interval.toMillis)
       } else {
         LOG.info(s"Success ($conditionDescription) after ${tries} retries")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
index 2217efe..6d277f0 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
@@ -1,10 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package io.gearpump.integrationtest.hadoop
 
-import io.gearpump.integrationtest.{Util, Docker}
 import org.apache.log4j.Logger
 
+import io.gearpump.integrationtest.{Docker, Util}
+
 object HadoopCluster {
 
+  /** Starts a Hadoop cluster */
   def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = {
     val hadoopCluster = new HadoopCluster
     try {
@@ -27,10 +47,11 @@ class HadoopCluster {
   def start(): Unit = {
     Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "")
 
-    Util.retryUntil(()=>isAlive, "Hadoop cluster is alive")
+    Util.retryUntil(() => isAlive, "Hadoop cluster is alive")
     LOG.info("Hadoop cluster is started.")
   }
 
+  // Checks whether the cluster is alive by listing "/"
   private def isAlive: Boolean = {
     Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls 
/")
   }
@@ -43,5 +64,4 @@ class HadoopCluster {
   def shutDown(): Unit = {
     Docker.killAndRemoveContainer(HADOOP_HOST)
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
index 3244d24..862fc58 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,12 +17,14 @@
  */
 package io.gearpump.integrationtest.kafka
 
+import org.apache.log4j.Logger
+
 import io.gearpump.integrationtest.minicluster.MiniCluster
 import io.gearpump.integrationtest.{Docker, Util}
-import org.apache.log4j.Logger
 
 object KafkaCluster {
 
+  /** Starts a Kafka cluster */
   def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): 
Unit = {
     val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka")
     try {
@@ -34,7 +36,7 @@ object KafkaCluster {
   }
 
   def withDataProducer(topic: String, brokerList: String)
-                      (testCode: NumericalDataProducer => Unit): Unit = {
+    (testCode: NumericalDataProducer => Unit): Unit = {
     val producer = new NumericalDataProducer(topic, brokerList)
     try {
       producer.start()
@@ -43,7 +45,6 @@ object KafkaCluster {
       producer.stop()
     }
   }
-
 }
 
 /**
@@ -67,7 +68,7 @@ class KafkaCluster(val advertisedHost: String, zkChroot: 
String = "") {
         "ZK_CHROOT" -> zkChroot),
       tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT)
     )
-    Util.retryUntil(()=>isAlive, "kafka cluster is alive")
+    Util.retryUntil(() => isAlive, "kafka cluster is alive")
     LOG.debug("kafka cluster is started.")
   }
 
@@ -118,14 +119,17 @@ class KafkaCluster(val advertisedHost: String, zkChroot: 
String = "") {
     kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, 
getBrokerListConnectString)
   }
 
-  private def kafkaListTopics(container: String, kafkaHome: String, 
zookeeperConnectionString: String): String = {
+  private def kafkaListTopics(
+      container: String, kafkaHome: String, zookeeperConnectionString: 
String): String = {
+
     LOG.debug(s"|=> Kafka list topics...")
     Docker.execute(container,
       s"$kafkaHome/bin/kafka-topics.sh" +
         s" --zookeeper $zookeeperConnectionString -list")
   }
 
-  private def kafkaFetchLatestOffset(container: String, topic: String, 
kafkaHome: String, brokersList: String): Int = {
+  private def kafkaFetchLatestOffset(
+      container: String, topic: String, kafkaHome: String, brokersList: 
String): Int = {
     LOG.debug(s"|=> Get latest offset of topic $topic...")
     val output = Docker.execute(container,
       s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" +

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
index d319d6b..03bb9fb 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,11 +19,12 @@ package io.gearpump.integrationtest.kafka
 
 import java.util.Properties
 
-import io.gearpump.streaming.serializer.ChillSerializer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.log4j.Logger
 
+import io.gearpump.streaming.serializer.ChillSerializer
+
 class NumericalDataProducer(topic: String, bootstrapServers: String) {
 
   private val LOG = Logger.getLogger(getClass)
@@ -44,7 +45,7 @@ class NumericalDataProducer(topic: String, bootstrapServers: 
String) {
     producer.close()
   }
 
-  /** How many message we have written in total*/
+  /** How many message we have written in total */
   def producedNumbers: Range = {
     Range(1, lastWriteNum + 1)
   }
@@ -52,7 +53,8 @@ class NumericalDataProducer(topic: String, bootstrapServers: 
String) {
   private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = {
     val properties = new Properties()
     properties.setProperty("bootstrap.servers", bootstrapServers)
-    new KafkaProducer[Array[Byte], Array[Byte]](properties, new 
ByteArraySerializer, new ByteArraySerializer)
+    new KafkaProducer[Array[Byte], Array[Byte]](properties,
+      new ByteArraySerializer, new ByteArraySerializer)
   }
 
   private val produceThread = new Thread(new Runnable {
@@ -71,5 +73,4 @@ class NumericalDataProducer(topic: String, bootstrapServers: 
String) {
       }
     }
   })
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
index efeedae..98aceba 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -39,5 +39,4 @@ class MessageLossDetector(totalNum: Int) extends 
ResultVerifier {
   def received(num: Int): Boolean = {
     bitSets(num)
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
index e2e1dbc..29289d9 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,14 +17,16 @@
  */
 package io.gearpump.integrationtest.kafka
 
-import io.gearpump.streaming.serializer.ChillSerializer
+import scala.util.{Failure, Success}
+
 import kafka.api.FetchRequestBuilder
 import kafka.consumer.SimpleConsumer
 import kafka.utils.Utils
-import scala.util.{Failure, Success}
+
+import io.gearpump.streaming.serializer.ChillSerializer
 
 class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: 
Int = 0,
-  host: String, port: Int) {
+    host: String, port: Int) {
 
   private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "")
   private val serializer = new ChillSerializer[Int]
@@ -44,5 +46,4 @@ class SimpleKafkaReader(verifier: ResultVerifier, topic: 
String, partition: Int
       }
     }
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
index 8ecd9cd..e5c7cb7 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,18 +17,16 @@
  */
 package io.gearpump.integrationtest.minicluster
 
-import java.io.File
+import scala.sys.process._
 
 import io.gearpump.integrationtest.Docker
 
-import scala.sys.process._
-
 /**
  * A helper to instantiate the base image for different usage.
  */
 class BaseContainer(val host: String, command: String,
-                    masterAddrs: List[(String, Int)],
-                    tunnelPorts: Set[Int] = Set.empty) {
+    masterAddrs: List[(String, Int)],
+    tunnelPorts: Set[Int] = Set.empty) {
 
   private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher"
   private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump"
@@ -59,5 +57,4 @@ class BaseContainer(val host: String, command: String,
   def killAndRemove(): Unit = {
     Docker.killAndRemoveContainer(host)
   }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
index 50988ca..5eebd30 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,9 +17,10 @@
  */
 package io.gearpump.integrationtest.minicluster
 
+import org.apache.log4j.Logger
+
 import io.gearpump.cluster.MasterToAppMaster
 import io.gearpump.integrationtest.Docker
-import org.apache.log4j.Logger
 
 /**
  * A command-line client to operate a Gearpump cluster

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
index 25c211f..bfdecee 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,12 +18,11 @@
 package io.gearpump.integrationtest.minicluster
 
 import java.io.IOException
+import scala.collection.mutable.ListBuffer
 
-import io.gearpump.cluster.master.MasterNode
-import io.gearpump.integrationtest.{Docker, Util}
 import org.apache.log4j.Logger
 
-import scala.collection.mutable.ListBuffer
+import io.gearpump.integrationtest.{Docker, Util}
 
 /**
  * This class is a test driver for end-to-end integration test.
@@ -53,7 +52,7 @@ class MiniCluster {
   def start(workerNum: Int = 2): Unit = {
 
     // Kill master
-    MASTER_ADDRS.foreach{case (host, _) =>
+    MASTER_ADDRS.foreach { case (host, _) =>
       if (Docker.containerExists(host)) {
         Docker.killAndRemoveContainer(host)
       }
@@ -61,7 +60,7 @@ class MiniCluster {
 
     // Kill existing workers
     workers ++= (0 until workerNum).map("worker" + _)
-    workers.foreach{ worker =>
+    workers.foreach { worker =>
       if (Docker.containerExists(worker)) {
         Docker.killAndRemoveContainer(worker)
       }
@@ -73,7 +72,7 @@ class MiniCluster {
     })
 
     // Start Workers
-    workers.foreach{worker =>
+    workers.foreach { worker =>
       val container = new BaseContainer(worker, "worker", MASTER_ADDRS)
       container.createAndStart()
     }
@@ -94,7 +93,8 @@ class MiniCluster {
       container.createAndStart()
       workers += host
     } else {
-      throw new IOException(s"Cannot add new worker $host, as worker with same 
hostname already exists")
+      throw new IOException(s"Cannot add new worker $host, " +
+        s"as worker with same hostname already exists")
     }
   }
 
@@ -102,7 +102,7 @@ class MiniCluster {
    * @throws RuntimeException if rest client is not authenticated after N 
attempts
    */
   private def expectRestClientAuthenticated(): Unit = {
-    Util.retryUntil(()=>{
+    Util.retryUntil(() => {
       restClient.login()
       LOG.info("rest client has been authenticated")
       true
@@ -113,7 +113,7 @@ class MiniCluster {
    * @throws RuntimeException if service is not available after N attempts
    */
   private def expectClusterAvailable(): Unit = {
-    Util.retryUntil(()=>{
+    Util.retryUntil(() => {
       val response = restClient.queryMaster()
       LOG.info(s"cluster is now available with response: $response.")
       response.aliveFor > 0
@@ -148,8 +148,9 @@ class MiniCluster {
 
   def restart(): Unit = {
     shutDown()
-    Util.retryUntil(()=>
-      !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists), "all 
docker containers killed")
+    Util.retryUntil(() => {
+      !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists)
+    }, "all docker containers killed")
     LOG.info("all docker containers have been killed. restarting...")
     start()
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
index b565f41..087f188 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,29 +17,30 @@
  */
 package io.gearpump.integrationtest.minicluster
 
+import scala.reflect.ClassTag
+
 import com.typesafe.config.{Config, ConfigFactory}
-import io.gearpump.WorkerId
-import io.gearpump.cluster.{AppJar, MasterToAppMaster}
+import org.apache.log4j.Logger
+import upickle.Js
+import upickle.default._
+
+import io.gearpump.cluster.AppMasterToMaster.MasterData
 import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData}
 import io.gearpump.cluster.MasterToClient.HistoryMetrics
+import io.gearpump.cluster.master.MasterSummary
+import io.gearpump.cluster.worker.{WorkerId, WorkerSummary}
+import io.gearpump.cluster.{AppJar, MasterToAppMaster}
 import io.gearpump.integrationtest.{Docker, Util}
 import io.gearpump.services.AppMasterService.Status
 import io.gearpump.services.MasterService.{AppSubmissionResult, 
BuiltinPartitioners}
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
 import io.gearpump.streaming.ProcessorDescription
-import io.gearpump.cluster.AppMasterToMaster.MasterData
-import io.gearpump.cluster.master.MasterSummary
-import io.gearpump.cluster.worker.WorkerSummary
 import io.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
 import io.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, 
ReplaceProcessor}
 import io.gearpump.streaming.appmaster.StreamAppMasterSummary
 import io.gearpump.streaming.executor.Executor.ExecutorSummary
 import io.gearpump.util.{Constants, Graph}
-import org.apache.log4j.Logger
-import upickle.Js
-import upickle.default._
-import io.gearpump.services.util.UpickleUtil._
-
-import scala.reflect.ClassTag
 
 /**
  * A REST client to operate a Gearpump cluster
@@ -50,14 +51,16 @@ class RestClient(host: String, port: Int) {
 
   private val cookieFile: String = "cookie.txt"
 
-  implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = 
upickle.default.Reader[Graph[Int, String]] {
+  implicit val graphReader: upickle.default.Reader[Graph[Int, String]] =
+    upickle.default.Reader[Graph[Int, String]] {
     case Js.Obj(verties, edges) =>
       val vertexList = upickle.default.readJs[List[Int]](verties._2)
       val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
       Graph(vertexList, edgeList)
   }
 
-  private def decodeAs[T](expr: String)(implicit reader: 
upickle.default.Reader[T], classTag: ClassTag[T]): T = try {
+  private def decodeAs[T](
+      expr: String)(implicit reader: upickle.default.Reader[T], classTag: 
ClassTag[T]): T = try {
     read[T](expr)
   } catch {
     case ex: Throwable =>
@@ -91,7 +94,8 @@ class RestClient(host: String, port: Int) {
     listApps().length + 1
   }
 
-  def submitApp(jar: String, executorNum: Int, args: String = "", config: 
String = ""): Boolean = try {
+  def submitApp(jar: String, executorNum: Int, args: String = "", config: 
String = "")
+    : Boolean = try {
     var endpoint = "master/submitapp"
 
     var options = Seq(s"jar=@$jar")
@@ -130,7 +134,8 @@ class RestClient(host: String, port: Int) {
     decodeAs[StreamAppMasterSummary](resp)
   }
 
-  def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = 
"processor*"): HistoryMetrics = {
+  def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = 
"processor*")
+    : HistoryMetrics = {
     val args = if (current) "?readLatest=true" else ""
     val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args")
     decodeAs[HistoryMetrics](resp)
@@ -196,7 +201,8 @@ class RestClient(host: String, port: Int) {
   def replaceStreamingAppProcessor(appId: Int, replaceMe: 
ProcessorDescription): Boolean = try {
     val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe)
     val args = upickle.default.write(replaceOperation)
-    val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + 
Util.encodeUriComponent(args), CRUD_POST)
+    val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + 
Util.encodeUriComponent(args),
+      CRUD_POST)
     decodeAs[DAGOperationResult](resp)
     true
   } catch {


Reply via email to