http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/ActorSystemBooter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/util/ActorSystemBooter.scala 
b/core/src/main/scala/org/apache/gearpump/util/ActorSystemBooter.scala
new file mode 100644
index 0000000..674445a
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/ActorSystemBooter.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.util.concurrent.{TimeUnit, TimeoutException}
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.util.{Failure, Success, Try}
+
+import akka.actor._
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.ClusterConfig
+import org.apache.gearpump.util.LogUtil.ProcessType
+
+/**
+ * ActorSystemBooter start a new JVM process to boot an actor system.
+ * All executors are started by ActorSystemBooter
+ *
+ * It send the system address to "report back actor"
+ */
+class ActorSystemBooter(config: Config) {
+  import org.apache.gearpump.util.ActorSystemBooter._
+
+  def boot(name: String, reportBackActor: String): ActorSystem = {
+    val system = ActorSystem(name, config)
+    // Daemon path: http://{system}@{ip}:{port}/daemon
+    system.actorOf(Props(classOf[Daemon], name, reportBackActor), "daemon")
+    system
+  }
+}
+
+object ActorSystemBooter {
+
+  def apply(config: Config): ActorSystemBooter = new ActorSystemBooter(config)
+
+  def main(args: Array[String]) {
+    val name = args(0)
+    val reportBack = args(1)
+    val config = ClusterConfig.default()
+
+    LogUtil.loadConfiguration(config, ProcessType.APPLICATION)
+
+    val debugPort = 
Option(System.getProperty(Constants.GEARPUMP_REMOTE_DEBUG_PORT))
+    debugPort.foreach { port =>
+      val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass)
+      LOG.info("==========================================")
+      LOG.info("Remote debug port: " + port)
+      LOG.info("==========================================")
+    }
+
+    val system = apply(config).boot(name, reportBack)
+
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      override def run(): Unit = {
+        val LOG: Logger = LogUtil.getLogger(ActorSystemBooter.getClass)
+        LOG.info("Maybe we have received a SIGINT signal from parent process, 
" +
+          "start to cleanup resources....")
+        system.terminate()
+      }
+    })
+
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  case class BindLifeCycle(actor: ActorRef)
+  case class CreateActor(prop: Props, name: String)
+  case class ActorCreated(actor: ActorRef, name: String)
+  case class CreateActorFailed(name: String, reason: Throwable)
+
+  case class RegisterActorSystem(systemPath: String)
+
+  /**
+   * This actor system will watch for parent,
+   * If parent dies, this will also die
+   */
+  case class ActorSystemRegistered(bindLifeWith: ActorRef)
+  case class RegisterActorSystemFailed(reason: Throwable)
+
+  object RegisterActorSystemTimeOut
+
+  class Daemon(val name: String, reportBack: String) extends Actor {
+    val LOG: Logger = LogUtil.getLogger(getClass, context = name)
+
+    val username = 
Option(System.getProperty(Constants.GEARPUMP_USERNAME)).getOrElse("not_defined")
+    LOG.info(s"RegisterActorSystem to ${reportBack}, current user: $username")
+
+    val reportBackActor = context.actorSelection(reportBack)
+    reportBackActor ! 
RegisterActorSystem(ActorUtil.getSystemAddress(context.system).toString)
+
+    implicit val executionContext = context.dispatcher
+    val timeout = context.system.scheduler.scheduleOnce(Duration(25, 
TimeUnit.SECONDS),
+      self, RegisterActorSystemFailed(new TimeoutException))
+
+    context.become(waitForRegisterResult)
+
+    def receive: Receive = null
+
+    def waitForRegisterResult: Receive = {
+      case ActorSystemRegistered(parent) =>
+        timeout.cancel()
+        context.watch(parent)
+        context.become(waitCommand)
+      case RegisterActorSystemFailed(ex) =>
+        LOG.error("RegisterActorSystemFailed", ex)
+        timeout.cancel()
+        context.stop(self)
+    }
+
+    def waitCommand: Receive = {
+      case BindLifeCycle(actor) =>
+        LOG.info(s"ActorSystem $name Binding life cycle with actor: $actor")
+        context.watch(actor)
+      case create@CreateActor(props: Props, name: String) =>
+        LOG.info(s"creating actor $name")
+        val actor = Try(context.actorOf(props, name))
+        actor match {
+          case Success(actor) =>
+            sender ! ActorCreated(actor, name)
+          case Failure(e) =>
+            sender ! CreateActorFailed(props.clazz.getName, e)
+        }
+      case PoisonPill =>
+        context.stop(self)
+      case Terminated(actor) =>
+        LOG.info(s"System $name Watched actor is terminated $actor")
+        context.stop(self)
+    }
+
+    override def postStop(): Unit = {
+      LOG.info(s"ActorSystem $name is shutting down...")
+      context.system.terminate()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala 
b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
new file mode 100644
index 0000000..b10163f
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import org.apache.gearpump.cluster.worker.WorkerId
+
+import scala.concurrent.{ExecutionContext, Future}
+
+import akka.actor.Actor.Receive
+import akka.actor._
+import akka.pattern.ask
+import org.slf4j.Logger
+
+import org.apache.gearpump.cluster.AppMasterToMaster.GetAllWorkers
+import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, 
ResolveWorkerId}
+import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
+import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, 
ResolveWorkerIdResult}
+import 
org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSystemJvmConfig,
 StartExecutorSystems}
+import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, 
ResourceRequest}
+import org.apache.gearpump.transport.HostPort
+
+object ActorUtil {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+
+  def getSystemAddress(system: ActorSystem): Address = {
+    system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
+  }
+
+  def getFullPath(system: ActorSystem, path: ActorPath): String = {
+    path.toStringWithAddress(getSystemAddress(system))
+  }
+
+  def getHostname(actor: ActorRef): String = {
+    val path = actor.path
+    path.address.host.getOrElse("localhost")
+  }
+
+  def defaultMsgHandler(actor: ActorRef): Receive = {
+    case msg: Any =>
+      LOG.error(s"Cannot find a matching message, ${msg.getClass.toString}, 
forwarded from $actor")
+  }
+
+  def printActorSystemTree(system: ActorSystem): Unit = {
+    val extendedSystem = system.asInstanceOf[ExtendedActorSystem]
+    val clazz = system.getClass
+    val m = clazz.getDeclaredMethod("printTree")
+    m.setAccessible(true)
+    LOG.info(m.invoke(system).asInstanceOf[String])
+  }
+
+  /** Checks whether a actor is child actor by simply examining name */
+  // TODO: fix this, we should also check the path to root besides name
+  def isChildActorPath(parent: ActorRef, child: ActorRef): Boolean = {
+    if (null != child) {
+      parent.path.name == child.path.parent.name
+    } else {
+      false
+    }
+  }
+
+  def actorNameForExecutor(appId: Int, executorId: Int): String = "app" + 
appId + "-executor" +
+    executorId
+
+  // TODO: Currently we explicitly require the master contacts to be started 
with this path pattern
+  // akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER
+  def getMasterActorPath(master: HostPort): ActorPath = {
+    import org.apache.gearpump.util.Constants.MASTER
+    
ActorPath.fromString(s"akka.tcp://$MASTER@${master.host}:${master.port}/user/$MASTER")
+  }
+
+  def launchExecutorOnEachWorker(master: ActorRef, executorJvmConfig: 
ExecutorSystemJvmConfig,
+      sender: ActorRef)(implicit executor: scala.concurrent.ExecutionContext): 
Unit = {
+    implicit val timeout = Constants.FUTURE_TIMEOUT
+
+    (master ? GetAllWorkers).asInstanceOf[Future[WorkerList]].map { list =>
+      val resources = list.workers.map {
+        workerId => ResourceRequest(Resource(1), workerId, relaxation = 
Relaxation.SPECIFICWORKER)
+      }.toArray
+
+      master.tell(StartExecutorSystems(resources, executorJvmConfig), sender)
+    }
+  }
+
+  def askAppMaster[T](master: ActorRef, appId: Int, msg: Any)(implicit ex: 
ExecutionContext)
+    : Future[T] = {
+    implicit val timeout = Constants.FUTURE_TIMEOUT
+    val appmaster = askActor[ResolveAppIdResult](master, 
ResolveAppId(appId)).flatMap { result =>
+      if (result.appMaster.isSuccess) {
+        Future.successful(result.appMaster.get)
+      } else {
+        Future.failed(result.appMaster.failed.get)
+      }
+    }
+    appmaster.flatMap(askActor[T](_, msg))
+  }
+
+  def askWorker[T](master: ActorRef, workerId: WorkerId, msg: Any)(implicit 
ex: ExecutionContext)
+    : Future[T] = {
+    implicit val timeout = Constants.FUTURE_TIMEOUT
+    val worker = askActor[ResolveWorkerIdResult](master, 
ResolveWorkerId(workerId))
+      .flatMap { result =>
+        if (result.worker.isSuccess) {
+          Future.successful(result.worker.get)
+        } else {
+          Future.failed(result.worker.failed.get)
+        }
+      }
+    worker.flatMap(askActor[T](_, msg))
+  }
+
+  def askActor[T](actor: ActorRef, msg: Any)(implicit ex: ExecutionContext): 
Future[T] = {
+    implicit val timeout = Constants.FUTURE_TIMEOUT
+    (actor ? msg).asInstanceOf[Future[T]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/AkkaApp.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/AkkaApp.scala 
b/core/src/main/scala/org/apache/gearpump/util/AkkaApp.scala
new file mode 100644
index 0000000..6060368
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/AkkaApp.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import scala.util.Try
+
+import org.apache.gearpump.cluster.ClusterConfig
+
+/**
+ * A Main class helper to load Akka configuration automatically.
+ */
+trait AkkaApp {
+
+  type Config = com.typesafe.config.Config
+
+  def main(akkaConf: Config, args: Array[String]): Unit
+
+  def help(): Unit
+
+  protected def akkaConfig: Config = {
+    ClusterConfig.default()
+  }
+
+  def main(args: Array[String]): Unit = {
+    Try {
+      main(akkaConfig, args)
+    }.failed.foreach { ex => help(); throw ex }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Constants.scala 
b/core/src/main/scala/org/apache/gearpump/util/Constants.scala
new file mode 100644
index 0000000..dba5a1f
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/Constants.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.partitioner._
+
+object Constants {
+  val MASTER_WATCHER = "masterwatcher"
+  val SINGLETON_MANAGER = "singleton"
+
+  val MASTER_CONFIG = "gearpump-master"
+  val WORKER_CONFIG = "gearpump-worker"
+  val UI_CONFIG = "gearpump-ui"
+  val LINUX_CONFIG = "gearpump-linux" // linux or Mac
+
+  val MASTER = "master"
+  val WORKER = "worker"
+
+  val GEARPUMP_WORKER_SLOTS = "gearpump.worker.slots"
+  val GEARPUMP_EXECUTOR_PROCESS_LAUNCHER = 
"gearpump.worker.executor-process-launcher"
+  val GEARPUMP_SCHEDULING_SCHEDULER = "gearpump.scheduling.scheduler-class"
+  val GEARPUMP_SCHEDULING_REQUEST = "gearpump.scheduling.requests"
+  val GEARPUMP_TRANSPORT_SERIALIZER = "gearpump.transport.serializer"
+  val GEARPUMP_SERIALIZER_POOL = "gearpump.serialization-framework"
+  val GEARPUMP_SERIALIZERS = "gearpump.serializers"
+  val GEARPUMP_TASK_DISPATCHER = "gearpump.task-dispatcher"
+  val GEARPUMP_CLUSTER_MASTERS = "gearpump.cluster.masters"
+  val GEARPUMP_MASTERCLIENT_TIMEOUT = "gearpump.masterclient.timeout"
+  val GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS =
+    "gearpump.worker.executor-share-same-jvm-as-worker"
+
+  val GEARPUMP_HOME = "gearpump.home"
+  val GEARPUMP_FULL_SCALA_VERSION = 
"gearpump.binary-version-with-scala-version"
+  val GEARPUMP_HOSTNAME = "gearpump.hostname"
+  val GEARPUMP_APPMASTER_ARGS = "gearpump.appmaster.vmargs"
+  val GEARPUMP_APPMASTER_EXTRA_CLASSPATH = "gearpump.appmaster.extraClasspath"
+  val GEARPUMP_EXECUTOR_ARGS = "gearpump.executor.vmargs"
+  val GEARPUMP_EXECUTOR_EXTRA_CLASSPATH = "gearpump.executor.extraClasspath"
+  val GEARPUMP_LOG_DAEMON_DIR = "gearpump.log.daemon.dir"
+  val GEARPUMP_LOG_APPLICATION_DIR = "gearpump.log.application.dir"
+  val HADOOP_CONF = "hadoopConf"
+
+  // Id used to identity Master JVM process in low level resource manager like 
YARN.
+  // In YARN, it means the container Id.
+  val GEARPUMP_MASTER_RESOURCE_MANAGER_CONTAINER_ID =
+    "gearpump.master-resource-manager-container-id"
+
+  // Id used to identity Worker JVM process in low level resource manager like 
YARN.
+  // In YARN, it means the container Id.
+  val GEARPUMP_WORKER_RESOURCE_MANAGER_CONTAINER_ID =
+    "gearpump.worker-resource-manager-container-id"
+
+  // true or false
+  val GEARPUMP_REMOTE_DEBUG_EXECUTOR_JVM = "gearpump.remote-debug-executor-jvm"
+  val GEARPUMP_REMOTE_DEBUG_PORT = "gearpump.remote-debug-port"
+
+  // Whether to turn on GC log, true or false
+  val GEARPUMP_VERBOSE_GC = "gearpump.verbose-gc"
+
+  // The time out for Future, like ask.
+  // !Important! This global timeout setting will also impact the UI
+  // responsive time if set to too big. Please make sure you have
+  // enough justification to change this global setting, otherwise
+  // please use your local timeout setting instead.
+  val FUTURE_TIMEOUT = akka.util.Timeout(15, TimeUnit.SECONDS)
+
+  val GEARPUMP_START_EXECUTOR_SYSTEM_TIMEOUT_MS = 
"gearpump.start-executor-system-timeout-ms"
+
+  val APPMASTER_DEFAULT_EXECUTOR_ID = -1
+
+  val NETTY_BUFFER_SIZE = "gearpump.netty.buffer-size"
+  val NETTY_MAX_RETRIES = "gearpump.netty.max-retries"
+  val NETTY_BASE_SLEEP_MS = "gearpump.netty.base-sleep-ms"
+  val NETTY_MAX_SLEEP_MS = "gearpump.netty.max-sleep-ms"
+  val NETTY_MESSAGE_BATCH_SIZE = "gearpump.netty.message-batch-size"
+  val NETTY_FLUSH_CHECK_INTERVAL = "gearpump.netty.flush-check-interval"
+  val NETTY_TCP_HOSTNAME = "akka.remote.netty.tcp.hostname"
+  val NETTY_DISPATCHER = "gearpump.netty.dispatcher"
+
+  val GEARPUMP_USERNAME = "gearpump.username"
+  val GEARPUMP_APPLICATION_ID = "gearpump.applicationId"
+  val GEARPUMP_MASTER_STARTTIME = "gearpump.master.starttime"
+  val GEARPUMP_EXECUTOR_ID = "gearpump.executorId"
+  // Application jar property
+  val GEARPUMP_APP_JAR = "gearpump.app.jar"
+  val GEARPUMP_APP_NAME_PREFIX = "gearpump.app.name.prefix"
+
+  // Where the jar is stored at. It can be a HDFS, or a local disk.
+  val GEARPUMP_APP_JAR_STORE_ROOT_PATH = "gearpump.jarstore.rootpath"
+
+  // Uses java property -Dgearpump.config.file=xxx.conf to set customized 
configuration
+  // Otherwise application.conf in classpath will be loaded
+  val GEARPUMP_CUSTOM_CONFIG_FILE = "gearpump.config.file"
+
+  // Metrics related
+  val GEARPUMP_METRIC_ENABLED = "gearpump.metrics.enabled"
+  val GEARPUMP_METRIC_SAMPLE_RATE = "gearpump.metrics.sample-rate"
+  val GEARPUMP_METRIC_REPORT_INTERVAL = "gearpump.metrics.report-interval-ms"
+  val GEARPUMP_METRIC_GRAPHITE_HOST = "gearpump.metrics.graphite.host"
+  val GEARPUMP_METRIC_GRAPHITE_PORT = "gearpump.metrics.graphite.port"
+  val GEARPUMP_METRIC_REPORTER = "gearpump.metrics.reporter"
+
+  // Retains at max @RETAIN_HISTORY_HOURS history data
+  val GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS = 
"gearpump.metrics.retainHistoryData.hours"
+
+  // Time interval between two history data points.
+  val GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS = 
"gearpump.metrics.retainHistoryData.intervalMs"
+
+  // Retains at max @RETAIN_LATEST_SECONDS recent data points
+  val GEARPUMP_RETAIN_RECENT_DATA_SECONDS = 
"gearpump.metrics.retainRecentData.seconds"
+
+  // time interval between two recent data points.
+  val GEARPUMP_RETAIN_RECENT_DATA_INTERVAL_MS = 
"gearpump.metrics.retainRecentData.intervalMs"
+
+  // AppMaster will max wait this time until it declare the resource cannot be 
allocated,
+  // and shutdown itself
+  val GEARPUMP_RESOURCE_ALLOCATION_TIMEOUT = 
"gearpump.resource-allocation-timeout-seconds"
+
+  // Service related
+  val GEARPUMP_SERVICE_HTTP = "gearpump.services.http"
+  val GEARPUMP_SERVICE_HOST = "gearpump.services.host"
+  val GEARPUMP_SERVICE_SUPERVISOR_PATH = 
"gearpump.services.supervisor-actor-path"
+  val GEARPUMP_SERVICE_RENDER_CONFIG_CONCISE = 
"gearpump.services.config-render-option-concise"
+
+  // The partitioners provided by Gearpump
+  val BUILTIN_PARTITIONERS = Array(
+    classOf[BroadcastPartitioner],
+    classOf[CoLocationPartitioner],
+    classOf[HashPartitioner],
+    classOf[ShuffleGroupingPartitioner],
+    classOf[ShufflePartitioner])
+
+  // Security related
+  val GEARPUMP_KEYTAB_FILE = "gearpump.keytab.file"
+  val GEARPUMP_KERBEROS_PRINCIPAL = "gearpump.kerberos.principal"
+
+  val GEARPUMP_METRICS_MAX_LIMIT = "gearpump.metrics.akka.max-limit-on-query"
+  val GEARPUMP_METRICS_AGGREGATORS = 
"gearpump.metrics.akka.metrics-aggregator-class"
+
+  val GEARPUMP_UI_SECURITY = "gearpump.ui-security"
+  val GEARPUMP_UI_SECURITY_AUTHENTICATION_ENABLED = 
"gearpump.ui-security.authentication-enabled"
+  val GEARPUMP_UI_AUTHENTICATOR_CLASS = "gearpump.ui-security.authenticator"
+  // OAuth Authentication Factory for UI server.
+  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ENABLED = 
"gearpump.ui-security.oauth2-authenticator-enabled"
+  val GEARPUMP_UI_OAUTH2_AUTHENTICATORS = 
"gearpump.ui-security.oauth2-authenticators"
+  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLASS = "class"
+  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CALLBACK = "callback"
+  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_ID = "clientid"
+  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_CLIENT_SECRET = "clientsecret"
+  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_DEFAULT_USER_ROLE = "default-userrole"
+  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_AUTHORIZATION_CODE = "code"
+  val GEARPUMP_UI_OAUTH2_AUTHENTICATOR_ACCESS_TOKEN = "accesstoken"
+
+  val PREFER_IPV4 = "java.net.preferIPv4Stack"
+
+  val APPLICATION_EXECUTOR_NUMBER = "gearpump.application.executor-num"
+
+  val AKKA_SCHEDULER_TICK_DURATION = "akka.scheduler.tick-duration"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala 
b/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala
new file mode 100644
index 0000000..d13f514
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/FileUtils.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.io.{File, IOException}
+import java.nio.charset.Charset
+
+import io.gearpump.google.common.io.Files
+
+object FileUtils {
+  private val UTF8 = Charset.forName("UTF-8")
+
+  def write(file: File, str: String): Unit = {
+    Files.write(str, file, UTF8)
+  }
+
+  def read(file: File): String = {
+    Files.asCharSource(file, UTF8).read()
+  }
+
+  def writeByteArrayToFile(file: File, bytes: Array[Byte]): Unit = {
+    Files.write(bytes, file)
+  }
+
+  def readFileToByteArray(file: File): Array[Byte] = {
+    Files.toByteArray(file)
+  }
+
+  /** recursively making all parent directories including itself */
+  def forceMkdir(directory: File): Unit = {
+    if (directory.exists() && directory.isFile) {
+      throw new IOException(s"Failed to create directory 
${directory.toString}, it already exist")
+    }
+    Files.createParentDirs(directory)
+    val result = directory.mkdir()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/Graph.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Graph.scala 
b/core/src/main/scala/org/apache/gearpump/util/Graph.scala
new file mode 100644
index 0000000..6bb58b8
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/Graph.scala
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.language.implicitConversions
+
+/**
+ * Generic mutable Graph libraries.
+ */
+class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends 
Serializable {
+
+  private val _vertices = mutable.Set.empty[N]
+  private val _edges = mutable.Set.empty[(N, E, N)]
+
+  // This is used to ensure the output of this Graph is always stable
+  // Like method vertices(), or edges()
+  private var _indexs = Map.empty[Any, Int]
+  private var _nextIndex = 0
+  private def nextId: Int = {
+    val result = _nextIndex
+    _nextIndex += 1
+    result
+  }
+
+  private def init(): Unit = {
+    Option(vertexList).getOrElse(List.empty[N]).foreach(addVertex(_))
+    Option(edgeList).getOrElse(List.empty[(N, E, N)]).foreach(addEdge(_))
+  }
+
+  init()
+
+  /**
+   * Add a vertex
+   * Current Graph is changed.
+   */
+  def addVertex(vertex: N): Unit = {
+    val result = _vertices.add(vertex)
+    if (result) {
+      _indexs += vertex -> nextId
+    }
+  }
+
+  /**
+   * Add a edge
+   * Current Graph is changed.
+   */
+  def addEdge(edge: (N, E, N)): Unit = {
+    val result = _edges.add(edge)
+    if (result) {
+      _indexs += edge -> nextId
+    }
+  }
+
+  /**
+   * return all vertices.
+   * The result is stable
+   */
+  def vertices: List[N] = {
+    // Sorts the vertex so that we can keep the order for mapVertex
+    _vertices.toList.sortBy(_indexs(_))
+  }
+
+  /**
+   * out degree
+   */
+  def outDegreeOf(node: N): Int = {
+    edges.count(_._1 == node)
+  }
+
+  /**
+   * in degree
+   */
+  def inDegreeOf(node: N): Int = {
+    edges.count(_._3 == node)
+  }
+
+  /**
+   * out going edges.
+   */
+  def outgoingEdgesOf(node: N): List[(N, E, N)] = {
+    edges.filter(_._1 == node)
+  }
+
+  /**
+   * incoming edges.
+   */
+  def incomingEdgesOf(node: N): List[(N, E, N)] = {
+    edges.filter(_._3 == node)
+  }
+
+  /**
+   * Remove vertex
+   * Current Graph is changed.
+   */
+  def removeVertex(node: N): Unit = {
+    _vertices.remove(node)
+    _indexs -= node
+    val toBeRemoved = incomingEdgesOf(node) ++ outgoingEdgesOf(node)
+    toBeRemoved.foreach(removeEdge(_))
+  }
+
+  /**
+   * Remove edge
+   * Current Graph is changed.
+   */
+  private def removeEdge(edge: (N, E, N)): Unit = {
+    _indexs -= edge
+    _edges.remove(edge)
+  }
+
+  /**
+   * add edge
+   * Current Graph is changed.
+   */
+  def addEdge(node1: N, edge: E, node2: N): Unit = {
+    addVertex(node1)
+    addVertex(node2)
+    addEdge((node1, edge, node2))
+  }
+
+  /**
+   * Map a graph to a new graph, with vertex converted to a new type
+   * Current Graph is not changed.
+   */
+  def mapVertex[NewNode](fun: N => NewNode): Graph[NewNode, E] = {
+    val vertexes = vertices.map(node => (node, fun(node)))
+
+    val vertexMap: Map[N, NewNode] = vertexes.toMap
+
+    val newEdges = edges.map { edge =>
+      (vertexMap(edge._1), edge._2, vertexMap(edge._3))
+    }
+    new Graph(vertexes.map(_._2), newEdges)
+  }
+
+  /**
+   * Map a graph to a new graph, with edge converted to new type
+   * Current graph is not changed.
+   */
+  def mapEdge[NewEdge](fun: (N, E, N) => NewEdge): Graph[N, NewEdge] = {
+    val newEdges = edges.map { edge =>
+      (edge._1, fun(edge._1, edge._2, edge._3), edge._3)
+    }
+    new Graph(vertices, newEdges)
+  }
+
+  /**
+   * edges connected to node
+   */
+  def edgesOf(node: N): List[(N, E, N)] = {
+    (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, 
N)].toList.sortBy(_indexs(_))
+  }
+
+  /**
+   * all edges
+   */
+  def edges: List[(N, E, N)] = {
+    _edges.toList.sortBy(_indexs(_))
+  }
+
+  /**
+   * Add another graph
+   * Current graph is changed.
+   */
+  def addGraph(other: Graph[N, E]): Graph[N, E] = {
+    (vertices ++ other.vertices).foreach(addVertex(_))
+    (edges ++ other.edges).foreach(edge => addEdge(edge._1, edge._2, edge._3))
+    this
+  }
+
+  /**
+   * clone the graph
+   */
+  def copy: Graph[N, E] = {
+    new Graph(vertices, edges)
+  }
+
+  /**
+   * check empty
+   */
+  def isEmpty: Boolean = {
+    val vertexCount = vertices.size
+    val edgeCount = edges.length
+    if (vertexCount + edgeCount == 0) {
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * sub-graph which contains current node and all neighbour
+   * nodes and edges.
+   */
+  def subGraph(node: N): Graph[N, E] = {
+    val newGraph = Graph.empty[N, E]
+    for (edge <- edgesOf(node)) {
+      newGraph.addEdge(edge._1, edge._2, edge._3)
+    }
+    newGraph
+  }
+
+  /**
+   * replace vertex, the current Graph is mutated.
+   */
+  def replaceVertex(node: N, newNode: N): Graph[N, E] = {
+    for (edge <- incomingEdgesOf(node)) {
+      addEdge(edge._1, edge._2, newNode)
+    }
+
+    for (edge <- outgoingEdgesOf(node)) {
+      addEdge(newNode, edge._2, edge._3)
+    }
+    removeVertex(node)
+    this
+  }
+
+  private def removeZeroInDegree: List[N] = {
+    val toBeRemoved = vertices.filter(inDegreeOf(_) == 0).sortBy(_indexs(_))
+    toBeRemoved.foreach(removeVertex(_))
+    toBeRemoved
+  }
+
+  /**
+   * Return an iterator of vertex in topological order
+   * The node returned by Iterator is stable sorted.
+   */
+  def topologicalOrderIterator: Iterator[N] = {
+    val newGraph = copy
+    var output = List.empty[N]
+
+    while (!newGraph.isEmpty) {
+      output ++= newGraph.removeZeroInDegree
+    }
+    output.iterator
+  }
+
+  /**
+   * Return all circles in graph.
+   *
+   * The reference of this algorithm is:
+   * 
https://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
+   */
+  private def findCircles: mutable.MutableList[mutable.MutableList[N]] = {
+    val inStack = mutable.Map.empty[N, Boolean]
+    val stack = mutable.Stack[N]()
+    val indexMap = mutable.Map.empty[N, Int]
+    val lowLink = mutable.Map.empty[N, Int]
+    var index = 0
+
+    val circles = mutable.MutableList.empty[mutable.MutableList[N]]
+
+    def tarjan(node: N): Unit = {
+      indexMap(node) = index
+      lowLink(node) = index
+      index += 1
+      inStack(node) = true
+      stack.push(node)
+
+      outgoingEdgesOf(node).foreach {
+        edge => {
+          if (!indexMap.contains(edge._3)) {
+            tarjan(edge._3)
+            if (lowLink.get(edge._3).get < lowLink.get(node).get) {
+              lowLink(node) = lowLink(edge._3)
+            }
+          } else {
+            if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < 
lowLink.get(node).get)) {
+              lowLink(node) = indexMap(edge._3)
+            }
+          }
+        }
+      }
+
+      if (indexMap.get(node).get == lowLink.get(node).get) {
+        val circle = mutable.MutableList.empty[N]
+        var n = node
+        do {
+          n = stack.pop()
+          inStack(n) = false
+          circle += n
+        } while (n != node)
+        circles += circle
+      }
+    }
+
+    vertices.foreach {
+      node => {
+        if (!indexMap.contains(node)) tarjan(node)
+      }
+    }
+
+    circles
+  }
+
+  /**
+   * Return an iterator of vertex in topological order of graph with circles
+   * The node returned by Iterator is stable sorted.
+   *
+   * The reference of this algorithm is:
+   * http://www.drdobbs.com/database/topological-sorting/184410262
+   */
+  def topologicalOrderWithCirclesIterator: Iterator[N] = {
+    val circles = findCircles
+    val newGraph = Graph.empty[mutable.MutableList[N], E]
+    circles.foreach {
+      circle => {
+        newGraph.addVertex(circle)
+      }
+    }
+
+    for (circle1 <- circles; circle2 <- circles; if circle1 != circle2) yield {
+      for (node1 <- circle1; node2 <- circle2) yield {
+        var edges = outgoingEdgesOf(node1)
+        for (edge <- edges; if edge._3 == node2) yield {
+          newGraph.addEdge(circle1, edge._2, circle2)
+        }
+
+        edges = outgoingEdgesOf(node2)
+        for (edge <- edges; if edge._3 == node1) yield {
+          newGraph.addEdge(circle2, edge._2, circle1)
+        }
+      }
+    }
+
+    val topo = newGraph.topologicalOrderIterator
+    topo.flatMap(_.sortBy(_indexs(_)).iterator)
+  }
+
+  /**
+   * check whether there is a loop
+   */
+  def hasCycle(): Boolean = {
+    @tailrec
+    def detectCycle(graph: Graph[N, E]): Boolean = {
+      if (graph.edges.isEmpty) {
+        false
+      } else if (graph.vertices.nonEmpty && 
!graph.vertices.exists(graph.inDegreeOf(_) == 0)) {
+        true
+      } else {
+        graph.removeZeroInDegree
+        detectCycle(graph)
+      }
+    }
+
+    detectCycle(copy)
+  }
+
+  /**
+   * Check whether there are two edges connecting two nodes.
+   */
+  def hasDuplicatedEdge(): Boolean = {
+    edges.groupBy(edge => (edge._1, edge._3)).values.exists(_.size > 1)
+  }
+
+  /**
+   * Generate a level map for each vertex withholding:
+   * {{{
+   * if vertex A -> B, then level(A) -> level(B)
+   * }}}
+   */
+  def vertexHierarchyLevelMap(): Map[N, Int] = {
+    val newGraph = copy
+    var output = Map.empty[N, Int]
+    var level = 0
+    while (!newGraph.isEmpty) {
+      output ++= newGraph.removeZeroInDegree.map((_, level)).toMap
+      level += 1
+    }
+    output
+  }
+
+  override def toString: String = {
+    Map("vertices" -> vertices.mkString(","),
+      "edges" -> edges.mkString(",")).toString()
+  }
+}
+
+object Graph {
+
+  /**
+   * Example:
+   *
+   * {{{
+   * Graph(1 ~ 2 ~> 4 ~ 5 ~> 7, 8~9~>55, 11)
+   * Will create a graph with:
+   * nodes:
+   * 1, 4, 7, 8, 55, 11
+   * edge:
+   * 2: (1->4)
+   * 5: (4->7)
+   * 9: (8->55)
+   * }}}
+   */
+  def apply[N, E](elems: Path[_ <: N, _ <: E]*): Graph[N, E] = {
+    val graph = empty[N, E]
+    elems.foreach { path =>
+      path.updategraph(graph)
+    }
+    graph
+  }
+
+  def apply[N, E](vertices: List[N], edges: List[(N, E, N)]): Graph[N, E] = {
+    new Graph(vertices, edges)
+  }
+
+  def unapply[N, E](graph: Graph[N, E]): Option[(List[N], List[(N, E, N)])] = {
+    Some((graph.vertices, graph.edges))
+  }
+
+  def empty[N, E]: Graph[N, E] = {
+    new Graph(List.empty[N], List.empty[(N, E, N)])
+  }
+
+  class Path[N, + E](path: List[Either[N, E]]) {
+
+    def ~[Edge >: E](edge: Edge): Path[N, Edge] = {
+      new Path(path :+ Right(edge))
+    }
+
+    def ~>[Node >: N](node: Node): Path[Node, E] = {
+      new Path(path :+ Left(node))
+    }
+
+    def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, Edge] = {
+      this ~ edge ~> node
+    }
+
+    private[Graph] def updategraph[Node >: N, Edge >: E](graph: Graph[Node, 
Edge]): Unit = {
+      val nodeEdgePair: Tuple2[Option[N], Option[E]] = (None, None)
+      path.foldLeft(nodeEdgePair) { (pair, either) =>
+        val (lastNode, lastEdge) = pair
+        either match {
+          case Left(node) =>
+            graph.addVertex(node)
+            if (lastNode.isDefined) {
+              graph.addEdge(lastNode.get, 
lastEdge.getOrElse(null.asInstanceOf[Edge]), node)
+            }
+            (Some(node), None)
+          case Right(edge) =>
+            (lastNode, Some(edge))
+        }
+      }
+    }
+  }
+
+  object Path {
+    implicit def anyToPath[N, E](any: N): Path[N, E] = Node(any)
+  }
+
+  implicit class Node[N, E](self: N) extends Path[N, E](List(Left(self))) {
+
+    override def ~[Edge](edge: Edge): Path[N, Edge] = {
+      new Path(List(Left(self), Right(edge)))
+    }
+
+    override def ~>[Node >: N](node: Node): Path[Node, E] = {
+      new NodeList(List(self, node))
+    }
+
+    override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, 
Edge] = {
+      this ~ edge ~> node
+    }
+  }
+
+  class NodeList[N, E](nodes: List[N]) extends Path[N, E](nodes.map(Left(_))) {
+    override def ~[Edge](edge: Edge): Path[N, Edge] = {
+      new Path(nodes.map(Left(_)) :+ Right(edge))
+    }
+
+    override def ~>[Node >: N](node: Node): Path[Node, E] = {
+      new NodeList(nodes :+ node)
+    }
+
+    override def to[Node >: N, Edge >: E](node: Node, edge: Edge): Path[Node, 
Edge] = {
+      this ~ edge ~> node
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala 
b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
new file mode 100644
index 0000000..549c34f
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.util
+import scala.collection.mutable.ListBuffer
+
+import akka.actor.Actor
+import com.typesafe.config.Config
+import org.slf4j.Logger
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, 
ReadOption}
+import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, 
HistoryMetricsItem}
+import org.apache.gearpump.metrics.Metrics._
+import org.apache.gearpump.metrics.MetricsAggregator
+import org.apache.gearpump.util.Constants._
+import org.apache.gearpump.util.HistoryMetricsService.{DummyMetricsAggregator, 
HistoryMetricsConfig, HistoryMetricsStore, SkipAllAggregator}
+
+/**
+ *
+ * Metrics service to serve history metrics data
+ *
+ * For simplicity, HistoryMetricsService will maintain 72 hours coarse-grained 
data
+ * for last 72 hours, and fine-grained data for past 5 min.
+ *
+ * For the coarse-grained data of past 72 hours, one or two sample point will 
be stored
+ * for each hour.
+ *
+ * For fine-grained data in last 5 min, there will be 1 sample point per 15 
seconds.
+ */
+class HistoryMetricsService(name: String, config: HistoryMetricsConfig) 
extends Actor {
+  private val LOG: Logger = LogUtil.getLogger(getClass, name = name)
+  private var metricsStore = Map.empty[String, HistoryMetricsStore]
+  private val systemConfig = context.system.settings.config
+
+  def receive: Receive = metricHandler orElse commandHandler
+  def metricHandler: Receive = {
+    case ReportMetrics =>
+      sender ! DemandMoreMetrics(self)
+    case metrics: MetricType =>
+      val name = metrics.name
+      if (metricsStore.contains(name)) {
+        metricsStore(name).add(metrics)
+      } else {
+        val store = HistoryMetricsStore(name, metrics, config)
+        metricsStore += name -> store
+        store.add(metrics)
+      }
+  }
+
+  private def toRegularExpression(input: String): String = {
+    "^" + input.flatMap {
+      case '*' => ".*"
+      case '?' => "."
+      case char if "()[]$^.{}|\\".contains(char) => "\\" + char
+      case other => s"$other"
+    } + ".*$"
+  }
+
+  private def fetchMetricsHistory(pathPattern: String, readOption: 
ReadOption.ReadOption)
+  : List[HistoryMetricsItem] = {
+
+    val result = new ListBuffer[HistoryMetricsItem]
+
+    val regex = toRegularExpression(pathPattern).r.pattern
+
+    val iter = metricsStore.iterator
+    while (iter.hasNext) {
+      val (name, store) = iter.next()
+
+      val matcher = regex.matcher(name)
+      if (matcher.matches()) {
+        readOption match {
+          case ReadOption.ReadLatest =>
+            result.append(store.readLatest: _*)
+          case ReadOption.ReadRecent =>
+            result.append(store.readRecent: _*)
+          case ReadOption.ReadHistory =>
+            result.append(store.readHistory: _*)
+          case _ =>
+          // Skip all other options.
+        }
+      }
+    }
+    result.toList
+  }
+
+  val dummyAggregator = new DummyMetricsAggregator
+  private var aggregators: Map[String, MetricsAggregator] = Map.empty[String, 
MetricsAggregator]
+
+  import scala.collection.JavaConverters._
+  private val validAggregators: Set[String] = {
+    val rootConfig = 
systemConfig.getConfig(Constants.GEARPUMP_METRICS_AGGREGATORS).root.unwrapped
+    rootConfig.keySet().asScala.toSet
+  }
+
+  def commandHandler: Receive = {
+    // Path accept syntax ? *, ? will match one char, * will match at least 
one char
+    case QueryHistoryMetrics(inputPath, readOption, aggregatorClazz, options) 
=>
+
+      val aggregator = {
+        if (aggregatorClazz == null || aggregatorClazz.isEmpty) {
+          dummyAggregator
+        } else if (aggregators.contains(aggregatorClazz)) {
+          aggregators(aggregatorClazz)
+        } else if (validAggregators.contains(aggregatorClazz)) {
+          val clazz = Class.forName(aggregatorClazz)
+          val constructor = clazz.getConstructor(classOf[Config])
+          val aggregator = 
constructor.newInstance(systemConfig).asInstanceOf[MetricsAggregator]
+          aggregators += aggregatorClazz -> aggregator
+          aggregator
+        } else {
+          LOG.error(s"Aggregator $aggregatorClazz is not in the white list 
${validAggregators}, " +
+            s"we will drop all messages. Please see config at 
${GEARPUMP_METRICS_AGGREGATORS}")
+          val skipAll = new SkipAllAggregator
+          aggregators += aggregatorClazz -> new SkipAllAggregator
+          skipAll
+        }
+      }
+
+      val metrics = fetchMetricsHistory(inputPath, readOption).iterator
+      sender ! HistoryMetrics(inputPath, aggregator.aggregate(options, 
metrics))
+  }
+}
+
+object HistoryMetricsService {
+
+  trait MetricsStore {
+    def add(inputMetrics: MetricType): Unit
+
+    def read: List[HistoryMetricsItem]
+
+    /**
+     * read latest inserted records
+     * @return
+     */
+    def readLatest: List[HistoryMetricsItem]
+  }
+
+  trait HistoryMetricsStore {
+    def add(inputMetrics: MetricType): Unit
+
+    /**
+     * read latest inserted records
+     * @return
+     */
+    def readLatest: List[HistoryMetricsItem]
+
+    def readRecent: List[HistoryMetricsItem]
+
+    def readHistory: List[HistoryMetricsItem]
+  }
+
+  class DummyHistoryMetricsStore extends HistoryMetricsStore {
+
+    val empty = List.empty[HistoryMetricsItem]
+
+    override def add(inputMetrics: MetricType): Unit = Unit
+
+    override def readRecent: List[HistoryMetricsItem] = empty
+
+    /**
+     * read latest inserted records
+     * @return
+     */
+    override def readLatest: List[HistoryMetricsItem] = empty
+
+    override def readHistory: List[HistoryMetricsItem] = empty
+  }
+
+  object HistoryMetricsStore {
+    def apply(name: String, metric: MetricType, config: HistoryMetricsConfig)
+      : HistoryMetricsStore = {
+      metric match {
+        case histogram: Histogram => new HistogramMetricsStore(config)
+        case meter: Meter => new MeterMetricsStore(config)
+        case counter: Counter => new CounterMetricsStore(config)
+        case gauge: Gauge => new GaugeMetricsStore(config)
+        case _ => new DummyHistoryMetricsStore // other metrics are not 
supported
+      }
+    }
+  }
+
+  /**
+   * Metrics store to store history data points
+   * For each time point, we will store single data point.
+   *
+   * @param retainCount how many data points to retain, old data will be 
removed
+   * @param retainIntervalMs time interval between two data points.
+   */
+  class SingleValueMetricsStore(retainCount: Int, retainIntervalMs: Long) 
extends MetricsStore {
+
+    private val queue = new util.ArrayDeque[HistoryMetricsItem]()
+    private var latest = List.empty[HistoryMetricsItem]
+
+    // End of the time window we are tracking
+    private var endTime = 0L
+
+    override def add(inputMetrics: MetricType): Unit = {
+      add(inputMetrics, System.currentTimeMillis())
+    }
+
+    def add(inputMetrics: MetricType, now: TimeStamp): Unit = {
+
+      val metrics = HistoryMetricsItem(now, inputMetrics)
+      latest = List(metrics)
+
+      if (now >= endTime) {
+        queue.addFirst(metrics)
+        endTime = (now / retainIntervalMs + 1) * retainIntervalMs
+
+        // Removes old data
+        if (queue.size() > retainCount) {
+          queue.removeLast()
+        }
+      }
+    }
+
+    def read: List[HistoryMetricsItem] = {
+      val result = new ListBuffer[HistoryMetricsItem]
+      import scala.collection.JavaConverters._
+      queue.iterator().asScala.foreach(result.prepend(_))
+      result.toList
+    }
+
+    override def readLatest: List[HistoryMetricsItem] = {
+      latest
+    }
+  }
+
+  /**
+   * Config for how long to keep history metrics data.
+   *
+   * @param retainHistoryDataHours Retain at max @RETAIN_HISTORY_HOURS history 
data(unit hour)
+   * @param retainHistoryDataIntervalMs time interval between two history data 
points.(unit: ms)
+   * @param retainRecentDataSeconds Retain at max @RETAIN_LATEST_SECONDS
+   *                                recent data points(unit: seconds)
+   * @param retainRecentDataIntervalMs Retain at max @RETAIN_LATEST_SECONDS 
recent
+   *                                   data points(unit: ms)
+   */
+  case class HistoryMetricsConfig(
+      retainHistoryDataHours: Int,
+      retainHistoryDataIntervalMs: Int,
+      retainRecentDataSeconds: Int,
+      retainRecentDataIntervalMs: Int)
+
+  object HistoryMetricsConfig {
+    def apply(config: Config): HistoryMetricsConfig = {
+      val historyHour = 
config.getInt(GEARPUMP_METRIC_RETAIN_HISTORY_DATA_HOURS)
+      val historyInterval = 
config.getInt(GEARPUMP_RETAIN_HISTORY_DATA_INTERVAL_MS)
+
+      val recentSeconds = config.getInt(GEARPUMP_RETAIN_RECENT_DATA_SECONDS)
+      val recentInterval = 
config.getInt(GEARPUMP_RETAIN_RECENT_DATA_INTERVAL_MS)
+      HistoryMetricsConfig(historyHour, historyInterval, recentSeconds, 
recentInterval)
+    }
+  }
+
+  class HistogramMetricsStore(config: HistoryMetricsConfig) extends 
HistoryMetricsStore {
+
+    private val history = new SingleValueMetricsStore(
+      config.retainHistoryDataHours * 3600 * 1000 / 
config.retainHistoryDataIntervalMs,
+      config.retainHistoryDataIntervalMs)
+
+    private val recent = new SingleValueMetricsStore(
+      config.retainRecentDataSeconds * 1000 / 
config.retainRecentDataIntervalMs,
+      config.retainRecentDataIntervalMs)
+
+    override def add(inputMetrics: MetricType): Unit = {
+      recent.add(inputMetrics)
+      history.add(inputMetrics)
+    }
+
+    override def readRecent: List[HistoryMetricsItem] = {
+      recent.read
+    }
+
+    override def readHistory: List[HistoryMetricsItem] = {
+      history.read
+    }
+
+    override def readLatest: List[HistoryMetricsItem] = {
+      recent.readLatest
+    }
+  }
+
+  class MeterMetricsStore(config: HistoryMetricsConfig) extends 
HistoryMetricsStore {
+
+    private val history = new SingleValueMetricsStore(
+      config.retainHistoryDataHours * 3600 * 1000 / 
config.retainHistoryDataIntervalMs,
+      config.retainHistoryDataIntervalMs)
+
+    private val recent = new SingleValueMetricsStore(
+      config.retainRecentDataSeconds * 1000 / 
config.retainRecentDataIntervalMs,
+      config.retainRecentDataIntervalMs)
+
+    override def add(inputMetrics: MetricType): Unit = {
+      recent.add(inputMetrics)
+      history.add(inputMetrics)
+    }
+
+    override def readRecent: List[HistoryMetricsItem] = {
+      recent.read
+    }
+
+    override def readHistory: List[HistoryMetricsItem] = {
+      history.read
+    }
+
+    override def readLatest: List[HistoryMetricsItem] = {
+      recent.readLatest
+    }
+  }
+
+  class CounterMetricsStore(config: HistoryMetricsConfig) extends 
HistoryMetricsStore {
+
+    private val history = new SingleValueMetricsStore(
+      config.retainHistoryDataHours * 3600 * 1000 / 
config.retainHistoryDataIntervalMs,
+      config.retainHistoryDataIntervalMs)
+
+    private val recent = new SingleValueMetricsStore(
+      config.retainRecentDataSeconds * 1000 / 
config.retainRecentDataIntervalMs,
+      config.retainRecentDataIntervalMs)
+
+    override def add(inputMetrics: MetricType): Unit = {
+      history.add(inputMetrics)
+      recent.add(inputMetrics)
+    }
+
+    override def readRecent: List[HistoryMetricsItem] = {
+      recent.read
+    }
+
+    override def readHistory: List[HistoryMetricsItem] = {
+      history.read
+    }
+
+    override def readLatest: List[HistoryMetricsItem] = {
+      recent.readLatest
+    }
+  }
+
+  class GaugeMetricsStore(config: HistoryMetricsConfig) extends 
HistoryMetricsStore {
+
+    private val compartor = (left: HistoryMetricsItem, right: 
HistoryMetricsItem) =>
+      left.value.asInstanceOf[Gauge].value > 
right.value.asInstanceOf[Gauge].value
+
+    private val history = new SingleValueMetricsStore(
+      config.retainHistoryDataHours * 3600 * 1000 / 
config.retainHistoryDataIntervalMs,
+      config.retainHistoryDataIntervalMs)
+
+    private val recent = new SingleValueMetricsStore(
+      config.retainRecentDataSeconds * 1000 / 
config.retainRecentDataIntervalMs,
+      config.retainRecentDataIntervalMs)
+
+    override def add(inputMetrics: MetricType): Unit = {
+      recent.add(inputMetrics)
+      history.add(inputMetrics)
+    }
+
+    override def readRecent: List[HistoryMetricsItem] = {
+      recent.read
+    }
+
+    override def readHistory: List[HistoryMetricsItem] = {
+      history.read
+    }
+
+    override def readLatest: List[HistoryMetricsItem] = {
+      recent.readLatest
+    }
+  }
+
+  class DummyMetricsAggregator extends MetricsAggregator {
+    def aggregate(options: Map[String, String], inputs: 
Iterator[HistoryMetricsItem])
+      : List[HistoryMetricsItem] = {
+      inputs.toList
+    }
+  }
+
+  class SkipAllAggregator extends MetricsAggregator {
+    private val empty = List.empty[HistoryMetricsItem]
+    def aggregate(options: Map[String, String], inputs: 
Iterator[HistoryMetricsItem])
+    : List[HistoryMetricsItem] = {
+      empty
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala 
b/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala
new file mode 100644
index 0000000..225f796
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/LogUtil.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.io.File
+import java.net.InetAddress
+import java.util.Properties
+import scala.util.Try
+
+import com.typesafe.config.Config
+import org.apache.log4j.PropertyConfigurator
+import org.slf4j.{Logger, LoggerFactory}
+
+object LogUtil {
+  object ProcessType extends Enumeration {
+    type ProcessType = Value
+    val MASTER, WORKER, LOCAL, APPLICATION, UI = Value
+  }
+
+  def getLogger[T](
+      clazz: Class[T], context: String = null, master: Any = null, worker: Any 
= null,
+      executor: Any = null, task: Any = null, app: Any = null, name: String = 
null): Logger = {
+    var env = ""
+
+    if (null != context) {
+      env += context
+    }
+    if (null != master) {
+      env += "master" + master
+    }
+    if (null != worker) {
+      env += "worker" + worker
+    }
+
+    if (null != app) {
+      env += "app" + app
+    }
+
+    if (null != executor) {
+      env += "exec" + executor
+    }
+    if (null != task) {
+      env += task
+    }
+    if (null != name) {
+      env += name
+    }
+
+    if (!env.isEmpty) {
+      LoggerFactory.getLogger(clazz.getSimpleName + "@" + env)
+    } else {
+      LoggerFactory.getLogger(clazz.getSimpleName)
+    }
+  }
+
+  /** Custom the log file locations by reading config from system properties */
+  def loadConfiguration(config: Config, processType: ProcessType.ProcessType): 
Unit = {
+    // Set log file name
+    val propName = s"gearpump.${processType.toString.toLowerCase}.log.file"
+    val props = loadConfiguration
+
+    props.setProperty("gearpump.log.file", "${" + propName + "}")
+
+    props.setProperty("JVM_NAME", jvmName)
+
+    processType match {
+      case ProcessType.APPLICATION =>
+        props.setProperty("log4j.rootAppender", 
"${gearpump.application.logger}")
+        props.setProperty("gearpump.application.log.rootdir",
+          applicationLogDir(config).getAbsolutePath)
+      case _ =>
+        props.setProperty("log4j.rootAppender", "${gearpump.root.logger}")
+        props.setProperty("gearpump.log.dir", 
daemonLogDir(config).getAbsolutePath)
+    }
+
+    PropertyConfigurator.configure(props)
+  }
+
+  def daemonLogDir(config: Config): File = {
+    val dir = config.getString(Constants.GEARPUMP_LOG_DAEMON_DIR)
+    new File(dir)
+  }
+
+  def verboseLogToConsole(): Unit = {
+    val props = loadConfiguration
+    props.setProperty("log4j.rootLogger", "DEBUG,console")
+    PropertyConfigurator.configure(props)
+  }
+
+  def loadConfiguration: Properties = {
+    val props = new Properties()
+    val log4jConfStream = 
getClass().getClassLoader.getResourceAsStream("log4j.properties")
+    if (log4jConfStream != null) {
+      props.load(log4jConfStream)
+    }
+    log4jConfStream.close()
+    props
+  }
+
+  private def jvmName: String = {
+    val hostname = Try(InetAddress.getLocalHost.getHostName).getOrElse("local")
+    java.lang.management.ManagementFactory.getRuntimeMXBean().getName()
+  }
+
+  def applicationLogDir(config: Config): File = {
+    val appLogDir = config.getString(Constants.GEARPUMP_LOG_APPLICATION_DIR)
+    new File(appLogDir)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/ProcessLogRedirector.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/util/ProcessLogRedirector.scala 
b/core/src/main/scala/org/apache/gearpump/util/ProcessLogRedirector.scala
new file mode 100644
index 0000000..e6d79e4
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/ProcessLogRedirector.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.io.{Closeable, Flushable}
+import scala.sys.process.ProcessLogger
+
+import org.slf4j.LoggerFactory
+
+/** Redirect the console output to parent process */
+class ProcessLogRedirector extends ProcessLogger with Closeable with Flushable 
with ConsoleOutput {
+  private val LOG = LoggerFactory.getLogger("redirect")
+
+  // We only capture the first 1K chars
+  private final val LENGTH = 1000
+  private var _error: String = ""
+  private var _output: String = ""
+
+  def error: String = _error
+  def output: String = _output
+
+  def out(s: => String): Unit = {
+    if (_output.length <= LENGTH) {
+      _output += "\n" + s
+    }
+    LOG.info(s)
+  }
+  def err(s: => String): Unit = {
+    if (_error.length <= LENGTH) {
+      _error += "\n" + s
+    }
+    LOG.error(s)
+  }
+  def buffer[T](f: => T): T = f
+  def close(): Unit = Unit
+  def flush(): Unit = Unit
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/ReferenceEqual.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/ReferenceEqual.scala 
b/core/src/main/scala/org/apache/gearpump/util/ReferenceEqual.scala
new file mode 100644
index 0000000..69e0d4c
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/ReferenceEqual.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+/**
+ * Check equal using reference-equal.
+ */
+trait ReferenceEqual extends AnyRef {
+
+  override def equals(other: Any): Boolean = {
+    this.eq(other.asInstanceOf[AnyRef])
+  }
+
+  override def hashCode(): Int = {
+    super.hashCode()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala 
b/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala
new file mode 100644
index 0000000..97d6dd0
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/RestartPolicy.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import scala.concurrent.duration.Duration
+
+import akka.actor.ChildRestartStats
+
+/**
+ * When one executor or task fails, Gearpump will try to start. However, if it 
fails after
+ * multiple retries, then we abort.
+ *
+ * @param maxNrOfRetries The number of times is allowed to be restarted, 
negative value means no
+ *                       limit, if the limit is exceeded the policy will not 
allow to restart
+ * @param withinTimeRange Duration of the time window for maxNrOfRetries.
+ *                        Duration.Inf means no window
+ */
+class RestartPolicy(maxNrOfRetries: Int, withinTimeRange: Duration) {
+  private val status = new ChildRestartStats(null, 0, 0L)
+  private val retriesWindow = (Some(maxNrOfRetries), 
Some(withinTimeRange.toMillis.toInt))
+
+  def allowRestart: Boolean = {
+    status.requestRestartPermission(retriesWindow)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/RichProcess.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/RichProcess.scala 
b/core/src/main/scala/org/apache/gearpump/util/RichProcess.scala
new file mode 100644
index 0000000..1729d32
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/RichProcess.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import scala.sys.process.Process
+
+trait ConsoleOutput {
+  def output: String
+  def error: String
+}
+
+/** Extends Process by providing a additional logger: ConsoleOutput interface. 
*/
+class RichProcess(process: Process, _logger: ConsoleOutput) extends Process {
+  def exitValue(): scala.Int = process.exitValue()
+  def destroy(): scala.Unit = process.destroy()
+  def logger: ConsoleOutput = _logger
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/TimeOutScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/util/TimeOutScheduler.scala 
b/core/src/main/scala/org/apache/gearpump/util/TimeOutScheduler.scala
new file mode 100644
index 0000000..0e2e27c
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/TimeOutScheduler.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.util.concurrent.TimeUnit
+import scala.concurrent.duration._
+
+import akka.actor.{Actor, ActorRef}
+import akka.pattern.ask
+
+/** A helper util to send a message to remote actor and notify callback when 
timeout */
+trait TimeOutScheduler {
+  this: Actor =>
+  import context.dispatcher
+
+  def sendMsgWithTimeOutCallBack(
+      target: ActorRef, msg: AnyRef, milliSeconds: Long, timeOutHandler: => 
Unit): Unit = {
+    val result = target.ask(msg)(FiniteDuration(milliSeconds, 
TimeUnit.MILLISECONDS))
+    result onSuccess {
+      case msg =>
+        self ! msg
+    }
+    result onFailure {
+      case _ => timeOutHandler
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/main/scala/org/apache/gearpump/util/Util.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/Util.scala 
b/core/src/main/scala/org/apache/gearpump/util/Util.scala
new file mode 100644
index 0000000..19bd5a8
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/util/Util.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.util
+
+import java.io.{BufferedReader, File, FileInputStream, InputStreamReader}
+import java.net.{ServerSocket, URI}
+import scala.concurrent.forkjoin.ThreadLocalRandom
+import scala.sys.process.Process
+import scala.util.{Failure, Success, Try}
+
+import com.typesafe.config.{Config, ConfigFactory}
+
+import org.apache.gearpump.cluster.AppJar
+import org.apache.gearpump.jarstore.JarStoreService
+import org.apache.gearpump.transport.HostPort
+
+object Util {
+  val LOG = LogUtil.getLogger(getClass)
+  private val defaultUri = new URI("file:///")
+  private val appNamePattern = "^[a-zA-Z_][a-zA-Z0-9_]+$".r.pattern
+
+  def validApplicationName(appName: String): Boolean = {
+    appNamePattern.matcher(appName).matches()
+  }
+
+  def getCurrentClassPath: Array[String] = {
+    val classpath = System.getProperty("java.class.path")
+    val classpathList = classpath.split(File.pathSeparator)
+    classpathList
+  }
+
+  def version: String = {
+    val home = System.getProperty(Constants.GEARPUMP_HOME)
+    val version = Try {
+      val versionFile = new FileInputStream(new File(home, "VERSION"))
+      val reader = new BufferedReader(new InputStreamReader(versionFile))
+      val version = reader.readLine().replace("version:=", "")
+      versionFile.close()
+      version
+    }
+    version match {
+      case Success(version) =>
+        version
+      case Failure(ex) =>
+        LOG.error("failed to read VERSION file, " + ex.getMessage)
+        "Unknown-Version"
+    }
+  }
+
+  def startProcess(options: Array[String], classPath: Array[String], 
mainClass: String,
+      arguments: Array[String]): RichProcess = {
+    val java = System.getProperty("java.home") + "/bin/java"
+    val command = List(java) ++ options ++
+      List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ 
arguments
+    LOG.info(s"Starting executor process java $mainClass 
${arguments.mkString(" ")} " +
+      s"\n ${options.mkString(" ")}")
+    val logger = new ProcessLogRedirector()
+    val process = Process(command).run(logger)
+    new RichProcess(process, logger)
+  }
+
+  /**
+   * hostList format: host1:port1,host2:port2,host3:port3...
+   */
+  def parseHostList(hostList: String): List[HostPort] = {
+    val masters = hostList.trim.split(",").map { address =>
+      val hostAndPort = address.split(":")
+      HostPort(hostAndPort(0), hostAndPort(1).toInt)
+    }
+    masters.toList
+  }
+
+  def resolvePath(path: String): String = {
+    val uri = new URI(path)
+    if (uri.getScheme == null && uri.getFragment == null) {
+      val absolutePath = new File(path).getCanonicalPath.replaceAll("\\\\", 
"/")
+      "file://" + absolutePath
+    } else {
+      path
+    }
+  }
+
+  def isLocalPath(path: String): Boolean = {
+    val uri = new URI(path)
+    val scheme = uri.getScheme
+    val authority = uri.getAuthority
+    if (scheme == null && authority == null) {
+      true
+    } else if (scheme == defaultUri.getScheme) {
+      true
+    } else {
+      false
+    }
+  }
+
+  def randInt(): Int = {
+    Math.abs(ThreadLocalRandom.current.nextInt())
+  }
+
+  def findFreePort(): Try[Int] = {
+    Try {
+      val socket = new ServerSocket(0)
+      socket.setReuseAddress(true)
+      val port = socket.getLocalPort()
+      socket.close
+      port
+    }
+  }
+
+  def uploadJar(jarFile: File, jarStoreService: JarStoreService): AppJar = {
+    val remotePath = jarStoreService.copyFromLocal(jarFile)
+    AppJar(jarFile.getName, remotePath)
+  }
+
+  /**
+   * This util can be used to filter out configuration from specific origin
+   *
+   * For example, if you want to filter out configuration from reference.conf
+   * Then you can use like this:
+   *
+   * filterOutOrigin(config, "reference.conf")
+   */
+  import scala.collection.JavaConverters._
+  def filterOutOrigin(config: Config, originFile: String): Config = {
+    config.entrySet().asScala.foldLeft(ConfigFactory.empty()) { (config, 
entry) =>
+      val key = entry.getKey
+      val value = entry.getValue
+      val origin = value.origin()
+      if (origin.resource() == originFile) {
+        config
+      } else {
+        config.withValue(key, value)
+      }
+    }
+  }
+
+  case class JvmSetting(vmargs: Array[String], classPath: Array[String])
+
+  case class AppJvmSettings(appMater: JvmSetting, executor: JvmSetting)
+
+  /** Get an effective AppJvmSettings from Config */
+  def resolveJvmSetting(conf: Config): AppJvmSettings = {
+
+    import org.apache.gearpump.util.Constants._
+
+    val appMasterVMArgs = 
Try(conf.getString(GEARPUMP_APPMASTER_ARGS).split("\\s+")
+      .filter(_.nonEmpty)).toOption
+    val executorVMArgs = 
Try(conf.getString(GEARPUMP_EXECUTOR_ARGS).split("\\s+")
+      .filter(_.nonEmpty)).toOption
+
+    val appMasterClassPath = Try(
+      conf.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
+        .split("[;:]").filter(_.nonEmpty)).toOption
+
+    val executorClassPath = Try(
+      conf.getString(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
+        .split(File.pathSeparator).filter(_.nonEmpty)).toOption
+
+    AppJvmSettings(
+      JvmSetting(appMasterVMArgs.getOrElse(Array.empty[String]),
+        appMasterClassPath.getOrElse(Array.empty[String])),
+      JvmSetting(executorVMArgs
+        .getOrElse(Array.empty[String]), 
executorClassPath.getOrElse(Array.empty[String])))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/core/src/test/resources/log4j.properties 
b/core/src/test/resources/log4j.properties
index 0faadd9..fb5f594 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -80,7 +80,7 @@ log4j.appender.RFA.MaxBackupIndex=30
 #
 # Hadoop Filesystem Rolling File Appender, similar to RFA but writing to 
Hadoop Filesystem instead of local disk.
 #
-log4j.appender.HDFSRFA=io.gearpump.util.HadoopFSLogAppender
+log4j.appender.HDFSRFA=org.apache.gearpump.util.HadoopFSLogAppender
 log4j.appender.HDFSRFA.File=${gearpump.log.dir}/${gearpump.log.file}
 
 # Logfile size and and 30 backups
@@ -103,7 +103,7 @@ log4j.appender.console.layout.ConversionPattern=[%p] 
[%d{MM/dd/yyyy HH:mm:ss.SSS
 #
 # Application Log Appender
 #
-log4j.appender.ALA=io.gearpump.util.HadoopFSLogAppender
+log4j.appender.ALA=org.apache.gearpump.util.HadoopFSLogAppender
 
log4j.appender.ALA.File=${gearpump.application.log.dir}/${gearpump.application.log.file}
 
 # Logfile size and and 30 backups

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/resources/test.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/test.conf 
b/core/src/test/resources/test.conf
index ac18c88..58e5ef1 100644
--- a/core/src/test/resources/test.conf
+++ b/core/src/test/resources/test.conf
@@ -14,7 +14,7 @@ gearpump {
 
   application.executor-num = 1
 
-  worker.executor-process-launcher = 
"io.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
+  worker.executor-process-launcher = 
"org.apache.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
 
   cluster {
     masters = []
@@ -22,9 +22,9 @@ gearpump {
 
   streaming.register-task-timeout-ms = 5000
 
-  transport.serializer = "io.gearpump.transport.MockTransportSerializer"
+  transport.serializer = 
"org.apache.gearpump.transport.MockTransportSerializer"
 
-  serialization-framework = 
"io.gearpump.serializer.FastKryoSerializationFramework"
+  serialization-framework = 
"org.apache.gearpump.serializer.FastKryoSerializationFramework"
 }
 
 ## Configurations only visible on Linux or Mac..

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/TestProbeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/TestProbeUtil.scala 
b/core/src/test/scala/io/gearpump/TestProbeUtil.scala
deleted file mode 100644
index e7181db..0000000
--- a/core/src/test/scala/io/gearpump/TestProbeUtil.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump
-
-import scala.language.implicitConversions
-
-import akka.actor.{Actor, Props, Terminated}
-import akka.testkit.TestProbe
-
-object TestProbeUtil {
-  implicit def toProps(probe: TestProbe): Props = {
-    Props(new Actor {
-      val probeRef = probe.ref
-      context.watch(probeRef)
-      def receive: Receive = {
-        case Terminated(probeRef) => context.stop(self)
-        case x => probeRef.forward(x)
-      }
-    })
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala 
b/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala
deleted file mode 100644
index b1ca357..0000000
--- a/core/src/test/scala/io/gearpump/cluster/MasterHarness.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster
-
-import java.io.File
-import java.net.{InetSocketAddress, Socket, SocketTimeoutException, 
URLClassLoader, UnknownHostException}
-import java.util.Properties
-import java.util.concurrent.{Executors, TimeUnit}
-import scala.collection.JavaConverters._
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext}
-
-import akka.actor.{Actor, ActorSystem, Address, Props}
-import akka.testkit.TestProbe
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, 
ConfigValueFactory}
-
-import io.gearpump.cluster.MasterHarness.MockMaster
-import io.gearpump.util.Constants._
-import io.gearpump.util.{ActorUtil, FileUtils, LogUtil}
-
-trait MasterHarness {
-  private val LOG = LogUtil.getLogger(getClass)
-
-  implicit val pool = MasterHarness.cachedPool
-
-  private var system: ActorSystem = null
-  private var systemAddress: Address = null
-  private var host: String = null
-  private var port: Int = 0
-  private var masterProperties = new Properties()
-  val PROCESS_BOOT_TIME = Duration(25, TimeUnit.SECONDS)
-
-  def getActorSystem: ActorSystem = system
-  def getHost: String = host
-  def getPort: Int = port
-
-  protected def config: Config
-
-  def startActorSystem(): Unit = {
-    val systemConfig = config
-    system = ActorSystem(MASTER, systemConfig)
-    systemAddress = ActorUtil.getSystemAddress(system)
-    host = systemAddress.host.get
-    port = systemAddress.port.get
-
-    masterProperties.put(s"${GEARPUMP_CLUSTER_MASTERS}.0", 
s"$getHost:$getPort")
-    masterProperties.put(s"${GEARPUMP_HOSTNAME}", s"$getHost")
-
-    LOG.info(s"Actor system is started, $host, $port")
-  }
-
-  def shutdownActorSystem(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-    LOG.info(s"Actor system is stopped, $host, $port")
-  }
-
-  def convertTestConf(host: String, port: Int): File = {
-    val test = ConfigFactory.parseResourcesAnySyntax("test.conf",
-      ConfigParseOptions.defaults.setAllowMissing(true))
-
-    val newConf = test.withValue(GEARPUMP_CLUSTER_MASTERS,
-      ConfigValueFactory.fromAnyRef(Array(s"$host:$port").toList.asJava))
-
-    val confFile = File.createTempFile("main", ".conf")
-    val serialized = newConf.root().render()
-    FileUtils.write(confFile, serialized)
-    confFile
-  }
-
-  def createMockMaster(): TestProbe = {
-    val masterReceiver = TestProbe()(system)
-    val master = system.actorOf(Props(classOf[MockMaster], masterReceiver), 
MASTER)
-    masterReceiver
-  }
-
-  def isPortUsed(host: String, port: Int): Boolean = {
-
-    var isPortUsed = true
-    val socket = new Socket()
-    try {
-      socket.setReuseAddress(true)
-      socket.connect(new InetSocketAddress(host, port), 1000)
-      socket.isConnected
-    } catch {
-      case ex: SocketTimeoutException =>
-        isPortUsed = false
-      case ex: UnknownHostException =>
-        isPortUsed = false
-      case ex: Throwable =>
-        // For other case, we think the port has been occupied.
-        isPortUsed = true
-    } finally {
-      socket.close()
-    }
-    isPortUsed
-  }
-
-  def getContextClassPath: Array[String] = {
-    val contextLoader = Thread.currentThread().getContextClassLoader()
-
-    val urlLoader = if (!contextLoader.isInstanceOf[URLClassLoader]) {
-      contextLoader.getParent.asInstanceOf[URLClassLoader]
-    } else {
-      contextLoader.asInstanceOf[URLClassLoader]
-    }
-
-    val urls = urlLoader.getURLs()
-    val classPath = urls.map { url =>
-      new File(url.getPath()).toString
-    }
-    classPath
-  }
-
-  /**
-   * Remove trailing $
-   */
-  def getMainClassName(mainObj: Any): String = {
-    mainObj.getClass.getName.dropRight(1)
-  }
-
-  def getMasterListOption(): Array[String] = {
-    masterProperties.asScala.toList.map { kv =>
-      s"-D${kv._1}=${kv._2}"
-    }.toArray
-  }
-
-  def masterConfig: Config = {
-    
ConfigFactory.parseProperties(masterProperties).withFallback(system.settings.config)
-  }
-}
-
-object MasterHarness {
-
-  val cachedPool = 
ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())
-
-  class MockMaster(receiver: TestProbe) extends Actor {
-    def receive: Receive = {
-      case msg => {
-        receiver.ref forward msg
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/core/src/test/scala/io/gearpump/cluster/TestUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/io/gearpump/cluster/TestUtil.scala 
b/core/src/test/scala/io/gearpump/cluster/TestUtil.scala
deleted file mode 100644
index dce4902..0000000
--- a/core/src/test/scala/io/gearpump/cluster/TestUtil.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster
-
-import akka.actor._
-
-object TestUtil {
-  val DEFAULT_CONFIG = ClusterConfig.default("test.conf")
-  val MASTER_CONFIG = ClusterConfig.master("test.conf")
-  val UI_CONFIG = ClusterConfig.ui("test.conf")
-
-  class DummyAppMaster(context: AppMasterContext, app: AppDescription) extends 
ApplicationMaster {
-    context.masterProxy !(context, app)
-
-    def receive: Receive = null
-  }
-
-  val dummyApp: AppDescription =
-    AppDescription("dummy", classOf[DummyAppMaster].getName, UserConfig.empty)
-}
\ No newline at end of file

Reply via email to