fix #1996, EmbeddedCluster requires master configuration like ClusterActorRefProvider.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ecf5b468 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ecf5b468 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ecf5b468 Branch: refs/heads/master Commit: ecf5b468e89c6b19b994760c4b784d11ce8eeee6 Parents: a7bbb4c Author: Sean Zhong <[email protected]> Authored: Wed Mar 9 15:24:31 2016 +0800 Committer: Sean Zhong <[email protected]> Committed: Thu Mar 10 10:11:21 2016 +0800 ---------------------------------------------------------------------- conf/gear.conf | 54 +++- core/src/main/resources/geardefault.conf | 279 ++++++++++++------- .../io/gearpump/cluster/ClusterConfig.scala | 10 +- .../main/scala/io/gearpump/util/ActorUtil.scala | 4 - .../main/scala/io/gearpump/util/Constants.scala | 2 +- core/src/test/resources/test.conf | 10 +- .../cluster/embedded/EmbeddedCluster.scala | 89 ++++++ .../gearpump/cluster/local/LocalCluster.scala | 88 ------ .../scala/io/gearpump/cluster/main/Local.scala | 1 - .../examples/wordcountjava/WordCount.java | 20 +- .../examples/wordcount/WordCount.scala | 20 +- .../stream/gearpump/graph/RemoteGraph.scala | 4 +- .../checklist/CommandLineSpec.scala | 10 +- .../io/gearpump/streaming/task/TaskUtil.scala | 3 +- 14 files changed, 373 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/conf/gear.conf ---------------------------------------------------------------------- diff --git a/conf/gear.conf b/conf/gear.conf index 1673cb7..e5898e9 100644 --- a/conf/gear.conf +++ b/conf/gear.conf @@ -3,6 +3,7 @@ ### To use the application, you at least need to change gearpump.cluster to point to right master ######################################### +### Configurations that are shared by all services and all user applications... gearpump { ############################## @@ -53,7 +54,9 @@ gearpump { cpu-core-limit-per-executor = 1 } - ## Whether we allow remote debug + ## Whether to enable remote-debug mode. + ## In remote debug mode, every executor process will bind to a free port, and + ## listen for remote jvm debug. remote-debug-executor-jvm = false ### When the resource cannot be allocated in the timeout, then @@ -104,6 +107,40 @@ gearpump { logfile { } + + ## Coarse-grain history metrics, which have a larger timespan but sparse data points + retainHistoryData { + # max hours of history data to retain + # Note: due to implementation limitation(we store all history in memory), + # please don't set this to too big which may exhaust memory. + hours = 72 + + # time interval between two data points for history data (unit: ms) + # Usually this value is set to a big value so that we only store + # coarse-grain data + intervalMs = 3600000 + } + + ## fine-grain recent metrics which just happened, which have a smaller timespan but dense data points + retainRecentData { + + # max seconds of recent data to retain, + # THis is for the fine-grain data + seconds = 300 + + # time interval between two data points for recent data (unit: ms) + intervalMs = 15000 + } + akka { + ### For this reporter, at most we will return max-limit-on-query metric item. + max-limit-on-query = 1000 + + ### Whitelist for Metrics Aggregator class. + ### See class [[MetricsAggregator]] for more information. + metrics-aggregator-class { + ## Format io.gearpump.KeyFullClassName = "" + } + } } ####################################### @@ -124,6 +161,10 @@ gearpump { } + + ### client's timeout (in second) to connect to master and wait for the response + masterclient.timeout = 90 + ### Gearpump has built-in serialization framework using Kryo. ### User are allowed to use a different serialization framework, like Protobuf ### See [io.gearpump.serializer.FastKryoSerializationFramework] to find how @@ -159,6 +200,10 @@ gearpump { services { host = "0.0.0.0" http = 8090 + supervisor-actor-path = "" + + ## To get a detail config string with origin and comments, change this to false. + config-render-option-concise = true } ## Time out setting to start a new executor system @@ -366,10 +411,9 @@ gearpump-ui { } } - -## Configurations only visible on Windows operation system.. -gearpump-windows { +## Configurations only visible on Linux or Mac. +gearpump-linux { ### On windows, the value must be larger than 10ms, check ### https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L204 - akka.scheduler.tick-duration = 10 + akka.scheduler.tick-duration = 1 } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/core/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/core/src/main/resources/geardefault.conf b/core/src/main/resources/geardefault.conf index d775696..065e5e0 100644 --- a/core/src/main/resources/geardefault.conf +++ b/core/src/main/resources/geardefault.conf @@ -6,6 +6,79 @@ ### Configurations that are shared by all services and all user applications... gearpump { + ############################## + ### Required to change!! + ### You need to set the master cluster address here + ### + ### + ### For example, you may start three master + ### on node1: bin/master -ip node1 -port 3000 + ### on node2: bin/master -ip node2 -port 3000 + ### on node3: bin/master -ip node3 -port 3000 + ### + ### Then you need to set the cluster.masters = ["node1:3000","node2:3000","node3:3000"] + cluster { + masters = [] + } + + ############################## + ### Required to change!! + ### You need to set the actual host name here + ### + hostname = "127.0.0.1" + + + ## The installation folder of gearpump + home = "" + + + + + + serializer.pool = "io.gearpump.serializer.FastKryoSerializerPool" + + + + ## How many slots each worker contains + worker.slots = 1000 + + ## The class responsable for launching the executor process. + ## User can switch to "io.gearpump.cluster.worker.CGroupProcessLauncher" to enable CGroup support. + worker.executor-process-launcher = "io.gearpump.cluster.worker.DefaultExecutorProcessLauncher" + + ## To enable worker use cgroup to make resource isolation, + ## set gearpump.worker.executor-process-launcher = "io.gearpump.cluster.worker.CGroupProcessLauncher" + ## + ## Before enable it, you should also make sure: + ## 1. Linux version (>= 2.6.18) + ## 2. Have installed cgroup (check the file's existence:/proc/cgroups) + ## You can get more information from http://gearpump.io + ## + ## For cgroup root, it represents the root node in CGroup's hierarchythe. + ## It's full path in local file system is "${cpu_mount_point} + root". + ## The cpu_mount_point is the cpu subsystem's mount poing in CGroup. + ## The root dir should be consistent with the part configured in /etc/cgconfig.conf + cgroup { + root = "gearpump" + + ## This config only works when cgroup is enabled. + ## The value means the number of CPU cores per executor can use, -1 means no limitation. + cpu-core-limit-per-executor = 1 + } + + ## Whether to enable remote-debug mode. + ## In remote debug mode, every executor process will bind to a free port, and + ## listen for remote jvm debug. + remote-debug-executor-jvm = false + + ### When the resource cannot be allocated in the timeout, then + ### the appmaster will shutdown itself. + resource-allocation-timeout-seconds = 120 + + ## + ## Executor share same process of worker + worker.executor-share-same-jvm-as-worker = false + ########################### ### Change the dispather for tasks ### If you don't know what this is about, don't change it @@ -13,12 +86,13 @@ gearpump { task-dispatcher = "gearpump.shared-thread-pool-dispatcher" - ## The installation folder of gearpump - home = "" - - ## Time out setting to start a new executor system - ## It takes a bit longer time than expected as a new JVM is created - start-executor-system-timeout-ms = 30000 + ### verbose gc + ### turn on JVM verbose GC + ### We will use -verbose:gc -Xloggc: -XX:+PrintGCDetails + ### -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution + ### -XX:+PrintGCApplicationConcurrentTime + ### -XX:+PrintGCApplicationStoppedTime + verbose-gc = false ########################### ### Metrics setting, @@ -34,10 +108,10 @@ gearpump { ### This config will slightly impact the application's performance, larger rate often brings higher throughput. sample-rate = 1 - report-interval-ms = 15000 + report-interval-ms = 3000 + # reporter = "logfile" # reporter = "graphite" - # reporter = "akka" reporter = "akka" graphite { @@ -85,19 +159,6 @@ gearpump { } } - ## Whether to enable remote-debug mode. - ## In remote debug mode, every executor process will bind to a free port, and - ## listen for remote jvm debug. - remote-debug-executor-jvm = false - - ### verbose gc - ### turn on JVM verbose GC - ### We will use -verbose:gc -Xloggc: -XX:+PrintGCDetails - ### -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution - ### -XX:+PrintGCApplicationConcurrentTime - ### -XX:+PrintGCApplicationStoppedTime - verbose-gc = false - ####################################### ### Logging settings ####################################### @@ -107,8 +168,6 @@ gearpump { # The log dir for applications log.application.dir = "logs" - serializer.pool = "io.gearpump.serializer.FastKryoSerializerPool" - serializers { ## Use default serializer for these types "scala.collection.immutable.List" = "" @@ -133,87 +192,22 @@ gearpump { "akka.actor.LocalActorRef" = "" } - ## How many slots each worker contains - worker.slots = 100 - ## The class responsable for launching the executor process. - ## User can switch to "io.gearpump.cluster.worker.CGroupProcessLauncher" to enable CGroup support. - worker.executor-process-launcher = "io.gearpump.cluster.worker.DefaultExecutorProcessLauncher" - - ## To enable worker use cgroup to make resource isolation, - ## set gearpump.worker.executor-process-launcher = "io.gearpump.cluster.worker.CGroupProcessLauncher" - ## - ## Before enable it, you should also make sure: - ## 1. Linux version (>= 2.6.18) - ## 2. Have installed cgroup (check the file's existence:/proc/cgroups) - ## You can get more information from http://gearpump.io - ## - ## For cgroup root, it represents the root node in CGroup's hierarchythe. - ## It's full path in local file system is "${cpu_mount_point} + root". - ## The cpu_mount_point is the cpu subsystem's mount poing in CGroup. - ## The root dir should be consistent with the part configured in /etc/cgconfig.conf - cgroup { - root = "gearpump" - - ## This config only works when cgroup is enabled. - ## The value means the number of CPU cores per executor can use, -1 means no limitation. - cpu-core-limit-per-executor = 1 - } - - ## - ## Executor share same process of worker - worker.executor-share-same-jvm-as-worker = false - - ################### - ### Appmaster JVM argument configuration - ################### - appmaster { - vmargs = "" - extraClasspath = "" - } - - ################### - ### Executor argument configuration - ### Executor JVM can contains multiple tasks - ################### - executor { - vmargs = "" - extraClasspath = "" - } - - ############################## - ### Required to change!! - ### You need to set the master cluster address here - ### - ### - ### For example, you may start three master - ### on node1: bin/master -ip node1 -port 3000 - ### on node2: bin/master -ip node2 -port 3000 - ### on node3: bin/master -ip node3 -port 3000 - ### - ### Then you need to set the cluster.masters = ["node1:3000","node2:3000","node3:3000"] - cluster { - masters = [] - } - - ############################## - ### Required to change!! - ### You need to set the actual host name here - ### - hostname = "127.0.0.1" - - ### When the resource cannot be allocated in the timeout, then - ### the appmaster will shutdown itself. - resource-allocation-timeout-seconds = 10 ### client's timeout (in second) to connect to master and wait for the response masterclient.timeout = 90 + ### Gearpump has built-in serialization framework using Kryo. + ### User are allowed to use a different serialization framework, like Protobuf + ### See [io.gearpump.serializer.FastKryoSerializationFramework] to find how + ### a custom serialization framework can be defined. + serialization-framework = "io.gearpump.serializer.FastKryoSerializationFramework" + ### Define where the submitted jar file will be stored at ### This path follows the hadoop path schema ### For HDFS, use hdfs://host:port/path/ - ### If you want to store on master nodes, then use local directory, + ### For shared NFS folder, use file:///your_nfs_mapping_directory ### jarstore.rootpath = "jarstore/" will points to relative directory where master is started. ### jarstore.rootpath = "/jarstore/" will points to absolute directory on master server jarstore.rootpath = "jarstore/" @@ -244,6 +238,10 @@ gearpump { config-render-option-concise = true } + ## Time out setting to start a new executor system + ## It takes a bit longer time than expected as a new JVM is created + start-executor-system-timeout-ms = 120000 + ############################################# ## Default Configuration for Gearpump Netty transport layer ## If you don't know what is this about, don't change it @@ -258,6 +256,37 @@ gearpump { dispatcher = "gearpump.shared-thread-pool-dispatcher" } + ################### + ### Appmaster JVM argument configuration + ################### + appmaster { + vmargs = "" + extraClasspath = "" + } + + ################### + ### Executor argument configuration + ### Executor JVM can contains multiple tasks + ################### + executor { + vmargs = "" + extraClasspath = "" + } + + ### Streaming related configuration + streaming { + ## We will timeout the task if it cannot register itself to AppMaster in time. + register-task-timeout-ms = 120000 + + ## ack once after sending ack-once-every-message-count messages. + ack-once-every-message-count = 100 + + ## max pending message per task to task connection. If pending message size is + ## over this, then the flow control will not allow further sending. + ## This value should be bigger than ack-once-every-message-count + max-pending-message-count-per-connection = 1000 + } + ################################## ### Akka Dispatcher configurations ### If you don't know what is this about, don't change it @@ -275,8 +304,45 @@ gearpump { single-thread-dispatcher { type = PinnedDispatcher } + + ########################### + ### Configuration for yarn module + ########################### + yarn { + client { + package-path = "/usr/lib/gearpump/gearpump.zip" + } + + applicationmaster { + ## Memory of YarnAppMaster + command = "$JAVA_HOME/bin/java -Xmx512m" + memory = "512" + vcores = "1" + queue = "default" + } + + master { + ## Memory of master daemon + command = "$JAVA_HOME/bin/java -Xmx512m" + memory = "512" + vcores = "1" + } + + worker { + ## memory of worker daemon + command = "$JAVA_HOME/bin/java -Xmx512m" + containers = "1" + ## This also contains all memory for child executors. + memory = "4096" + vcores = "1" + } + services { + enabled = true + } + } } + ### Configuration only visible to master nodes.. gearpump-master { ## NOTE: Please add akka-related settings in gear.conf to avoid confusion in @@ -289,11 +355,14 @@ gearpump-worker { ## config overriding. } -## Configurations only visible on Windows operation system.. -gearpump-windows { -} -### configurations only visible to UI service... +######################################### +### For log level of Akka class, you need to change both log4j.properties and this entry +######################################### +#akka.loglevel = "INFO" + + +### configurations only visible to UI server. gearpump-ui { ## Security related settings gearpump.ui-security { @@ -344,7 +413,9 @@ gearpump-ui { ## ticket: https://github.com/akka/akka/issues/16295 akka.io.tcp.windows-connection-abort-workaround-enabled=off -akka.scheduler.tick-duration = 1 +### On windows, the value must be larger than 10ms, check +### https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L204 +akka.scheduler.tick-duration = 10 akka { http { @@ -453,3 +524,11 @@ akka { } } } + + +## Configurations only visible on Linux or Mac. +gearpump-linux { + ### On windows, the value must be larger than 10ms, check + ### https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L204 + akka.scheduler.tick-duration = 1 +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala index e2fa155..64d5c42 100644 --- a/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala +++ b/core/src/main/scala/io/gearpump/cluster/ClusterConfig.scala @@ -133,13 +133,15 @@ object ClusterConfig { val all = systemProperties.withFallback(user).withFallback(gear).withFallback(gearDefault) - val windows = all.getConfig(WINDOWS_CONFIG) + val linux = all.getConfig(LINUX_CONFIG) var basic = all.withoutPath(MASTER_CONFIG).withoutPath(WORKER_CONFIG). - withoutPath(UI_CONFIG).withoutPath(WINDOWS_CONFIG) + withoutPath(UI_CONFIG).withoutPath(LINUX_CONFIG) - if (akka.util.Helpers.isWindows) { - basic = windows.withFallback(basic) + if (!akka.util.Helpers.isWindows) { + + // Change the akka.scheduler.tick-duration to 10ms for Linux or Mac + basic = linux.withFallback(basic) } val master = replaceHost(all.getConfig(MASTER_CONFIG).withFallback(basic)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/core/src/main/scala/io/gearpump/util/ActorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/ActorUtil.scala b/core/src/main/scala/io/gearpump/util/ActorUtil.scala index 830d45e..b63733d 100644 --- a/core/src/main/scala/io/gearpump/util/ActorUtil.scala +++ b/core/src/main/scala/io/gearpump/util/ActorUtil.scala @@ -71,10 +71,6 @@ object ActorUtil { } } - def loadClass(className: String): Class[_<:Actor] = { - Class.forName(className).asSubclass(classOf[Actor]) - } - def actorNameForExecutor(appId : Int, executorId : Int) = "app" + appId + "-executor" + executorId /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/core/src/main/scala/io/gearpump/util/Constants.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala b/core/src/main/scala/io/gearpump/util/Constants.scala index 5e340e3..7d066e9 100644 --- a/core/src/main/scala/io/gearpump/util/Constants.scala +++ b/core/src/main/scala/io/gearpump/util/Constants.scala @@ -29,7 +29,7 @@ object Constants { val MASTER_CONFIG = "gearpump-master" val WORKER_CONFIG = "gearpump-worker" val UI_CONFIG = "gearpump-ui" - val WINDOWS_CONFIG = "gearpump-windows" + val LINUX_CONFIG = "gearpump-linux" // linux or Mac val MASTER = "master" val WORKER = "worker" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/core/src/test/resources/test.conf ---------------------------------------------------------------------- diff --git a/core/src/test/resources/test.conf b/core/src/test/resources/test.conf index edc01d9..3deb28b 100644 --- a/core/src/test/resources/test.conf +++ b/core/src/test/resources/test.conf @@ -8,6 +8,10 @@ gearpump { worker.slots = 100 + ### When the resource cannot be allocated in the timeout, then + ### the appmaster will shutdown itself. + resource-allocation-timeout-seconds = 10 + worker.executor-process-launcher = "io.gearpump.cluster.worker.DefaultExecutorProcessLauncher" cluster { @@ -21,11 +25,11 @@ gearpump { serialization-framework = "io.gearpump.serializer.FastKryoSerializationFramework" } -## Configurations only visible on Windows operation system.. -gearpump-windows { +## Configurations only visible on Linux or Mac.. +gearpump-linux { ### On windows, the value must be larger than 10ms, check ### https://github.com/akka/akka/blob/master/akka-actor/src/main/scala/akka/actor/Scheduler.scala#L204 - akka.scheduler.tick-duration = 10 + akka.scheduler.tick-duration = 1 } ### Configuration only visible to worker nodes... http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala new file mode 100644 index 0000000..6be52b2 --- /dev/null +++ b/daemon/src/main/scala/io/gearpump/cluster/embedded/EmbeddedCluster.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.cluster.embedded + +import akka.actor.{ActorRef, ActorSystem, Props} +import com.typesafe.config.{ConfigValueFactory, Config} +import io.gearpump.cluster.ClusterConfig +import io.gearpump.cluster.client.ClientContext +import io.gearpump.cluster.master.{Master => MasterActor} +import io.gearpump.cluster.worker.{Worker => WorkerActor} +import io.gearpump.util.{LogUtil, Constants, Util, ActorUtil} +import io.gearpump.util.Constants.{GEARPUMP_METRIC_ENABLED, GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, MASTER, GEARPUMP_CLUSTER_MASTERS} +import scala.collection.JavaConverters._ + + +/** + * Create a in-process cluster with single worker + */ +class EmbeddedCluster(inputConfig: Config) { + + private val workerCount: Int = 1 + private var _master: ActorRef = null + private var _system: ActorSystem = null + private var _config: Config = null + + private val LOG = LogUtil.getLogger(getClass) + + def start: Unit = { + val port = Util.findFreePort.get + val akkaConf = getConfig(inputConfig, port) + _config = akkaConf + val system = ActorSystem(MASTER, akkaConf) + + val master = system.actorOf(Props[MasterActor], MASTER) + + 0.until(workerCount).foreach { id => + system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) + } + this._master = master + this._system = system + + LOG.info("=================================") + LOG.info("Local Cluster is started at: ") + LOG.info(s" 127.0.0.1:$port") + LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port") + } + + private def getConfig(inputConfig: Config, port: Int): Config = { + val config = inputConfig. + withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). + withValue(GEARPUMP_CLUSTER_MASTERS, ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)). + withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, ConfigValueFactory.fromAnyRef(true)). + withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)). + withValue("akka.actor.provider", ConfigValueFactory.fromAnyRef("akka.cluster.ClusterActorRefProvider")) + config + } + + def newClientContext: ClientContext = { + ClientContext(_config, _system, _master) + } + + def stop: Unit = { + _system.stop(_master) + _system.shutdown() + _system.awaitTermination() + } +} + +object EmbeddedCluster{ + def apply(): EmbeddedCluster = { + new EmbeddedCluster(ClusterConfig.master()) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/daemon/src/main/scala/io/gearpump/cluster/local/LocalCluster.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/local/LocalCluster.scala b/daemon/src/main/scala/io/gearpump/cluster/local/LocalCluster.scala deleted file mode 100644 index 0276f72..0000000 --- a/daemon/src/main/scala/io/gearpump/cluster/local/LocalCluster.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.cluster.local - -import akka.actor.{ActorRef, ActorSystem, Props} -import com.typesafe.config.{ConfigValueFactory, Config} -import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.master.{Master => MasterActor} -import io.gearpump.cluster.worker.{Worker => WorkerActor} -import io.gearpump.util.{LogUtil, Constants, Util, ActorUtil} -import io.gearpump.util.Constants.{GEARPUMP_METRIC_ENABLED, GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, MASTER, GEARPUMP_CLUSTER_MASTERS} -import scala.collection.JavaConverters._ - - -/** - * Create a in-process cluster with single worker - */ -class LocalCluster(inputConfig: Config) { - - private val workerCount: Int = 1 - private var _master: ActorRef = null - private var _system: ActorSystem = null - private var _config: Config = null - - private val LOG = LogUtil.getLogger(getClass) - - def start: Unit = { - val port = Util.findFreePort.get - val akkaConf = getConfig(inputConfig, port) - _config = akkaConf - val system = ActorSystem(MASTER, akkaConf) - - val master = system.actorOf(Props[MasterActor], MASTER) - - 0.until(workerCount).foreach { id => - system.actorOf(Props(classOf[WorkerActor], master), classOf[WorkerActor].getSimpleName + id) - } - this._master = master - this._system = system - - LOG.info("=================================") - LOG.info("Local Cluster is started at: ") - LOG.info(s" 127.0.0.1:$port") - LOG.info(s"To see UI, run command: services -master 127.0.0.1:$port") - } - - private def getConfig(inputConfig: Config, port: Int): Config = { - val config = inputConfig. - withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(port)). - withValue(GEARPUMP_CLUSTER_MASTERS, ConfigValueFactory.fromIterable(List(s"127.0.0.1:$port").asJava)). - withValue(GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, ConfigValueFactory.fromAnyRef(true)). - withValue(GEARPUMP_METRIC_ENABLED, ConfigValueFactory.fromAnyRef(true)) - config - } - - def newClientContext: ClientContext = { - ClientContext(_config, _system, _master) - } - - def stop: Unit = { - _system.stop(_master) - _system.shutdown() - _system.awaitTermination() - } -} - -object LocalCluster{ - def apply(): LocalCluster = { - new LocalCluster(ClusterConfig.master()) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala ---------------------------------------------------------------------- diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala index 3f26538..d6b2479 100644 --- a/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala +++ b/daemon/src/main/scala/io/gearpump/cluster/main/Local.scala @@ -21,7 +21,6 @@ package io.gearpump.cluster.main import akka.actor.{ActorSystem, Props} import com.typesafe.config.ConfigValueFactory import io.gearpump.cluster.ClusterConfig -import io.gearpump.cluster.local.LocalCluster import io.gearpump.cluster.master.{Master => MasterActor} import io.gearpump.cluster.worker.{Worker => WorkerActor} import io.gearpump.util.Constants._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java index 05d4e27..9e1e7d5 100644 --- a/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/io/gearpump/streaming/examples/wordcountjava/WordCount.java @@ -22,7 +22,7 @@ import com.typesafe.config.Config; import io.gearpump.cluster.ClusterConfig; import io.gearpump.cluster.UserConfig; import io.gearpump.cluster.client.ClientContext; -import io.gearpump.cluster.local.LocalCluster; +import io.gearpump.cluster.embedded.EmbeddedCluster; import io.gearpump.partitioner.HashPartitioner; import io.gearpump.partitioner.Partitioner; import io.gearpump.streaming.javaapi.Graph; @@ -31,11 +31,11 @@ import io.gearpump.streaming.javaapi.StreamApplication; public class WordCount { - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException { main(ClusterConfig.defaultConfig(), args); } - public static void main(Config akkaConf, String[] args) { + public static void main(Config akkaConf, String[] args) throws InterruptedException { // For split task, we config to create two tasks int splitTaskNumber = 2; @@ -58,9 +58,12 @@ public class WordCount { StreamApplication app = new StreamApplication("wordcountJava", conf, graph); - LocalCluster localCluster = null; - if (System.getProperty("DEBUG") != null) { - localCluster = new LocalCluster(akkaConf); + EmbeddedCluster localCluster = null; + + Boolean debugMode = System.getProperty("DEBUG") != null; + + if (debugMode) { + localCluster = new EmbeddedCluster(akkaConf); localCluster.start(); } @@ -75,6 +78,11 @@ public class WordCount { } masterClient.submit(app); + + if (debugMode) { + Thread.sleep(30 * 1000); // sleep for 30 seconds. + } + masterClient.close(); if (localCluster != null) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala index 10faefe..22f8ac6 100644 --- a/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/io/gearpump/streaming/examples/wordcount/WordCount.scala @@ -18,7 +18,7 @@ package io.gearpump.streaming.examples.wordcount -import io.gearpump.cluster.local.LocalCluster +import io.gearpump.cluster.embedded.{EmbeddedCluster} import io.gearpump.streaming.{StreamApplication, Processor} import io.gearpump.cluster.UserConfig import io.gearpump.cluster.client.ClientContext @@ -34,8 +34,10 @@ object WordCount extends AkkaApp with ArgumentsParser { override val options: Array[(String, CLIOption[Any])] = Array( "split" -> CLIOption[Int]("<how many split tasks>", required = false, defaultValue = Some(1)), - "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)) - ) + "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), + "debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)), + "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", required = false, defaultValue = Some(30)) + ) def application(config: ParseResult) : StreamApplication = { val splitNum = config.getInt("split") @@ -51,8 +53,11 @@ object WordCount extends AkkaApp with ArgumentsParser { override def main(akkaConf: Config, args: Array[String]): Unit = { val config = parse(args) - val localCluster = if (System.getProperty("DEBUG") != null) { - val cluster = new LocalCluster(akkaConf: Config) + val debugMode = config.getBoolean("debug") + val sleepSeconds = config.getInt("sleep") + + val localCluster = if (debugMode) { + val cluster = new EmbeddedCluster(akkaConf: Config) cluster.start Some(cluster) } else { @@ -66,6 +71,11 @@ object WordCount extends AkkaApp with ArgumentsParser { val app = application(config) context.submit(app) + + if (debugMode) { + Thread.sleep(sleepSeconds * 1000) // sleep for 30 seconds for debugging. + } + context.close() localCluster.map(_.stop) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala index 96f1bf3..250c354 100644 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala @@ -27,7 +27,7 @@ import akka.stream.gearpump.task.SourceBridgeTask.SourceBridgeTaskClient import akka.stream.impl.StreamLayout.Module import io.gearpump.cluster.ClusterConfig import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.local.LocalCluster +import io.gearpump.cluster.embedded.{EmbeddedCluster} import io.gearpump.streaming.{StreamApplication, ProcessorId} import io.gearpump.util.Graph @@ -49,7 +49,7 @@ object RemoteGraph { */ class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem) extends SubGraphMaterializer { private val local = if (useInProcessCluster) { - val cluster = LocalCluster() + val cluster = EmbeddedCluster() cluster.start Some(cluster) } else { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala index 5831656..1c9fdfd 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala @@ -86,6 +86,14 @@ class CommandLineSpec extends TestSpecBase { success shouldBe false } + "the EmbededCluster can be used as embedded cluster in process" in { + // setup + val args = "-debug true -sleep 10" + val appId = expectSubmitAppSuccess(wordCountJar, args) + var success = commandLineClient.killApp(appId) + success shouldBe true + } + "should fail when attempting to kill a non-exist application" in { // setup val freeAppId = getNextAvailableAppId @@ -110,7 +118,7 @@ class CommandLineSpec extends TestSpecBase { commandLineClient.listApps().length + 1 } - private def expectSubmitAppSuccess(jar: String): Int = { + private def expectSubmitAppSuccess(jar: String, args: String = ""): Int = { val appId = commandLineClient.submitApp(jar) appId should not equal -1 appId http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ecf5b468/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala index 79cf442..c6564ff 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/task/TaskUtil.scala @@ -26,6 +26,7 @@ object TaskUtil { * @return resolved class */ def loadClass(className: String): Class[_<:Task] = { - Class.forName(className).asSubclass(classOf[Task]) + val loader = Thread.currentThread().getContextClassLoader() + loader.loadClass(className).asSubclass(classOf[Task]) } }
