http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala
 
b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala
index 5e11c34..7b863fa 100644
--- 
a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala
+++ 
b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/Records.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,14 +17,18 @@
  */
 
 package io.gearpump.experiments.yarn.glue
-import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, 
ApplicationReport => YarnApplicationReport, ApplicationSubmissionContext, 
Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => 
YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource}
-import org.apache.hadoop.yarn.util.{Records => YarnRecords}
 
+import scala.language.implicitConversions
+
+import org.apache.hadoop.yarn.api.records.{ApplicationId => YarnApplicationId, 
ApplicationReport => YarnApplicationReport, ApplicationSubmissionContext, 
Container => YarnContainer, ContainerId => YarnContainerId, ContainerStatus => 
YarnContainerStatus, NodeId => YarnNodeId, Resource => YarnResource, 
YarnApplicationState}
+import org.apache.hadoop.yarn.util.{Records => YarnRecords}
 
 object Records {
   def newRecord[T](clazz: Class[T]): T = YarnRecords.newRecord(clazz)
 
-  def newAppSubmissionContext = 
YarnRecords.newRecord(classOf[ApplicationSubmissionContext])
+  def newAppSubmissionContext: ApplicationSubmissionContext = {
+    YarnRecords.newRecord(classOf[ApplicationSubmissionContext])
+  }
 
   class ApplicationId(private[glue] val impl: YarnApplicationId) {
     def getId: Int = impl.getId
@@ -38,9 +42,13 @@ object Records {
         false
       }
     }
+
+    override def hashCode(): Int = {
+      impl.hashCode()
+    }
   }
 
-  object ApplicationId{
+  object ApplicationId {
     def newInstance(timestamp: Long, id: Int): ApplicationId = {
       YarnApplicationId.newInstance(timestamp, id)
     }
@@ -53,9 +61,9 @@ object Records {
 
     def getFinishTime: Long = impl.getFinishTime
 
-    def getOriginalTrackingUrl = impl.getOriginalTrackingUrl
+    def getOriginalTrackingUrl: String = impl.getOriginalTrackingUrl
 
-    def getYarnApplicationState = impl.getYarnApplicationState
+    def getYarnApplicationState: YarnApplicationState = 
impl.getYarnApplicationState
 
     override def toString: String = impl.toString
   }
@@ -69,6 +77,10 @@ object Records {
         false
       }
     }
+
+    override def hashCode(): Int = {
+      impl.hashCode()
+    }
   }
 
   object Resource {
@@ -93,6 +105,10 @@ object Records {
         false
       }
     }
+
+    override def hashCode(): Int = {
+      impl.hashCode()
+    }
   }
 
   class ContainerId(private[glue] val impl: YarnContainerId) {
@@ -105,6 +121,10 @@ object Records {
         false
       }
     }
+
+    override def hashCode(): Int = {
+      impl.hashCode()
+    }
   }
 
   object ContainerId {
@@ -123,6 +143,10 @@ object Records {
         false
       }
     }
+
+    override def hashCode(): Int = {
+      impl.hashCode()
+    }
   }
 
   class ContainerStatus(private[glue] val impl: YarnContainerStatus) {
@@ -167,11 +191,13 @@ object Records {
     app.impl
   }
 
-  private[glue] implicit def yarnContainerStatusToContainerStatus(yarn: 
YarnContainerStatus): ContainerStatus = {
+  private[glue] implicit def yarnContainerStatusToContainerStatus(yarn: 
YarnContainerStatus)
+    : ContainerStatus = {
     new ContainerStatus(yarn)
   }
 
-  private[glue] implicit def containerStatusToYarnContainerStatus(app: 
ContainerStatus): YarnContainerStatus = {
+  private[glue] implicit def containerStatusToYarnContainerStatus(app: 
ContainerStatus)
+    : YarnContainerStatus = {
     app.impl
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala
 
b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala
index 2719025..db7d5d7 100644
--- 
a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala
+++ 
b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnClient.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,12 @@
 
 package io.gearpump.experiments.yarn.glue
 
-import io.gearpump.experiments.yarn.glue.Records._
-import io.gearpump.util.LogUtil
 import org.apache.hadoop.yarn.api.records.YarnApplicationState
 import org.apache.hadoop.yarn.client.api
 
+import io.gearpump.experiments.yarn.glue.Records._
+import io.gearpump.util.LogUtil
+
 /**
  * Adapter for api.YarnClient
  */
@@ -36,7 +37,7 @@ class YarnClient(yarn: YarnConfig) {
   LOG.info("Starting YarnClient...")
 
   def createApplication: ApplicationId = {
-    val app  = client.createApplication()
+    val app = client.createApplication()
     val response = app.getNewApplicationResponse()
     LOG.info("Create application, appId: " + response.getApplicationId())
     response.getApplicationId()
@@ -46,7 +47,9 @@ class YarnClient(yarn: YarnConfig) {
     client.getApplicationReport(appId)
   }
 
-  def submit(name: String, appId: ApplicationId, command: String, resource: 
Resource, queue: String, packagePath: String, configPath: String): 
ApplicationId = {
+  def submit(
+      name: String, appId: ApplicationId, command: String, resource: Resource, 
queue: String,
+      packagePath: String, configPath: String): ApplicationId = {
 
     val appContext = Records.newAppSubmissionContext
     appContext.setApplicationName(name)
@@ -61,13 +64,13 @@ class YarnClient(yarn: YarnConfig) {
     client.submitApplication(appContext)
   }
 
-  def awaitApplication(appId: ApplicationId, timeoutMilliseconds: Long = 
Long.MaxValue): ApplicationReport = {
-    import YarnApplicationState._
+  def awaitApplication(appId: ApplicationId, timeoutMilliseconds: Long = 
Long.MaxValue)
+    : ApplicationReport = {
+    import org.apache.hadoop.yarn.api.records.YarnApplicationState._
     val terminated = Set(FINISHED, KILLED, FAILED, RUNNING)
     var result: ApplicationReport = null
     var done = false
 
-
     val start = System.currentTimeMillis()
     def timeout: Boolean = {
       val now = System.currentTimeMillis()
@@ -78,7 +81,7 @@ class YarnClient(yarn: YarnConfig) {
       }
     }
 
-    while(!done && !timeout) {
+    while (!done && !timeout) {
       val report = client.getApplicationReport(appId)
       val status = report.getYarnApplicationState
       if (terminated.contains(status)) {
@@ -90,15 +93,13 @@ class YarnClient(yarn: YarnConfig) {
       }
     }
 
-    Console.println()
-
     if (timeout) {
       throw new Exception(s"Launch Application $appId timeout...")
     }
     result
   }
 
-  def stop: Unit = {
+  def stop(): Unit = {
     client.stop()
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala
 
b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala
index 688ffed..87f199a 100644
--- 
a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala
+++ 
b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/YarnConfig.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,8 +18,6 @@
 
 package io.gearpump.experiments.yarn.glue
 
-import java.io.{OutputStream, OutputStreamWriter}
-
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala
 
b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala
new file mode 100644
index 0000000..24adbaa
--- /dev/null
+++ 
b/experiments/yarn/src/main/scala/io/gearpump/experiments/yarn/glue/package.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.experiments.yarn
+
+
+/**
+ * YARN facade to decouple Gearpump with YARN.
+ */
+package object glue {
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala
 
b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala
index 3f1aed1..2a6bf38 100644
--- 
a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala
+++ 
b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/CommandSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,9 +19,10 @@
 package io.gearpump.experiments.yarn.appmaster
 
 import com.typesafe.config.ConfigFactory
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.TestUtil
 import io.gearpump.transport.HostPort
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 
 class CommandSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   val config = ConfigFactory.parseString(
@@ -68,21 +69,27 @@ class CommandSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     val version = "gearpump-0.1"
     val master = MasterCommand(config, version, HostPort("127.0.0.1", 8080))
 
-    val expected = "$JAVA_HOME/bin/java  -Xmx512m -cp 
conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH
 -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 
-Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} 
-Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 
-Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR>  
io.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | /usr/bin/tee -a 
<LOG_DIR>/stderr"
+    // scalastyle:off line.size.limit
+    val expected = "$JAVA_HOME/bin/java  -Xmx512m -cp 
conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH
 -Dgearpump.cluster.masters.0=127.0.0.1:8080 -Dgearpump.hostname=127.0.0.1 
-Dgearpump.master-resource-manager-container-id={{CONTAINER_ID}} 
-Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 
-Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> 
io.gearpump.cluster.main.Master -ip 127.0.0.1 -port 8080 2>&1 | /usr/bin/tee -a 
<LOG_DIR>/stderr"
+    // scalastyle:on line.size.limit
     assert(master.get == expected)
   }
 
   "WorkerCommand" should "create correct command line" in {
     val version = "gearpump-0.1"
     val worker = WorkerCommand(config, version, HostPort("127.0.0.1", 8080), 
"worker-machine")
-    val expected = "$JAVA_HOME/bin/java  -Xmx512m -cp 
conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH
 -Dgearpump.cluster.masters.0=127.0.0.1:8080 
-Dgearpump.log.daemon.dir=<LOG_DIR> 
-Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} 
-Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 
-Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine  
io.gearpump.cluster.main.Worker  2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
+    // scalastyle:off line.size.limit
+    val expected = "$JAVA_HOME/bin/java  -Xmx512m -cp 
conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/*:$CLASSPATH
 -Dgearpump.cluster.masters.0=127.0.0.1:8080 
-Dgearpump.log.daemon.dir=<LOG_DIR> 
-Dgearpump.worker-resource-manager-container-id={{CONTAINER_ID}} 
-Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 
-Dgearpump.log.application.dir=<LOG_DIR> -Dgearpump.hostname=worker-machine 
io.gearpump.cluster.main.Worker  2>&1 | /usr/bin/tee -a <LOG_DIR>/stderr"
+    // scalastyle:on line.size.limit
     assert(worker.get == expected)
   }
 
   "AppMasterCommand" should "create correct command line" in {
     val version = "gearpump-0.1"
     val appmaster = AppMasterCommand(config, version, Array("arg1", "arg2", 
"arg3"))
-    val expected = "$JAVA_HOME/bin/java -Xmx512m -cp 
conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH
 -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 
-Dgearpump.binary-version-with-scala-version=gearpump-0.1 
-Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> 
-Dgearpump.hostname={{NM_HOST}}  
io.gearpump.experiments.yarn.appmaster.YarnAppMaster  arg1 arg2 arg3 2>&1 | 
/usr/bin/tee -a <LOG_DIR>/stderr"
+    // scalastyle:off line.size.limit
+    val expected = "$JAVA_HOME/bin/java -Xmx512m -cp 
conf:pack/gearpump-0.1/conf:pack/gearpump-0.1/dashboard:pack/gearpump-0.1/lib/*:pack/gearpump-0.1/lib/daemon/*:pack/gearpump-0.1/lib/services/*:pack/gearpump-0.1/lib/yarn/*:$CLASSPATH
 -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.1 
-Dgearpump.binary-version-with-scala-version=gearpump-0.1 
-Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> 
-Dgearpump.hostname={{NM_HOST}} 
io.gearpump.experiments.yarn.appmaster.YarnAppMaster  arg1 arg2 arg3 2>&1 | 
/usr/bin/tee -a <LOG_DIR>/stderr"
+    // scalastyle:on line.size.limit
     assert(appmaster.get == expected)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
 
b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
index d8150ac..f8f9fe8 100644
--- 
a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
+++ 
b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/UIServiceSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,25 +18,29 @@
 
 package io.gearpump.experiments.yarn.appmaster
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{ActorRef, ActorSystem, Props}
 import akka.testkit.TestProbe
 import com.typesafe.config.ConfigFactory
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.TestUtil
 import io.gearpump.experiments.yarn.appmaster.UIServiceSpec.{Info, MockUI}
 import io.gearpump.transport.HostPort
 import io.gearpump.util.Constants
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 
-class UIServiceSpec  extends FlatSpec with Matchers with BeforeAndAfterAll {
+class UIServiceSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   implicit var system: ActorSystem = null
 
-  override def beforeAll() = {
+  override def beforeAll(): Unit = {
     system = ActorSystem(getClass.getSimpleName, TestUtil.DEFAULT_CONFIG)
   }
 
-  override def afterAll() = {
-    system.shutdown()
-    system.awaitTermination()
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   it should "start UI server correctly" in {
@@ -51,14 +55,15 @@ class UIServiceSpec  extends FlatSpec with Matchers with 
BeforeAndAfterAll {
 
     val ui = system.actorOf(Props(new MockUI(masters, host, port, probe.ref)))
 
-    probe.expectMsgPF(){
-      case info: Info =>
+    probe.expectMsgPF() {
+      case info: Info => {
         assert(info.masterHost == "127.0.0.1")
         assert(info.masterPort == 3000)
         val conf = ConfigFactory.parseFile(new java.io.File(info.configFile))
         assert(conf.getString(Constants.GEARPUMP_SERVICE_HOST) == host)
         assert(conf.getString(Constants.GEARPUMP_SERVICE_HTTP) == "8091")
         assert(conf.getString(Constants.NETTY_TCP_HOSTNAME) == host)
+      }
     }
 
     system.stop(ui)
@@ -70,9 +75,10 @@ object UIServiceSpec {
   case class Info(supervisor: String, masterHost: String, masterPort: Int, 
configFile: String)
 
   class MockUI(masters: List[HostPort], host: String, port: Int, probe: 
ActorRef)
-      extends UIService(masters, host, port) {
+    extends UIService(masters, host, port) {
 
-    override def launch(supervisor: String, masterHost: String, masterPort: 
Int, configFile: String): Unit = {
+    override def launch(
+        supervisor: String, masterHost: String, masterPort: Int, configFile: 
String): Unit = {
       probe ! Info(supervisor, masterHost, masterPort, configFile)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
 
b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
index 880c1d0..84d6d37 100644
--- 
a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
+++ 
b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/appmaster/YarnAppMasterSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,9 +18,17 @@
 
 package io.gearpump.experiments.yarn.appmaster
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.{Actor, ActorSystem, Props}
 import akka.testkit.{TestActorRef, TestProbe}
 import com.typesafe.config.ConfigFactory
+import org.mockito.ArgumentCaptor
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, 
RemoveWorker}
 import io.gearpump.cluster.TestUtil
 import io.gearpump.experiments.yarn.Constants
@@ -29,11 +37,6 @@ import 
io.gearpump.experiments.yarn.appmaster.YarnAppMasterSpec.UI
 import io.gearpump.experiments.yarn.glue.Records.{Container, Resource, _}
 import io.gearpump.experiments.yarn.glue.{NMClient, RMClient}
 import io.gearpump.transport.HostPort
-import org.mockito.ArgumentCaptor
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
 
 class YarnAppMasterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
 
@@ -83,24 +86,23 @@ class YarnAppMasterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
   val packagePath = "/user/gearpump/gearpump.zip"
   val configPath = "/user/my/conf"
 
-
-  override def beforeAll() = {
+  override def beforeAll(): Unit = {
     system = ActorSystem("test", config)
   }
 
-  override def afterAll() = {
-    system.shutdown()
-    system.awaitTermination()
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
-  private def startAppMaster: (TestActorRef[_ <: Actor], TestProbe, NMClient, 
RMClient) = {
+  private def startAppMaster(): (TestActorRef[YarnAppMaster], TestProbe, 
NMClient, RMClient) = {
     val rmClient = mock(classOf[RMClient])
     val nmClient = mock(classOf[NMClient])
     val ui = mock(classOf[UIFactory])
     when(ui.props(any[List[HostPort]], anyString, 
anyInt)).thenReturn(Props(new UI))
 
-
-    val appMaster = TestActorRef(Props(new YarnAppMaster(rmClient, nmClient, 
packagePath, configPath, ui)))
+    val appMaster = TestActorRef[YarnAppMaster](Props(
+      new YarnAppMaster(rmClient, nmClient, packagePath, configPath, ui)))
 
     verify(rmClient).start(appMaster)
     verify(nmClient).start(appMaster)
@@ -118,48 +120,50 @@ class YarnAppMasterSpec extends FlatSpec with Matchers 
with BeforeAndAfterAll {
     when(masterContainer.getNodeId).thenReturn(mockNode)
     when(masterContainer.getId).thenReturn(mockId)
 
-    // launch master
+    // Launchs master
     appMaster ! ContainersAllocated(List.fill(masterCount)(masterContainer))
-    verify(nmClient, times(masterCount)).launchCommand(any[Container], 
anyString, anyString, anyString)
+    verify(nmClient,
+      times(masterCount)).launchCommand(any[Container], anyString, anyString, 
anyString)
 
-    // master containers started
+    // Master containers started
     (0 until masterCount).foreach(_ => appMaster ! ContainerStarted(mockId))
 
-    //transition to start workers
+    // Transition to start workers
     val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
     verify(rmClient, times(2)).requestContainers(workerResources.capture())
     assert(workerResources.getValue.size == workerCount)
 
-    // launch workers
+    // Launchs workers
     val workerContainer = mock(classOf[Container])
     when(workerContainer.getNodeId).thenReturn(mockNode)
     val workerContainerId = 
ContainerId.fromString("container_1449802454214_0034_01_000006")
     when(workerContainer.getId).thenReturn(workerContainerId)
     appMaster ! ContainersAllocated(List.fill(workerCount)(workerContainer))
-    verify(nmClient, times(workerCount + 
masterCount)).launchCommand(any[Container], anyString, anyString, anyString)
+    verify(nmClient, times(workerCount + masterCount))
+      .launchCommand(any[Container], anyString, anyString, anyString)
 
-    // worker containers started
+    // Worker containers started
     (0 until workerCount).foreach(_ => appMaster ! ContainerStarted(mockId))
 
-    // start UI server
+    // Starts UI server
     verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt)
 
-    //Application Ready...
+    // Application Ready...
     val client = TestProbe()
 
-    // get active config
+    // Gets active config
     appMaster.tell(GetActiveConfig("client"), client.ref)
     client.expectMsgType[ActiveConfig]
 
-    // query version
+    // Queries version
     appMaster.tell(QueryVersion, client.ref)
     client.expectMsgType[Version]
 
-    // query version
+    // Queries version
     appMaster.tell(QueryClusterInfo, client.ref)
     client.expectMsgType[ClusterInfo]
 
-    // add worker
+    // Adds worker
     val newWorkerCount = 2
     appMaster.tell(AddWorker(newWorkerCount), client.ref)
     client.expectMsgType[CommandResult]
@@ -167,18 +171,18 @@ class YarnAppMasterSpec extends FlatSpec with Matchers 
with BeforeAndAfterAll {
     verify(rmClient, times(3)).requestContainers(newWorkerResources.capture())
     assert(newWorkerResources.getValue.size == newWorkerCount)
 
-    // new container allocated
+    // New container allocated
     appMaster ! ContainersAllocated(List.fill(newWorkerCount)(workerContainer))
     verify(nmClient, times(workerCount + masterCount + newWorkerCount)).
       launchCommand(any[Container], anyString, anyString, anyString)
 
-    // new worker containers started
+    // New worker containers started
     (0 until newWorkerCount).foreach(_ => appMaster ! ContainerStarted(mockId))
 
-    // same UI server
+    // Same UI server
     verify(ui, times(1)).props(any[List[HostPort]], anyString, anyInt)
 
-    // remove worker
+    // Removes worker
     appMaster.tell(RemoveWorker(workerContainerId.toString), client.ref)
     client.expectMsgType[CommandResult]
     verify(nmClient).stopContainer(any[ContainerId], any[NodeId])
@@ -187,21 +191,22 @@ class YarnAppMasterSpec extends FlatSpec with Matchers 
with BeforeAndAfterAll {
   }
 
   it should "start master, worker and UI on YARN" in {
-    val (appMaster, client, nmClient, rmClient) = startAppMaster
+    val env = startAppMaster()
+    val (appMaster, client, nmClient, rmClient) = env
 
-    // kill the app
+    // Kills the app
     appMaster.tell(Kill, client.ref)
     client.expectMsgType[CommandResult]
     verify(nmClient, times(1)).stop()
     verify(rmClient, times(1)).shutdownApplication()
-
   }
 
   it should "handle resource manager errors" in {
-    val (appMaster, client, nmClient, rmClient) = startAppMaster
+    val env = startAppMaster()
+    val (appMaster, client, nmClient, rmClient) = env
 
     // on error
-    val ex = new Exception
+    val ex = new Exception("expected resource manager exception")
     appMaster.tell(ResourceManagerException(ex), client.ref)
     verify(nmClient, times(1)).stop()
     verify(rmClient, times(1)).failApplication(ex)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
 
b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
index c794cae..3bd7f4f 100644
--- 
a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
+++ 
b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/LaunchClusterSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,24 +18,25 @@
 
 package io.gearpump.experiments.yarn.client
 
-import java.io.{OutputStream, ByteArrayOutputStream, BufferedOutputStream, 
FileOutputStream, InputStream, ByteArrayInputStream}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream, 
OutputStream}
+import java.util.Random
 import java.util.zip.{ZipEntry, ZipOutputStream}
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.util.Try
 
 import akka.actor.ActorSystem
 import akka.testkit.TestProbe
 import com.typesafe.config.ConfigFactory
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.TestUtil
 import io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, 
GetActiveConfig}
+import io.gearpump.experiments.yarn.glue.Records._
 import io.gearpump.experiments.yarn.glue.{FileSystem, YarnClient, YarnConfig}
 import io.gearpump.util.FileUtils
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import org.mockito.ArgumentCaptor
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import java.util.Random
-import scala.concurrent.Await
-import scala.util.Try
-import io.gearpump.experiments.yarn.glue.Records._
 class LaunchClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
   implicit var system: ActorSystem = null
 
@@ -88,14 +89,13 @@ class LaunchClusterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
       |}
     """.stripMargin).withFallback(TestUtil.DEFAULT_CONFIG)
 
-
-  override def beforeAll() = {
+  override def beforeAll(): Unit = {
     system = ActorSystem(getClass.getSimpleName, akka)
   }
 
-  override def afterAll() = {
-    system.shutdown()
-    system.awaitTermination()
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   it should "reject non-zip files" in {
@@ -109,7 +109,6 @@ class LaunchClusterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     assert(Try(launcher.submit("gearpump", packagePath)).isFailure)
   }
 
-
   it should "reject if we cannot find the package file on HDFS" in {
     val yarnConfig = mock(classOf[YarnConfig])
     val yarnClient = mock(classOf[YarnClient])
@@ -145,7 +144,8 @@ class LaunchClusterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     val appMasterResolver = mock(classOf[AppMasterResolver])
 
     val version = "gearpump-0.2"
-    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, 
appMasterResolver, version)
+    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
+      appMasterResolver, version)
     val packagePath = "gearpump.zip"
     when(fs.exists(anyString)).thenReturn(true)
 
@@ -161,7 +161,8 @@ class LaunchClusterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     val appMasterResolver = mock(classOf[AppMasterResolver])
 
     val version = "gearpump-0.2"
-    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, 
appMasterResolver, version)
+    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient,
+      fs, system, appMasterResolver, version)
     val packagePath = "gearpump.zip"
 
     val out = mock(classOf[OutputStream])
@@ -178,12 +179,15 @@ class LaunchClusterSpec extends FlatSpec with Matchers 
with BeforeAndAfterAll {
     when(yarnClient.createApplication).thenReturn(appId)
     assert(appId == launcher.submit("gearpump", packagePath))
 
-    // 3 config files are uploaded to HDFS, one is akka.conf, one is 
yarn-site.xml, one is log4j.properties.
+    // 3 Config files are uploaded to HDFS, one is akka.conf,
+    // one is yarn-site.xml, one is log4j.properties.
     verify(fs, times(3)).create(anyString)
     verify(out, times(3)).close()
 
-    //val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
-    val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp 
conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH
 -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 
-Dgearpump.binary-version-with-scala-version=gearpump-0.2 
-Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> 
-Dgearpump.hostname={{NM_HOST}}  
io.gearpump.experiments.yarn.appmaster.YarnAppMaster  -conf 
/root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | 
/usr/bin/tee -a <LOG_DIR>/stderr"
+    // val workerResources = ArgumentCaptor.forClass(classOf[List[Resource]])
+    // scalastyle:off line.size.limit
+    val expectedCommand = "$JAVA_HOME/bin/java -Xmx512m -cp 
conf:pack/gearpump-0.2/conf:pack/gearpump-0.2/dashboard:pack/gearpump-0.2/lib/*:pack/gearpump-0.2/lib/daemon/*:pack/gearpump-0.2/lib/services/*:pack/gearpump-0.2/lib/yarn/*:$CLASSPATH
 -Dgearpump.home={{LOCAL_DIRS}}/{{CONTAINER_ID}}/pack/gearpump-0.2 
-Dgearpump.binary-version-with-scala-version=gearpump-0.2 
-Dgearpump.log.daemon.dir=<LOG_DIR> -Dgearpump.log.application.dir=<LOG_DIR> 
-Dgearpump.hostname={{NM_HOST}} 
io.gearpump.experiments.yarn.appmaster.YarnAppMaster  -conf 
/root/.gearpump_application_0_0000/conf/ -package gearpump.zip 2>&1 | 
/usr/bin/tee -a <LOG_DIR>/stderr"
+    // scalastyle:on line.size.limit
     verify(yarnClient).submit("gearpump", appId, expectedCommand,
       Resource.newInstance(512, 1), "default",
       "gearpump.zip", "/root/.gearpump_application_0_0000/conf/")
@@ -197,7 +201,8 @@ class LaunchClusterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     val appMaster = TestProbe()
 
     val version = "gearpump-0.2"
-    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system, 
appMasterResolver, version)
+    val launcher = new LaunchCluster(akka, yarnConfig, yarnClient, fs, system,
+      appMasterResolver, version)
     val outputPath = java.io.File.createTempFile("LaunchClusterSpec", ".conf")
 
     when(appMasterResolver.resolve(any[ApplicationId], 
anyInt)).thenReturn(appMaster.ref)
@@ -206,7 +211,7 @@ class LaunchClusterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     appMaster.reply(ActiveConfig(ConfigFactory.empty()))
 
     import scala.concurrent.duration._
-    val file = Await.result(fileFuture, 30 seconds).asInstanceOf[java.io.File]
+    val file = Await.result(fileFuture, 30.seconds).asInstanceOf[java.io.File]
 
     assert(!FileUtils.read(file).isEmpty)
     file.delete()
@@ -216,10 +221,10 @@ class LaunchClusterSpec extends FlatSpec with Matchers 
with BeforeAndAfterAll {
     val bytes = new ByteArrayOutputStream(1000)
     val zipOut = new ZipOutputStream(bytes)
 
-    // not available on BufferedOutputStream
+    // Not available on BufferedOutputStream
     zipOut.putNextEntry(new ZipEntry(s"$version/README.md"))
     zipOut.write("README".getBytes())
-    // not available on BufferedOutputStream
+    // Not available on BufferedOutputStream
     zipOut.closeEntry()
     zipOut.close()
     new ByteArrayInputStream(bytes.toByteArray)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala
 
b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala
index 07a07d7..b324ece 100644
--- 
a/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala
+++ 
b/experiments/yarn/src/test/scala/io/gearpump/experiments/yarn/client/ManageClusterSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,9 +18,14 @@
 
 package io.gearpump.experiments.yarn.client
 
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+
 import akka.actor.ActorSystem
 import akka.testkit.TestProbe
 import com.typesafe.config.ConfigFactory
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
 import io.gearpump.cluster.ClientToMaster.{AddWorker, CommandResult, 
RemoveWorker}
 import io.gearpump.cluster.TestUtil
 import io.gearpump.cluster.main.ParseResult
@@ -28,21 +33,18 @@ import 
io.gearpump.experiments.yarn.appmaster.YarnAppMaster.{ActiveConfig, Clust
 import io.gearpump.experiments.yarn.client.ManageCluster._
 import io.gearpump.experiments.yarn.glue.Records.ApplicationId
 import io.gearpump.util.FileUtils
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-
-import scala.concurrent.Await
 
 class ManageClusterSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
 
   implicit var system: ActorSystem = null
 
-  override def beforeAll() = {
+  override def beforeAll(): Unit = {
     system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
   }
 
-  override def afterAll() = {
-    system.shutdown()
-    system.awaitTermination()
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
   it should "getConfig from remote Gearpump" in {
@@ -52,11 +54,12 @@ class ManageClusterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
 
     val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
 
-    val future = manager.command(GET_CONFIG, new ParseResult(Map("output" -> 
output.toString), Array.empty[String]))
+    val future = manager.command(GET_CONFIG, new ParseResult(Map("output" -> 
output.toString),
+      Array.empty[String]))
     appMaster.expectMsgType[GetActiveConfig]
     appMaster.reply(ActiveConfig(ConfigFactory.empty()))
     import scala.concurrent.duration._
-    Await.result(future, 30 seconds)
+    Await.result(future, 30.seconds)
 
     val content = FileUtils.read(output)
     assert(content.length > 0)
@@ -68,11 +71,12 @@ class ManageClusterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     val appMaster = TestProbe()
     val manager = new ManageCluster(appId, appMaster.ref, system)
 
-    val future = manager.command(ADD_WORKER, new ParseResult(Map("count" -> 
1.toString), Array.empty[String]))
+    val future = manager.command(ADD_WORKER, new ParseResult(Map("count" -> 
1.toString),
+      Array.empty[String]))
     appMaster.expectMsg(AddWorker(1))
     appMaster.reply(CommandResult(true))
     import scala.concurrent.duration._
-    val result = Await.result(future, 30 seconds).asInstanceOf[CommandResult]
+    val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
     assert(result.success)
   }
 
@@ -81,11 +85,12 @@ class ManageClusterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     val appMaster = TestProbe()
     val manager = new ManageCluster(appId, appMaster.ref, system)
 
-    val future = manager.command(REMOVE_WORKER, new 
ParseResult(Map("container" -> "1"), Array.empty[String]))
+    val future = manager.command(REMOVE_WORKER, new 
ParseResult(Map("container" -> "1"),
+      Array.empty[String]))
     appMaster.expectMsg(RemoveWorker("1"))
     appMaster.reply(CommandResult(true))
     import scala.concurrent.duration._
-    val result = Await.result(future, 30 seconds).asInstanceOf[CommandResult]
+    val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
     assert(result.success)
   }
 
@@ -93,11 +98,12 @@ class ManageClusterSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     val appId = ApplicationId.newInstance(0L, 0)
     val appMaster = TestProbe()
     val manager = new ManageCluster(appId, appMaster.ref, system)
-    val future = manager.command(VERSION, new ParseResult(Map("container" -> 
"1"), Array.empty[String]))
+    val future = manager.command(VERSION, new ParseResult(Map("container" -> 
"1"),
+      Array.empty[String]))
     appMaster.expectMsg(QueryVersion)
     appMaster.reply(Version("version 0.1"))
     import scala.concurrent.duration._
-    val result = Await.result(future, 30 seconds).asInstanceOf[Version]
+    val result = Await.result(future, 30.seconds).asInstanceOf[Version]
     assert(result.version == "version 0.1")
   }
 
@@ -108,11 +114,12 @@ class ManageClusterSpec extends FlatSpec with Matchers 
with BeforeAndAfterAll {
 
     val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
 
-    val future = manager.command(QUERY, new ParseResult(Map.empty[String, 
String], Array.empty[String]))
+    val future = manager.command(QUERY, new ParseResult(Map.empty[String, 
String],
+      Array.empty[String]))
     appMaster.expectMsg(QueryClusterInfo)
     appMaster.reply(ClusterInfo(List("master"), List("worker")))
     import scala.concurrent.duration._
-    val result = Await.result(future, 30 seconds).asInstanceOf[ClusterInfo]
+    val result = Await.result(future, 30.seconds).asInstanceOf[ClusterInfo]
     assert(result.masters.sameElements(List("master")))
     assert(result.workers.sameElements(List("worker")))
   }
@@ -124,11 +131,12 @@ class ManageClusterSpec extends FlatSpec with Matchers 
with BeforeAndAfterAll {
 
     val output = java.io.File.createTempFile("managerClusterSpec", ".conf")
 
-    val future = manager.command(KILL, new ParseResult(Map("container" -> 
"1"), Array.empty[String]))
+    val future = manager.command(KILL, new ParseResult(Map("container" -> "1"),
+      Array.empty[String]))
     appMaster.expectMsg(Kill)
     appMaster.reply(CommandResult(true))
     import scala.concurrent.duration._
-    val result = Await.result(future, 30 seconds).asInstanceOf[CommandResult]
+    val result = Await.result(future, 30.seconds).asInstanceOf[CommandResult]
     assert(result.success)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/README.md
----------------------------------------------------------------------
diff --git a/external/hadoopfs/README.md b/external/hadoopfs/README.md
index 73eee9e..7a9aeef 100644
--- a/external/hadoopfs/README.md
+++ b/external/hadoopfs/README.md
@@ -30,7 +30,7 @@ This interface is used by SequenceFileSink to determinate the 
output format, the
 The HDFS cluster should run on where Gearpump is deployed.
 Suppose HDFS is installed at ```/usr/lib/hadoop2.6``` on every node and you 
already have your application built into a jar file. 
 Then before submitting the application, you need to add Hdfs lib folder and 
conf folder into ```gearpump.executor.extraClasspath``` in 
```conf/gear.conf```, for example 
```/usr/lib/hadoop2.6/share/hadoop/common/*:/usr/lib/hadoop2.6/share/hadoop/common/lib/*:/usr/lib/hadoop2.6/share/hadoop/hdfs/*:/usr/lib/hadoop2.6/share/hadoop/hdfs/lib/*:/usr/lib/hadoop2.6/etc/conf```.
 
-Please note only client side's configuration change is needed. After that, you 
are able to submmit the application.
+Please note only client side's configuration change is needed. After that, you 
are able to submit the application.
 
 ## Working with Secured HDFS
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
index eb03e3c..194c9a5 100644
--- 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
+++ 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,27 +18,30 @@
 
 package io.gearpump.streaming.hadoop
 
-import io.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreWriter, 
HadoopCheckpointStoreReader}
-import io.gearpump.streaming.hadoop.lib.rotation.Rotation
-import io.gearpump.streaming.transaction.api.CheckpointStore
-import io.gearpump.TimeStamp
-import io.gearpump.util.LogUtil
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.slf4j.Logger
 
+import io.gearpump.TimeStamp
+import io.gearpump.streaming.hadoop.lib.rotation.Rotation
+import io.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, 
HadoopCheckpointStoreWriter}
+import io.gearpump.streaming.transaction.api.CheckpointStore
+import io.gearpump.util.LogUtil
+
 object HadoopCheckpointStore {
   val LOG: Logger = LogUtil.getLogger(classOf[HadoopCheckpointStore])
 }
 
-
 /**
- * stores timestamp-checkpoint mapping to Hadoop-compatible filesystem
- * store file layout
- *    timestamp1, index1,
- *    timestamp2, index2,
- *    ...
- *    timestampN, indexN
+ * Stores timestamp-checkpoint mapping to Hadoop-compatible filesystem.
+ *
+ * Store file layout:
+ * {{{
+ * timestamp1, index1,
+ * timestamp2, index2,
+ * ...
+ * timestampN, indexN
+ * }}}
  */
 class HadoopCheckpointStore(
     dir: Path,
@@ -52,12 +55,15 @@ class HadoopCheckpointStore(
   private[hadoop] var curFile: Option[String] = None
   private[hadoop] var curWriter: Option[HadoopCheckpointStoreWriter] = None
   // regex (checkpoints-$startTime-$endTime.store) for complete checkpoint 
file,
-  private val compRegex = """checkpoints-(\d+)-(\d+).store""".r
+  private val compRegex =
+    """checkpoints-(\d+)-(\d+).store""".r
   // regex (checkpoints-$startTime.store) for temporary checkpoint file
-  private val tempRegex = """checkpoints-(\d+).store""".r
+  private val tempRegex =
+    """checkpoints-(\d+).store""".r
 
   /**
-   * persists a pair of timestamp and checkpoint, which
+   * Persists a pair of timestamp and checkpoint, which:
+   *
    *   1. creates a temporary checkpoint file, checkpoints-\$startTime.store, 
if not exist
    *   2. writes out (timestamp, checkpoint) and marks rotation
    *   3. rotates checkpoint file if needed
@@ -70,7 +76,8 @@ class HadoopCheckpointStore(
     if (curWriter.isEmpty) {
       curStartTime = curTime
       curFile = Some(s"checkpoints-$curStartTime.store")
-      curWriter = curFile.map(file => new HadoopCheckpointStoreWriter(new 
Path(dir, file), hadoopConfig))
+      curWriter = curFile.map(file =>
+        new HadoopCheckpointStoreWriter(new Path(dir, file), hadoopConfig))
     }
 
     curWriter.foreach { w =>
@@ -89,7 +96,8 @@ class HadoopCheckpointStore(
   }
 
   /**
-   * recovers checkpoint given timestamp, which
+   * Recovers checkpoint given timestamp, which
+   *  {{{
    *   1. returns None if no store exists
    *   2. searches checkpoint stores for
    *     a. complete store checkpoints-\$startTime-\$endTime.store
@@ -99,6 +107,7 @@ class HadoopCheckpointStore(
    *   3. renames store to checkpoints-\$startTime-\$endTime.store
    *   4. deletes all stores whose name has a startTime larger than timestamp
    *   5. looks for the checkpoint in the found store
+   *   }}}
    */
   override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
     var checkpoint: Option[Array[Byte]] = None
@@ -152,4 +161,4 @@ class HadoopCheckpointStore(
   override def close(): Unit = {
     curWriter.foreach(_.close())
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
index a6c6d9b..5a81ecd 100644
--- 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
+++ 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreFactory.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,13 +20,14 @@ package io.gearpump.streaming.hadoop
 
 import java.io.{ObjectInputStream, ObjectOutputStream}
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.hadoop.lib.HadoopUtil
 import io.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation}
 import io.gearpump.streaming.task.TaskContext
-import io.gearpump.streaming.transaction.api.{CheckpointStoreFactory, 
CheckpointStore}
-import io.gearpump.cluster.UserConfig
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import io.gearpump.streaming.transaction.api.{CheckpointStore, 
CheckpointStoreFactory}
 
 object HadoopCheckpointStoreFactory {
   val VERSION = 1
@@ -37,7 +38,7 @@ class HadoopCheckpointStoreFactory(
     @transient private var hadoopConfig: Configuration,
     rotation: Rotation = new FileSizeRotation(128 * Math.pow(2, 20).toLong))
   extends CheckpointStoreFactory {
-  import HadoopCheckpointStoreFactory._
+  import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory._
 
   private def writeObject(out: ObjectOutputStream): Unit = {
     out.defaultWriteObject()
@@ -52,8 +53,9 @@ class HadoopCheckpointStoreFactory(
 
   override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): 
CheckpointStore = {
     import taskContext.{appId, taskId}
-    val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION", 
s"app$appId-task${taskId.processorId}_${taskId.index}")
+    val dirPath = new Path(dir + Path.SEPARATOR + s"v$VERSION",
+      s"app$appId-task${taskId.processorId}_${taskId.index}")
     val fs = HadoopUtil.getFileSystemForPath(dirPath, hadoopConfig)
     new HadoopCheckpointStore(dirPath, fs, hadoopConfig, rotation)
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala
 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala
index bf4f2ea..a07dbbc 100644
--- 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala
+++ 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/SequenceFileSink.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,6 +19,10 @@ package io.gearpump.streaming.hadoop
 
 import java.text.SimpleDateFormat
 
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hdfs.HdfsConfiguration
+import org.apache.hadoop.io.SequenceFile
+
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.hadoop.lib.HadoopUtil
@@ -26,9 +30,6 @@ import 
io.gearpump.streaming.hadoop.lib.format.{DefaultSequenceFormatter, Output
 import io.gearpump.streaming.hadoop.lib.rotation.{FileSizeRotation, Rotation}
 import io.gearpump.streaming.sink.DataSink
 import io.gearpump.streaming.task.{TaskContext, TaskId}
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hdfs.HdfsConfiguration
-import org.apache.hadoop.io.SequenceFile
 
 class SequenceFileSink(
     userConfig: UserConfig,
@@ -43,10 +44,12 @@ class SequenceFileSink(
   private var appName: String = null
 
   /**
-    * open connection to data sink
-    * invoked at onStart() method of [[Task]]
-    * @param context is the task context at runtime
-    */
+   * Starts connection to data sink
+   *
+   * Invoked at onStart() method of [[io.gearpump.streaming.task.Task]]
+   *
+   * @param context is the task context at runtime
+   */
   override def open(context: TaskContext): Unit = {
     HadoopUtil.login(userConfig, configuration)
     this.appName = context.appName
@@ -55,10 +58,11 @@ class SequenceFileSink(
   }
 
   /**
-    * write message into data sink
-    * invoked at onNext() method of [[Task]]
-    * @param message wraps data to be written out
-    */
+   * Writes message into data sink
+   *
+   * Invoked at onNext() method of [[io.gearpump.streaming.task.Task]]
+   * @param message wraps data to be written out
+   */
   override def write(message: Message): Unit = {
     val key = sequenceFormat.getKey(message)
     val value = sequenceFormat.getValue(message)
@@ -67,7 +71,7 @@ class SequenceFileSink(
     }
     writer.append(key, value)
     rotation.mark(message.timestamp, writer.getLength)
-    if(rotation.shouldRotate){
+    if (rotation.shouldRotate) {
       closeWriter
       this.writer = getNextWriter
       rotation.rotate
@@ -75,15 +79,16 @@ class SequenceFileSink(
   }
 
   /**
-    * close connection to data sink
-    * invoked at onClose() method of [[Task]]
-    */
+   * Closes connection to data sink
+   *
+   * Invoked at onClose() method of [[io.gearpump.streaming.task.Task]]
+   */
   override def close(): Unit = {
-    closeWriter
+    closeWriter()
   }
 
-  private def closeWriter: Unit = {
-    Option(writer).foreach{ w =>
+  private def closeWriter(): Unit = {
+    Option(writer).foreach { w =>
       w.hflush()
       w.close()
     }
@@ -102,4 +107,4 @@ class SequenceFileSink(
     val base = new Path(basePath, 
s"$appName-task${taskId.processorId}_${taskId.index}")
     new Path(base, dateFormat.format(new java.util.Date))
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
index 40ee2f5..52acbac 100644
--- 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
+++ 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,18 +20,19 @@ package io.gearpump.streaming.hadoop.lib
 
 import java.io.EOFException
 
-import io.gearpump.TimeStamp
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
+import io.gearpump.TimeStamp
+
 class HadoopCheckpointStoreReader(
     path: Path,
     hadoopConfig: Configuration)
- extends Iterator[(TimeStamp, Array[Byte])] {
+  extends Iterator[(TimeStamp, Array[Byte])] {
 
   private val stream = HadoopUtil.getInputStream(path, hadoopConfig)
   private var nextTimeStamp: Option[TimeStamp] = None
-  private var nextData : Option[Array[Byte]] = None
+  private var nextData: Option[Array[Byte]] = None
 
   override def hasNext: Boolean = {
     if (nextTimeStamp.isDefined) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
index f8d99ee..35f2f51 100644
--- 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
+++ 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,10 +18,11 @@
 
 package io.gearpump.streaming.hadoop.lib
 
-import io.gearpump.TimeStamp
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
+import io.gearpump.TimeStamp
+
 class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) {
   private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala
 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala
index f1db219..eb579e4 100644
--- 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala
+++ 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/HadoopUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +19,20 @@ package io.gearpump.streaming.hadoop.lib
 
 import java.io.File
 
-import io.gearpump.cluster.UserConfig
-import io.gearpump.util.{FileUtils, Constants}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.security.UserGroupInformation
 
+import io.gearpump.cluster.UserConfig
+import io.gearpump.util.{Constants, FileUtils}
+
 private[hadoop] object HadoopUtil {
 
   def getOutputStream(path: Path, hadoopConfig: Configuration): 
FSDataOutputStream = {
     val dfs = getFileSystemForPath(path, hadoopConfig)
     val stream: FSDataOutputStream = {
       if (dfs.isFile(path)) {
-          dfs.append(path)
+        dfs.append(path)
       } else {
         dfs.create(path)
       }
@@ -56,12 +57,13 @@ private[hadoop] object HadoopUtil {
   }
 
   def login(userConfig: UserConfig, configuration: Configuration): Unit = {
-    if(UserGroupInformation.isSecurityEnabled) {
+    if (UserGroupInformation.isSecurityEnabled) {
       val principal = 
userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL)
       val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE)
-      if(principal.isEmpty || keytabContent.isEmpty) {
+      if (principal.isEmpty || keytabContent.isEmpty) {
         val errorMsg = s"HDFS is security enabled, user should provide 
kerberos principal in " +
-          s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} and keytab file in 
${Constants.GEARPUMP_KEYTAB_FILE}"
+          s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} " +
+          s"and keytab file in ${Constants.GEARPUMP_KEYTAB_FILE}"
         throw new Exception(errorMsg)
       }
       val keytabFile = File.createTempFile("login", ".keytab")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
index 8b693d0..d19e71f 100644
--- 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
+++ 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/DefaultSequenceFormatter.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
  */
 package io.gearpump.streaming.hadoop.lib.format
 
+import org.apache.hadoop.io.{LongWritable, Text, Writable}
+
 import io.gearpump.Message
-import org.apache.hadoop.io.{LongWritable, Writable, Text}
 
-class DefaultSequenceFormatter extends OutputFormatter{
+class DefaultSequenceFormatter extends OutputFormatter {
   override def getKey(message: Message): Writable = new 
LongWritable(message.timestamp)
 
   override def getValue(message: Message): Writable = new 
Text(message.msg.asInstanceOf[String])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
index bc385f0..fe8e52e 100644
--- 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
+++ 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/format/OutputFormatter.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,10 +17,11 @@
  */
 package io.gearpump.streaming.hadoop.lib.format
 
-import io.gearpump.Message
 import org.apache.hadoop.io.Writable
 
-trait OutputFormatter extends Serializable{
+import io.gearpump.Message
+
+trait OutputFormatter extends Serializable {
   def getKeyClass: Class[_ <: Writable]
 
   def getValueClass: Class[_ <: Writable]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
index 104a7b9..cd83ea5 100644
--- 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
+++ 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotation.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -30,11 +30,8 @@ case class FileSizeRotation(maxBytes: Long) extends Rotation 
{
 
   override def shouldRotate: Boolean = bytesWritten >= maxBytes
 
-  override def rotate: Unit = {
+  override def rotate(): Unit = {
     bytesWritten = 0L
   }
 }
 
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
index c56dd65..e28b222 100644
--- 
a/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
+++ 
b/external/hadoopfs/src/main/scala/io/gearpump/streaming/hadoop/lib/rotation/Rotation.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,5 +23,5 @@ import io.gearpump.TimeStamp
 trait Rotation extends Serializable {
   def mark(timestamp: TimeStamp, offset: Long): Unit
   def shouldRotate: Boolean
-  def rotate: Unit
+  def rotate(): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
 
b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
index db5615d..cc8a5f0 100644
--- 
a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
+++ 
b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/HadoopCheckpointStoreIntegrationSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,6 @@
 
 package io.gearpump.streaming.hadoop
 
-import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.hadoop.lib.HadoopUtil
-import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
-import io.gearpump.streaming.task.TaskId
-import io.gearpump.cluster.UserConfig
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.mockito.Mockito._
@@ -31,7 +26,14 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-class HadoopCheckpointStoreIntegrationSpec extends PropSpec with 
PropertyChecks with MockitoSugar with Matchers {
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.MockUtil
+import io.gearpump.streaming.hadoop.lib.HadoopUtil
+import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import io.gearpump.streaming.task.TaskId
+
+class HadoopCheckpointStoreIntegrationSpec
+  extends PropSpec with PropertyChecks with MockitoSugar with Matchers {
 
   property("HadoopCheckpointStore should persist and recover checkpoints") {
     val fileSizeGen = Gen.chooseNum[Int](100, 1000)
@@ -44,7 +46,8 @@ class HadoopCheckpointStoreIntegrationSpec extends PropSpec 
with PropertyChecks
       when(taskContext.taskId).thenReturn(TaskId(0, 0))
 
       val rootDirName = "test"
-      val rootDir = new Path(rootDirName + Path.SEPARATOR + 
s"v${HadoopCheckpointStoreFactory.VERSION}")
+      val rootDir = new Path(rootDirName + Path.SEPARATOR +
+        s"v${HadoopCheckpointStoreFactory.VERSION}")
       val subDir = new Path(rootDir, "app0-task0_0")
 
       val fs = HadoopUtil.getFileSystemForPath(rootDir, hadoopConfig)
@@ -60,7 +63,7 @@ class HadoopCheckpointStoreIntegrationSpec extends PropSpec 
with PropertyChecks
       val tempFile = new Path(subDir, "checkpoints-0.store")
       fs.exists(tempFile) shouldBe true
 
-      checkpointStore.persist(1L , Array.fill(fileSize)(0.toByte))
+      checkpointStore.persist(1L, Array.fill(fileSize)(0.toByte))
       fs.exists(tempFile) shouldBe false
       fs.exists(new Path(subDir, "checkpoints-0-1.store")) shouldBe true
 
@@ -80,6 +83,4 @@ class HadoopCheckpointStoreIntegrationSpec extends PropSpec 
with PropertyChecks
       fs.close()
     }
   }
-
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
 
b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
index 9e1890e..9b4057c 100644
--- 
a/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
+++ 
b/external/hadoopfs/src/test/scala/io/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,12 @@
 
 package io.gearpump.streaming.hadoop.lib.rotation
 
-import io.gearpump.TimeStamp
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
+import io.gearpump.TimeStamp
+
 class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers {
 
   val timestampGen = Gen.chooseNum[Long](0L, 1000L)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hbase/README.md
----------------------------------------------------------------------
diff --git a/external/hbase/README.md b/external/hbase/README.md
index d4b8cd0..0e55a5d 100644
--- a/external/hbase/README.md
+++ b/external/hbase/README.md
@@ -10,7 +10,7 @@ The message type that HBaseSink is able to handle including:
  2. Tuple4[Array[Byte], Array[Byte], Array[Byte], Array[Byte]] which means 
(rowKey, columnGroup, columnName, value)
  3. Sequence of type 1 and 2
   
-Suppose there is a DataSource Task will output above-mentitioned messages, you 
can write a simple application then:
+Suppose there is a DataSource Task will output above-mentioned messages, you 
can write a simple application then:
 
 ```scala
 val sink = new HBaseSink(UserConfig.empty, "$tableName")
@@ -25,7 +25,7 @@ val application = StreamApplication("HBase", 
Graph(computation), UserConfig.empt
 The HBase cluster should run on where Gearpump is deployed.
 Suppose HBase is installed at ```/usr/lib/hbase``` on every node and you 
already have your application built into a jar file. 
 Then before submitting the application, you need to add HBase lib folder and 
conf folder into ```gearpump.executor.extraClasspath``` in 
```conf/gear.conf```, for example 
```/usr/lib/hbase/lib/*:/usr/lib/hbase/conf```. 
-Please note only client side's configuration change is needed. After that, you 
are able to submmit the application.
+Please note only client side's configuration change is needed. After that, you 
are able to submit the application.
  
 ## Working with Secured HBASE
 
@@ -49,4 +49,3 @@ val application = StreamApplication("HBase", 
Graph(computation), UserConfig.empt
 
 Note here the keytab file set into config should be a byte array.
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala 
b/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala
index 3503b7f..a477ba6 100644
--- a/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala
+++ b/external/hbase/src/main/scala/io/gearpump/external/hbase/HBaseSink.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,18 +19,21 @@ package io.gearpump.external.hbase
 
 import java.io.{File, ObjectInputStream, ObjectOutputStream}
 
-import io.gearpump.Message
-import io.gearpump.cluster.UserConfig
-import io.gearpump.streaming.sink.DataSink
-import io.gearpump.streaming.task.TaskContext
-import io.gearpump.util.{FileUtils, Constants}
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.client.{ConnectionFactory, Connection, Put}
+import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
 import org.apache.hadoop.security.UserGroupInformation
 
-class HBaseSink(userconfig: UserConfig, tableName: String, @transient var 
configuration: Configuration) extends DataSink{
+import io.gearpump.Message
+import io.gearpump.cluster.UserConfig
+import io.gearpump.streaming.sink.DataSink
+import io.gearpump.streaming.task.TaskContext
+import io.gearpump.util.{Constants, FileUtils}
+
+class HBaseSink(
+    userconfig: UserConfig, tableName: String, @transient var configuration: 
Configuration)
+  extends DataSink{
   lazy val connection = HBaseSink.getConnection(userconfig, configuration)
   lazy val table = connection.getTable(TableName.valueOf(tableName))
 
@@ -41,10 +44,13 @@ class HBaseSink(userconfig: UserConfig, tableName: String, 
@transient var config
   }
 
   def insert(rowKey: String, columnGroup: String, columnName: String, value: 
String): Unit = {
-    insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup), 
Bytes.toBytes(columnName), Bytes.toBytes(value))
+    insert(Bytes.toBytes(rowKey), Bytes.toBytes(columnGroup),
+      Bytes.toBytes(columnName), Bytes.toBytes(value))
   }
 
-  def insert(rowKey: Array[Byte], columnGroup: Array[Byte], columnName: 
Array[Byte], value: Array[Byte]): Unit = {
+  def insert(
+      rowKey: Array[Byte], columnGroup: Array[Byte], columnName: Array[Byte], 
value: Array[Byte])
+    : Unit = {
     val put = new Put(rowKey)
     put.addColumn(columnGroup, columnName, value)
     table.put(put)
@@ -54,10 +60,20 @@ class HBaseSink(userconfig: UserConfig, tableName: String, 
@transient var config
     msg match {
       case seq: Seq[Any] =>
         seq.foreach(put)
-      case tuple: (String, String, String, String) =>
-        insert(tuple._1, tuple._2, tuple._3, tuple._4)
-      case tuple: (Array[Byte], Array[Byte], Array[Byte], Array[Byte]) =>
-        insert(tuple._1, tuple._2, tuple._3, tuple._4)
+      case tuple: (_, _, _, _) => {
+        tuple._1 match {
+          case str: String => {
+            insert(tuple._1.asInstanceOf[String], 
tuple._2.asInstanceOf[String],
+              tuple._3.asInstanceOf[String], tuple._4.asInstanceOf[String])
+          }
+          case byteArray: Array[Byte@unchecked] => {
+            insert(tuple._1.asInstanceOf[Array[Byte]], 
tuple._2.asInstanceOf[Array[Byte]],
+              tuple._3.asInstanceOf[Array[Byte]], 
tuple._4.asInstanceOf[Array[Byte]])
+          }
+          case _ =>
+          // Skip
+        }
+      }
     }
   }
 
@@ -93,17 +109,19 @@ object HBaseSink {
     new HBaseSink(userconfig, tableName)
   }
 
-  def apply[T](userconfig: UserConfig, tableName: String, configuration: 
Configuration): HBaseSink = {
+  def apply[T](userconfig: UserConfig, tableName: String, configuration: 
Configuration)
+    : HBaseSink = {
     new HBaseSink(userconfig, tableName, configuration)
   }
 
   private def getConnection(userConfig: UserConfig, configuration: 
Configuration): Connection = {
-    if(UserGroupInformation.isSecurityEnabled) {
+    if (UserGroupInformation.isSecurityEnabled) {
       val principal = 
userConfig.getString(Constants.GEARPUMP_KERBEROS_PRINCIPAL)
       val keytabContent = userConfig.getBytes(Constants.GEARPUMP_KEYTAB_FILE)
-      if(principal.isEmpty || keytabContent.isEmpty) {
+      if (principal.isEmpty || keytabContent.isEmpty) {
         val errorMsg = s"HBase is security enabled, user should provide 
kerberos principal in " +
-          s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} and keytab file in 
${Constants.GEARPUMP_KEYTAB_FILE}"
+          s"${Constants.GEARPUMP_KERBEROS_PRINCIPAL} and keytab file " +
+          s"in ${Constants.GEARPUMP_KEYTAB_FILE}"
         throw new Exception(errorMsg)
       }
       val keytabFile = File.createTempFile("login", ".keytab")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala
 
b/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala
index 8d4746e..11a20dc 100644
--- 
a/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala
+++ 
b/external/hbase/src/main/scala/io/gearpump/external/hbase/dsl/HBaseDSLSink.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,18 +17,25 @@
  */
 package io.gearpump.external.hbase.dsl
 
+import scala.language.implicitConversions
+
 import org.apache.hadoop.conf.Configuration
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.external.hbase.HBaseSink
 import io.gearpump.streaming.dsl.Stream
-import Stream.Sink
+import io.gearpump.streaming.dsl.Stream.Sink
 
+/** Create a HBase DSL Sink */
 class HBaseDSLSink[T](stream: Stream[T]) {
-  def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, 
description: String): Stream[T] = {
+
+  def writeToHbase(userConfig: UserConfig, table: String, parallism: Int, 
description: String)
+    : Stream[T] = {
     stream.sink(HBaseSink[T](userConfig, table), parallism, userConfig, 
description)
   }
 
-  def writeToHbase(userConfig: UserConfig, configuration: Configuration, 
table: String, parallism: Int, description: String): Stream[T] = {
+  def writeToHbase(userConfig: UserConfig, configuration: Configuration, 
table: String,
+      parallism: Int, description: String): Stream[T] = {
     stream.sink(HBaseSink[T](userConfig, table, configuration), parallism, 
userConfig, description)
   }
 }
@@ -37,4 +44,4 @@ object HBaseDSLSink {
   implicit def streamToHBaseDSLSink[T](stream: Stream[T]): HBaseDSLSink[T] = {
     new HBaseDSLSink[T](stream)
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala 
b/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala
index a0fec76..cad6581 100644
--- 
a/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala
+++ 
b/external/hbase/src/test/scala/io/gearpump/external/hbase/HBaseSinkSpec.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,29 +17,25 @@
  */
 package io.gearpump.external.hbase
 
-import org.apache.hadoop.hbase.client.{HTable, Put}
-import org.apache.hadoop.hbase.util.Bytes
-import org.mockito.Mockito
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
 class HBaseSinkSpec extends PropSpec with PropertyChecks with Matchers {
 
-
   property("HBaseSink should insert a row successfully") {
-    /*
-    import Mockito._
-    val htable = Mockito.mock(classOf[HTable])
-    val row = "row"
-    val group = "group"
-    val name = "name"
-    val value = "1.2"
-    val put = new Put(Bytes.toBytes(row))
-    put.add(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value))
-    val hbaseSink = HBaseSink(htable)
-    hbaseSink.insert(put)
-    verify(htable).put(put)
-    */
+
+  //  import Mockito._
+  //  val htable = Mockito.mock(classOf[HTable])
+  //  val row = "row"
+  //  val group = "group"
+  //  val name = "name"
+  //  val value = "1.2"
+  //  val put = new Put(Bytes.toBytes(row))
+  //  put.add(Bytes.toBytes(group), Bytes.toBytes(name), Bytes.toBytes(value))
+  //  val hbaseSink = HBaseSink(htable)
+  //  hbaseSink.insert(put)
+  //  verify(htable).put(put)
+
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala 
b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala
index f947960..b482c7c 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSink.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,12 +20,13 @@ package io.gearpump.streaming.kafka
 
 import java.util.Properties
 
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+
+import io.gearpump.Message
 import io.gearpump.streaming.kafka.lib.KafkaUtil
 import io.gearpump.streaming.sink.DataSink
 import io.gearpump.streaming.task.TaskContext
-import io.gearpump.Message
-import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.serialization.ByteArraySerializer
 
 /**
  * kafka sink connectors that invokes 
org.apache.kafka.clients.producer.KafkaProducer to send
@@ -33,7 +34,8 @@ import 
org.apache.kafka.common.serialization.ByteArraySerializer
  * @param getProducer is a function to construct a KafkaProducer
  * @param topic is the kafka topic to write to
  */
-class KafkaSink private[kafka](getProducer: () => KafkaProducer[Array[Byte], 
Array[Byte]], topic: String) extends DataSink {
+class KafkaSink private[kafka](
+    getProducer: () => KafkaProducer[Array[Byte], Array[Byte]], topic: String) 
extends DataSink {
 
   /**
    * @param topic producer topic
@@ -55,7 +57,7 @@ class KafkaSink private[kafka](getProducer: () => 
KafkaProducer[Array[Byte], Arr
     this(topic, KafkaUtil.buildProducerConfig(bootstrapServers))
   }
 
-  // lazily construct producer since KafkaProducer is not serializable
+  // Lazily construct producer since KafkaProducer is not serializable
   private lazy val producer = getProducer()
 
   override def open(context: TaskContext): Unit = {}
@@ -63,7 +65,8 @@ class KafkaSink private[kafka](getProducer: () => 
KafkaProducer[Array[Byte], Arr
   override def write(message: Message): Unit = {
     val record = message.msg match {
       case (k, v) =>
-        new ProducerRecord[Array[Byte], Array[Byte]](topic, 
k.asInstanceOf[Array[Byte]], v.asInstanceOf[Array[Byte]])
+        new ProducerRecord[Array[Byte], Array[Byte]](topic, 
k.asInstanceOf[Array[Byte]],
+          v.asInstanceOf[Array[Byte]])
       case v =>
         new ProducerRecord[Array[Byte], Array[Byte]](topic, 
v.asInstanceOf[Array[Byte]])
     }

Reply via email to