Repository: incubator-gearpump Updated Branches: refs/heads/master 8aac07b86 -> 198366360
[GEARPUMP-369] Fix configuring number of executors Author: manuzhang <[email protected]> Closes #242 from manuzhang/fix_config_executor_num. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/19836636 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/19836636 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/19836636 Branch: refs/heads/master Commit: 19836636062145f47dcb3fcd31402b9dd8c1139b Parents: 8aac07b Author: manuzhang <[email protected]> Authored: Thu Apr 5 22:57:49 2018 +0800 Committer: manuzhang <[email protected]> Committed: Thu Apr 5 22:58:10 2018 +0800 ---------------------------------------------------------------------- .../gearpump/cluster/client/ClientContext.scala | 19 ++++++++++++------- .../gearpump/cluster/main/AppSubmitter.scala | 7 +++++-- .../experiments/storm/main/GearpumpNimbus.scala | 2 +- 3 files changed, 18 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/19836636/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala index 4840120..db286a0 100755 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala @@ -75,22 +75,27 @@ class ClientContext protected(config: Config, sys: ActorSystem, _master: ActorRe submit(app, jar, getExecutorNum) } - def submit(app: Application, jar: String, executorNum: Int): RunningApplication = { + def submit(app: Application, jar: String, executorNum: Option[Int]): RunningApplication = { val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX)) - val submissionConfig = getSubmissionConfig(config) - .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum)) + val submissionConfig = getSubmissionConfig(config, executorNum) val appDescription = AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig) val appJar = Option(jar).map(loadFile) submitApplication(SubmitApplication(appDescription, appJar)) } - private def getExecutorNum: Int = { - Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1) + private def getExecutorNum: Option[Int] = { + Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).toOption } - private def getSubmissionConfig(config: Config): Config = { - ClusterConfig.filterOutDefaultConfig(config) + private def getSubmissionConfig(config: Config, executorNum: Option[Int]): Config = { + val conf = ClusterConfig.filterOutDefaultConfig(config) + executorNum match { + case Some(n) => + conf.withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(n)) + case None => + conf + } } def listApps: AppMastersData = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/19836636/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala index 79f31eb..508448f 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala @@ -37,7 +37,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser { defaultValue = Some("")), "jar" -> CLIOption("<application>.jar", required = true), "executors" -> CLIOption[Int]("number of executor to launch", required = false, - defaultValue = Some(1)), + defaultValue = None), "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)), // For document purpose only, OPTION_CONFIG option is not used here. @@ -59,7 +59,10 @@ object AppSubmitter extends AkkaApp with ArgumentsParser { // Set jar path to be submitted to cluster System.setProperty(Constants.GEARPUMP_APP_JAR, jar) - System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString) + if (config.exists("executors")) { + System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, + config.getInt("executors").toString) + } val namePrefix = config.getString("namePrefix") if (namePrefix.nonEmpty) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/19836636/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala index 4a438d7..c66159e 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -155,7 +155,7 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe implicit val system = clientContext.system val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf) val stormConfig = gearpumpStormTopology.getStormConfig - val workerNum = StormUtil.getInt(stormConfig, Config.TOPOLOGY_WORKERS).getOrElse(1) + val workerNum = StormUtil.getInt(stormConfig, Config.TOPOLOGY_WORKERS) val processorGraph = GraphBuilder.build(gearpumpStormTopology) val config = UserConfig.empty .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
