fix #1996, EmbeddedCluster requires master configuration like 
ClusterActorRefProvider.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ecf5b468
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ecf5b468
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ecf5b468

Branch: refs/heads/master
Commit: ecf5b468e89c6b19b994760c4b784d11ce8eeee6
Parents: a7bbb4c
Author: Sean Zhong <[email protected]>
Authored: Wed Mar 9 15:24:31 2016 +0800
Committer: Sean Zhong <[email protected]>
Committed: Thu Mar 10 10:11:21 2016 +0800

----------------------------------------------------------------------
 conf/gear.conf                                  |  54 +++-
 core/src/main/resources/geardefault.conf        | 279 ++++++++++++-------
 .../io/gearpump/cluster/ClusterConfig.scala     |  10 +-
 .../main/scala/io/gearpump/util/ActorUtil.scala |   4 -
 .../main/scala/io/gearpump/util/Constants.scala |   2 +-
 core/src/test/resources/test.conf               |  10 +-
 .../cluster/embedded/EmbeddedCluster.scala      |  89 ++++++
 .../gearpump/cluster/local/LocalCluster.scala   |  88 ------
 .../scala/io/gearpump/cluster/main/Local.scala  |   1 -
 .../examples/wordcountjava/WordCount.java       |  20 +-
 .../examples/wordcount/WordCount.scala          |  20 +-
 .../stream/gearpump/graph/RemoteGraph.scala     |   4 +-
 .../checklist/CommandLineSpec.scala             |  10 +-
 .../io/gearpump/streaming/task/TaskUtil.scala   |   3 +-
 14 files changed, 373 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/conf/gear.conf
----------------------------------------------------------------------
diff --git a/conf/gear.conf b/conf/gear.conf
index 1673cb7..e5898e9 100644
--- a/conf/gear.conf
+++ b/conf/gear.conf
@@ -3,6 +3,7 @@
 ### To use the application, you at least need to change gearpump.cluster to 
point to right master
 #########################################
 
+### Configurations that are shared by all services and all user applications...
 gearpump {
 
   ##############################
@@ -53,7 +54,9 @@ gearpump {
     cpu-core-limit-per-executor = 1
   }
 
-  ## Whether we allow remote debug
+  ## Whether to enable remote-debug mode.
+  ## In remote debug mode, every executor process will bind to a free port, and
+  ## listen for remote jvm debug.
   remote-debug-executor-jvm = false
 
   ### When the resource cannot be allocated in the timeout, then
@@ -104,6 +107,40 @@ gearpump {
     logfile {
 
     }
+
+    ## Coarse-grain history metrics, which have a larger timespan but sparse 
data points
+    retainHistoryData {
+      # max hours of history data to retain
+      # Note: due to implementation limitation(we store all history in memory),
+      # please don't set this to too big which may exhaust memory.
+      hours = 72
+
+      # time interval between two data points for history data (unit: ms)
+      # Usually this value is set to a big value so that we only store
+      # coarse-grain data
+      intervalMs = 3600000
+    }
+
+    ## fine-grain recent metrics which just happened, which have a smaller 
timespan but dense data points
+    retainRecentData {
+
+      # max seconds of recent data to retain,
+      # THis is for the fine-grain data
+      seconds = 300
+
+      # time interval between two data points for recent data (unit: ms)
+      intervalMs = 15000
+    }
+    akka {
+      ### For this reporter, at most we will return max-limit-on-query metric 
item.
+      max-limit-on-query = 1000
+
+      ### Whitelist for Metrics Aggregator class.
+      ### See class [[MetricsAggregator]] for more information.
+      metrics-aggregator-class {
+        ## Format io.gearpump.KeyFullClassName = ""
+      }
+    }
   }
 
   #######################################
@@ -124,6 +161,10 @@ gearpump {
   }
 
 
+
+  ### client's timeout (in second) to connect to master and wait for the 
response
+  masterclient.timeout = 90
+
   ### Gearpump has built-in serialization framework using Kryo.
   ### User are allowed to use a different serialization framework, like 
Protobuf
   ### See [io.gearpump.serializer.FastKryoSerializationFramework] to find how
@@ -159,6 +200,10 @@ gearpump {
   services {
     host = "0.0.0.0"
     http = 8090
+    supervisor-actor-path = ""
+
+    ## To get a detail config string with origin and comments, change this to 
false.
+    config-render-option-concise = true
   }
 
   ## Time out setting to start a new executor system
@@ -366,10 +411,9 @@ gearpump-ui {
   }
 }
 
-
-## Configurations only visible on Windows operation system..
-gearpump-windows {
+## Configurations only visible on Linux or Mac.
+gearpump-linux {
   ### On windows, the value must be larger than 10ms, check
   ### 
https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L204
-  akka.scheduler.tick-duration = 10
+  akka.scheduler.tick-duration = 1
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/core/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/core/src/main/resources/geardefault.conf 
b/core/src/main/resources/geardefault.conf
index d775696..065e5e0 100644
--- a/core/src/main/resources/geardefault.conf
+++ b/core/src/main/resources/geardefault.conf
@@ -6,6 +6,79 @@
 ### Configurations that are shared by all services and all user applications...
 gearpump {
 
+  ##############################
+  ### Required to change!!
+  ### You need to set the master cluster address here
+  ###
+  ###
+  ### For example, you may start three master
+  ### on node1: bin/master -ip node1 -port 3000
+  ### on node2: bin/master -ip node2 -port 3000
+  ### on node3: bin/master -ip node3 -port 3000
+  ###
+  ### Then you need to set the cluster.masters = 
["node1:3000","node2:3000","node3:3000"]
+  cluster {
+    masters = []
+  }
+
+  ##############################
+  ### Required to change!!
+  ### You need to set the actual host name here
+  ###
+  hostname = "127.0.0.1"
+
+
+  ## The installation folder of gearpump
+  home = ""
+
+
+
+
+
+  serializer.pool = "io.gearpump.serializer.FastKryoSerializerPool"
+
+
+
+  ## How many slots each worker contains
+  worker.slots = 1000
+
+  ## The class responsable for launching the executor process.
+  ## User can switch to "io.gearpump.cluster.worker.CGroupProcessLauncher" to 
enable CGroup support.
+  worker.executor-process-launcher = 
"io.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
+
+  ## To enable worker use cgroup to make resource isolation,
+  ## set gearpump.worker.executor-process-launcher = 
"io.gearpump.cluster.worker.CGroupProcessLauncher"
+  ##
+  ## Before enable it, you should also make sure:
+  ##   1. Linux version (>= 2.6.18)
+  ##   2. Have installed cgroup (check the file's existence:/proc/cgroups)
+  ## You can get more information from http://gearpump.io
+  ##
+  ## For cgroup root, it represents the root node in CGroup's hierarchythe.
+  ## It's full path in local file system is "${cpu_mount_point} + root".
+  ## The cpu_mount_point is the cpu subsystem's mount poing in CGroup.
+  ## The root dir should be consistent with the part configured in 
/etc/cgconfig.conf
+  cgroup {
+    root = "gearpump"
+
+    ## This config only works when cgroup is enabled.
+    ## The value means the number of CPU cores per executor can use, -1 means 
no limitation.
+    cpu-core-limit-per-executor = 1
+  }
+
+  ## Whether to enable remote-debug mode.
+  ## In remote debug mode, every executor process will bind to a free port, and
+  ## listen for remote jvm debug.
+  remote-debug-executor-jvm = false
+
+  ### When the resource cannot be allocated in the timeout, then
+  ### the appmaster will shutdown itself.
+  resource-allocation-timeout-seconds = 120
+  
+  ##
+  ## Executor share same process of worker
+  worker.executor-share-same-jvm-as-worker = false
+
   ###########################
   ### Change the dispather for tasks
   ### If you don't know what this is about, don't change it
@@ -13,12 +86,13 @@ gearpump {
 
   task-dispatcher = "gearpump.shared-thread-pool-dispatcher"
 
-  ## The installation folder of gearpump
-  home = ""
-
-  ## Time out setting to start a new executor system
-  ## It takes a bit longer time than expected as a new JVM is created
-  start-executor-system-timeout-ms = 30000
+  ### verbose gc
+  ### turn on JVM verbose GC
+  ### We will use -verbose:gc -Xloggc: -XX:+PrintGCDetails
+  ### -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution
+  ### -XX:+PrintGCApplicationConcurrentTime
+  ### -XX:+PrintGCApplicationStoppedTime
+  verbose-gc = false
 
   ###########################
   ### Metrics setting,
@@ -34,10 +108,10 @@ gearpump {
     ### This config will slightly impact the application's performance, larger 
rate often brings higher throughput.
     sample-rate = 1
 
-    report-interval-ms = 15000
+    report-interval-ms = 3000
 
+    # reporter = "logfile"
     # reporter = "graphite"
-    # reporter = "akka"
     reporter = "akka"
 
     graphite {
@@ -85,19 +159,6 @@ gearpump {
     }
   }
 
-  ## Whether to enable remote-debug mode.
-  ## In remote debug mode, every executor process will bind to a free port, and
-  ## listen for remote jvm debug.
-  remote-debug-executor-jvm = false
-
-  ### verbose gc
-  ### turn on JVM verbose GC
-  ### We will use -verbose:gc -Xloggc: -XX:+PrintGCDetails
-  ### -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution
-  ### -XX:+PrintGCApplicationConcurrentTime
-  ### -XX:+PrintGCApplicationStoppedTime
-  verbose-gc = false
-
   #######################################
   ### Logging settings
   #######################################
@@ -107,8 +168,6 @@ gearpump {
   # The log dir for applications
   log.application.dir = "logs"
 
-  serializer.pool = "io.gearpump.serializer.FastKryoSerializerPool"
-
   serializers {
     ## Use default serializer for these types
     "scala.collection.immutable.List" = ""
@@ -133,87 +192,22 @@ gearpump {
     "akka.actor.LocalActorRef" = ""
   }
 
-  ## How many slots each worker contains
-  worker.slots = 100
 
-  ## The class responsable for launching the executor process.
-  ## User can switch to "io.gearpump.cluster.worker.CGroupProcessLauncher" to 
enable CGroup support.
-  worker.executor-process-launcher = 
"io.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
-
-  ## To enable worker use cgroup to make resource isolation,
-  ## set gearpump.worker.executor-process-launcher = 
"io.gearpump.cluster.worker.CGroupProcessLauncher"
-  ##
-  ## Before enable it, you should also make sure:
-  ##   1. Linux version (>= 2.6.18)
-  ##   2. Have installed cgroup (check the file's existence:/proc/cgroups)
-  ## You can get more information from http://gearpump.io
-  ##
-  ## For cgroup root, it represents the root node in CGroup's hierarchythe.
-  ## It's full path in local file system is "${cpu_mount_point} + root".
-  ## The cpu_mount_point is the cpu subsystem's mount poing in CGroup.
-  ## The root dir should be consistent with the part configured in 
/etc/cgconfig.conf
-  cgroup {
-    root = "gearpump"
-
-    ## This config only works when cgroup is enabled.
-    ## The value means the number of CPU cores per executor can use, -1 means 
no limitation.
-    cpu-core-limit-per-executor = 1
-  }
-
-  ##
-  ## Executor share same process of worker
-  worker.executor-share-same-jvm-as-worker = false
-
-  ###################
-  ### Appmaster JVM argument configuration
-  ###################
-  appmaster {
-    vmargs = ""
-    extraClasspath = ""
-  }
-
-  ###################
-  ### Executor argument configuration
-  ### Executor JVM can contains multiple tasks
-  ###################
-  executor {
-    vmargs = ""
-    extraClasspath = ""
-  }
-
-  ##############################
-  ### Required to change!!
-  ### You need to set the master cluster address here
-  ###
-  ###
-  ### For example, you may start three master
-  ### on node1: bin/master -ip node1 -port 3000
-  ### on node2: bin/master -ip node2 -port 3000
-  ### on node3: bin/master -ip node3 -port 3000
-  ###
-  ### Then you need to set the cluster.masters = 
["node1:3000","node2:3000","node3:3000"]
-  cluster {
-    masters = []
-  }
-
-  ##############################
-  ### Required to change!!
-  ### You need to set the actual host name here
-  ###
-  hostname = "127.0.0.1"
-
-  ### When the resource cannot be allocated in the timeout, then
-  ### the appmaster will shutdown itself.
-  resource-allocation-timeout-seconds = 10
 
   ### client's timeout (in second) to connect to master and wait for the 
response
   masterclient.timeout = 90
 
+  ### Gearpump has built-in serialization framework using Kryo.
+  ### User are allowed to use a different serialization framework, like 
Protobuf
+  ### See [io.gearpump.serializer.FastKryoSerializationFramework] to find how
+  ### a custom serialization framework can be defined.
+  serialization-framework = 
"io.gearpump.serializer.FastKryoSerializationFramework"
+
   ### Define where the submitted jar file will be stored at
 
   ### This path follows the hadoop path schema
   ### For HDFS, use hdfs://host:port/path/
-  ### If you want to store on master nodes, then use local directory,
+  ### For shared NFS folder, use file:///your_nfs_mapping_directory
   ### jarstore.rootpath = "jarstore/" will points to relative directory where 
master is started.
   ### jarstore.rootpath = "/jarstore/" will points to absolute directory on 
master server
   jarstore.rootpath = "jarstore/"
@@ -244,6 +238,10 @@ gearpump {
     config-render-option-concise = true
   }
 
+  ## Time out setting to start a new executor system
+  ## It takes a bit longer time than expected as a new JVM is created
+  start-executor-system-timeout-ms = 120000
+
   #############################################
   ## Default Configuration for Gearpump Netty transport layer
   ## If you don't know what is this about, don't change it
@@ -258,6 +256,37 @@ gearpump {
     dispatcher = "gearpump.shared-thread-pool-dispatcher"
   }
 
+  ###################
+  ### Appmaster JVM argument configuration
+  ###################
+  appmaster {
+    vmargs = ""
+    extraClasspath = ""
+  }
+
+  ###################
+  ### Executor argument configuration
+  ### Executor JVM can contains multiple tasks
+  ###################
+  executor {
+    vmargs = ""
+    extraClasspath = ""
+  }
+
+  ### Streaming related configuration
+  streaming {
+    ## We will timeout the task if it cannot register itself to AppMaster in 
time.
+    register-task-timeout-ms = 120000
+
+    ## ack once after sending ack-once-every-message-count messages.
+    ack-once-every-message-count = 100
+
+    ## max pending message per task to task connection. If pending message 
size is
+    ## over this, then the flow control will not allow further sending.
+    ## This value should be bigger than ack-once-every-message-count
+    max-pending-message-count-per-connection = 1000
+  }
+
   ##################################
   ### Akka Dispatcher configurations
   ### If you don't know what is this about, don't change it
@@ -275,8 +304,45 @@ gearpump {
   single-thread-dispatcher {
     type = PinnedDispatcher
   }
+
+  ###########################
+  ### Configuration for yarn module
+  ###########################
+  yarn {
+    client {
+      package-path = "/usr/lib/gearpump/gearpump.zip"
+    }
+
+    applicationmaster {
+      ## Memory of YarnAppMaster
+      command = "$JAVA_HOME/bin/java -Xmx512m"
+      memory = "512"
+      vcores = "1"
+      queue = "default"
+    }
+
+    master {
+      ## Memory of master daemon
+      command = "$JAVA_HOME/bin/java  -Xmx512m"
+      memory = "512"
+      vcores = "1"
+    }
+
+    worker {
+      ## memory of worker daemon
+      command = "$JAVA_HOME/bin/java  -Xmx512m"
+      containers = "1"
+      ## This also contains all memory for child executors.
+      memory = "4096"
+      vcores = "1"
+    }
+    services {
+      enabled = true
+    }
+  }
 }
 
+
 ### Configuration only visible to master nodes..
 gearpump-master {
   ## NOTE: Please add akka-related settings in gear.conf to avoid confusion in
@@ -289,11 +355,14 @@ gearpump-worker {
   ## config overriding.
 }
 
-## Configurations only visible on Windows operation system..
-gearpump-windows {
-}
 
-### configurations only visible to UI service...
+#########################################
+### For log level of Akka class, you need to change both log4j.properties and 
this entry
+#########################################
+#akka.loglevel = "INFO"
+
+
+### configurations only visible to UI server.
 gearpump-ui {
   ## Security related settings
   gearpump.ui-security {
@@ -344,7 +413,9 @@ gearpump-ui {
 ## ticket: https://github.com/akka/akka/issues/16295
 akka.io.tcp.windows-connection-abort-workaround-enabled=off
 
-akka.scheduler.tick-duration = 1
+### On windows, the value must be larger than 10ms, check
+### 
https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L204
+akka.scheduler.tick-duration = 10
 
 akka {
   http {
@@ -453,3 +524,11 @@ akka {
     }
   }
 }
+
+
+## Configurations only visible on Linux or Mac.
+gearpump-linux {
+  ### On windows, the value must be larger than 10ms, check
+  ### 
https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L204
+  akka.scheduler.tick-duration = 1
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala 
b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
index e2fa155..64d5c42 100644
--- a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
+++ b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala
@@ -133,13 +133,15 @@ object ClusterConfig {
 
     val all = 
systemProperties.withFallback(user).withFallback(gear).withFallback(gearDefault)
 
-    val windows = all.getConfig(WINDOWS_CONFIG)
+    val linux = all.getConfig(LINUX_CONFIG)
 
     var basic = all.withoutPath(MASTER_CONFIG).withoutPath(WORKER_CONFIG).
-      withoutPath(UI_CONFIG).withoutPath(WINDOWS_CONFIG)
+      withoutPath(UI_CONFIG).withoutPath(LINUX_CONFIG)
 
-    if (akka.util.Helpers.isWindows) {
-      basic = windows.withFallback(basic)
+    if (!akka.util.Helpers.isWindows) {
+
+      // Change the akka.scheduler.tick-duration to 10ms for Linux or Mac
+      basic = linux.withFallback(basic)
     }
 
     val master = replaceHost(all.getConfig(MASTER_CONFIG).withFallback(basic))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/core/src/main/scala/io/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala 
b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
index 830d45e..b63733d 100644
--- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/io/gearpump/util/ActorUtil.scala
@@ -71,10 +71,6 @@ object ActorUtil {
     }
   }
 
-  def loadClass(className: String): Class[_<:Actor] = {
-    Class.forName(className).asSubclass(classOf[Actor])
-  }
-
   def actorNameForExecutor(appId : Int, executorId : Int) = "app" + appId + 
"-executor" + executorId
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/core/src/main/scala/io/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala 
b/core/src/main/scala/io/gearpump/util/Constants.scala
index 5e340e3..7d066e9 100644
--- a/core/src/main/scala/io/gearpump/util/Constants.scala
+++ b/core/src/main/scala/io/gearpump/util/Constants.scala
@@ -29,7 +29,7 @@ object Constants {
   val MASTER_CONFIG = "gearpump-master"
   val WORKER_CONFIG = "gearpump-worker"
   val UI_CONFIG = "gearpump-ui"
-  val WINDOWS_CONFIG = "gearpump-windows"
+  val LINUX_CONFIG = "gearpump-linux" // linux or Mac
 
   val MASTER = "master"
   val WORKER = "worker"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/core/src/test/resources/test.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/test.conf 
b/core/src/test/resources/test.conf
index edc01d9..3deb28b 100644
--- a/core/src/test/resources/test.conf
+++ b/core/src/test/resources/test.conf
@@ -8,6 +8,10 @@ gearpump {
 
   worker.slots = 100
 
+  ### When the resource cannot be allocated in the timeout, then
+  ### the appmaster will shutdown itself.
+  resource-allocation-timeout-seconds = 10
+
   worker.executor-process-launcher = 
"io.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
 
   cluster {
@@ -21,11 +25,11 @@ gearpump {
   serialization-framework = 
"io.gearpump.serializer.FastKryoSerializationFramework"
 }
 
-## Configurations only visible on Windows operation system..
-gearpump-windows {
+## Configurations only visible on Linux or Mac..
+gearpump-linux {
   ### On windows, the value must be larger than 10ms, check
   ### 
https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L204
-  akka.scheduler.tick-duration = 10
+  akka.scheduler.tick-duration = 1
 }
 
 ### Configuration only visible to worker nodes...

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
----------------------------------------------------------------------
diff --git 
a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala 
b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
new file mode 100644
index 0000000..6be52b2
--- /dev/null
+++ b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.cluster.embedded
+
+import akka.actor.{ActorRef, ActorSystem, Props}
+import com.typesafe.config.{ConfigValueFactory, Config}
+import io.gearpump.cluster.ClusterConfig
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.master.{Master => MasterActor}
+import io.gearpump.cluster.worker.{Worker => WorkerActor}
+import io.gearpump.util.{LogUtil, Constants, Util, ActorUtil}
+import io.gearpump.util.Constants.{GEARPUMP_METRIC_ENABLED, 
GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, MASTER, 
GEARPUMP_CLUSTER_MASTERS}
+import scala.collection.JavaConverters._
+
+
+/**
+ * Create a in-process cluster with single worker
+ */
+class EmbeddedCluster(inputConfig: Config) {
+
+  private val workerCount: Int = 1
+  private var _master: ActorRef = null
+  private var _system: ActorSystem = null
+  private var _config: Config = null
+
+  private val LOG = LogUtil.getLogger(getClass)
+
+  def start: Unit = {
+    val port = Util.findFreePort.get
+    val akkaConf = getConfig(inputConfig, port)
+    _config = akkaConf
+    val system = ActorSystem(MASTER, akkaConf)
+
+    val master = system.actorOf(Props[MasterActor], MASTER)
+
+    0.until(workerCount).foreach { id =>
+      system.actorOf(Props(classOf[WorkerActor], master), 
classOf[WorkerActor].getSimpleName + id)
+    }
+    this._master = master
+    this._system = system
+
+    LOG.info("=================================")
+    LOG.info("Local Cluster is started at: ")
+    LOG.info(s"                 127.0.0.1:$port")
+    LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
+  }
+
+  private def getConfig(inputConfig: Config, port: Int): Config = {
+    val config = inputConfig.
+      withValue("akka.remote.netty.tcp.port", 
ConfigValueFactory.fromAnyRef(port)).
+      withValue(GEARPUMP_CLUSTER_MASTERS, 
ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
+      withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, 
ConfigValueFactory.fromAnyRef(true)).
+      withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)).
+      withValue("akka.actor.provider", 
ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider"))
+    config
+  }
+
+  def newClientContext: ClientContext = {
+    ClientContext(_config, _system, _master)
+  }
+
+  def stop: Unit = {
+    _system.stop(_master)
+    _system.shutdown()
+    _system.awaitTermination()
+  }
+}
+
+object EmbeddedCluster{
+  def apply(): EmbeddedCluster = {
+    new EmbeddedCluster(ClusterConfig.master())
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/daemon/src/main/scala/io/gearpump/cluster/local/LocalCluster.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/local/LocalCluster.scala 
b/daemon/src/main/scala/io/gearpump/cluster/local/LocalCluster.scala
deleted file mode 100644
index 0276f72..0000000
--- a/daemon/src/main/scala/io/gearpump/cluster/local/LocalCluster.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.cluster.local
-
-import akka.actor.{ActorRef, ActorSystem, Props}
-import com.typesafe.config.{ConfigValueFactory, Config}
-import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.master.{Master => MasterActor}
-import io.gearpump.cluster.worker.{Worker => WorkerActor}
-import io.gearpump.util.{LogUtil, Constants, Util, ActorUtil}
-import io.gearpump.util.Constants.{GEARPUMP_METRIC_ENABLED, 
GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, MASTER, 
GEARPUMP_CLUSTER_MASTERS}
-import scala.collection.JavaConverters._
-
-
-/**
- * Create a in-process cluster with single worker
- */
-class LocalCluster(inputConfig: Config) {
-
-  private val workerCount: Int = 1
-  private var _master: ActorRef = null
-  private var _system: ActorSystem = null
-  private var _config: Config = null
-
-  private val LOG = LogUtil.getLogger(getClass)
-
-  def start: Unit = {
-    val port = Util.findFreePort.get
-    val akkaConf = getConfig(inputConfig, port)
-    _config = akkaConf
-    val system = ActorSystem(MASTER, akkaConf)
-
-    val master = system.actorOf(Props[MasterActor], MASTER)
-
-    0.until(workerCount).foreach { id =>
-      system.actorOf(Props(classOf[WorkerActor], master), 
classOf[WorkerActor].getSimpleName + id)
-    }
-    this._master = master
-    this._system = system
-
-    LOG.info("=================================")
-    LOG.info("Local Cluster is started at: ")
-    LOG.info(s"                 127.0.0.1:$port")
-    LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port")
-  }
-
-  private def getConfig(inputConfig: Config, port: Int): Config = {
-    val config = inputConfig.
-      withValue("akka.remote.netty.tcp.port", 
ConfigValueFactory.fromAnyRef(port)).
-      withValue(GEARPUMP_CLUSTER_MASTERS, 
ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)).
-      withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, 
ConfigValueFactory.fromAnyRef(true)).
-      withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true))
-    config
-  }
-
-  def newClientContext: ClientContext = {
-    ClientContext(_config, _system, _master)
-  }
-
-  def stop: Unit = {
-    _system.stop(_master)
-    _system.shutdown()
-    _system.awaitTermination()
-  }
-}
-
-object LocalCluster{
-  def apply(): LocalCluster = {
-    new LocalCluster(ClusterConfig.master())
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala 
b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
index 3f26538..d6b2479 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala
@@ -21,7 +21,6 @@ package io.gearpump.cluster.main
 import akka.actor.{ActorSystem, Props}
 import com.typesafe.config.ConfigValueFactory
 import io.gearpump.cluster.ClusterConfig
-import io.gearpump.cluster.local.LocalCluster
 import io.gearpump.cluster.master.{Master => MasterActor}
 import io.gearpump.cluster.worker.{Worker => WorkerActor}
 import io.gearpump.util.Constants._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
 
b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
index 05d4e27..9e1e7d5 100644
--- 
a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
+++ 
b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java
@@ -22,7 +22,7 @@ import com.typesafe.config.Config;
 import io.gearpump.cluster.ClusterConfig;
 import io.gearpump.cluster.UserConfig;
 import io.gearpump.cluster.client.ClientContext;
-import io.gearpump.cluster.local.LocalCluster;
+import io.gearpump.cluster.embedded.EmbeddedCluster;
 import io.gearpump.partitioner.HashPartitioner;
 import io.gearpump.partitioner.Partitioner;
 import io.gearpump.streaming.javaapi.Graph;
@@ -31,11 +31,11 @@ import io.gearpump.streaming.javaapi.StreamApplication;
 
 public class WordCount {
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws InterruptedException {
     main(ClusterConfig.defaultConfig(), args);
   }
 
-  public static void main(Config akkaConf, String[] args) {
+  public static void main(Config akkaConf, String[] args) throws 
InterruptedException {
 
     // For split task, we config to create two tasks
     int splitTaskNumber = 2;
@@ -58,9 +58,12 @@ public class WordCount {
     StreamApplication app = new StreamApplication("wordcountJava", conf, 
graph);
 
 
-    LocalCluster localCluster = null;
-    if (System.getProperty("DEBUG") != null) {
-      localCluster = new LocalCluster(akkaConf);
+    EmbeddedCluster localCluster = null;
+
+    Boolean debugMode = System.getProperty("DEBUG") != null;
+
+    if (debugMode) {
+      localCluster = new EmbeddedCluster(akkaConf);
       localCluster.start();
     }
 
@@ -75,6 +78,11 @@ public class WordCount {
     }
 
     masterClient.submit(app);
+
+    if (debugMode) {
+      Thread.sleep(30 * 1000); // sleep for 30 seconds.
+    }
+
     masterClient.close();
 
     if (localCluster != null) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
index 10faefe..22f8ac6 100644
--- 
a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala
@@ -18,7 +18,7 @@
 
 package io.gearpump.streaming.examples.wordcount
 
-import io.gearpump.cluster.local.LocalCluster
+import io.gearpump.cluster.embedded.{EmbeddedCluster}
 import io.gearpump.streaming.{StreamApplication, Processor}
 import io.gearpump.cluster.UserConfig
 import io.gearpump.cluster.client.ClientContext
@@ -34,8 +34,10 @@ object WordCount extends AkkaApp with ArgumentsParser {
 
   override val options: Array[(String, CLIOption[Any])] = Array(
     "split" -> CLIOption[Int]("<how many split tasks>", required = false, 
defaultValue = Some(1)),
-    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1))
-    )
+    "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1)),
+    "debug" -> CLIOption[Boolean]("<true|false>", required = false, 
defaultValue = Some(false)),
+    "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", 
required = false, defaultValue = Some(30))
+   )
 
   def application(config: ParseResult) : StreamApplication = {
     val splitNum = config.getInt("split")
@@ -51,8 +53,11 @@ object WordCount extends AkkaApp with ArgumentsParser {
   override def main(akkaConf: Config, args: Array[String]): Unit = {
     val config = parse(args)
 
-    val localCluster = if (System.getProperty("DEBUG") != null) {
-      val cluster = new LocalCluster(akkaConf: Config)
+    val debugMode = config.getBoolean("debug")
+    val sleepSeconds = config.getInt("sleep")
+
+    val localCluster = if (debugMode) {
+      val cluster = new EmbeddedCluster(akkaConf: Config)
       cluster.start
       Some(cluster)
     } else {
@@ -66,6 +71,11 @@ object WordCount extends AkkaApp with ArgumentsParser {
 
     val app = application(config)
     context.submit(app)
+
+    if (debugMode) {
+      Thread.sleep(sleepSeconds * 1000) // sleep for 30 seconds for debugging.
+    }
+
     context.close()
     localCluster.map(_.stop)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
index 96f1bf3..250c354 100644
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
@@ -27,7 +27,7 @@ import 
akka.stream.gearpump.task.SourceBridgeTask.SourceBridgeTaskClient
 import akka.stream.impl.StreamLayout.Module
 import io.gearpump.cluster.ClusterConfig
 import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.local.LocalCluster
+import io.gearpump.cluster.embedded.{EmbeddedCluster}
 import io.gearpump.streaming.{StreamApplication, ProcessorId}
 import io.gearpump.util.Graph
 
@@ -49,7 +49,7 @@ object RemoteGraph {
    */
   class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: 
ActorSystem) extends SubGraphMaterializer {
     private val local = if (useInProcessCluster) {
-      val cluster = LocalCluster()
+      val cluster = EmbeddedCluster()
       cluster.start
       Some(cluster)
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
index 5831656..1c9fdfd 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala
@@ -86,6 +86,14 @@ class CommandLineSpec extends TestSpecBase {
       success shouldBe false
     }
 
+    "the EmbededCluster can be used as embedded cluster in process" in {
+      // setup
+      val args = "-debug true -sleep 10"
+      val appId = expectSubmitAppSuccess(wordCountJar, args)
+      var success = commandLineClient.killApp(appId)
+      success shouldBe true
+    }
+
     "should fail when attempting to kill a non-exist application" in {
       // setup
       val freeAppId = getNextAvailableAppId
@@ -110,7 +118,7 @@ class CommandLineSpec extends TestSpecBase {
     commandLineClient.listApps().length + 1
   }
 
-  private def expectSubmitAppSuccess(jar: String): Int = {
+  private def expectSubmitAppSuccess(jar: String, args: String = ""): Int = {
     val appId = commandLineClient.submitApp(jar)
     appId should not equal -1
     appId

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala 
b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
index 79cf442..c6564ff 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala
@@ -26,6 +26,7 @@ object TaskUtil {
    * @return resolved class
    */
   def loadClass(className: String): Class[_<:Task] = {
-    Class.forName(className).asSubclass(classOf[Task])
+    val loader = Thread.currentThread().getContextClassLoader()
+    loader.loadClass(className).asSubclass(classOf[Task])
   }
 }


Reply via email to