http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/Context.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Context.scala 
b/core/src/main/scala/io/gearpump/transport/netty/Context.scala
index fac190e..9a9ee29 100644
--- a/core/src/main/scala/io/gearpump/transport/netty/Context.scala
+++ b/core/src/main/scala/io/gearpump/transport/netty/Context.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -20,30 +20,26 @@ package io.gearpump.transport.netty
 
 import java.io.Closeable
 import java.util.concurrent._
+import scala.collection.JavaConverters._
 
 import akka.actor.{ActorRef, ActorSystem, Props}
 import com.typesafe.config.Config
-import io.gearpump.transport.netty.Server.ServerPipelineFactory
-import io.gearpump.transport.{ActorLookupById, HostPort}
-import io.gearpump.util.{Constants, LogUtil}
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
 import org.slf4j.Logger
 
-import scala.collection.JavaConversions._
-import scala.language.implicitConversions
+import io.gearpump.transport.netty.Server.ServerPipelineFactory
+import io.gearpump.transport.{ActorLookupById, HostPort}
+import io.gearpump.util.{Constants, LogUtil}
 
 object Context {
   private final val LOG: Logger = LogUtil.getLogger(getClass)
-
-  implicit def toCloseable(fun : () => Any)  = new Closeable {
-    override def close = fun()
-  }
 }
 
-class Context(system : ActorSystem, conf: NettyConfig) extends IContext {
-import io.gearpump.transport.netty.Context._
+/** Netty Context */
+class Context(system: ActorSystem, conf: NettyConfig) extends IContext {
+  import io.gearpump.transport.netty.Context._
 
-  def this(system : ActorSystem, conf : Config) {
+  def this(system: ActorSystem, conf: Config) {
     this(system, new NettyConfig(conf))
   }
 
@@ -54,52 +50,64 @@ import io.gearpump.transport.netty.Context._
   private lazy val clientChannelFactory: NioClientSocketChannelFactory = {
     val bossFactory: ThreadFactory = new NettyRenameThreadFactory("client" + 
"-boss")
     val workerFactory: ThreadFactory = new NettyRenameThreadFactory("client" + 
"-worker")
-    val channelFactory = new 
NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), 
Executors.newCachedThreadPool(workerFactory), maxWorkers)
-    closeHandler.add { ()=>
-
-      LOG.info("Closing all client resources....")
-      channelFactory.releaseExternalResources
-    }
+    val channelFactory =
+      new NioClientSocketChannelFactory(
+        Executors.newCachedThreadPool(bossFactory),
+        Executors.newCachedThreadPool(workerFactory), maxWorkers)
+
+    closeHandler.add(new Closeable {
+      override def close(): Unit = {
+        LOG.info("Closing all client resources....")
+        channelFactory.releaseExternalResources
+      }
+    })
     channelFactory
   }
 
-
-  def bind(name: String, lookupActor : ActorLookupById, deserializeFlag : 
Boolean = true, inputPort: Int = 0): Int = {
-    //TODO: whether we should expose it as application config?
-    val server = system.actorOf(Props(classOf[Server], name, conf, 
lookupActor, deserializeFlag).withDispatcher(nettyDispatcher), name)
+  def bind(
+      name: String, lookupActor : ActorLookupById, deserializeFlag : Boolean = 
true,
+      inputPort: Int = 0): Int = {
+    // TODO: whether we should expose it as application config?
+    val server = system.actorOf(Props(classOf[Server], name, conf, lookupActor,
+      deserializeFlag).withDispatcher(nettyDispatcher), name)
     val (port, channel) = NettyUtil.newNettyServer(name,
       new ServerPipelineFactory(server, conf), 5242880, inputPort)
     val factory = channel.getFactory
-    closeHandler.add{ () =>
+    closeHandler.add(new Closeable {
+      override def close(): Unit = {
         system.stop(server)
         channel.close()
-
         LOG.info("Closing all server resources....")
         factory.releaseExternalResources
       }
+    })
     port
   }
 
-  def connect(hostPort : HostPort) : ActorRef = {
-    val client = system.actorOf(Props(classOf[Client], conf, 
clientChannelFactory, hostPort).withDispatcher(nettyDispatcher))
-    closeHandler.add { () =>
+  def connect(hostPort: HostPort): ActorRef = {
+    val client = system.actorOf(Props(classOf[Client], conf, 
clientChannelFactory, hostPort)
+      .withDispatcher(nettyDispatcher))
+    closeHandler.add(new Closeable {
+      override def close(): Unit = {
+        LOG.info("closing Client actor....")
+        system.stop(client)
+      }
+    })
 
-      LOG.info("closing Client actor....")
-      system.stop(client)
-    }
     client
   }
 
   /**
    * terminate this context
    */
-  def close {
+  def close(): Unit = {
 
-    LOG.info(s"Context.term, cleanup resources...., we have 
${closeHandler.size()} items to close...")
+    LOG.info(s"Context.term, cleanup resources...., " +
+      s"we have ${closeHandler.size()} items to close...")
 
-    // clean up resource in reverse order so that client actor can be cleaned
+    // Cleans up resource in reverse order so that client actor can be cleaned
     // before clientChannelFactory
-    closeHandler.iterator().toArray.reverse.foreach(_.close())
+    closeHandler.iterator().asScala.toList.reverse.foreach(_.close())
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/IContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/IContext.scala 
b/core/src/main/scala/io/gearpump/transport/netty/IContext.scala
index e1a5141..56b2f7c 100644
--- a/core/src/main/scala/io/gearpump/transport/netty/IContext.scala
+++ b/core/src/main/scala/io/gearpump/transport/netty/IContext.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,24 +19,23 @@
 package io.gearpump.transport.netty
 
 import akka.actor.ActorRef
-import io.gearpump.transport.{HostPort, ActorLookupById}
+
 import io.gearpump.transport.{ActorLookupById, HostPort}
 
 trait IContext {
 
   /**
-   * TODO: remove deserializeFlag from interface
+   * Create a Netty server connection.
    */
-  def bind(name: String, lookupActor : ActorLookupById, deserializeFlag : 
Boolean, port: Int) : Int
+  def bind(name: String, lookupActor: ActorLookupById, deserializeFlag: 
Boolean, port: Int): Int
 
   /**
-   * connect to a remote host
-   * return a ActorRef which you can send message TaskMessage to..
+   * Create a Netty client actor
    */
   def connect(hostPort: HostPort): ActorRef
 
   /**
    * Close resource for this context
    */
-  def close
+  def close()
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala 
b/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala
index ab76d9f..a62eff5 100644
--- a/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala
+++ b/core/src/main/scala/io/gearpump/transport/netty/NettyConfig.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,6 +19,7 @@
 package io.gearpump.transport.netty
 
 import com.typesafe.config.Config
+
 import io.gearpump.util.Constants
 
 class NettyConfig(conf: Config) {
@@ -30,6 +31,9 @@ class NettyConfig(conf: Config) {
   val messageBatchSize = conf.getInt(Constants.NETTY_MESSAGE_BATCH_SIZE)
   val flushCheckInterval = conf.getInt(Constants.NETTY_FLUSH_CHECK_INTERVAL)
 
-  def newTransportSerializer = 
Class.forName(conf.getString(Constants.GEARPUMP_TRANSPORT_SERIALIZER)).
-    newInstance().asInstanceOf[ITransportMessageSerializer]
+  def newTransportSerializer: ITransportMessageSerializer = {
+    Class.forName(
+      conf.getString(Constants.GEARPUMP_TRANSPORT_SERIALIZER))
+      .newInstance().asInstanceOf[ITransportMessageSerializer]
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala 
b/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala
index 40dc9f0..3e746af 100644
--- a/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala
+++ b/core/src/main/scala/io/gearpump/transport/netty/NettyUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -27,18 +27,25 @@ import org.jboss.netty.channel.{Channel, ChannelFactory, 
ChannelPipelineFactory}
 
 object NettyUtil {
 
-  def newNettyServer(name: String, pipelineFactory: ChannelPipelineFactory, 
buffer_size: Int, inputPort: Int = 0): (Int, Channel) = {
+  def newNettyServer(
+      name: String,
+      pipelineFactory: ChannelPipelineFactory,
+      buffer_size: Int,
+      inputPort: Int = 0): (Int, Channel) = {
     val bossFactory: ThreadFactory = new NettyRenameThreadFactory(name + 
"-boss")
     val workerFactory: ThreadFactory = new NettyRenameThreadFactory(name + 
"-worker")
-    val factory = new 
NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), 
Executors.newCachedThreadPool(workerFactory), 1)
+    val factory = new 
NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+      Executors.newCachedThreadPool(workerFactory), 1)
 
     val bootstrap = createServerBootStrap(factory, pipelineFactory, 
buffer_size)
     val channel: Channel = bootstrap.bind(new InetSocketAddress(inputPort))
-    val port = 
channel.getLocalAddress().asInstanceOf[InetSocketAddress].getPort();
+    val port = 
channel.getLocalAddress().asInstanceOf[InetSocketAddress].getPort()
     (port, channel)
   }
 
-  def createServerBootStrap(factory: ChannelFactory, pipelineFactory: 
ChannelPipelineFactory, buffer_size: Int) = {
+  def createServerBootStrap(
+      factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, 
buffer_size: Int)
+    : ServerBootstrap = {
     val bootstrap = new ServerBootstrap(factory)
     bootstrap.setOption("child.tcpNoDelay", true)
     bootstrap.setOption("child.receiveBufferSize", buffer_size)
@@ -47,7 +54,9 @@ object NettyUtil {
     bootstrap
   }
 
-  def createClientBootStrap(factory: ChannelFactory, pipelineFactory: 
ChannelPipelineFactory, buffer_size: Int) = {
+  def createClientBootStrap(
+      factory: ChannelFactory, pipelineFactory: ChannelPipelineFactory, 
buffer_size: Int)
+    : ClientBootstrap = {
     val bootstrap = new ClientBootstrap(factory)
     bootstrap.setOption("tcpNoDelay", true)
     bootstrap.setOption("sendBufferSize", buffer_size)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/transport/netty/Server.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/transport/netty/Server.scala 
b/core/src/main/scala/io/gearpump/transport/netty/Server.scala
index dde0861..9a9d79b 100644
--- a/core/src/main/scala/io/gearpump/transport/netty/Server.scala
+++ b/core/src/main/scala/io/gearpump/transport/netty/Server.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,32 +19,36 @@
 package io.gearpump.transport.netty
 
 import java.util
+import scala.collection.JavaConverters._
+import scala.collection.immutable.IntMap
+import scala.concurrent.Future
 
 import akka.actor.{Actor, ActorContext, ActorRef, ExtendedActorSystem}
-import io.gearpump.transport.ActorLookupById
-import io.gearpump.util.{LogUtil, AkkaHelper}
 import org.jboss.netty.channel._
 import org.jboss.netty.channel.group.{ChannelGroup, DefaultChannelGroup}
 import org.slf4j.Logger
 
-import scala.collection.JavaConversions._
-import scala.collection.immutable.{IntMap, LongMap}
-import scala.concurrent.Future
+import io.gearpump.transport.ActorLookupById
+import io.gearpump.util.{AkkaHelper, LogUtil}
 
-class Server(name: String, conf: NettyConfig, lookupActor : ActorLookupById, 
deserializeFlag : Boolean) extends Actor {
-  private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = 
name)
+/** Netty server actor, message received will be forward to the target on the 
address line. */
+class Server(
+    name: String, conf: NettyConfig, lookupActor: ActorLookupById, 
deserializeFlag: Boolean)
+  extends Actor {
 
+  private[netty] final val LOG: Logger = LogUtil.getLogger(getClass, context = 
name)
   import io.gearpump.transport.netty.Server._
 
   val allChannels: ChannelGroup = new DefaultChannelGroup("gearpump-server")
 
   val system = context.system.asInstanceOf[ExtendedActorSystem]
 
-  def receive = msgHandler orElse channelManager
-  //As we will only transfer TaskId on the wire, this object will translate 
taskId to or from ActorRef
+  def receive: Receive = msgHandler orElse channelManager
+  // As we will only transfer TaskId on the wire,
+  // this object will translate taskId to or from ActorRef
   private val taskIdActorRefTranslation = new 
TaskIdActorRefTranslation(context)
 
-  def channelManager : Receive = {
+  def channelManager: Receive = {
     case AddChannel(channel) => allChannels.add(channel)
     case CloseChannel(channel) =>
       import context.dispatcher
@@ -54,9 +58,9 @@ class Server(name: String, conf: NettyConfig, lookupActor : 
ActorLookupById, des
       }
   }
 
-  def msgHandler : Receive = {
+  def msgHandler: Receive = {
     case MsgBatch(msgs) =>
-      msgs.groupBy(_.targetTask()).foreach { taskBatch =>
+      msgs.asScala.groupBy(_.targetTask()).foreach { taskBatch =>
         val (taskId, taskMessages) = taskBatch
         val actor = lookupActor.lookupLocalActor(taskId)
 
@@ -69,18 +73,13 @@ class Server(name: String, conf: NettyConfig, lookupActor : 
ActorLookupById, des
       }
   }
 
-  override def postStop() = {
+  override def postStop(): Unit = {
     allChannels.close.awaitUninterruptibly
   }
 }
 
 object Server {
 
-  // Create a 1-1 mapping fake ActorRef for task
-  // The path is fake, don't use the ActorRef directly.
-  // As we must use actorFor() which is deprecated,
-  // according to the advice https://issues.scala-lang.org/browse/SI-7934,
-  // use a helper object to bypass this deprecation warning.
   class ServerPipelineFactory(server: ActorRef, conf: NettyConfig) extends 
ChannelPipelineFactory {
     def getPipeline: ChannelPipeline = {
       val pipeline: ChannelPipeline = Channels.pipeline
@@ -114,8 +113,9 @@ object Server {
   class TaskIdActorRefTranslation(context: ActorContext) {
     private var taskIdtoActorRef = IntMap.empty[ActorRef]
 
-    def translateToActorRef(sessionId : Int): ActorRef = {
-      if(!taskIdtoActorRef.contains(sessionId)){
+    /** 1-1 mapping from session id to fake ActorRef */
+    def translateToActorRef(sessionId: Int): ActorRef = {
+      if (!taskIdtoActorRef.contains(sessionId)) {
 
         // A fake ActorRef for performance optimization.
         val actorRef = AkkaHelper.actorFor(context.system, 
s"/session#$sessionId")
@@ -123,13 +123,12 @@ object Server {
       }
       taskIdtoActorRef.get(sessionId).get
     }
-
   }
 
   case class AddChannel(channel: Channel)
 
   case class CloseChannel(channel: Channel)
 
-  case class MsgBatch(messages: Iterable[TaskMessage])
+  case class MsgBatch(messages: java.lang.Iterable[TaskMessage])
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala 
b/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala
index 71fda90..25a34d9 100644
--- a/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala
+++ b/core/src/main/scala/io/gearpump/util/ActorSystemBooter.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,16 +18,17 @@
 
 package io.gearpump.util
 
-import java.util.concurrent.{TimeoutException, TimeUnit}
+import java.util.concurrent.{TimeUnit, TimeoutException}
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success, Try}
 
 import akka.actor._
 import com.typesafe.config.Config
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.util.LogUtil.ProcessType
 import org.slf4j.Logger
 
-import scala.concurrent.duration.Duration
-import scala.util.{Failure, Success, Try}
+import io.gearpump.cluster.ClusterConfig
+import io.gearpump.util.LogUtil.ProcessType
 
 /**
  * ActorSystemBooter start a new JVM process to boot an actor system.
@@ -35,23 +36,22 @@ import scala.util.{Failure, Success, Try}
  *
  * It send the system address to "report back actor"
  */
-
-class ActorSystemBooter(config : Config) {
+class ActorSystemBooter(config: Config) {
   import io.gearpump.util.ActorSystemBooter._
 
-  def boot(name : String, reportBackActor : String) : ActorSystem = {
+  def boot(name: String, reportBackActor: String): ActorSystem = {
     val system = ActorSystem(name, config)
-    // daemon path: http://{system}@{ip}:{port}/daemon
+    // Daemon path: http://{system}@{ip}:{port}/daemon
     system.actorOf(Props(classOf[Daemon], name, reportBackActor), "daemon")
     system
   }
 }
 
-object ActorSystemBooter  {
+object ActorSystemBooter {
 
-  def apply(config : Config) : ActorSystemBooter = new 
ActorSystemBooter(config)
+  def apply(config: Config): ActorSystemBooter = new ActorSystemBooter(config)
 
-  def main (args: Array[String]) {
+  def main(args: Array[String]) {
     val name = args(0)
     val reportBack = args(1)
     val config = ClusterConfig.default()
@@ -59,7 +59,7 @@ object ActorSystemBooter  {
     LogUtil.loadConfiguration(config, ProcessType.APPLICATION)
 
     val debugPort = 
Option(System.getProperty(Constants.GEARPUMP_REMOTE_DEBUG_PORT))
-    debugPort.foreach{ port =>
+    debugPort.foreach { port =>
       val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass)
       LOG.info("==========================================")
       LOG.info("Remote debug port: " + port)
@@ -69,22 +69,23 @@ object ActorSystemBooter  {
     val system = apply(config).boot(name, reportBack)
 
     Runtime.getRuntime().addShutdownHook(new Thread() {
-      override def run() : Unit = {
+      override def run(): Unit = {
         val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass)
-        LOG.info("Maybe we have received a SIGINT signal from parent process, 
start to cleanup resources....")
-        system.shutdown()
+        LOG.info("Maybe we have received a SIGINT signal from parent process, 
" +
+          "start to cleanup resources....")
+        system.terminate()
       }
-    });
+    })
 
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
   }
 
-  case class BindLifeCycle(actor : ActorRef)
-  case class CreateActor(prop : Props, name : String)
-  case class ActorCreated(actor : ActorRef, name : String)
-  case class CreateActorFailed(name : String, reason: Throwable)
+  case class BindLifeCycle(actor: ActorRef)
+  case class CreateActor(prop: Props, name: String)
+  case class ActorCreated(actor: ActorRef, name: String)
+  case class CreateActorFailed(name: String, reason: Throwable)
 
-  case class RegisterActorSystem(systemPath : String)
+  case class RegisterActorSystem(systemPath: String)
 
   /**
    * This actor system will watch for parent,
@@ -95,23 +96,24 @@ object ActorSystemBooter  {
 
   object RegisterActorSystemTimeOut
 
-  class Daemon(val name : String, reportBack : String) extends Actor {
+  class Daemon(val name: String, reportBack: String) extends Actor {
     val LOG: Logger = LogUtil.getLogger(getClass, context = name)
 
     val username = 
Option(System.getProperty(Constants.GEARPUMP_USERNAME)).getOrElse("not_defined")
     LOG.info(s"RegisterActorSystem to ${reportBack}, current user: $username")
-    
+
     val reportBackActor = context.actorSelection(reportBack)
     reportBackActor ! 
RegisterActorSystem(ActorUtil.getSystemAddress(context.system).toString)
 
     implicit val executionContext = context.dispatcher
-    val timeout = context.system.scheduler.scheduleOnce(Duration(25, 
TimeUnit.SECONDS), self, RegisterActorSystemFailed(new TimeoutException))
+    val timeout = context.system.scheduler.scheduleOnce(Duration(25, 
TimeUnit.SECONDS),
+      self, RegisterActorSystemFailed(new TimeoutException))
 
     context.become(waitForRegisterResult)
 
-    def receive : Receive = null
+    def receive: Receive = null
 
-    def waitForRegisterResult : Receive = {
+    def waitForRegisterResult: Receive = {
       case ActorSystemRegistered(parent) =>
         timeout.cancel()
         context.watch(parent)
@@ -122,11 +124,11 @@ object ActorSystemBooter  {
         context.stop(self)
     }
 
-    def waitCommand : Receive = {
+    def waitCommand: Receive = {
       case BindLifeCycle(actor) =>
         LOG.info(s"ActorSystem $name Binding life cycle with actor: $actor")
         context.watch(actor)
-      case create @ CreateActor(props : Props, name : String) =>
+      case create@CreateActor(props: Props, name: String) =>
         LOG.info(s"creating actor $name")
         val actor = Try(context.actorOf(props, name))
         actor match {
@@ -142,9 +144,9 @@ object ActorSystemBooter  {
         context.stop(self)
     }
 
-    override def postStop : Unit = {
+    override def postStop(): Unit = {
       LOG.info(s"ActorSystem $name is shutting down...")
-      context.system.shutdown()
+      context.system.terminate()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala 
b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
index cc701d8..d5f48a7 100644
--- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,43 +18,44 @@
 
 package io.gearpump.util
 
+import scala.concurrent.{ExecutionContext, Future}
+
 import akka.actor.Actor.Receive
 import akka.actor._
 import akka.pattern.ask
-import io.gearpump.WorkerId
+import org.slf4j.Logger
+
 import io.gearpump.cluster.AppMasterToMaster.GetAllWorkers
 import io.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId}
 import io.gearpump.cluster.MasterToAppMaster.WorkerList
 import io.gearpump.cluster.MasterToClient.{ResolveAppIdResult, 
ResolveWorkerIdResult}
 import 
io.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig, 
StartExecutorSystems}
 import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.transport.HostPort
-import org.slf4j.Logger
-
-import scala.concurrent.{ExecutionContext, Future}
 
 object ActorUtil {
-   private val LOG: Logger = LogUtil.getLogger(getClass)
+  private val LOG: Logger = LogUtil.getLogger(getClass)
 
-  def getSystemAddress(system : ActorSystem) : Address = {
+  def getSystemAddress(system: ActorSystem): Address = {
     system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
   }
 
-  def getFullPath(system : ActorSystem, path : ActorPath): String = {
+  def getFullPath(system: ActorSystem, path: ActorPath): String = {
     path.toStringWithAddress(getSystemAddress(system))
   }
 
-  def getHostname(actor : ActorRef): String = {
+  def getHostname(actor: ActorRef): String = {
     val path = actor.path
     path.address.host.getOrElse("localhost")
   }
 
-  def defaultMsgHandler(actor : ActorRef) : Receive = {
-    case msg : Any =>
-    LOG.error(s"Cannot find a matching message, ${msg.getClass.toString}, 
forwarded from $actor")
+  def defaultMsgHandler(actor: ActorRef): Receive = {
+    case msg: Any =>
+      LOG.error(s"Cannot find a matching message, ${msg.getClass.toString}, 
forwarded from $actor")
   }
 
-  def printActorSystemTree(system : ActorSystem) : Unit = {
+  def printActorSystemTree(system: ActorSystem): Unit = {
     val extendedSystem = system.asInstanceOf[ExtendedActorSystem]
     val clazz = system.getClass
     val m = clazz.getDeclaredMethod("printTree")
@@ -62,9 +63,9 @@ object ActorUtil {
     LOG.info(m.invoke(system).asInstanceOf[String])
   }
 
-  // Check whether a actor is child actor by simply examining name
-  //TODO: fix this, we should also check the path to root besides name
-  def isChildActorPath(parent : ActorRef, child : ActorRef) : Boolean = {
+  /** Checks whether a actor is child actor by simply examining name */
+  // TODO: fix this, we should also check the path to root besides name
+  def isChildActorPath(parent: ActorRef, child: ActorRef): Boolean = {
     if (null != child) {
       parent.path.name == child.path.parent.name
     } else {
@@ -72,21 +73,18 @@ object ActorUtil {
     }
   }
 
-  def actorNameForExecutor(appId : Int, executorId : Int) = "app" + appId + 
"-executor" + executorId
+  def actorNameForExecutor(appId: Int, executorId: Int): String = "app" + 
appId + "-executor" +
+    executorId
 
-  /**
-   * TODO:
-   * Currently we explicitly require the master contacts to be started with 
this path pattern
-   * 'akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER'
-   *
-   */
+  // TODO: Currently we explicitly require the master contacts to be started 
with this path pattern
+  // akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER
   def getMasterActorPath(master: HostPort): ActorPath = {
-    import Constants.MASTER
+    import io.gearpump.util.Constants.MASTER
     
ActorPath.fromString(s"akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER")
   }
 
   def launchExecutorOnEachWorker(master: ActorRef, executorJvmConfig: 
ExecutorSystemJvmConfig,
-    sender: ActorRef)(implicit executor : scala.concurrent.ExecutionContext) = 
{
+      sender: ActorRef)(implicit executor: scala.concurrent.ExecutionContext): 
Unit = {
     implicit val timeout = Constants.FUTURE_TIMEOUT
 
     (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]].map { list =>
@@ -98,10 +96,10 @@ object ActorUtil {
     }
   }
 
-
-  def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: 
ExecutionContext): Future[T] = {
+  def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: 
ExecutionContext)
+    : Future[T] = {
     implicit val timeout = Constants.FUTURE_TIMEOUT
-    val appmaster =  askActor[ResolveAppIdResult](master, 
ResolveAppId(appId)).flatMap { result =>
+    val appmaster = askActor[ResolveAppIdResult](master, 
ResolveAppId(appId)).flatMap { result =>
       if (result.appMaster.isSuccess) {
         Future.successful(result.appMaster.get)
       } else {
@@ -111,15 +109,17 @@ object ActorUtil {
     appmaster.flatMap(askActor[T](_, msg))
   }
 
-  def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit 
ex: ExecutionContext): Future[T] = {
+  def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit 
ex: ExecutionContext)
+    : Future[T] = {
     implicit val timeout = Constants.FUTURE_TIMEOUT
-    val worker =  askActor[ResolveWorkerIdResult](master, 
ResolveWorkerId(workerId)).flatMap { result =>
-      if (result.worker.isSuccess) {
-        Future.successful(result.worker.get)
-      } else {
-        Future.failed(result.worker.failed.get)
+    val worker = askActor[ResolveWorkerIdResult](master, 
ResolveWorkerId(workerId))
+      .flatMap { result =>
+        if (result.worker.isSuccess) {
+          Future.successful(result.worker.get)
+        } else {
+          Future.failed(result.worker.failed.get)
+        }
       }
-    }
     worker.flatMap(askActor[T](_, msg))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/AkkaApp.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/AkkaApp.scala 
b/core/src/main/scala/io/gearpump/util/AkkaApp.scala
index aee02d5..2b0bf61 100644
--- a/core/src/main/scala/io/gearpump/util/AkkaApp.scala
+++ b/core/src/main/scala/io/gearpump/util/AkkaApp.scala
@@ -1,9 +1,26 @@
-package io.gearpump.util
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import io.gearpump.cluster.ClusterConfig
+package io.gearpump.util
 
 import scala.util.Try
 
+import io.gearpump.cluster.ClusterConfig
 
 /**
  * A Main class helper to load Akka configuration automatically.
@@ -14,7 +31,7 @@ trait AkkaApp {
 
   def main(akkaConf: Config, args: Array[String]): Unit
 
-  def help: Unit
+  def help(): Unit
 
   protected def akkaConfig: Config = {
     ClusterConfig.default()
@@ -23,6 +40,6 @@ trait AkkaApp {
   def main(args: Array[String]): Unit = {
     Try {
       main(akkaConfig, args)
-    }.failed.foreach{ex => help; throw ex}
+    }.failed.foreach { ex => help(); throw ex }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala 
b/core/src/main/scala/io/gearpump/util/Constants.scala
index a4fb545..65b7023 100644
--- a/core/src/main/scala/io/gearpump/util/Constants.scala
+++ b/core/src/main/scala/io/gearpump/util/Constants.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -44,7 +44,8 @@ object Constants {
   val GEARPUMP_TASK_DISPATCHER = "gearpump.task-dispatcher"
   val GEARPUMP_CLUSTER_MASTERS = "gearpump.cluster.masters"
   val GEARPUMP_MASTERCLIENT_TIMEOUT = "gearpump.masterclient.timeout"
-  val GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS = 
"gearpump.worker.executor-share-same-jvm-as-worker"
+  val GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS =
+    "gearpump.worker.executor-share-same-jvm-as-worker"
 
   val GEARPUMP_HOME = "gearpump.home"
   val GEARPUMP_FULL_SCALA_VERSION = 
"gearpump.binary-version-with-scala-version"
@@ -57,23 +58,24 @@ object Constants {
   val GEARPUMP_LOG_APPLICATION_DIR = "gearpump.log.application.dir"
   val HADOOP_CONF = "hadoopConf"
 
-
   // Id used to identity Master JVM process in low level resource manager like 
YARN.
   // In YARN, it means the container Id.
-  val GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID = 
"gearpump.master-resource-manager-container-id"
+  val GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID =
+    "gearpump.master-resource-manager-container-id"
 
   // Id used to identity Worker JVM process in low level resource manager like 
YARN.
   // In YARN, it means the container Id.
-  val GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID = 
"gearpump.worker-resource-manager-container-id"
+  val GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID =
+    "gearpump.worker-resource-manager-container-id"
 
   // true or false
   val GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM = "gearpump.remote-debug-executor-jvm"
   val GEARPUMP_REMOTE_DEBUG_PORT = "gearpump.remote-debug-port"
 
-  // whether turn on GC log, true or false
+  // Whether to turn on GC log, true or false
   val GEARPUMP_VERBOSE_GC = "gearpump.verbose-gc"
 
-  // the time out for Future, like ask.
+  // The time out for Future, like ask.
   // !Important! This global timeout setting will also impact the UI
   // responsive time if set to too big. Please make sure you have
   // enough justification to change this global setting, otherwise
@@ -101,15 +103,14 @@ object Constants {
   val GEARPUMP_APP_JAR = "gearpump.app.jar"
   val GEARPUMP_APP_NAME_PREFIX = "gearpump.app.name.prefix"
 
-  // where the jar is stored at. It can be a HDFS, or a local disk.
+  // Where the jar is stored at. It can be a HDFS, or a local disk.
   val GEARPUMP_APP_JAR_STORE_ROOT_PATH = "gearpump.jarstore.rootpath"
 
-  // Use java property -Dgearpump.config.file=xxx.conf to set customized 
configuration
+  // Uses java property -Dgearpump.config.file=xxx.conf to set customized 
configuration
   // Otherwise application.conf in classpath will be loaded
   val GEARPUMP_CUSTOM_CONFIG_FILE = "gearpump.config.file"
 
-
-  //Metrics related
+  // Metrics related
   val GEARPUMP_METRIC_ENABLED = "gearpump.metrics.enabled"
   val GEARPUMP_METRIC_SAMPLE_RATE = "gearpump.metrics.sample-rate"
   val GEARPUMP_METRIC_REPORT_INTERVAL = "gearpump.metrics.report-interval-ms"
@@ -117,13 +118,13 @@ object Constants {
   val GEARPUMP_METRIC_GRAPHITE_PORT = "gearpump.metrics.graphite.port"
   val GEARPUMP_METRIC_REPORTER = "gearpump.metrics.reporter"
 
-  // we will retain at max @RETAIN_HISTORY_HOURS history data
+  // Retains at max @RETAIN_HISTORY_HOURS history data
   val GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS = 
"gearpump.metrics.retainHistoryData.hours"
 
-  // time interval between two history data points.
+  // Time interval between two history data points.
   val GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS = 
"gearpump.metrics.retainHistoryData.intervalMs"
 
-  // we will retain at max @RETAIN_LATEST_SECONDS recent data points
+  // Retains at max @RETAIN_LATEST_SECONDS recent data points
   val GEARPUMP_RETAIN_RECENT_DATA_SECONDS = 
"gearpump.metrics.retainRecentData.seconds"
 
   // time interval between two recent data points.
@@ -133,13 +134,13 @@ object Constants {
   // and shutdown itself
   val GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT = 
"gearpump.resource-allocation-timeout-seconds"
 
-  //Service related
+  // Service related
   val GEARPUMP_SERVICE_HTTP = "gearpump.services.http"
   val GEARPUMP_SERVICE_HOST = "gearpump.services.host"
   val GEARPUMP_SERVICE_SUPERVISOR_PATH = 
"gearpump.services.supervisor-actor-path"
   val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = 
"gearpump.services.config-render-option-concise"
 
-  //The partitioners provided by Gearpump
+  // The partitioners provided by Gearpump
   val BUILTIN_PARTITIONERS = Array(
     classOf[BroadcastPartitioner],
     classOf[CoLocationPartitioner],
@@ -147,11 +148,10 @@ object Constants {
     classOf[ShuffleGroupingPartitioner],
     classOf[ShufflePartitioner])
 
-  //Security related
+  // Security related
   val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file"
   val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal"
 
-
   val GEARPUMP_METRICS_MAX_LIMIT = "gearpump.metrics.akka.max-limit-on-query"
   val GEARPUMP_METRICS_AGGREGATORS = 
"gearpump.metrics.akka.metrics-aggregator-class"
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/FileUtils.scala 
b/core/src/main/scala/io/gearpump/util/FileUtils.scala
index b0e8446..1561587 100644
--- a/core/src/main/scala/io/gearpump/util/FileUtils.scala
+++ b/core/src/main/scala/io/gearpump/util/FileUtils.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,9 +18,10 @@
 
 package io.gearpump.util
 
+import java.io.{File, IOException}
 import java.nio.charset.Charset
+
 import io.gearpump.google.common.io.Files
-import java.io.{IOException, File}
 
 object FileUtils {
   private val UTF8 = Charset.forName("UTF-8")
@@ -41,7 +42,7 @@ object FileUtils {
     Files.toByteArray(file)
   }
 
-  // recursively making all parent directories including itself
+  /** recursively making all parent directories including itself */
   def forceMkdir(directory: File): Unit = {
     if (directory.exists() && directory.isFile) {
       throw new IOException(s"Failed to create directory 
${directory.toString}, it already exist")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/Graph.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Graph.scala 
b/core/src/main/scala/io/gearpump/util/Graph.scala
index 6bff9da..8c34329 100644
--- a/core/src/main/scala/io/gearpump/util/Graph.scala
+++ b/core/src/main/scala/io/gearpump/util/Graph.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,14 +17,14 @@
  */
 
 package io.gearpump.util
+import scala.annotation.tailrec
 import scala.collection.mutable
 import scala.language.implicitConversions
 
 /**
  * Generic mutable Graph libraries.
- *
  */
-class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends 
Serializable{
+class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends 
Serializable {
 
   private val _vertices = mutable.Set.empty[N]
   private val _edges = mutable.Set.empty[(N, E, N)]
@@ -50,7 +50,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, 
N)]) extends Serial
    * Add a vertex
    * Current Graph is changed.
    */
-  def addVertex(vertex : N): Unit = {
+  def addVertex(vertex: N): Unit = {
     val result = _vertices.add(vertex)
     if (result) {
       _indexs += vertex -> nextId
@@ -73,14 +73,14 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, 
E, N)]) extends Serial
    * The result is stable
    */
   def vertices: List[N] = {
-    // sort the vertex so that we can keep the order for mapVertex
+    // Sorts the vertex so that we can keep the order for mapVertex
     _vertices.toList.sortBy(_indexs(_))
   }
 
   /**
    * out degree
    */
-  def outDegreeOf(node : N): Int = {
+  def outDegreeOf(node: N): Int = {
     edges.count(_._1 == node)
   }
 
@@ -94,7 +94,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, 
N)]) extends Serial
   /**
    * out going edges.
    */
-  def outgoingEdgesOf(node : N): List[(N, E, N)]  = {
+  def outgoingEdgesOf(node: N): List[(N, E, N)] = {
     edges.filter(_._1 == node)
   }
 
@@ -129,7 +129,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, 
E, N)]) extends Serial
    * add edge
    * Current Graph is changed.
    */
-  def addEdge(node1 : N, edge: E, node2: N): Unit = {
+  def addEdge(node1: N, edge: E, node2: N): Unit = {
     addVertex(node1)
     addVertex(node2)
     addEdge((node1, edge, node2))
@@ -155,7 +155,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, 
E, N)]) extends Serial
    * Current graph is not changed.
    */
   def mapEdge[NewEdge](fun: (N, E, N) => NewEdge): Graph[N, NewEdge] = {
-    val newEdges =  edges.map {edge =>
+    val newEdges = edges.map { edge =>
       (edge._1, fun(edge._1, edge._2, edge._3), edge._3)
     }
     new Graph(vertices, newEdges)
@@ -164,7 +164,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, 
E, N)]) extends Serial
   /**
    * edges connected to node
    */
-  def edgesOf(node : N): List[(N, E, N)] = {
+  def edgesOf(node: N): List[(N, E, N)] = {
     (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, 
N)].toList.sortBy(_indexs(_))
   }
 
@@ -179,9 +179,9 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, 
E, N)]) extends Serial
    * Add another graph
    * Current graph is changed.
    */
-  def addGraph(other : Graph[N, E]) : Graph[N, E] = {
+  def addGraph(other: Graph[N, E]): Graph[N, E] = {
     (vertices ++ other.vertices).foreach(addVertex(_))
-    (edges ++ other.edges).foreach(edge =>addEdge(edge._1, edge._2, edge._3))
+    (edges ++ other.edges).foreach(edge => addEdge(edge._1, edge._2, edge._3))
     this
   }
 
@@ -247,7 +247,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, 
E, N)]) extends Serial
     val newGraph = copy
     var output = List.empty[N]
 
-    while(!newGraph.isEmpty) {
+    while (!newGraph.isEmpty) {
       output ++= newGraph.removeZeroInDegree
     }
     output.iterator
@@ -255,6 +255,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, 
E, N)]) extends Serial
 
   /**
    * Return all circles in graph.
+   *
    * The reference of this algorithm is:
    * 
https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
    */
@@ -278,11 +279,13 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, 
E, N)]) extends Serial
         edge => {
           if (!indexMap.contains(edge._3)) {
             tarjan(edge._3)
-            if (lowLink.get(edge._3).get < lowLink.get(node).get)
+            if (lowLink.get(edge._3).get < lowLink.get(node).get) {
               lowLink(node) = lowLink(edge._3)
+            }
           } else {
-            if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < 
lowLink.get(node).get))
+            if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < 
lowLink.get(node).get)) {
               lowLink(node) = indexMap(edge._3)
+            }
           }
         }
       }
@@ -311,6 +314,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, 
E, N)]) extends Serial
   /**
    * Return an iterator of vertex in topological order of graph with circles
    * The node returned by Iterator is stable sorted.
+   *
    * The reference of this algorithm is:
    * http://www.drdobbs.com/database/topological-sorting/184410262
    */
@@ -345,10 +349,11 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, 
E, N)]) extends Serial
    * check whether there is a loop
    */
   def hasCycle(): Boolean = {
-    @annotation.tailrec def detectCycle(graph: Graph[N, E]): Boolean = {
-      if(graph.edges.isEmpty) {
+    @tailrec
+    def detectCycle(graph: Graph[N, E]): Boolean = {
+      if (graph.edges.isEmpty) {
         false
-      } else if(graph.vertices.nonEmpty && 
!graph.vertices.exists(graph.inDegreeOf(_) == 0)) {
+      } else if (graph.vertices.nonEmpty && 
!graph.vertices.exists(graph.inDegreeOf(_) == 0)) {
         true
       } else {
         graph.removeZeroInDegree
@@ -367,23 +372,25 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, 
E, N)]) extends Serial
   }
 
   /**
-   * Generate a level map for each vertex
-   * withholding: if vertex A -> B, then level(A) < level(B)
+   * Generate a level map for each vertex withholding:
+   * {{{
+   * if vertex A -> B, then level(A) -> level(B)
+   * }}}
    */
   def vertexHierarchyLevelMap(): Map[N, Int] = {
     val newGraph = copy
     var output = Map.empty[N, Int]
     var level = 0
-    while(!newGraph.isEmpty) {
+    while (!newGraph.isEmpty) {
       output ++= newGraph.removeZeroInDegree.map((_, level)).toMap
       level += 1
     }
     output
   }
 
-  override def toString = {
+  override def toString: String = {
     Map("vertices" -> vertices.mkString(","),
-    "edges" -> edges.mkString(",")).toString()
+      "edges" -> edges.mkString(",")).toString()
   }
 }
 
@@ -391,6 +398,8 @@ object Graph {
 
   /**
    * Example:
+   *
+   * {{{
    * Graph(1 ~ 2 ~> 4 ~ 5 ~> 7, 8~9~>55, 11)
    * Will create a graph with:
    * nodes:
@@ -399,17 +408,17 @@ object Graph {
    * 2: (1->4)
    * 5: (4->7)
    * 9: (8->55)
-   *
+   * }}}
    */
   def apply[N, E](elems: Path[_ <: N, _ <: E]*): Graph[N, E] = {
     val graph = empty[N, E]
-    elems.foreach{ path =>
+    elems.foreach { path =>
       path.updategraph(graph)
     }
     graph
   }
 
-  def apply[N , E](vertices: List[N], edges: List[(N, E, N)]): Graph[N, E] = {
+  def apply[N, E](vertices: List[N], edges: List[(N, E, N)]): Graph[N, E] = {
     new Graph(vertices, edges)
   }
 
@@ -417,11 +426,11 @@ object Graph {
     Some((graph.vertices, graph.edges))
   }
 
-  def empty[N, E] = {
+  def empty[N, E]: Graph[N, E] = {
     new Graph(List.empty[N], List.empty[(N, E, N)])
   }
 
-  class Path[N, +E](path: List[Either[N, E]]) {
+  class Path[N, + E](path: List[Either[N, E]]) {
 
     def ~[Edge >: E](edge: Edge): Path[N, Edge] = {
       new Path(path :+ Right(edge))
@@ -458,7 +467,7 @@ object Graph {
   }
 
   implicit class Node[N, E](self: N) extends Path[N, E](List(Left(self))) {
-    
+
     override def ~[Edge](edge: Edge): Path[N, Edge] = {
       new Path(List(Left(self), Right(edge)))
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala 
b/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala
index 20a3fe6..7552444 100644
--- a/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala
+++ b/core/src/main/scala/io/gearpump/util/HistoryMetricsService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +19,19 @@
 package io.gearpump.util
 
 import java.util
+import scala.collection.mutable.ListBuffer
 
 import akka.actor.Actor
 import com.typesafe.config.Config
+import org.slf4j.Logger
+
 import io.gearpump.TimeStamp
-import io.gearpump.cluster.ClientToMaster.{ReadOption, QueryHistoryMetrics}
+import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, ReadOption}
 import io.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem}
 import io.gearpump.metrics.Metrics._
 import io.gearpump.metrics.MetricsAggregator
 import io.gearpump.util.Constants._
-import io.gearpump.util.HistoryMetricsService.{HistoryMetricsStore, 
SkipAllAggregator, DummyMetricsAggregator, MetricsStore, HistoryMetricsConfig}
-import org.slf4j.Logger
-
-import scala.collection.mutable.ListBuffer
+import io.gearpump.util.HistoryMetricsService.{DummyMetricsAggregator, 
HistoryMetricsConfig, HistoryMetricsStore, SkipAllAggregator}
 
 /**
  *
@@ -44,7 +44,6 @@ import scala.collection.mutable.ListBuffer
  * for each hour.
  *
  * For fine-grained data in last 5 min, there will be 1 sample point per 15 
seconds.
- *
  */
 class HistoryMetricsService(name: String, config: HistoryMetricsConfig) 
extends Actor {
   private val LOG: Logger = LogUtil.getLogger(getClass, name = name)
@@ -75,14 +74,15 @@ class HistoryMetricsService(name: String, config: 
HistoryMetricsConfig) extends
     } + ".*$"
   }
 
-  private def fetchMetricsHistory(pathPattern: String, readOption: 
ReadOption.ReadOption): List[HistoryMetricsItem] = {
+  private def fetchMetricsHistory(pathPattern: String, readOption: 
ReadOption.ReadOption)
+  : List[HistoryMetricsItem] = {
 
     val result = new ListBuffer[HistoryMetricsItem]
 
     val regex = toRegularExpression(pathPattern).r.pattern
 
     val iter = metricsStore.iterator
-    while(iter.hasNext) {
+    while (iter.hasNext) {
       val (name, store) = iter.next()
 
       val matcher = regex.matcher(name)
@@ -95,7 +95,7 @@ class HistoryMetricsService(name: String, config: 
HistoryMetricsConfig) extends
           case ReadOption.ReadHistory =>
             result.append(store.readHistory: _*)
           case _ =>
-          //skip all other options.
+          // Skip all other options.
         }
       }
     }
@@ -106,43 +106,42 @@ class HistoryMetricsService(name: String, config: 
HistoryMetricsConfig) extends
   private var aggregators: Map[String, MetricsAggregator] = Map.empty[String, 
MetricsAggregator]
 
   import scala.collection.JavaConverters._
-  val validAggregators = {
-    
systemConfig.getConfig(Constants.GEARPUMP_METRICS_AGGREGATORS).root.unwrapped.keySet().asScala.toSet
+  private val validAggregators: Set[String] = {
+    val rootConfig = 
systemConfig.getConfig(Constants.GEARPUMP_METRICS_AGGREGATORS).root.unwrapped
+    rootConfig.keySet().asScala.toSet
   }
 
   def commandHandler: Receive = {
-    //path accept syntax ? *, ? will match one char, * will match at least one 
char
+    // Path accept syntax ? *, ? will match one char, * will match at least 
one char
     case QueryHistoryMetrics(inputPath, readOption, aggregatorClazz, options) 
=>
 
-       val aggregator = {
-         if (aggregatorClazz == null || aggregatorClazz.isEmpty) {
-           dummyAggregator
-         } else if (aggregators.contains(aggregatorClazz)) {
-           aggregators(aggregatorClazz)
-         } else if (validAggregators.contains(aggregatorClazz)) {
-           val clazz = Class.forName(aggregatorClazz)
-           val constructor = clazz.getConstructor(classOf[Config])
-           val aggregator = 
constructor.newInstance(systemConfig).asInstanceOf[MetricsAggregator]
-           aggregators += aggregatorClazz -> aggregator
-           aggregator
-         } else {
-           LOG.error(s"Aggregator $aggregatorClazz is not in the white list 
${validAggregators}, we will drop all messages. " +
-             s"Please see config at ${GEARPUMP_METRICS_AGGREGATORS}")
-           val skipAll = new SkipAllAggregator
-           aggregators += aggregatorClazz -> new SkipAllAggregator
-           skipAll
-         }
-       }
-
-      import collection.JavaConversions._
-      val metrics =  fetchMetricsHistory(inputPath, readOption).iterator
+      val aggregator = {
+        if (aggregatorClazz == null || aggregatorClazz.isEmpty) {
+          dummyAggregator
+        } else if (aggregators.contains(aggregatorClazz)) {
+          aggregators(aggregatorClazz)
+        } else if (validAggregators.contains(aggregatorClazz)) {
+          val clazz = Class.forName(aggregatorClazz)
+          val constructor = clazz.getConstructor(classOf[Config])
+          val aggregator = 
constructor.newInstance(systemConfig).asInstanceOf[MetricsAggregator]
+          aggregators += aggregatorClazz -> aggregator
+          aggregator
+        } else {
+          LOG.error(s"Aggregator $aggregatorClazz is not in the white list 
${validAggregators}, " +
+            s"we will drop all messages. Please see config at 
${GEARPUMP_METRICS_AGGREGATORS}")
+          val skipAll = new SkipAllAggregator
+          aggregators += aggregatorClazz -> new SkipAllAggregator
+          skipAll
+        }
+      }
+
+      val metrics = fetchMetricsHistory(inputPath, readOption).iterator
       sender ! HistoryMetrics(inputPath, aggregator.aggregate(options, 
metrics))
-   }
+  }
 }
 
 object HistoryMetricsService {
 
-
   trait MetricsStore {
     def add(inputMetrics: MetricType): Unit
 
@@ -169,7 +168,7 @@ object HistoryMetricsService {
     def readHistory: List[HistoryMetricsItem]
   }
 
-  class DummyHistoryMetricsStore extends HistoryMetricsStore{
+  class DummyHistoryMetricsStore extends HistoryMetricsStore {
 
     val empty = List.empty[HistoryMetricsItem]
 
@@ -187,7 +186,8 @@ object HistoryMetricsService {
   }
 
   object HistoryMetricsStore {
-    def apply(name: String, metric: MetricType, config: HistoryMetricsConfig): 
HistoryMetricsStore = {
+    def apply(name: String, metric: MetricType, config: HistoryMetricsConfig)
+      : HistoryMetricsStore = {
       metric match {
         case histogram: Histogram => new HistogramMetricsStore(config)
         case meter: Meter => new MeterMetricsStore(config)
@@ -199,18 +199,18 @@ object HistoryMetricsService {
   }
 
   /**
-   ** Metrics store to store history data points
+   * Metrics store to store history data points
    * For each time point, we will store single data point.
    *
    * @param retainCount how many data points to retain, old data will be 
removed
    * @param retainIntervalMs time interval between two data points.
    */
-  class SingleValueMetricsStore (retainCount: Int, retainIntervalMs: Long) 
extends MetricsStore{
+  class SingleValueMetricsStore(retainCount: Int, retainIntervalMs: Long) 
extends MetricsStore {
 
-    private val queue =  new util.ArrayDeque[HistoryMetricsItem]()
+    private val queue = new util.ArrayDeque[HistoryMetricsItem]()
     private var latest = List.empty[HistoryMetricsItem]
 
-    // end of the time window we are tracking
+    // End of the time window we are tracking
     private var endTime = 0L
 
     override def add(inputMetrics: MetricType): Unit = {
@@ -226,7 +226,7 @@ object HistoryMetricsService {
         queue.addFirst(metrics)
         endTime = (now / retainIntervalMs + 1) * retainIntervalMs
 
-        // remove old data
+        // Removes old data
         if (queue.size() > retainCount) {
           queue.removeLast()
         }
@@ -235,8 +235,8 @@ object HistoryMetricsService {
 
     def read: List[HistoryMetricsItem] = {
       val result = new ListBuffer[HistoryMetricsItem]
-      import scala.collection.JavaConversions.asScalaIterator
-      queue.iterator().foreach(result.prepend(_))
+      import scala.collection.JavaConverters._
+      queue.iterator().asScala.foreach(result.prepend(_))
       result.toList
     }
 
@@ -246,11 +246,14 @@ object HistoryMetricsService {
   }
 
   /**
+   * Config for how long to keep history metrics data.
    *
    * @param retainHistoryDataHours Retain at max @RETAIN_HISTORY_HOURS history 
data(unit hour)
    * @param retainHistoryDataIntervalMs time interval between two history data 
points.(unit: ms)
-   * @param retainRecentDataSeconds Retain at max @RETAIN_LATEST_SECONDS 
recent data points(unit: seconds)
-   * @param retainRecentDataIntervalMs Retain at max @RETAIN_LATEST_SECONDS 
recent data points(unit: ms)
+   * @param retainRecentDataSeconds Retain at max @RETAIN_LATEST_SECONDS
+   *                                recent data points(unit: seconds)
+   * @param retainRecentDataIntervalMs Retain at max @RETAIN_LATEST_SECONDS 
recent
+   *                                   data points(unit: ms)
    */
   case class HistoryMetricsConfig(
       retainHistoryDataHours: Int,
@@ -385,14 +388,17 @@ object HistoryMetricsService {
   }
 
   class DummyMetricsAggregator extends MetricsAggregator {
-    def aggregate(options: Map[String, String], inputs: 
Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = {
-      import scala.collection.JavaConverters._
+    def aggregate(options: Map[String, String], inputs: 
Iterator[HistoryMetricsItem])
+      : List[HistoryMetricsItem] = {
       inputs.toList
     }
   }
 
   class SkipAllAggregator extends MetricsAggregator {
     private val empty = List.empty[HistoryMetricsItem]
-    def aggregate(options: Map[String, String], inputs: 
Iterator[HistoryMetricsItem]): List[HistoryMetricsItem] = empty
+    def aggregate(options: Map[String, String], inputs: 
Iterator[HistoryMetricsItem])
+    : List[HistoryMetricsItem] = {
+      empty
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/LogUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/LogUtil.scala 
b/core/src/main/scala/io/gearpump/util/LogUtil.scala
index 5fa3136..1669129 100644
--- a/core/src/main/scala/io/gearpump/util/LogUtil.scala
+++ b/core/src/main/scala/io/gearpump/util/LogUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,27 +16,26 @@
  * limitations under the License.
  */
 
-
 package io.gearpump.util
 
 import java.io.File
 import java.net.InetAddress
 import java.util.Properties
+import scala.util.Try
 
 import com.typesafe.config.Config
 import org.apache.log4j.PropertyConfigurator
 import org.slf4j.{Logger, LoggerFactory}
 
-import scala.util.Try
-
 object LogUtil {
   object ProcessType extends Enumeration {
     type ProcessType = Value
     val MASTER, WORKER, LOCAL, APPLICATION, UI = Value
   }
 
-  def getLogger[T](clazz : Class[T], context : String = null, master : Any = 
null, worker : Any = null, executor : Any = null, task : Any = null, app : Any 
= null, name: String = null) : Logger = {
-
+  def getLogger[T](
+      clazz: Class[T], context: String = null, master: Any = null, worker: Any 
= null,
+      executor: Any = null, task: Any = null, app: Any = null, name: String = 
null): Logger = {
     var env = ""
 
     if (null != context) {
@@ -70,8 +69,9 @@ object LogUtil {
     }
   }
 
-  def loadConfiguration(config : Config, processType : 
ProcessType.ProcessType) : Unit = {
-    //set log file name
+  /** Custom the log file locations by reading config from system properties */
+  def loadConfiguration(config: Config, processType: ProcessType.ProcessType): 
Unit = {
+    // Set log file name
     val propName = s"gearpump.${processType.toString.toLowerCase}.log.file"
     val props = loadConfiguration
 
@@ -82,7 +82,8 @@ object LogUtil {
     processType match {
       case ProcessType.APPLICATION =>
         props.setProperty("log4j.rootAppender", 
"${gearpump.application.logger}")
-        props.setProperty("gearpump.application.log.rootdir", 
applicationLogDir(config).getAbsolutePath)
+        props.setProperty("gearpump.application.log.rootdir",
+          applicationLogDir(config).getAbsolutePath)
       case _ =>
         props.setProperty("log4j.rootAppender", "${gearpump.root.logger}")
         props.setProperty("gearpump.log.dir", 
daemonLogDir(config).getAbsolutePath)
@@ -96,7 +97,7 @@ object LogUtil {
     new File(dir)
   }
 
-  def verboseLogToConsole: Unit = {
+  def verboseLogToConsole(): Unit = {
     val props = loadConfiguration
     props.setProperty("log4j.rootLogger", "DEBUG,console")
     PropertyConfigurator.configure(props)
@@ -105,14 +106,14 @@ object LogUtil {
   def loadConfiguration: Properties = {
     val props = new Properties()
     val log4jConfStream = 
getClass().getClassLoader.getResourceAsStream("log4j.properties")
-    if(log4jConfStream!=null) {
+    if (log4jConfStream != null) {
       props.load(log4jConfStream)
     }
     log4jConfStream.close()
     props
   }
 
-  private def jvmName : String = {
+  private def jvmName: String = {
     val hostname = Try(InetAddress.getLocalHost.getHostName).getOrElse("local")
     java.lang.management.ManagementFactory.getRuntimeMXBean().getName()
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala 
b/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala
index 13f9ea8..0b843f3 100644
--- a/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala
+++ b/core/src/main/scala/io/gearpump/util/ProcessLogRedirector.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,11 +19,11 @@
 package io.gearpump.util
 
 import java.io.{Closeable, Flushable}
+import scala.sys.process.ProcessLogger
 
 import org.slf4j.LoggerFactory
 
-import scala.sys.process.ProcessLogger
-
+/** Redirect the console output to parent process */
 class ProcessLogRedirector extends ProcessLogger with Closeable with Flushable 
with ConsoleOutput {
   private val LOG = LoggerFactory.getLogger("redirect")
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala 
b/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala
index 1142bdc..f6c7a2b 100644
--- a/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala
+++ b/core/src/main/scala/io/gearpump/util/ReferenceEqual.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,13 +18,16 @@
 
 package io.gearpump.util
 
-
 /**
  * Check equal using reference-equal.
  */
 trait ReferenceEqual extends AnyRef {
 
-  override def equals(other : Any) : Boolean = {
+  override def equals(other: Any): Boolean = {
     this.eq(other.asInstanceOf[AnyRef])
   }
+
+  override def hashCode(): Int = {
+    super.hashCode()
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/RestartPolicy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/RestartPolicy.scala 
b/core/src/main/scala/io/gearpump/util/RestartPolicy.scala
index 787b508..245cb1b 100644
--- a/core/src/main/scala/io/gearpump/util/RestartPolicy.scala
+++ b/core/src/main/scala/io/gearpump/util/RestartPolicy.scala
@@ -15,18 +15,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package io.gearpump.util
 
-import akka.actor.{ChildRestartStats, ActorRef}
+package io.gearpump.util
 
 import scala.concurrent.duration.Duration
 
+import akka.actor.ChildRestartStats
+
 /**
- * @param maxNrOfRetries the number of times is allowed to be restarted, 
negative value means no limit,
- *   if the limit is exceeded the policy will not allow to restart
- * @param withinTimeRange duration of the time window for maxNrOfRetries, 
Duration.Inf means no window
+ * When one executor or task fails, Gearpump will try to start. However, if it 
fails after
+ * multiple retries, then we abort.
+ *
+ * @param maxNrOfRetries The number of times is allowed to be restarted, 
negative value means no
+ *                       limit, if the limit is exceeded the policy will not 
allow to restart
+ * @param withinTimeRange Duration of the time window for maxNrOfRetries.
+ *                        Duration.Inf means no window
  */
-class RestartPolicy (maxNrOfRetries: Int, withinTimeRange: Duration) {
+class RestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) {
   private val status = new ChildRestartStats(null, 0, 0L)
   private val retriesWindow = (Some(maxNrOfRetries), 
Some(withinTimeRange.toMillis.toInt))
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/RichProcess.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/RichProcess.scala 
b/core/src/main/scala/io/gearpump/util/RichProcess.scala
index bfe0607..ab5611f 100644
--- a/core/src/main/scala/io/gearpump/util/RichProcess.scala
+++ b/core/src/main/scala/io/gearpump/util/RichProcess.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -25,7 +25,9 @@ trait ConsoleOutput {
   def error: String
 }
 
-class RichProcess(process: Process, val logger: ConsoleOutput) extends Process 
{
-  def exitValue() : scala.Int = process.exitValue()
-  def destroy() : scala.Unit = process.destroy()
+/** Extends Process by providing a additional logger: ConsoleOutput interface. 
*/
+class RichProcess(process: Process, _logger: ConsoleOutput) extends Process {
+  def exitValue(): scala.Int = process.exitValue()
+  def destroy(): scala.Unit = process.destroy()
+  def logger: ConsoleOutput = _logger
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala 
b/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala
index 1464b21..64b920c 100644
--- a/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala
+++ b/core/src/main/scala/io/gearpump/util/TimeOutScheduler.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,20 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.util
 
 import java.util.concurrent.TimeUnit
+import scala.concurrent.duration._
 
 import akka.actor.{Actor, ActorRef}
 import akka.pattern.ask
 
-import scala.concurrent.duration._
-
+/** A helper util to send a message to remote actor and notify callback when 
timeout */
 trait TimeOutScheduler {
   this: Actor =>
   import context.dispatcher
 
-  def sendMsgWithTimeOutCallBack(target: ActorRef, msg: AnyRef, milliSeconds: 
Long, timeOutHandler: => Unit): Unit = {
+  def sendMsgWithTimeOutCallBack(
+      target: ActorRef, msg: AnyRef, milliSeconds: Long, timeOutHandler: => 
Unit): Unit = {
     val result = target.ask(msg)(FiniteDuration(milliSeconds, 
TimeUnit.MILLISECONDS))
     result onSuccess {
       case msg =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/main/scala/io/gearpump/util/Util.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Util.scala 
b/core/src/main/scala/io/gearpump/util/Util.scala
index 96eddfd..8ed9bb3 100644
--- a/core/src/main/scala/io/gearpump/util/Util.scala
+++ b/core/src/main/scala/io/gearpump/util/Util.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,18 +18,18 @@
 
 package io.gearpump.util
 
-import java.io.{DataInputStream, FileInputStream, File}
-import java.net.{URL, URI, ServerSocket}
-import java.util.jar.Manifest
-import com.typesafe.config.{ConfigFactory, Config}
-import io.gearpump.cluster.AppJar
-import io.gearpump.jarstore.{FilePath, JarStoreService}
-import io.gearpump.transport.HostPort
-
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
+import java.net.{ServerSocket, URI}
 import scala.concurrent.forkjoin.ThreadLocalRandom
-import scala.sys.process.{ProcessLogger, Process}
+import scala.sys.process.Process
 import scala.util.{Failure, Success, Try}
 
+import com.typesafe.config.{Config, ConfigFactory}
+
+import io.gearpump.cluster.AppJar
+import io.gearpump.jarstore.JarStoreService
+import io.gearpump.transport.HostPort
+
 object Util {
   val LOG = LogUtil.getLogger(getClass)
   private val defaultUri = new URI("file:///")
@@ -39,7 +39,7 @@ object Util {
     appNamePattern.matcher(appName).matches()
   }
 
-  def getCurrentClassPath : Array[String] = {
+  def getCurrentClassPath: Array[String] = {
     val classpath = System.getProperty("java.class.path")
     val classpathList = classpath.split(File.pathSeparator)
     classpathList
@@ -48,8 +48,9 @@ object Util {
   def version: String = {
     val home = System.getProperty(Constants.GEARPUMP_HOME)
     val version = Try {
-      val versionFile = new DataInputStream(new FileInputStream(new File(home, 
"VERSION")))
-      val version = versionFile.readLine().replace("version:=", "")
+      val versionFile = new FileInputStream(new File(home, "VERSION"))
+      val reader = new BufferedReader(new InputStreamReader(versionFile))
+      val version = reader.readLine().replace("version:=", "")
       versionFile.close()
       version
     }
@@ -62,11 +63,13 @@ object Util {
     }
   }
 
-  def startProcess(options : Array[String], classPath : Array[String], 
mainClass : String,
-                   arguments : Array[String]) : RichProcess = {
+  def startProcess(options: Array[String], classPath: Array[String], 
mainClass: String,
+      arguments: Array[String]): RichProcess = {
     val java = System.getProperty("java.home") + "/bin/java"
-    val command = List(java) ++ options ++ List("-cp", 
classPath.mkString(File.pathSeparator), mainClass) ++ arguments
-    LOG.info(s"Starting executor process java $mainClass 
${arguments.mkString(" ")} \n ${options.mkString(" ")}")
+    val command = List(java) ++ options ++
+      List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ 
arguments
+    LOG.info(s"Starting executor process java $mainClass 
${arguments.mkString(" ")} " +
+      s"\n ${options.mkString(" ")}")
     val logger = new ProcessLogRedirector()
     val process = Process(command).run(logger)
     new RichProcess(process, logger)
@@ -75,7 +78,7 @@ object Util {
   /**
    * hostList format: host1:port1,host2:port2,host3:port3...
    */
-  def parseHostList(hostList : String) : List[HostPort] = {
+  def parseHostList(hostList: String): List[HostPort] = {
     val masters = hostList.trim.split(",").map { address =>
       val hostAndPort = address.split(":")
       HostPort(hostAndPort(0), hostAndPort(1).toInt)
@@ -85,7 +88,7 @@ object Util {
 
   def resolvePath(path: String): String = {
     val uri = new URI(path)
-    if(uri.getScheme == null && uri.getFragment == null) {
+    if (uri.getScheme == null && uri.getFragment == null) {
       val absolutePath = new File(path).getCanonicalPath.replaceAll("\\\\", 
"/")
       "file://" + absolutePath
     } else {
@@ -106,16 +109,16 @@ object Util {
     }
   }
 
-  def randInt: Int = {
+  def randInt(): Int = {
     Math.abs(ThreadLocalRandom.current.nextInt())
   }
 
-  def findFreePort: Try[Int] = {
+  def findFreePort(): Try[Int] = {
     Try {
-      val socket = new ServerSocket(0);
-      socket.setReuseAddress(true);
-      val port = socket.getLocalPort();
-      socket.close;
+      val socket = new ServerSocket(0)
+      socket.setReuseAddress(true)
+      val port = socket.getLocalPort()
+      socket.close
       port
     }
   }
@@ -132,7 +135,6 @@ object Util {
    * Then you can use like this:
    *
    * filterOutOrigin(config, "reference.conf")
-   *
    */
   import scala.collection.JavaConverters._
   def filterOutOrigin(config: Config, originFile: String): Config = {
@@ -148,16 +150,19 @@ object Util {
     }
   }
 
-  case class JvmSetting(vmargs : Array[String], classPath : Array[String])
+  case class JvmSetting(vmargs: Array[String], classPath: Array[String])
 
-  case class AppJvmSettings(appMater : JvmSetting, executor : JvmSetting)
+  case class AppJvmSettings(appMater: JvmSetting, executor: JvmSetting)
 
-  def resolveJvmSetting(conf : Config) : AppJvmSettings = {
+  /** Get an effective AppJvmSettings from Config */
+  def resolveJvmSetting(conf: Config): AppJvmSettings = {
 
-    import Constants._
+    import io.gearpump.util.Constants._
 
-    val appMasterVMArgs = 
Try(conf.getString(GEARPUMP_APPMASTER_ARGS).split("\\s+").filter(_.nonEmpty)).toOption
-    val executorVMArgs = 
Try(conf.getString(GEARPUMP_EXECUTOR_ARGS).split("\\s+").filter(_.nonEmpty)).toOption
+    val appMasterVMArgs = 
Try(conf.getString(GEARPUMP_APPMASTER_ARGS).split("\\s+")
+      .filter(_.nonEmpty)).toOption
+    val executorVMArgs = 
Try(conf.getString(GEARPUMP_EXECUTOR_ARGS).split("\\s+")
+      .filter(_.nonEmpty)).toOption
 
     val appMasterClassPath = Try(
       conf.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
@@ -169,7 +174,7 @@ object Util {
 
     AppJvmSettings(
       JvmSetting(appMasterVMArgs.getOrElse(Array.empty[String]),
-        appMasterClassPath.getOrElse(Array.empty[String]) ),
+        appMasterClassPath.getOrElse(Array.empty[String])),
       JvmSetting(executorVMArgs
         .getOrElse(Array.empty[String]), 
executorClassPath.getOrElse(Array.empty[String])))
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/core/src/test/resources/log4j.properties 
b/core/src/test/resources/log4j.properties
index 870a69f..0faadd9 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -7,7 +7,7 @@
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
 #
-#     http://www.apache.org/licenses/LICENSE-2.0
+#      http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
@@ -50,7 +50,6 @@ log4j.rootLogger=INFO,console
 # Logging Threshold
 log4j.threshhold=ALL
 
-
 # =====================================================================
 # Appenders
 # =====================================================================
@@ -115,4 +114,3 @@ log4j.appender.ALA.layout=org.apache.log4j.PatternLayout
 #log4j.appender.ALA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n
 log4j.appender.ALA.layout.ConversionPattern=[%p] [%d{MM/dd/yyyy HH:mm:ss.SSS}] 
[%c{1}] %m%n
 
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/resources/test.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/test.conf 
b/core/src/test/resources/test.conf
index 324e8bd..ac18c88 100644
--- a/core/src/test/resources/test.conf
+++ b/core/src/test/resources/test.conf
@@ -97,7 +97,6 @@ gearpump-ui {
   }
 }
 
-
 akka {
   logger-startup-timeout = 30s
 
@@ -111,7 +110,6 @@ akka {
       mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
     }
     default-dispatcher {
-      mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
       throughput = 10
       fork-join-executor {
         parallelism-factor = 2
@@ -145,7 +143,6 @@ akka {
     default-remote-dispatcher {
       throughput = 5
       type = Dispatcher
-      mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
       executor = "fork-join-executor"
       fork-join-executor {
         parallelism-min = 1

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/TestProbeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/TestProbeUtil.scala 
b/core/src/test/scala/io/gearpump/TestProbeUtil.scala
index 9b808dd..e7181db 100644
--- a/core/src/test/scala/io/gearpump/TestProbeUtil.scala
+++ b/core/src/test/scala/io/gearpump/TestProbeUtil.scala
@@ -18,11 +18,13 @@
 
 package io.gearpump
 
-import akka.actor.{Terminated, Actor, Props}
+import scala.language.implicitConversions
+
+import akka.actor.{Actor, Props, Terminated}
 import akka.testkit.TestProbe
 
 object TestProbeUtil {
-  implicit def toProps(probe: TestProbe) = {
+  implicit def toProps(probe: TestProbe): Props = {
     Props(new Actor {
       val probeRef = probe.ref
       context.watch(probeRef)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala 
b/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala
index 21d145a..b1ca357 100644
--- a/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala
+++ b/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,24 +19,20 @@
 package io.gearpump.cluster
 
 import java.io.File
-import io.gearpump.util.{Constants, ActorUtil}
-import io.gearpump.util.{FileUtils}
-import java.net.{UnknownHostException, SocketTimeoutException, Socket, 
InetSocketAddress, ServerSocket, URLClassLoader}
+import java.net.{InetSocketAddress, Socket, SocketTimeoutException, 
URLClassLoader, UnknownHostException}
 import java.util.Properties
 import java.util.concurrent.{Executors, TimeUnit}
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext}
 
 import akka.actor.{Actor, ActorSystem, Address, Props}
 import akka.testkit.TestProbe
 import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, 
ConfigValueFactory}
+
 import io.gearpump.cluster.MasterHarness.MockMaster
 import io.gearpump.util.Constants._
-import io.gearpump.util.{Constants, LogUtil, ActorUtil, Util}
-
-import scala.collection.JavaConverters._
-import scala.concurrent.ExecutionContext
-import scala.concurrent.duration.Duration
-import scala.sys.process.Process
-import scala.util.Try
+import io.gearpump.util.{ActorUtil, FileUtils, LogUtil}
 
 trait MasterHarness {
   private val LOG = LogUtil.getLogger(getClass)
@@ -54,7 +50,7 @@ trait MasterHarness {
   def getHost: String = host
   def getPort: Int = port
 
-  def config : Config
+  protected def config: Config
 
   def startActorSystem(): Unit = {
     val systemConfig = config
@@ -69,13 +65,13 @@ trait MasterHarness {
     LOG.info(s"Actor system is started, $host, $port")
   }
 
-  def shutdownActorSystem():Unit = {
-    system.shutdown()
-    system.awaitTermination
+  def shutdownActorSystem(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
     LOG.info(s"Actor system is stopped, $host, $port")
   }
 
-  def convertTestConf(host : String, port : Int) : File = {
+  def convertTestConf(host: String, port: Int): File = {
     val test = ConfigFactory.parseResourcesAnySyntax("test.conf",
       ConfigParseOptions.defaults.setAllowMissing(true))
 
@@ -88,13 +84,13 @@ trait MasterHarness {
     confFile
   }
 
-  def createMockMaster() : TestProbe = {
+  def createMockMaster(): TestProbe = {
     val masterReceiver = TestProbe()(system)
     val master = system.actorOf(Props(classOf[MockMaster], masterReceiver), 
MASTER)
     masterReceiver
   }
 
-  def isPortUsed(host : String, port : Int) : Boolean = {
+  def isPortUsed(host: String, port: Int): Boolean = {
 
     var isPortUsed = true
     val socket = new Socket()
@@ -103,12 +99,12 @@ trait MasterHarness {
       socket.connect(new InetSocketAddress(host, port), 1000)
       socket.isConnected
     } catch {
-      case ex: SocketTimeoutException  =>
+      case ex: SocketTimeoutException =>
         isPortUsed = false
       case ex: UnknownHostException =>
         isPortUsed = false
       case ex: Throwable =>
-        // for other case, we think the port is listened
+        // For other case, we think the port has been occupied.
         isPortUsed = true
     } finally {
       socket.close()
@@ -116,7 +112,7 @@ trait MasterHarness {
     isPortUsed
   }
 
-  def getContextClassPath : Array[String] = {
+  def getContextClassPath: Array[String] = {
     val contextLoader = Thread.currentThread().getContextClassLoader()
 
     val urlLoader = if (!contextLoader.isInstanceOf[URLClassLoader]) {
@@ -135,19 +131,16 @@ trait MasterHarness {
   /**
    * Remove trailing $
    */
-  def getMainClassName(mainObj : Any) : String = {
+  def getMainClassName(mainObj: Any): String = {
     mainObj.getClass.getName.dropRight(1)
   }
 
-  import Constants._
-
   def getMasterListOption(): Array[String] = {
     masterProperties.asScala.toList.map { kv =>
       s"-D${kv._1}=${kv._2}"
     }.toArray
   }
 
-
   def masterConfig: Config = {
     
ConfigFactory.parseProperties(masterProperties).withFallback(system.settings.config)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/core/src/test/scala/io/gearpump/cluster/TestUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/TestUtil.scala 
b/core/src/test/scala/io/gearpump/cluster/TestUtil.scala
index e491fc7..dce4902 100644
--- a/core/src/test/scala/io/gearpump/cluster/TestUtil.scala
+++ b/core/src/test/scala/io/gearpump/cluster/TestUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package io.gearpump.cluster
 
 import akka.actor._
@@ -25,10 +26,11 @@ object TestUtil {
   val UI_CONFIG = ClusterConfig.ui("test.conf")
 
   class DummyAppMaster(context: AppMasterContext, app: AppDescription) extends 
ApplicationMaster {
-    context.masterProxy ! (context, app)
+    context.masterProxy !(context, app)
 
-    def receive : Receive = null
+    def receive: Receive = null
   }
 
-  val dummyApp : AppDescription = AppDescription("dummy", 
classOf[DummyAppMaster].getName, UserConfig.empty)
+  val dummyApp: AppDescription =
+    AppDescription("dummy", classOf[DummyAppMaster].getName, UserConfig.empty)
 }
\ No newline at end of file

Reply via email to