Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 8aac07b86 -> 198366360


[GEARPUMP-369] Fix configuring number of executors

Author: manuzhang <[email protected]>

Closes #242 from manuzhang/fix_config_executor_num.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/19836636
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/19836636
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/19836636

Branch: refs/heads/master
Commit: 19836636062145f47dcb3fcd31402b9dd8c1139b
Parents: 8aac07b
Author: manuzhang <[email protected]>
Authored: Thu Apr 5 22:57:49 2018 +0800
Committer: manuzhang <[email protected]>
Committed: Thu Apr 5 22:58:10 2018 +0800

----------------------------------------------------------------------
 .../gearpump/cluster/client/ClientContext.scala  | 19 ++++++++++++-------
 .../gearpump/cluster/main/AppSubmitter.scala     |  7 +++++--
 .../experiments/storm/main/GearpumpNimbus.scala  |  2 +-
 3 files changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/19836636/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
index 4840120..db286a0 100755
--- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala
@@ -75,22 +75,27 @@ class ClientContext protected(config: Config, sys: 
ActorSystem, _master: ActorRe
     submit(app, jar, getExecutorNum)
   }
 
-  def submit(app: Application, jar: String, executorNum: Int): 
RunningApplication = {
+  def submit(app: Application, jar: String, executorNum: Option[Int]): 
RunningApplication = {
     val appName = checkAndAddNamePrefix(app.name, 
System.getProperty(GEARPUMP_APP_NAME_PREFIX))
-    val submissionConfig = getSubmissionConfig(config)
-      .withValue(APPLICATION_EXECUTOR_NUMBER, 
ConfigValueFactory.fromAnyRef(executorNum))
+    val submissionConfig = getSubmissionConfig(config, executorNum)
     val appDescription =
       AppDescription(appName, app.appMaster.getName, app.userConfig, 
submissionConfig)
     val appJar = Option(jar).map(loadFile)
     submitApplication(SubmitApplication(appDescription, appJar))
   }
 
-  private def getExecutorNum: Int = {
-    Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1)
+  private def getExecutorNum: Option[Int] = {
+    Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).toOption
   }
 
-  private def getSubmissionConfig(config: Config): Config = {
-    ClusterConfig.filterOutDefaultConfig(config)
+  private def getSubmissionConfig(config: Config, executorNum: Option[Int]): 
Config = {
+    val conf = ClusterConfig.filterOutDefaultConfig(config)
+    executorNum match {
+      case Some(n) =>
+        conf.withValue(APPLICATION_EXECUTOR_NUMBER, 
ConfigValueFactory.fromAnyRef(n))
+      case None =>
+        conf
+    }
   }
 
   def listApps: AppMastersData = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/19836636/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
index 79f31eb..508448f 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/main/AppSubmitter.scala
@@ -37,7 +37,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser {
       defaultValue = Some("")),
     "jar" -> CLIOption("<application>.jar", required = true),
     "executors" -> CLIOption[Int]("number of executor to launch", required = 
false,
-      defaultValue = Some(1)),
+      defaultValue = None),
     "verbose" -> CLIOption("<print verbose log on console>", required = false,
       defaultValue = Some(false)),
     // For document purpose only, OPTION_CONFIG option is not used here.
@@ -59,7 +59,10 @@ object AppSubmitter extends AkkaApp with ArgumentsParser {
 
       // Set jar path to be submitted to cluster
       System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
-      System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, 
config.getInt("executors").toString)
+      if (config.exists("executors")) {
+        System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER,
+          config.getInt("executors").toString)
+      }
 
       val namePrefix = config.getString("namePrefix")
       if (namePrefix.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/19836636/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
index 4a438d7..c66159e 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -155,7 +155,7 @@ class GearpumpNimbus(clientContext: ClientContext, 
stormConf: JMap[AnyRef, AnyRe
     implicit val system = clientContext.system
     val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
     val stormConfig = gearpumpStormTopology.getStormConfig
-    val workerNum = StormUtil.getInt(stormConfig, 
Config.TOPOLOGY_WORKERS).getOrElse(1)
+    val workerNum = StormUtil.getInt(stormConfig, Config.TOPOLOGY_WORKERS)
     val processorGraph = GraphBuilder.build(gearpumpStormTopology)
     val config = UserConfig.empty
       .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)

Reply via email to