http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/views/landing/breadcrumbs.js
----------------------------------------------------------------------
diff --git a/services/dashboard/views/landing/breadcrumbs.js 
b/services/dashboard/views/landing/breadcrumbs.js
index 56ed685..52ab8c2 100644
--- a/services/dashboard/views/landing/breadcrumbs.js
+++ b/services/dashboard/views/landing/breadcrumbs.js
@@ -5,7 +5,7 @@
 
 angular.module('dashboard')
 
-  .directive('breadcrumbs', function() {
+  .directive('breadcrumbs', function () {
     'use strict';
 
     return {
@@ -13,9 +13,9 @@ angular.module('dashboard')
       templateUrl: 'views/landing/breadcrumbs.html',
       replace: true,
       scope: {},
-      controller: ['$scope', function($scope) {
+      controller: ['$scope', function ($scope) {
 
-        $scope.$on('$stateChangeSuccess', function() {
+        $scope.$on('$stateChangeSuccess', function () {
           $scope.breadcrumbs = buildBreadcrumbs();
         });
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/views/landing/header.html
----------------------------------------------------------------------
diff --git a/services/dashboard/views/landing/header.html 
b/services/dashboard/views/landing/header.html
index ddec73d..2f2c27a 100644
--- a/services/dashboard/views/landing/header.html
+++ b/services/dashboard/views/landing/header.html
@@ -35,7 +35,6 @@
       </li>
     </ul>
 
-
     <ul class="dropdown nav navbar-nav navbar-right">
       <li role="placeholder"></li>
       <li>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/views/landing/header.js
----------------------------------------------------------------------
diff --git a/services/dashboard/views/landing/header.js 
b/services/dashboard/views/landing/header.js
index 00f55a4..617dfda 100644
--- a/services/dashboard/views/landing/header.js
+++ b/services/dashboard/views/landing/header.js
@@ -5,7 +5,7 @@
 
 angular.module('dashboard')
 
-  .directive('header', function() {
+  .directive('header', function () {
     'use strict';
 
     return {
@@ -13,7 +13,7 @@ angular.module('dashboard')
       templateUrl: 'views/landing/header.html',
       replace: true,
       scope: {},
-      controller: ['$scope', '$cookies', 'restapi', 'conf', function($scope, 
$cookies, restapi, conf) {
+      controller: ['$scope', '$cookies', 'restapi', 'conf', function ($scope, 
$cookies, restapi, conf) {
         $scope.clusterMenuItems = [
           {text: 'Master', pathPatt: 'master', icon: 'fa fa-laptop'},
           {text: 'Workers', pathPatt: 'workers', icon: 'fa fa-server'}
@@ -28,7 +28,7 @@ angular.module('dashboard')
         ];
 
         $scope.version = 'beta';
-        restapi.serviceVersion(function(version) {
+        restapi.serviceVersion(function (version) {
           $scope.version = version;
         });
       }]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/views/service_unreachable_notice.html
----------------------------------------------------------------------
diff --git a/services/dashboard/views/service_unreachable_notice.html 
b/services/dashboard/views/service_unreachable_notice.html
index 916124c..1835403 100644
--- a/services/dashboard/views/service_unreachable_notice.html
+++ b/services/dashboard/views/service_unreachable_notice.html
@@ -6,7 +6,8 @@
           The server is unreachable</h4>
       </div>
       <div class="modal-body">
-        <h5 style="line-height: 150%; margin-top: 0">Sorry, we couldn't 
complete the request. Please check your network connectivity or
+        <h5 style="line-height: 150%; margin-top: 0">Sorry, we couldn't 
complete the request. Please
+          check your network connectivity or
           contact the administrator for assistance.</h5>
       </div>
     </div>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/widgets/metrics_period_switcher.js
----------------------------------------------------------------------
diff --git a/services/dashboard/widgets/metrics_period_switcher.js 
b/services/dashboard/widgets/metrics_period_switcher.js
index b40aa23..de9b62c 100644
--- a/services/dashboard/widgets/metrics_period_switcher.js
+++ b/services/dashboard/widgets/metrics_period_switcher.js
@@ -5,7 +5,7 @@
 
 angular.module('dashboard')
 
-  .directive('metricsPeriodSwitcher', function() {
+  .directive('metricsPeriodSwitcher', function () {
     'use strict';
 
     return {
@@ -15,7 +15,7 @@ angular.module('dashboard')
         pastHours: '=',
         viewCurrent: '='
       },
-      link: function(scope) {
+      link: function (scope) {
         'use strict';
 
         scope.options = {
@@ -23,7 +23,7 @@ angular.module('dashboard')
           hist: 'Past %d Hours'.replace('%d', scope.pastHours)
         };
         scope.value = scope.viewCurrent ? 'current' : 'hist';
-        scope.$watch('value', function(value) {
+        scope.$watch('value', function (value) {
           scope.viewCurrent = String(value) === 'current';
         });
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/dashboard/widgets/radio_group.js
----------------------------------------------------------------------
diff --git a/services/dashboard/widgets/radio_group.js 
b/services/dashboard/widgets/radio_group.js
index aa5427d..cc46788 100644
--- a/services/dashboard/widgets/radio_group.js
+++ b/services/dashboard/widgets/radio_group.js
@@ -5,7 +5,7 @@
 
 angular.module('dashboard')
 
-  .directive('radioGroup', function() {
+  .directive('radioGroup', function () {
     'use strict';
 
     return {
@@ -16,11 +16,11 @@ angular.module('dashboard')
         ngModel: '=',
         options: '='
       },
-      link: function(scope, elem, attrs) {
+      link: function (scope, elem, attrs) {
         'use strict';
 
         scope.buttonWidth = attrs.buttonWidth || '96px';
-        scope.toggle = function(value) {
+        scope.toggle = function (value) {
           scope.ngModel = value;
         };
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala
----------------------------------------------------------------------
diff --git 
a/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala 
b/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala
index 51e6f0d..3d80c4e 100644
--- a/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala
+++ b/services/js/src/main/scala/io/gearpump/dashboard/DashboardApp.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
index aa88d13..eb6531f 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/AdminService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,32 +18,34 @@
 
 package io.gearpump.services
 
-import akka.actor.{ActorSystem}
+import akka.actor.ActorSystem
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.server.Directives._
-import akka.stream.{Materializer}
-import io.gearpump.util.{Constants, Util}
+import akka.stream.Materializer
+
+// NOTE: This cannot be removed!!!
 import io.gearpump.services.util.UpickleUtil._
+
 /**
  * AdminService is for cluster-wide managements. it is not related with
  * specific application.
  *
  * For example:
- * Security management: Add user, remove user.
- * Configuration management: Change configurations.
- * Machine management: Add worker machines, remove worker machines, and add 
masters.
+ *  - Security management: Add user, remove user.
+ *  - Configuration management: Change configurations.
+ *  - Machine management: Add worker machines, remove worker machines, and add 
masters.
  */
 
 // TODO: Add YARN resource manager capacities to add/remove machines.
 class AdminService(override val system: ActorSystem)
   extends BasicService {
 
-  override def prefix = Neutral
+  protected override def prefix = Neutral
 
-  override def doRoute(implicit mat: Materializer) = {
+  protected override def doRoute(implicit mat: Materializer) = {
     path("terminate") {
       post {
-        system.shutdown()
+        system.terminate()
         complete(StatusCodes.NotFound)
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
index 30ec061..060e780 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/AppMasterService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,12 +18,16 @@
 
 package io.gearpump.services
 
-import akka.actor.{ActorSystem, ActorRef}
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.{ActorRef, ActorSystem}
 import akka.http.scaladsl.model.{FormData, Multipart}
 import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.server.Route
 import akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
-import akka.stream.{Materializer, ActorMaterializer}
+import akka.stream.Materializer
+import upickle.default.{read, write}
+
 import io.gearpump.cluster.AppMasterToMaster.{AppMasterSummary, 
GeneralAppMasterSummary}
 import io.gearpump.cluster.ClientToMaster._
 import io.gearpump.cluster.ClusterConfig
@@ -31,6 +35,8 @@ import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, 
AppMasterDataDetail
 import io.gearpump.cluster.MasterToClient._
 import io.gearpump.jarstore.JarStoreService
 import io.gearpump.services.AppMasterService.Status
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
 import io.gearpump.streaming.AppMasterToMaster.StallingTasks
 import io.gearpump.streaming.appmaster.DagManager._
 import io.gearpump.streaming.appmaster.StreamAppMasterSummary
@@ -38,9 +44,6 @@ import 
io.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary,
 import io.gearpump.util.ActorUtil.{askActor, askAppMaster}
 import io.gearpump.util.FileDirective._
 import io.gearpump.util.{Constants, Util}
-import upickle.default.{read, write}
-import io.gearpump.services.util.UpickleUtil._
-import scala.util.{Failure, Success, Try}
 
 /**
  * Management service for AppMaster
@@ -52,40 +55,42 @@ class AppMasterService(val master: ActorRef,
   private val systemConfig = system.settings.config
   private val concise = 
systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
 
-  override def doRoute(implicit mat: Materializer) = pathPrefix("appmaster" / 
IntNumber) { appId =>
-    path("dynamicdag") {
-      parameters(ParamMagnet("args")) { args: String =>
-        def replaceProcessor(dagOperation: DAGOperation): Route = {
-          onComplete(askAppMaster[DAGOperationResult](master, appId, 
dagOperation)) {
-            case Success(value) =>
-              complete(write(value))
-            case Failure(ex) =>
-              failWith(ex)
+  protected override def doRoute(implicit mat: Materializer) = 
pathPrefix("appmaster" / IntNumber) {
+    appId => {
+      path("dynamicdag") {
+        parameters(ParamMagnet("args")) { args: String =>
+          def replaceProcessor(dagOperation: DAGOperation): Route = {
+            onComplete(askAppMaster[DAGOperationResult](master, appId, 
dagOperation)) {
+              case Success(value) =>
+                complete(write(value))
+              case Failure(ex) =>
+                failWith(ex)
+            }
           }
-        }
 
-        val msg = java.net.URLDecoder.decode(args)
-        val dagOperation = read[DAGOperation](msg)
-        (post & entity(as[Multipart.FormData])) { _ =>
-        uploadFile { form =>
-          val jar = form.getFile("jar").map(_.file)
+          val msg = java.net.URLDecoder.decode(args, "UTF-8")
+          val dagOperation = read[DAGOperation](msg)
+          (post & entity(as[Multipart.FormData])) { _ =>
+          uploadFile { form =>
+            val jar = form.getFile("jar").map(_.file)
 
-          if (jar.nonEmpty) {
-            dagOperation match {
-              case replace: ReplaceProcessor =>
-                val description = replace.newProcessorDescription.copy(jar = 
Util.uploadJar(jar.get, jarStore))
-                val dagOperationWithJar = replace.copy(newProcessorDescription 
= description)
-                replaceProcessor(dagOperationWithJar)
+            if (jar.nonEmpty) {
+              dagOperation match {
+                case replace: ReplaceProcessor =>
+                  val description = replace.newProcessorDescription.copy(jar =
+                    Util.uploadJar(jar.get, jarStore))
+                  val dagOperationWithJar = 
replace.copy(newProcessorDescription = description)
+                  replaceProcessor(dagOperationWithJar)
+              }
+            } else {
+              replaceProcessor(dagOperation)
             }
-          } else {
-            replaceProcessor(dagOperation)
           }
+        } ~ (post & entity(as[FormData])) { _ =>
+          replaceProcessor(dagOperation)
         }
-      } ~ (post & entity(as[FormData])) { _ =>
-        replaceProcessor(dagOperation)
-      }
-      }
-    } ~
+        }
+      } ~
       path("stallingtasks") {
         onComplete(askAppMaster[StallingTasks](master, appId, 
GetStallingTasks(appId))) {
           case Success(value) =>
@@ -124,23 +129,25 @@ class AppMasterService(val master: ActorRef,
           val executorId = Integer.parseInt(executorIdString)
           onComplete(askAppMaster[ExecutorConfig](master, appId, 
QueryExecutorConfig(executorId))) {
             case Success(value) =>
-              val config = Option(value.config).map(ClusterConfig.render(_, 
concise)).getOrElse("{}")
+              val config = Option(value.config).map(ClusterConfig.render(_, 
concise))
+                .getOrElse("{}")
               complete(config)
             case Failure(ex) =>
               failWith(ex)
           }
         } ~
-        pathEnd {
-          get {
-            val executorId = Integer.parseInt(executorIdString)
-            onComplete(askAppMaster[ExecutorSummary](master, appId, 
GetExecutorSummary(executorId))) {
+          pathEnd {
+            get {
+              val executorId = Integer.parseInt(executorIdString)
+              onComplete(askAppMaster[ExecutorSummary](master, appId,
+            GetExecutorSummary(executorId))) {
               case Success(value) =>
                 complete(write(value))
               case Failure(ex) =>
                 failWith(ex)
+              }
             }
           }
-        }
       } ~
       path("metrics" / RestPath) { path =>
         parameterMap { optionMap =>
@@ -210,6 +217,7 @@ class AppMasterService(val master: ActorRef,
           }
         }
       }
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala
index 079afc3..75f033b 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/BasicService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,28 +18,31 @@
 
 package io.gearpump.services
 
-import akka.actor.{ActorSystem}
+import scala.concurrent.ExecutionContext
+
+import akka.actor.ActorSystem
 import akka.http.scaladsl.model.headers.CacheDirectives.{`max-age`, `no-cache`}
 import akka.http.scaladsl.model.headers.`Cache-Control`
 import akka.http.scaladsl.server.Directives._
-import akka.http.scaladsl.server.{Route}
-import akka.stream.{Materializer}
-import io.gearpump.util.{LogUtil, Constants}
+import akka.http.scaladsl.server.Route
+import akka.stream.Materializer
 
-import scala.concurrent.ExecutionContext
+import io.gearpump.util.{Constants, LogUtil}
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
 
 trait RouteService {
   def route: Route
 }
 
 /**
- * Wrap the cache behavior, and some common utils.
+ * Wraps the cache behavior, and some common utils.
  */
-trait BasicService extends RouteService{
+trait BasicService extends RouteService {
 
   implicit def system: ActorSystem
 
-  implicit def timeout = Constants.FUTURE_TIMEOUT
+  implicit def timeout: akka.util.Timeout = Constants.FUTURE_TIMEOUT
 
   implicit def ec: ExecutionContext = system.dispatcher
 
@@ -53,7 +56,7 @@ trait BasicService extends RouteService{
   private val noCacheHeader = `Cache-Control`(`no-cache`, `max-age`(0L))
 
   def route: Route = encodeResponse {
-    extractMaterializer {implicit mat =>
+    extractMaterializer { implicit mat =>
       rawPathPrefix(prefix) {
         if (cache) {
           doRoute(mat)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
index 91f25fe..6ca0f98 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,13 +16,15 @@
  * limitations under the License.
  */
 
-
 package io.gearpump.services
 
 import java.io.{File, IOException}
-import java.nio.file.Files
 import java.nio.charset.StandardCharsets.UTF_8
-import java.nio.file.StandardOpenOption.{WRITE, APPEND}
+import java.nio.file.Files
+import java.nio.file.StandardOpenOption.{APPEND, WRITE}
+import scala.collection.JavaConverters._
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
 
 import akka.actor.{ActorRef, ActorSystem}
 import akka.http.scaladsl.server.Directives._
@@ -30,6 +32,7 @@ import 
akka.http.scaladsl.server.directives.ParameterDirectives.ParamMagnet
 import akka.http.scaladsl.unmarshalling.Unmarshaller._
 import akka.stream.Materializer
 import com.typesafe.config.Config
+
 import io.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, GetMasterData, 
GetWorkerData, MasterData, WorkerData}
 import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, 
QueryMasterConfig, ReadOption}
 import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, 
AppMastersDataRequest, WorkerList}
@@ -38,18 +41,16 @@ import io.gearpump.cluster.client.ClientContext
 import io.gearpump.cluster.worker.WorkerSummary
 import io.gearpump.cluster.{ClusterConfig, UserConfig}
 import io.gearpump.jarstore.JarStoreService
-import io.gearpump.services.MasterService.{BuiltinPartitioners, 
SubmitApplicationRequest}
 import io.gearpump.partitioner.{PartitionerByClassName, PartitionerDescription}
+import io.gearpump.services.MasterService.{BuiltinPartitioners, 
SubmitApplicationRequest}
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
 import io.gearpump.streaming.{ProcessorDescription, ProcessorId, 
StreamApplication}
 import io.gearpump.util.ActorUtil._
 import io.gearpump.util.FileDirective._
 import io.gearpump.util.{Constants, Graph, Util}
-import io.gearpump.services.util.UpickleUtil._
-
-import scala.collection.JavaConversions._
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
 
+/** Manages service for master node */
 class MasterService(val master: ActorRef,
     val jarStore: JarStoreService, override val system: ActorSystem)
   extends BasicService {
@@ -59,7 +60,7 @@ class MasterService(val master: ActorRef,
   private val systemConfig = system.settings.config
   private val concise = 
systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
 
-  override def doRoute(implicit mat: Materializer) = pathPrefix("master") {
+  protected override def doRoute(implicit mat: Materializer) = 
pathPrefix("master") {
     pathEnd {
       get {
         onComplete(askActor[MasterData](master, GetMasterData)) {
@@ -76,16 +77,17 @@ class MasterService(val master: ActorRef,
       }
     } ~
     path("workerlist") {
-      def future = askActor[WorkerList](master, GetAllWorkers).flatMap { 
workerList =>
-        val workers = workerList.workers
-        val workerDataList = List.empty[WorkerSummary]
-
-        Future.fold(workers.map { workerId =>
-          askWorker[WorkerData](master, workerId, GetWorkerData(workerId))
-        })(workerDataList) { (workerDataList, workerData) =>
-          workerDataList :+ workerData.workerDescription
+      def future: Future[List[WorkerSummary]] = askActor[WorkerList](master, 
GetAllWorkers)
+        .flatMap { workerList =>
+          val workers = workerList.workers
+          val workerDataList = List.empty[WorkerSummary]
+
+          Future.fold(workers.map { workerId =>
+            askWorker[WorkerData](master, workerId, GetWorkerData(workerId))
+          })(workerDataList) { (workerDataList, workerData) =>
+            workerDataList :+ workerData.workerDescription
+          }
         }
-      }
       onComplete(future) {
         case Success(result: List[WorkerSummary]) => complete(write(result))
         case Failure(ex) => failWith(ex)
@@ -221,9 +223,11 @@ object MasterService {
   case class Status(success: Boolean, reason: String = null)
 
   /**
-   * Submit Native Application.
+   * Submits Native Application.
    */
-  def submitGearApp(jar: Option[File], executorNum: Int, args: String, 
systemConfig: Config, userConfigFile: Option[File]): Boolean = {
+  def submitGearApp(
+      jar: Option[File], executorNum: Int, args: String,
+      systemConfig: Config, userConfigFile: Option[File]): Boolean = {
     submitAndDeleteTempFiles(
       "io.gearpump.cluster.main.AppSubmitter",
       argsArray = Array("-executors", executorNum.toString) ++ 
spaceSeparatedArgumentsToArray(args),
@@ -235,9 +239,10 @@ object MasterService {
   }
 
   /**
-   * Submit Storm application.
+   * Submits Storm application.
    */
-  def submitStormApp(jar: Option[File], stormConf: Option[File], args: String, 
systemConfig: Config): Boolean = {
+  def submitStormApp(
+      jar: Option[File], stormConf: Option[File], args: String, systemConfig: 
Config): Boolean = {
     submitAndDeleteTempFiles(
       "io.gearpump.experiments.storm.main.GearpumpStormClient",
       argsArray = spaceSeparatedArgumentsToArray(args),
@@ -248,9 +253,10 @@ object MasterService {
     )
   }
 
-  private def submitAndDeleteTempFiles(mainClass: String, argsArray: 
Array[String], fileMap: Map[String, File],
-                                       classPath: Array[String], systemConfig: 
Config,
-                                       userConfigFile: Option[File] = None): 
Boolean = {
+  private def submitAndDeleteTempFiles(
+      mainClass: String, argsArray: Array[String], fileMap: Map[String, File],
+      classPath: Array[String], systemConfig: Config,
+      userConfigFile: Option[File] = None): Boolean = {
     try {
       val jar = fileMap.get("jar")
       if (jar.isEmpty) {
@@ -279,7 +285,7 @@ object MasterService {
   }
 
   /**
-   * Return Java options for gearpump cluster
+   * Returns Java options for gearpump cluster
    */
   private def clusterOptions(systemConfig: Config, userConfigFile: 
Option[File]): Array[String] = {
     var options = Array(
@@ -288,7 +294,8 @@ object MasterService {
       s"-D${Constants.PREFER_IPV4}=true"
     )
 
-    val masters = 
systemConfig.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).flatMap(Util.parseHostList)
+    val masters = 
systemConfig.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+      .toList.flatMap(Util.parseHostList)
     options ++= masters.zipWithIndex.map { case (master, index) =>
       
s"-D${Constants.GEARPUMP_CLUSTER_MASTERS}.$index=${master.host}:${master.port}"
     }.toArray[String]
@@ -311,7 +318,7 @@ object MasterService {
   }
 
   /**
-   * Return a space separated arguments as an array.
+   * Returns a space separated arguments as an array.
    */
   private def spaceSeparatedArgumentsToArray(str: String): Array[String] = {
     str.split(" +").filter(_.nonEmpty)
@@ -335,9 +342,9 @@ object MasterService {
     )
   }
 
-  case class SubmitApplicationRequest (
-    appName: String,
-    processors: Map[ProcessorId, ProcessorDescription],
-    dag: Graph[Int, String],
-    userconfig: UserConfig)
+  case class SubmitApplicationRequest(
+      appName: String,
+      processors: Map[ProcessorId, ProcessorDescription],
+      dag: Graph[Int, String],
+      userconfig: UserConfig)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala 
b/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
index a44e0a9..d0fea30 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/RestServices.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,25 @@
 
 package io.gearpump.services
 
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
 import akka.actor.{ActorRef, ActorSystem}
 import akka.http.scaladsl.model.StatusCodes._
 import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.server.{Route, _}
 import akka.stream.ActorMaterializer
 import akka.util.Timeout
-import io.gearpump.jarstore.JarStoreService
-import io.gearpump.util.{Constants, LogUtil}
 import org.apache.commons.lang.exception.ExceptionUtils
 
-import scala.concurrent.Await
-import scala.concurrent.duration._
+import io.gearpump.jarstore.JarStoreService
+import io.gearpump.util.{Constants, LogUtil}
+// NOTE: This cannot be removed!!!
+import io.gearpump.services.util.UpickleUtil._
 
-class RestServices(master: ActorRef, mat: ActorMaterializer, system: 
ActorSystem) extends RouteService {
+/** Contains all REST API service endpoints */
+class RestServices(master: ActorRef, mat: ActorMaterializer, system: 
ActorSystem)
+  extends RouteService {
 
   implicit val timeout = Constants.FUTURE_TIMEOUT
 
@@ -42,9 +47,11 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, 
system: ActorSystem
 
   private val LOG = LogUtil.getLogger(getClass)
 
-  private val securityEnabled = 
config.getBoolean(Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED)
+  private val securityEnabled = config.getBoolean(
+    Constants.GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED)
 
-  private val supervisorPath = 
system.settings.config.getString(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
+  private val supervisorPath = system.settings.config.getString(
+    Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH)
 
   private val myExceptionHandler: ExceptionHandler = ExceptionHandler {
     case ex: Throwable => {
@@ -55,7 +62,7 @@ class RestServices(master: ActorRef, mat: ActorMaterializer, 
system: ActorSystem
     }
   }
 
-  // make sure staticRoute is the final one, as it will try to lookup resource 
in local path
+  // Makes sure staticRoute is the final one, as it will try to lookup 
resource in local path
   // if there is no match in previous routes
   private val static = new StaticService(system, supervisorPath).route
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
index a5bbe2f..b1abf62 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/SecurityService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,54 +18,57 @@
 
 package io.gearpump.services
 
-import akka.actor.{ActorSystem}
-import akka.http.scaladsl.model.{Uri, StatusCodes, RemoteAddress}
-import akka.http.scaladsl.model.headers.{HttpCookiePair, HttpCookie, 
HttpChallenge}
-import 
akka.http.scaladsl.server.AuthenticationFailedRejection.{CredentialsRejected, 
CredentialsMissing}
-import akka.http.scaladsl.server._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.model.headers.{HttpChallenge, HttpCookie, 
HttpCookiePair}
+import akka.http.scaladsl.model.{RemoteAddress, StatusCodes, Uri}
+import 
akka.http.scaladsl.server.AuthenticationFailedRejection.{CredentialsMissing, 
CredentialsRejected}
 import akka.http.scaladsl.server.Directives._
+import akka.http.scaladsl.server._
 import akka.http.scaladsl.server.directives.FormFieldDirectives.FieldMagnet
 import akka.stream.Materializer
-import com.softwaremill.session.ClientSessionManagerMagnet._
-import com.softwaremill.session.SessionDirectives._
-import com.softwaremill.session._
 import com.typesafe.config.Config
+import com.softwaremill.session.SessionDirectives._
+import com.softwaremill.session.SessionOptions._
+import com.softwaremill.session.{MultiValueSessionSerializer, SessionConfig, 
SessionManager}
+import upickle.default.write
+
+import io.gearpump.security.{Authenticator => BaseAuthenticator}
 import io.gearpump.services.SecurityService.{User, UserSession}
 import io.gearpump.services.security.oauth2.OAuth2Authenticator
 import io.gearpump.util.{Constants, LogUtil}
-import upickle.default.{write}
+// NOTE: This cannot be removed!!!
 import io.gearpump.services.util.UpickleUtil._
-import io.gearpump.security.{Authenticator => BaseAuthenticator}
-import scala.concurrent.{ExecutionContext, Future}
-import scala.util.{Failure, Success}
-
 
 /**
- * When user cannot be authenticated, will reject with 401 
AuthenticationFailedRejection
- * When user can be authenticated, but are not authorized to access certail 
resource, will
- * return a 405 AuthorizationFailedRejection.
- *
- * When web UI frontend receive 401, it should redirect the UI to login page.
- * When web UI receive 405,it should display errors like
- * "current user is not authorized to access this resource."
+ * Security authentication endpoint.
  *
- * The Authenticator used is pluggable, the current Authenticator is resolved 
by looking up config path
- * [[Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS]].
+ * - When user cannot be authenticated, will reject with 401 
AuthenticationFailedRejection
+ * - When user can be authenticated, but are not authorized to access certail 
resource, will
+ *   return a 405 AuthorizationFailedRejection.
+ * - When web UI frontend receive 401, it should redirect the UI to login page.
+ * - When web UI receive 405,it should display errors like
+ *   "current user is not authorized to access this resource."
  *
- * see [[BaseAuthenticator]] to find more info on custom Authenticator.
+ * The Authenticator used is pluggable, the current Authenticator is resolved 
by looking up
+ * config path [[io.gearpump.util.Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS]].
  *
+ * See [[io.gearpump.security.Authenticator]] to find more info on custom 
Authenticator.
  */
 class SecurityService(inner: RouteService, implicit val system: ActorSystem) 
extends RouteService {
 
   // Use scheme "GearpumpBasic" to avoid popping up web browser native 
authentication box.
-  private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = 
"gearpump", params = Map.empty)
+  private val challenge = HttpChallenge(scheme = "GearpumpBasic", realm = 
"gearpump",
+    params = Map.empty)
 
   val LOG = LogUtil.getLogger(getClass, "AUDIT")
 
   private val config = system.settings.config
   private val sessionConfig = SessionConfig.fromConfig(config)
-  private implicit val sessionManager = new 
SessionManager[UserSession](sessionConfig)
-  private val magnet = 
ClientSessionManagerMagnet.forSessionManager[UserSession, Unit](Unit)
+  private implicit val sessionManager: SessionManager[UserSession] =
+    new SessionManager[UserSession](sessionConfig)
 
   private val authenticator = {
     val clazz = 
Class.forName(config.getString(Constants.GEARPUMP_UI_AUTHENTICATOR_CLASS))
@@ -74,7 +77,7 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
     authenticator
   }
 
-  private def configToMap(config : Config, path: String) = {
+  private def configToMap(config: Config, path: String) = {
     import scala.collection.JavaConverters._
     config.getConfig(path).root.unwrapped.asScala.toMap map { case (k, v) => k 
-> v.toString }
   }
@@ -91,8 +94,9 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
     }
   }
 
-  private def authenticate(user: String, pass: String)(implicit ec: 
ExecutionContext): Future[Option[UserSession]] = {
-    authenticator.authenticate(user, pass, ec).map{ result =>
+  private def authenticate(user: String, pass: String)(implicit ec: 
ExecutionContext)
+  : Future[Option[UserSession]] = {
+    authenticator.authenticate(user, pass, ec).map { result =>
       if (result.authenticated) {
         Some(UserSession(user, result.permissionLevel))
       } else {
@@ -110,7 +114,7 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
   }
 
   private def requireAuthentication(inner: UserSession => Route): Route = {
-    optionalSession(magnet) { sessionOption =>
+    optionalSession(oneOff, usingCookiesOrHeaders) { sessionOption =>
       sessionOption match {
         case Some(session) => {
           inner(session)
@@ -122,10 +126,12 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
   }
 
   private def login(session: UserSession, ip: String, redirectToRoot: Boolean 
= false): Route = {
-    setSession(session) {
+    setSession(oneOff, usingCookies, session) {
       val user = session.user
-      val maxAgeMs = 1000 * 
sessionConfig.clientSessionMaxAgeSeconds.getOrElse(24 * 3600L) // default 1 day
-      setCookie(HttpCookie.fromPair(HttpCookiePair("username", user), path = 
Some("/"), maxAge = Some(maxAgeMs))) {
+      // Default: 1 day
+      val maxAgeMs = 1000 * sessionConfig.sessionMaxAgeSeconds.getOrElse(24 * 
3600L)
+      setCookie(HttpCookie.fromPair(HttpCookiePair("username", user), path = 
Some("/"),
+        maxAge = Some(maxAgeMs))) {
         LOG.info(s"user $user login from $ip")
         if (redirectToRoot) {
           redirect(Uri("/"), StatusCodes.TemporaryRedirect)
@@ -137,7 +143,7 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
   }
 
   private def logout(user: UserSession, ip: String): Route = {
-    invalidateSession(magnet) { ctx =>
+    invalidateSession(oneOff, usingCookies) { ctx =>
       LOG.info(s"user ${user.user} logout from $ip")
       ctx.complete(write(new User(user.user)))
     }
@@ -145,13 +151,13 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
 
   // Only admin are able to access operation like post/delete/put
   private def requireAuthorization(user: UserSession, route: => Route): Route 
= {
-    // valid user
+    // Valid user
     if (user.permissionLevel >= BaseAuthenticator.User.permissionLevel) {
       route
     } else {
-      // possibly a guest or not authenticated.
+      // Possibly a guest or not authenticated.
       (put | delete | post) {
-        // reject with 405 authorization error
+        // Reject with 405 authorization error
         reject(AuthorizationFailedRejection)
       } ~
       get {
@@ -171,8 +177,8 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
     extractExecutionContext{implicit ec: ExecutionContext =>
     extractMaterializer{implicit mat: Materializer =>
     (extractClientIP | unknownIp) { ip =>
-     pathPrefix("login") {
-       pathEndOrSingleSlash {
+      pathPrefix("login") {
+        pathEndOrSingleSlash {
           get {
             getFromResource("login/login.html")
           } ~
@@ -181,7 +187,7 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
             formField(FieldMagnet('username.as[String])) {user: String =>
               formFields(FieldMagnet('password.as[String])) {pass: String =>
                 val result = authenticate(user, pass)
-                onSuccess(result){
+                onSuccess(result) {
                   case Some(session) =>
                     login(session, ip.toString)
                   case None =>
@@ -192,7 +198,7 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
           }
         } ~
         path ("oauth2" / "providers") {
-          // respond with a list of OAuth2 providers.
+          // Responds with a list of OAuth2 providers.
           complete(write(oauth2Providers))
         } ~
         // Support OAUTH Authentication
@@ -207,7 +213,7 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
 
             def loginWithOAuth2Parameters(parameters: Map[String, String]): 
Route = {
               val result = oauthService.authenticate(parameters)
-              onComplete(result){
+              onComplete(result) {
                 case Success(session) =>
                   login(session, ip.toString, redirectToRoot = true)
                 case Failure(ex) => {
@@ -218,7 +224,7 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
             }
 
             path ("authorize") {
-              // redirect to OAuth2 service provider for authorization.
+              // Redirects to OAuth2 service provider for authorization.
               redirect(Uri(oauthService.getAuthorizationUrl), 
StatusCodes.TemporaryRedirect)
             } ~
             path ("accesstoken") {
@@ -255,17 +261,28 @@ class SecurityService(inner: RouteService, implicit val 
system: ActorSystem) ext
 
 object SecurityService {
 
-  val SESSION_MANAGER_KEY = "akka.http.session.serverSecret"
+  val SESSION_MANAGER_KEY = "akka.http.session.server-secret"
 
   case class UserSession(user: String, permissionLevel: Int)
 
   object UserSession {
-    implicit def serializer: SessionSerializer[UserSession] = new 
ToMapSessionSerializer[UserSession] {
-      private val User = "user"
-      private val PermissionLevel = "permissionLevel"
 
-      override def serializeToMap(t: UserSession) = Map(User -> t.user, 
PermissionLevel->t.permissionLevel.toString)
-      override def deserializeFromMap(m: Map[String, String]) = 
UserSession(m(User), m(PermissionLevel).toInt)
+    private val User = "user"
+    private val PermissionLevel = "permissionLevel"
+
+    implicit def serializer: MultiValueSessionSerializer[UserSession] = {
+      new MultiValueSessionSerializer[UserSession](
+        toMap = {t: UserSession =>
+          Map(User -> t.user, PermissionLevel -> t.permissionLevel.toString)
+        },
+        fromMap = {m: Map[String, String] =>
+          if (m.contains(User)) {
+            Try(UserSession(m(User), m(PermissionLevel).toInt))
+          } else {
+            Failure[UserSession](new Exception("Fail to parse session "))
+          }
+        }
+      )
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
index 09180c3..fc990a1 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/StaticService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,13 @@
 
 package io.gearpump.services
 
-import akka.actor.{ActorSystem}
+import akka.actor.ActorSystem
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.server.Directives._
-import akka.stream.{Materializer}
-import io.gearpump.util.{Constants, Util}
+import akka.stream.Materializer
+
+import io.gearpump.util.Util
+// NOTE: This cannot be removed!!!
 import io.gearpump.services.util.UpickleUtil._
 
 /**
@@ -33,13 +35,13 @@ class StaticService(override val system: ActorSystem, 
supervisorPath: String)
 
   private val version = Util.version
 
-  override def prefix = Neutral
+  protected override def prefix = Neutral
 
-  override def cache = true
+  override def cache: Boolean = true
 
-  override def doRoute(implicit mat: Materializer) = {
+  protected override def doRoute(implicit mat: Materializer) = {
     path("version") {
-      get {ctx =>
+      get { ctx =>
         ctx.complete(version)
       }
     } ~

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
index 5d552d6..c7f19e4 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/SupervisorService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,39 +18,43 @@
 
 package io.gearpump.services
 
+import scala.concurrent.Future
+import scala.util.{Failure, Success}
+
 import akka.actor.{ActorRef, ActorSystem}
 import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.server.Route
 import akka.stream.Materializer
-import io.gearpump.WorkerId
-import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData, 
GetAllWorkers}
+
+import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
 import io.gearpump.cluster.ClientToMaster._
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.services.SupervisorService.{Path, Status}
 import io.gearpump.util.ActorUtil._
-
-import scala.concurrent.Future
-import scala.util.{Failure, Success}
-import upickle.default.{read, write}
+// NOTE: This cannot be removed!!!
 import io.gearpump.services.util.UpickleUtil._
 
-class SupervisorService(val master: ActorRef, val supervisor: ActorRef, 
override val system: ActorSystem)
+/** Responsible for adding/removing machines. Typically it delegates to YARN. 
*/
+class SupervisorService(
+    val master: ActorRef, val supervisor: ActorRef, override val system: 
ActorSystem)
   extends BasicService {
 
   import upickle.default.write
 
   /**
-   * TODO: Add additional check to ensure the user have enough authorization 
to add/remove a worker machine
+   * TODO: Add additional check to ensure the user have enough authorization to
+   * add/remove a worker machine
    */
   private def authorize(internal: Route): Route = {
     if (supervisor == null) {
-      failWith(new Exception("API not enabled, cannot find a valid supervisor! 
Please make sure Gearpump is " +
-        "running on top of YARN or other resource managers"))
+      failWith(new Exception("API not enabled, cannot find a valid supervisor! 
" +
+        "Please make sure Gearpump is running on top of YARN or other resource 
managers"))
     } else {
       internal
     }
   }
 
-  override def doRoute(implicit mat: Materializer) = pathPrefix("supervisor") {
+  protected override def doRoute(implicit mat: Materializer) = 
pathPrefix("supervisor") {
     pathEnd {
       get {
         val path = if (supervisor == null) {
@@ -105,7 +109,7 @@ class SupervisorService(val master: ActorRef, val 
supervisor: ActorRef, override
   }
 }
 
-object SupervisorService{
+object SupervisorService {
   case class Status(enabled: Boolean)
 
   case class Path(path: String)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
index a1edbe4..78cdb37 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/WorkerService.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,20 +18,23 @@
 
 package io.gearpump.services
 
-import akka.actor.{ActorSystem, ActorRef}
+import scala.util.{Failure, Success}
+
+import akka.actor.{ActorRef, ActorSystem}
 import akka.http.scaladsl.server.Directives._
-import akka.stream.{Materializer}
-import io.gearpump.WorkerId
+import akka.stream.Materializer
+
 import io.gearpump.cluster.AppMasterToMaster.{GetWorkerData, WorkerData}
-import io.gearpump.cluster.ClientToMaster.{ReadOption, QueryHistoryMetrics, 
QueryWorkerConfig}
+import io.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, 
QueryWorkerConfig, ReadOption}
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.MasterToClient.{HistoryMetrics, WorkerConfig}
+import io.gearpump.cluster.worker.WorkerId
 import io.gearpump.util.ActorUtil._
 import io.gearpump.util.Constants
+// NOTE: This cannot be removed!!!
 import io.gearpump.services.util.UpickleUtil._
 
-import scala.util.{Failure, Success}
-
+/** Service to handle worker related queries */
 class WorkerService(val master: ActorRef, override val system: ActorSystem)
   extends BasicService {
 
@@ -39,15 +42,17 @@ class WorkerService(val master: ActorRef, override val 
system: ActorSystem)
   private val systemConfig = system.settings.config
   private val concise = 
systemConfig.getBoolean(Constants.GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE)
 
-  override def doRoute(implicit mat: Materializer) = pathPrefix("worker" / 
Segment) { workerIdString =>
-    pathEnd {
-      val workerId = WorkerId.parse(workerIdString)
-      onComplete(askWorker[WorkerData](master, workerId, 
GetWorkerData(workerId))) {
-        case Success(value: WorkerData) =>
-          complete(write(value.workerDescription))
-        case Failure(ex) => failWith(ex)
+  protected override def doRoute(implicit mat: Materializer) = 
pathPrefix("worker" / Segment) {
+    workerIdString => {
+      pathEnd {
+        val workerId = WorkerId.parse(workerIdString)
+        onComplete(askWorker[WorkerData](master, workerId, 
GetWorkerData(workerId))) {
+          case Success(value: WorkerData) =>
+            complete(write(value.workerDescription))
+          case Failure(ex) => failWith(ex)
+        }
       }
-    } ~
+    }~
     path("config") {
       val workerId = WorkerId.parse(workerIdString)
       onComplete(askWorker[WorkerConfig](master, workerId, 
QueryWorkerConfig(workerId))) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala 
b/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala
index 7dc0277..a647a8d 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/main/Services.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,21 +18,26 @@
 
 package io.gearpump.services.main
 
-import akka.actor.{ActorSystem}
+import java.util.Random
+import scala.collection.JavaConverters._
+import scala.concurrent.Await
+
+import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.server.Route
 import akka.stream.ActorMaterializer
 import com.typesafe.config.ConfigValueFactory
+import org.slf4j.Logger
+import sun.misc.BASE64Encoder
+
 import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.main.{Gear, CLIOption, ArgumentsParser}
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption, Gear}
 import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.services.{SecurityService, RestServices}
+import io.gearpump.services.{RestServices, SecurityService}
 import io.gearpump.util.LogUtil.ProcessType
 import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
-import org.slf4j.Logger
-
-import scala.concurrent.Await
 
+/** Command line to start UI server */
 object Services extends AkkaApp with ArgumentsParser {
 
   private val LOG = LogUtil.getLogger(getClass)
@@ -48,11 +53,13 @@ object Services extends AkkaApp with ArgumentsParser {
     ClusterConfig.ui()
   }
 
-  override def help: Unit = {
+  override def help(): Unit = {
+    // scalastyle:off println
     Console.println("UI Server")
+    // scalastyle:on println
   }
 
-  private var killFunction: Option[() =>Unit] = None
+  private var killFunction: Option[() => Unit] = None
 
   override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
 
@@ -69,48 +76,59 @@ object Services extends AkkaApp with ArgumentsParser {
       LogUtil.getLogger(getClass)
     }
 
-
-    import scala.collection.JavaConversions._
     if (argConfig.exists("master")) {
       val master = argConfig.getString("master")
       akkaConf = akkaConf.withValue(Constants.GEARPUMP_CLUSTER_MASTERS,
-        ConfigValueFactory.fromIterable(List(master)))
+        ConfigValueFactory.fromIterable(List(master).asJava))
     }
 
     akkaConf = akkaConf.withValue(Constants.GEARPUMP_SERVICE_SUPERVISOR_PATH,
       ConfigValueFactory.fromAnyRef(argConfig.getString("supervisor")))
-      // Create a random unique secret key for session manager.
+      // Creates a random unique secret key for session manager.
       // All previous stored session token cookies will be invalidated when UI
       // server is restarted.
       .withValue(SecurityService.SESSION_MANAGER_KEY,
-        ConfigValueFactory.fromAnyRef(java.util.UUID.randomUUID().toString()))
+        ConfigValueFactory.fromAnyRef(randomSeverSecret()))
 
-    val masterCluster = 
akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).toList.flatMap(Util.parseHostList)
+    val masterCluster = 
akkaConf.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala
+      .flatMap(Util.parseHostList)
 
-    implicit val system = ActorSystem("services" , akkaConf)
+    implicit val system = ActorSystem("services", akkaConf)
     implicit val executionContext = system.dispatcher
 
     import scala.concurrent.duration._
-    val master = system.actorOf(MasterProxy.props(masterCluster, 1 day), 
s"masterproxy${system.name}")
+    val master = system.actorOf(MasterProxy.props(masterCluster, 1.day),
+      s"masterproxy${system.name}")
     val (host, port) = parseHostPort(system.settings.config)
 
-
     implicit val mat = ActorMaterializer()
     val services = new RestServices(master, mat, system)
 
     val bindFuture = Http().bindAndHandle(Route.handlerFlow(services.route), 
host, port)
-    Await.result(bindFuture, 15 seconds)
+    Await.result(bindFuture, 15.seconds)
 
-    val displayHost = if(host == "0.0.0.0") "127.0.0.1" else host
+    val displayHost = if (host == "0.0.0.0") "127.0.0.1" else host
     LOG.info(s"Please browse to http://$displayHost:$port to see the web UI")
+
+    // scalastyle:off println
     println(s"Please browse to http://$displayHost:$port to see the web UI")
+    // scalastyle:on println
 
-    killFunction = Some{() =>
+    killFunction = Some { () =>
       LOG.info("Shutting down UI Server")
-      system.shutdown()
+      system.terminate()
     }
 
-    system.awaitTermination()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  private def randomSeverSecret(): String = {
+    val random = new Random()
+    val length = 64 // Required
+    val bytes = new Array[Byte](length)
+    random.nextBytes(bytes)
+    val endecoder = new BASE64Encoder()
+    endecoder.encode(bytes)
   }
 
   private def parseHostPort(config: Config): (String, Int) = {
@@ -120,7 +138,8 @@ object Services extends AkkaApp with ArgumentsParser {
   }
 
   // TODO: fix this
-  // Hack around for YARN module, so that we can kill the UI server when 
application is shutting down.
+  // Hacks around for YARN module, so that we can kill the UI server
+  // when application is shutting down.
   def kill(): Unit = {
     if (killFunction.isDefined) {
       killFunction.get.apply()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/package.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/package.scala 
b/services/jvm/src/main/scala/io/gearpump/services/package.scala
index bebcf95..7b38134 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/package.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/package.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala
 
b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala
index edaefa7..158d406 100644
--- 
a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala
+++ 
b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/OAuth2Authenticator.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,11 +18,13 @@
 
 package io.gearpump.services.security.oauth2
 
+import scala.concurrent.{ExecutionContext, Future}
+
 import com.typesafe.config.Config
+
 import io.gearpump.services.SecurityService.UserSession
 import io.gearpump.util.Constants
 import io.gearpump.util.Constants._
-import scala.concurrent.{ExecutionContext, Future}
 
 /**
  *
@@ -30,46 +32,47 @@ import scala.concurrent.{ExecutionContext, Future}
  * @see [[https://tools.ietf.org/html/rfc6749]] to find what is OAuth2, and 
how it works.
  *
  * Basically flow for OAuth2 Authentication:
- * 1. User accesses Gearpump UI website, and choose to login with OAuth2 
server.
- * 2. Gearpump UI website redirects user to OAuth2 server domain authorization 
endpoint.
- * 3. End user complete the authorization in the domain of OAuth2 server.
- * 4. OAuth2 server redirects user back to Gearpump UI server.
- * 5. Gearpump UI server verify the tokens and extract credentials from query
- * parameters and form fields.
+ *  1. User accesses Gearpump UI website, and choose to login with OAuth2 
server.
+ *  2. Gearpump UI website redirects user to OAuth2 server domain 
authorization endpoint.
+ *  3. End user complete the authorization in the domain of OAuth2 server.
+ *  4. OAuth2 server redirects user back to Gearpump UI server.
+ *  5. Gearpump UI server verify the tokens and extract credentials from query
+ *    parameters and form fields.
  *
- * @note '''Thread-safety''' is a MUST requirement. Developer need to ensure 
the sub-class is thread-safe.
- * Sub-class should have a parameterless constructor.
+ * NOTE: '''Thread-safety''' is a MUST requirement. Developer need to ensure 
the sub-class is
+ *      thread-safe. Sub-class should have a parameterless constructor.
  *
- * @note OAuth2 Authenticator requires access of Internet. Please make sure 
HTTP proxy are
+ * NOTE:  OAuth2 Authenticator requires access of Internet. Please make sure 
HTTP proxy are
  * set properly if applied.
  *
- * @example Config proxy when UI server is started on Windows:
+ * Example: Config proxy when UI server is started on Windows:
  * {{{
- *   > set JAVA_OPTS=-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 
-Dhttps.proxyHost=xx.com -Dhttps.proxyPort=8088
+ *   > set JAVA_OPTS=-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 
-Dhttps.proxyHost=xx.com
+ *      -Dhttps.proxyPort=8088
  *   > bin\services
  * }}}
  *
- * @example Config proxy when UI server is started on Linux:
+ * Example: Config proxy when UI server is started on Linux:
  * {{{
- *   $ export JAVA_OPTS="-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 
-Dhttps.proxyHost=xx.com -Dhttps.proxyPort=8088"
+ *   $ export JAVA_OPTS="-Dhttp.proxyHost=xx.com -Dhttp.proxyPort=8088 
-Dhttps.proxyHost=xx.com
+ *      -Dhttps.proxyPort=8088"
  *   $ bin/services
  * }}}
- *
  */
 trait OAuth2Authenticator {
 
   /**
    * Inits authenticator with config which contains client ID, client secret, 
and etc..
    *
-   * Typically, the client key and client secret is provided by OAuth2 
Authorization server when user
-   * register an application there.
-   * @see [[https://tools.ietf.org/html/rfc6749]] for definition of client, 
client Id,
+   * Typically, the client key and client secret is provided by OAuth2 
Authorization server
+   * when user register an application there.
+   * See [[https://tools.ietf.org/html/rfc6749]] for definition of client, 
client Id,
    * and client secret.
    *
    * See [[https://developer.github.com/v3/oauth/]] for an actual example of 
how Github
    * use client key, and client secret.
    *
-   * @note '''Thread-Safety''': Framework ensures this call is synchronized.
+   * NOTE:  '''Thread-Safety''': Framework ensures this call is synchronized.
    *
    * @param config Client Id, client secret, callback URL and etc..
    * @param executionContext ExecutionContext from hosting environment.
@@ -80,7 +83,7 @@ trait OAuth2Authenticator {
    * Returns the OAuth Authorization URL so for redirection to that address to 
do OAuth2
    * authorization.
    *
-   * @note '''Thread-Safety''': This can be called in a multi-thread 
environment. Developer
+   * NOTE:  '''Thread-Safety''': This can be called in a multi-thread 
environment. Developer
    *      need to ensure thread safety.
    */
   def getAuthorizationUrl: String
@@ -89,9 +92,11 @@ trait OAuth2Authenticator {
    * After authorization, OAuth2 server redirects user back with tokens. This 
verify the
    * tokens, retrieve the profiles, and return [[UserSession]] information.
    *
-   * @note This is an Async call.
-   * @note This call requires external internet access.
-   * @note '''Thread-Safety''': This can be called in a multi-thread 
environment. Developer
+   * NOTE:  This is an Async call.
+   *
+   * NOTE:  This call requires external internet access.
+   *
+   * NOTE:  '''Thread-Safety''': This can be called in a multi-thread 
environment. Developer
    *      need to ensure thread safety.
    *
    * @param parameters HTTP Query and Post parameters, which typically 
contains Authorization code.
@@ -100,7 +105,7 @@ trait OAuth2Authenticator {
   def authenticate(parameters: Map[String, String]): Future[UserSession]
 
   /**
-   * Clean resource
+   * Clean up resource
    */
   def close(): Unit
 }
@@ -116,7 +121,8 @@ object OAuth2Authenticator {
    * @param provider, Name for the OAuth2 Authentication Service.
    * @return Returns null if the OAuth2 Authentication is disabled.
    */
-  def get(config: Config, provider: String, executionContext: 
ExecutionContext): OAuth2Authenticator = {
+  def get(config: Config, provider: String, executionContext: ExecutionContext)
+    : OAuth2Authenticator = {
 
     if (providers.contains(provider)) {
       providers(provider)
@@ -129,7 +135,8 @@ object OAuth2Authenticator {
             providers(provider)
           } else {
             val authenticatorConfig = config.getConfig(path)
-            val authenticatorClass = 
authenticatorConfig.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
+            val authenticatorClass = authenticatorConfig.getString(
+              GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
             val clazz = 
Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass)
             val authenticator = 
clazz.newInstance().asInstanceOf[OAuth2Authenticator]
             authenticator.init(authenticatorConfig, executionContext)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
 
b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
index f0ba400..e422d3f 100644
--- 
a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
+++ 
b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/BaseOAuth2Authenticator.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,14 +19,18 @@
 package io.gearpump.services.security.oauth2.impl
 
 import java.util.concurrent.atomic.AtomicBoolean
+import scala.collection.mutable.StringBuilder
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
 
+import com.typesafe.config.Config
 import com.github.scribejava.core.builder.ServiceBuilderAsync
 import com.github.scribejava.core.builder.api.DefaultApi20
 import com.github.scribejava.core.model._
 import com.github.scribejava.core.oauth.OAuth20Service
 import com.github.scribejava.core.utils.OAuthEncoder
 import com.ning.http.client.AsyncHttpClientConfig
-import com.typesafe.config.Config
+
 import io.gearpump.security.Authenticator
 import io.gearpump.services.SecurityService.UserSession
 import io.gearpump.services.security.oauth2.OAuth2Authenticator
@@ -34,14 +38,10 @@ import 
io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi
 import io.gearpump.util.Constants._
 import io.gearpump.util.Util
 
-import scala.collection.mutable.StringBuilder
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success}
-
 /**
  * Uses Ning AsyncClient to connect to OAuth2 service.
  *
- * @see [[OAuth2Authenticator]] for more API information.
+ * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more 
API information.
  */
 abstract class BaseOAuth2Authenticator extends OAuth2Authenticator {
 
@@ -105,7 +105,6 @@ abstract class BaseOAuth2Authenticator extends 
OAuth2Authenticator {
     oauthService.getAuthorizationUrl()
   }
 
-
   protected def authenticateWithAccessToken(accessToken: OAuth2AccessToken): 
Future[UserSession] = {
     val promise = Promise[UserSession]()
     val request = new OAuthRequestAsync(Verb.GET, protectedResourceUrl, 
oauthService)
@@ -139,7 +138,7 @@ abstract class BaseOAuth2Authenticator extends 
OAuth2Authenticator {
 
       new OAuthAsyncRequestCallback[OAuth2AccessToken] {
         override def onCompleted(accessToken: OAuth2AccessToken): Unit = {
-          authenticateWithAccessToken(accessToken).onComplete{
+          authenticateWithAccessToken(accessToken).onComplete {
             case Success(user) => promise.success(user)
             case Failure(ex) => promise.failure(ex)
           }
@@ -167,9 +166,11 @@ abstract class BaseOAuth2Authenticator extends 
OAuth2Authenticator {
     }
   }
 
-  private def buildOAuth2Service(clientId: String, clientSecret: String, 
callback: String): OAuth20Service = {
-    val state: String = "state" + Util.randInt
-    
ScribeJavaConfig.setForceTypeOfHttpRequests(ForceTypeOfHttpRequest.FORCE_ASYNC_ONLY_HTTP_REQUESTS)
+  private def buildOAuth2Service(clientId: String, clientSecret: String, 
callback: String)
+    : OAuth20Service = {
+    val state: String = "state" + Util.randInt()
+    ScribeJavaConfig.setForceTypeOfHttpRequests(
+      ForceTypeOfHttpRequest.FORCE_ASYNC_ONLY_HTTP_REQUESTS)
     val clientConfig: AsyncHttpClientConfig = new 
AsyncHttpClientConfig.Builder()
       .setMaxConnections(5)
       .setUseProxyProperties(true)
@@ -189,7 +190,6 @@ abstract class BaseOAuth2Authenticator extends 
OAuth2Authenticator {
 
     service
   }
-
 }
 
 object BaseOAuth2Authenticator {
@@ -200,18 +200,21 @@ object BaseOAuth2Authenticator {
     }
 
     def getAuthorizationUrl(config: OAuthConfig): String = {
-      val sb: StringBuilder = new StringBuilder(String.format(authorizeUrl, 
config.getResponseType, config.getApiKey, 
OAuthEncoder.encode(config.getCallback), OAuthEncoder.encode(config.getScope)))
+      val sb: StringBuilder = new StringBuilder(String.format(authorizeUrl,
+        config.getResponseType, config.getApiKey, 
OAuthEncoder.encode(config.getCallback),
+        OAuthEncoder.encode(config.getScope)))
       val state: String = config.getState
       if (state != null) {
         
sb.append('&').append(OAuthConstants.STATE).append('=').append(OAuthEncoder.encode(state))
       }
-      return sb.toString
+      sb.toString
     }
 
     override def createService(config: OAuthConfig): OAuth20Service = {
-      return new OAuth20Service(this, config) {
+      new OAuth20Service(this, config) {
 
-        protected override def createAccessTokenRequest[T <: 
AbstractRequest](code: String, request: T): T = {
+        protected override def createAccessTokenRequest[T <: AbstractRequest](
+            code: String, request: T): T = {
           super.createAccessTokenRequest(code, request)
 
           if (!getConfig.hasGrantType) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
 
b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
index 27b66d2..98e37ea 100644
--- 
a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
+++ 
b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/CloudFoundryUAAOAuth2Authenticator.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,44 +18,49 @@
 
 package io.gearpump.services.security.oauth2.impl
 
+import scala.concurrent.{ExecutionContext, Future, Promise}
+
+import com.typesafe.config.Config
 import com.github.scribejava.core.builder.api.DefaultApi20
 import com.github.scribejava.core.model._
 import com.github.scribejava.core.oauth.OAuth20Service
 import com.ning.http.client
 import com.ning.http.client.{AsyncCompletionHandler, AsyncHttpClient}
-import com.typesafe.config.Config
-import io.gearpump.services.SecurityService.UserSession
-import io.gearpump.services.security.oauth2.OAuth2Authenticator
-import 
io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20
-import io.gearpump.util.Constants._
 import spray.json.{JsString, _}
 import sun.misc.BASE64Encoder
 
-import scala.concurrent.{ExecutionContext, Promise, Future}
+import io.gearpump.services.SecurityService.UserSession
+import 
io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator.BaseApi20
+import io.gearpump.util.Constants._
 
 /**
  *
  * Does authentication with CloudFoundry UAA service. Currently it only
  * extract the email address of end user.
  *
- * For what is UAA, please see:
- * @see [[https://github.com/cloudfoundry/uaa for information about 
CloudFoundry UAA]]
+ * For what is UAA,
+ * See [[https://github.com/cloudfoundry/uaa for information about 
CloudFoundry UAA]]
  *      (User Account and Authentication Service)
  *
  * Pre-requisite steps to use this Authenticator:
  *
  * Step1: Register your website to UAA with tool uaac.
- * 1) Check tutorial on uaac at 
[[https://docs.cloudfoundry.org/adminguide/uaa-user-management.html]]
- * 2) Open a bash shell, set the UAA server by command `uaac target`
- * {{{
- *   uaac target [your uaa server url]
- * }}}
+ *  1) Check tutorial on uaac at
+ *    [[https://docs.cloudfoundry.org/adminguide/uaa-user-management.html]]
+ *
+ *  2) Open a bash shell, set the UAA server by command `uaac target`
+ *    {{{
+ *    uaac target [your uaa server url]
+ *    }}}
+ *
  * NOTE: [your uaa server url] should match the uaahost settings in gear.conf
- * 3) Login in as user admin by
- * {{{
- *    uaac token client get admin -s MyAdminPassword
- * }}}
- * 4) Create a new Application (Client) in UAA,
+ *
+ *  3) Login in as user admin by
+ *     {{{
+ *     uaac token client get admin -s MyAdminPassword
+ *     }}}
+ *
+ *  4) Create a new Application (Client) in UAA,
  * {{{
  *   uaac client add [your_client_id]
  *     --scope "openid cloud_controller.read"
@@ -67,28 +72,34 @@ import scala.concurrent.{ExecutionContext, Promise, Future}
  * }}}
  *
  * Step2: Configure the OAuth2 information in gear.conf
- * 1) Enable OAuth2 authentication by setting 
"gearpump.ui-security.oauth2-authenticator-enabled"
+ *
+ *  1) Enable OAuth2 authentication by setting 
"gearpump.ui-security.oauth2-authenticator-enabled"
  * as true.
- * 2) Navigate to section 
"gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa"
- * 3) Config gear.conf 
"gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" section.
+ *
+ *  2) Navigate to section 
"gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa"
+ *
+ *  3) Config gear.conf 
"gearpump.ui-security.oauth2-authenticators.cloudfoundryuaa" section.
  * Please make sure class name, client ID, client Secret, and callback URL are 
set properly.
  *
- * @note The callback URL here should match what you set on CloudFoundry UAA 
in step1.
+ * NOTE:  The callback URL here should match what you set on CloudFoundry UAA 
in step1.
  *
  * Step3: Restart the UI service and try the "social login" button for UAA.
  *
- * @note OAuth requires Internet access, @see [[OAuth2Authenticator]] to find 
tutorials to configure
- *       Internet proxy.
+ * NOTE:  OAuth requires Internet access, @see
+ *       [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] to find 
tutorials to
+ *       configure Internet proxy.
  *
- * @see [[OAuth2Authenticator]] for more background information of OAuth2.
+ * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more 
background
+ *     information of OAuth2.
  */
 class CloudFoundryUAAOAuth2Authenticator extends BaseOAuth2Authenticator {
 
-  import CloudFoundryUAAOAuth2Authenticator._
+  import 
io.gearpump.services.security.oauth2.impl.CloudFoundryUAAOAuth2Authenticator._
 
   private var host: String = null
 
-  protected override def authorizeUrl: String = 
s"$host/oauth/authorize?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s"
+  protected override def authorizeUrl: String =
+    
s"$host/oauth/authorize?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s"
 
   protected override def accessTokenEndpoint: String = s"$host/oauth/token"
 
@@ -104,7 +115,8 @@ class CloudFoundryUAAOAuth2Authenticator extends 
BaseOAuth2Authenticator {
 
     if (config.getBoolean(ADDITIONAL_AUTHENTICATOR_ENABLED)) {
       val additionalAuthenticatorConfig = 
config.getConfig(ADDITIONAL_AUTHENTICATOR)
-      val authenticatorClass = 
additionalAuthenticatorConfig.getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
+      val authenticatorClass = additionalAuthenticatorConfig
+        .getString(GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS)
       val clazz = 
Thread.currentThread().getContextClassLoader.loadClass(authenticatorClass)
       val authenticator = 
clazz.newInstance().asInstanceOf[AdditionalAuthenticator]
       authenticator.init(additionalAuthenticatorConfig, executionContext)
@@ -117,17 +129,17 @@ class CloudFoundryUAAOAuth2Authenticator extends 
BaseOAuth2Authenticator {
     email.value
   }
 
-
   protected override def oauth2Api(): DefaultApi20 = {
     new CloudFoundryUAAService(authorizeUrl, accessTokenEndpoint)
   }
 
-  protected override def authenticateWithAccessToken(accessToken: 
OAuth2AccessToken): Future[UserSession] = {
+  protected override def authenticateWithAccessToken(accessToken: 
OAuth2AccessToken)
+    : Future[UserSession] = {
 
     implicit val ec: ExecutionContext = executionContext
 
     if (additionalAuthenticator.isDefined) {
-      super.authenticateWithAccessToken(accessToken).flatMap{user =>
+      super.authenticateWithAccessToken(accessToken).flatMap { user =>
         
additionalAuthenticator.get.authenticate(oauthService.getAsyncHttpClient, 
accessToken, user)
       }
     } else {
@@ -152,9 +164,10 @@ object CloudFoundryUAAOAuth2Authenticator {
     }
 
     override def createService(config: OAuthConfig): OAuth20Service = {
-      return new OAuth20Service(this, config) {
+      new OAuth20Service(this, config) {
 
-        protected override def createAccessTokenRequest[T <: 
AbstractRequest](code: String, request: T): T = {
+        protected override def createAccessTokenRequest[T <: AbstractRequest](
+            code: String, request: T): T = {
           val config: OAuthConfig = getConfig()
 
           request.addParameter(OAuthConstants.GRANT_TYPE, 
OAuthConstants.AUTHORIZATION_CODE)
@@ -181,8 +194,10 @@ object CloudFoundryUAAOAuth2Authenticator {
   trait AdditionalAuthenticator {
 
     /**
-     * @param config configurations specifically used for this authenticator.
-     * @param executionContext execution Context to use to run futures.
+     * Initialization
+     *
+     * @param config Configurations specifically used for this authenticator.
+     * @param executionContext Execution Context to use to run futures.
      */
     def init(config: Config, executionContext: ExecutionContext): Unit
 
@@ -192,7 +207,9 @@ object CloudFoundryUAAOAuth2Authenticator {
      * @param user user session returned by previous authenticator
      * @return an updated UserSession
      */
-    def authenticate(asyncClient: AsyncHttpClient, accessToken: 
OAuth2AccessToken, user: UserSession): Future[UserSession]
+    def authenticate(
+        asyncClient: AsyncHttpClient, accessToken: OAuth2AccessToken, user: 
UserSession)
+      : Future[UserSession]
   }
 
   val ORGANIZATION_URL = "organization-url"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
 
b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
index 54e6eef..c2b18fd 100644
--- 
a/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
+++ 
b/services/jvm/src/main/scala/io/gearpump/services/security/oauth2/impl/GoogleOAuth2Authenticator.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,8 +22,6 @@ import 
com.github.scribejava.apis.google.GoogleJsonTokenExtractor
 import com.github.scribejava.core.builder.api.DefaultApi20
 import com.github.scribejava.core.extractors.TokenExtractor
 import com.github.scribejava.core.model._
-import io.gearpump.services.security.oauth2.OAuth2Authenticator
-import 
io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator.AsyncGoogleApi20
 import spray.json._
 
 /**
@@ -34,34 +32,40 @@ import spray.json._
  * Pre-requisite steps to use this Authenticator:
  *
  * Step1: Register your website as an OAuth2 Application on Google
- * 1) Create an application representing your website at 
[[https://console.developers.google.com]]
- * 2) In "API Manager" of your created application, enable API "Google+ API"
- * 3) Create OAuth client ID for this application. In "Credentials" tab of 
"API Manager",
+ *  1) Create an application representing your website at 
[[https://console.developers.google.com]]
+ *
+ *  2) In "API Manager" of your created application, enable API "Google+ API"
+ *
+ *  3) Create OAuth client ID for this application. In "Credentials" tab of 
"API Manager",
  * choose "Create credentials", and then select OAuth client ID. Follow the 
wizard
  * to set callback URL, and generate client ID, and client Secret. Callback 
URL is NOT optional.
  *
  * Step2: Configure the OAuth2 information in gear.conf
- * 1) Enable OAuth2 authentication by setting 
"gearpump.ui-security.oauth2-authenticator-enabled"
+ *
+ *  1) Enable OAuth2 authentication by setting 
"gearpump.ui-security.oauth2-authenticator-enabled"
  * as true.
- * 2) Configure section "gearpump.ui-security.oauth2-authenticators.google". 
Please make sure
+ *
+ *  2) Configure section "gearpump.ui-security.oauth2-authenticators.google". 
Please make sure
  * class name, client ID, client Secret, and callback URL are set properly.
  *
- * @note callback URL set here should match what is configured on Google in 
step1.
+ * NOTE:  callback URL set here should match what is configured on Google in 
step1.
  *
  * Step3: Restart the UI service and try out the Google social login button in 
UI.
  *
- * @note OAuth requires Internet access, @see [[OAuth2Authenticator]] to find 
some helpful tutorials.
+ * NOTE:  OAuth requires Internet access, @see
+ *       [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] to find 
some helpful tutorials
  *
- * @note Google use scope to define what data can be fetched by OAuth2. 
Currently we use profile
- * [[https://www.googleapis.com/auth/userinfo.email]]. However, Google may 
change the profile in future.
+ * NOTE:  Google use scope to define what data can be fetched by OAuth2. 
Currently we use profile
+ *       [[https://www.googleapis.com/auth/userinfo.email]]. However, Google 
may change the profile
+ *       in future.
  *
- * @todo Currently, this doesn't verify the state from Google OAuth2 response.
+ * TODO Currently, this doesn't verify the state from Google OAuth2 response.
  *
- * @see [[OAuth2Authenticator]] for more API information.
+ * See [[io.gearpump.services.security.oauth2.OAuth2Authenticator]] for more 
API information.
  */
 class GoogleOAuth2Authenticator extends BaseOAuth2Authenticator {
 
-  import GoogleOAuth2Authenticator._
+  import io.gearpump.services.security.oauth2.impl.GoogleOAuth2Authenticator._
 
   protected override def authorizeUrl: String = AuthorizeUrl
 
@@ -83,15 +87,17 @@ class GoogleOAuth2Authenticator extends 
BaseOAuth2Authenticator {
 
 object GoogleOAuth2Authenticator {
 
-  import BaseOAuth2Authenticator._
+  import io.gearpump.services.security.oauth2.impl.BaseOAuth2Authenticator._
 
+  // scalastyle:off line.size.limit
   val AuthorizeUrl = 
"https://accounts.google.com/o/oauth2/auth?response_type=%s&client_id=%s&redirect_uri=%s&scope=%s";
+  // scalastyle:on line.size.limit
   val AccessEndpoint = "https://www.googleapis.com/oauth2/v4/token";
   val ResourceUrl = "https://www.googleapis.com/plus/v1/people/me";
   val Scope = "https://www.googleapis.com/auth/userinfo.email";
 
   private class AsyncGoogleApi20(authorizeUrl: String, accessEndpoint: String)
-      extends BaseApi20(authorizeUrl, accessEndpoint) {
+    extends BaseApi20(authorizeUrl, accessEndpoint) {
 
     override def getAccessTokenExtractor: TokenExtractor[OAuth2AccessToken] = {
       GoogleJsonTokenExtractor.instance

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala 
b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
index 0ae4505..f95474e 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/util/UpickleUtil.scala
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,23 @@
 
 package io.gearpump.services.util
 
-import io.gearpump.WorkerId
-import io.gearpump.util.Graph
 import upickle.Js
 
+import io.gearpump.cluster.worker.WorkerId
+import io.gearpump.util.Graph
+
 object UpickleUtil {
 
-  //TODO: upickle cannot infer the reader automatically due to
-  // issue https://github.com/lihaoyi/upickle-pprint/issues/102
-  implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = 
upickle.default.Reader[Graph[Int, String]] {
-    case Js.Obj(verties, edges) =>
-      val vertexList = upickle.default.readJs[List[Int]](verties._2)
-      val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2)
-      Graph(vertexList, edgeList)
+  // For implicit type, we need to add EXPLICIT return type, otherwise, 
upickle may NOT infer the
+  // reader type automatically.
+  // See issue https://github.com/lihaoyi/upickle-pprint/issues/102
+  implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = {
+    upickle.default.Reader[Graph[Int, String]] {
+      case Js.Obj(verties, edges) =>
+        val vertexList = upickle.default.readJs[List[Int]](verties._2)
+        val edgeList = upickle.default.readJs[List[(Int, String, 
Int)]](edges._2)
+        Graph(vertexList, edgeList)
+    }
   }
 
   implicit val workerIdReader: upickle.default.Reader[WorkerId] = 
upickle.default.Reader[WorkerId] {

Reply via email to