Revert "fix #1895, make GearpumpNimbus a standalone service"

Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/b8f8bb12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/b8f8bb12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/b8f8bb12

Branch: refs/heads/master
Commit: b8f8bb12bd0fe51d45fc63962579ef5b71f7cbe1
Parents: 7f48c12
Author: ManuZhang <[email protected]>
Authored: Tue Feb 2 07:54:53 2016 +0800
Committer: ManuZhang <[email protected]>
Committed: Tue Feb 2 07:54:53 2016 +0800

----------------------------------------------------------------------
 docs/dev-storm.md                               |  40 +--
 .../gearpump/experiments/storm/Commands.scala   |  36 +++
 .../storm/GearpumpThriftServer.scala            | 143 +++++++++++
 .../experiments/storm/StormRunner.scala         | 137 ++++++++--
 .../experiments/storm/main/GearpumpNimbus.scala | 247 -------------------
 .../storm/main/GearpumpStormClient.scala        |  70 ------
 .../storm/topology/GearpumpStormTopology.scala  |  52 +++-
 .../experiments/storm/GearpumpNimbusSpec.scala  |  68 +++++
 .../storm/GearpumpThriftServerSpec.scala        |  48 ++++
 .../experiments/storm/StormRunnerSpec.scala     |  58 +++++
 .../topology/GearpumpStormTopologySpec.scala    |  20 +-
 .../integrationtest/storm/StormClient.scala     |  14 +-
 project/Pack.scala                              |   2 +-
 .../io/gearpump/services/MasterService.scala    |   2 +-
 14 files changed, 537 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/docs/dev-storm.md
----------------------------------------------------------------------
diff --git a/docs/dev-storm.md b/docs/dev-storm.md
index a22ed6c..a05c4ed 100644
--- a/docs/dev-storm.md
+++ b/docs/dev-storm.md
@@ -77,43 +77,23 @@ This section shows how to run an existing Storm jar in a 
local Gearpump cluster.
 1. launch a local cluster
   
    ```
-   bin/local
+   ./target/pack/bin/local
    ```
 
-2. start a local Gearpump Nimbus server 
-
-   Users need server's thrift port to submit topologies later. The thrift port 
is written to a yaml config file set with `-output` option. 
-   Users can provide an existing config file where only `nimbus.thrift.port` 
is overwritten. If not provided, a new file `app.yaml` is created with the 
config.
-
+2. submit a topology from storm-starter. 
    ```
-   bin/storm nimbus -output [conf <custom yaml config>]
+   bin/storm -verbose -config storm.yaml -jar 
storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology 
exclamation 
    ```
-   
-3. submit Storm applications
-  
-   Users can either submit Storm applications through command line or UI. 
-   
-   a. submit Storm applications through command line
-
-
-     ```
-     bin/storm app -verbose -config app.yaml -jar 
storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology 
exclamation 
-     ```
   
-     Users are able to configure their applications through following options
+   Users are able to configure their applications through following options
    
-       * `jar` - set the path of a Storm application jar
-       * `config` - submit the custom configuration file generated when 
launching Nimbus
+     * `jar` - set the path of a storm application jar
+     * `config` - submit a customized storm configuration file     
   
-   b. submit Storm application through UI
+   That's it. Check the dashboard and you should see data flowing through your 
topology. 
    
-      1. Click on the "Create" button on the applications page on UI. 
-      2. Click on the "Submit Storm Application" item in the pull down menu.
-      3. In the popup console, upload the Storm application jar and the 
configuration file generated when launching Nimbus,
-         and fill in `storm.starter.ExclamationTopology exclamation` as 
arguments.
-      4. Click on the "Submit" button   
+   *Note that submission from UI is not supported yet*. 
 
-   Either way, check the dashboard and you should see data flowing through 
your topology. 
   
 ## How is it different from running on Storm
 
@@ -165,11 +145,11 @@ Gearpump has flow control between tasks such that [sender 
cannot flood receiver]
 All Storm configurations are respected with the following priority order 
 
 ```
-defaults.yaml < custom file config < application config < component config
+defaults.yaml < storm.yaml < application config < component config < custom 
user config
 ```
 
 where
 
 * application config is submit from Storm application along with the topology 
 * component config is set in spout / bolt with `getComponentConfiguration`
-* custom file config is specified with the `-config` option when submitting 
Storm application from command line or uploaded from UI
\ No newline at end of file
+* custom user config is specified with the `-config` option when submitting 
Storm application from command line
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala
new file mode 100644
index 0000000..1d5b903
--- /dev/null
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.experiments.storm
+
+import backtype.storm.generated.{KillOptions, StormTopology, SubmitOptions}
+
+object Commands {
+
+  case class Submit(name: String, uploadedJarLocation: String, jsonConf: 
String, topology: StormTopology, options: SubmitOptions)
+
+  case class AppSubmitted(name: String, appId: Int)
+
+  case class Kill(name: String, option: KillOptions)
+
+  case class AppKilled(name: String, appId: Int)
+
+  case object GetClusterInfo
+
+  case class GetTopology(id: String)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala
new file mode 100644
index 0000000..640c20c
--- /dev/null
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.experiments.storm
+
+import java.nio.ByteBuffer
+import java.util.concurrent.TimeUnit
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import akka.actor.ActorRef
+import backtype.storm.Config
+import backtype.storm.generated._
+import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
+import backtype.storm.utils.Utils
+import io.gearpump.experiments.storm.Commands.{GetClusterInfo, _}
+import io.gearpump.experiments.storm.util.StormUtil
+import io.gearpump.util.ActorUtil.askActor
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+import scala.language.postfixOps
+
+object GearpumpThriftServer {
+  val THRIFT_PORT = StormUtil.getThriftPort
+  implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS)
+
+  private def createServer(handler: ActorRef): ThriftServer = {
+    val processor = new Nimbus.Processor[GearpumpNimbus](new 
GearpumpNimbus(handler))
+    val connectionType = ThriftConnectionType.NIMBUS
+    val config = Utils.readDefaultConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
+    config.put(Config.NIMBUS_THRIFT_PORT, s"$THRIFT_PORT")
+    new ThriftServer(config, processor, connectionType)
+  }
+
+  def apply(handler: ActorRef): GearpumpThriftServer = {
+    new GearpumpThriftServer(createServer(handler))
+  }
+
+  class GearpumpNimbus(handler: ActorRef) extends Nimbus.Iface {
+
+    override def submitTopology(name: String, uploadedJarLocation: String, 
jsonConf: String, topology: StormTopology): Unit = {
+      val ask = askActor(handler, Submit(name, uploadedJarLocation, jsonConf, 
topology, new SubmitOptions(TopologyInitialStatus.ACTIVE)))
+      Await.result(ask, 30 seconds)
+    }
+
+    override def killTopologyWithOpts(name: String, options: KillOptions): 
Unit = {
+      Await.result(askActor(handler,Kill(name, options)), 10 seconds)
+    }
+
+    override def submitTopologyWithOpts(name: String, uploadedJarLocation: 
String, jsonConf: String, topology: StormTopology, options: SubmitOptions): 
Unit = {
+      Await.result(askActor(handler,Submit(name, uploadedJarLocation, 
jsonConf, topology, options)), 10 seconds)
+    }
+
+    override def uploadChunk(location: String, chunk: ByteBuffer): Unit = {
+    }
+
+    override def getNimbusConf: String = {
+      throw new UnsupportedOperationException
+    }
+
+    override def getTopology(id: String): StormTopology = {
+      Await.result(askActor[StormTopology](handler, GetTopology(id)), 10 
seconds)
+    }
+
+    override def getTopologyConf(id: String): String = {
+      throw new UnsupportedOperationException
+    }
+
+    override def beginFileDownload(file: String): String = {
+      throw new UnsupportedOperationException
+    }
+
+    override def getUserTopology(id: String): StormTopology = getTopology(id)
+
+    override def activate(name: String): Unit = {
+      throw new UnsupportedOperationException
+    }
+
+    override def rebalance(name: String, options: RebalanceOptions): Unit = {
+      throw new UnsupportedOperationException
+    }
+
+    override def deactivate(name: String): Unit = {
+      throw new UnsupportedOperationException
+    }
+
+    override def getTopologyInfo(id: String): TopologyInfo = {
+      throw new UnsupportedOperationException
+    }
+
+    override def getTopologyInfoWithOpts(s: String, getInfoOptions: 
GetInfoOptions): TopologyInfo = {
+      throw new UnsupportedOperationException
+    }
+
+    override def killTopology(name: String): Unit = killTopologyWithOpts(name, 
new KillOptions())
+
+    override def downloadChunk(id: String): ByteBuffer = {
+      throw new UnsupportedOperationException
+    }
+
+    override def beginFileUpload(): String = {
+      "local thrift server"
+    }
+
+    override def getClusterInfo: ClusterSummary = {
+      Await.result(askActor[ClusterSummary](handler, GetClusterInfo), 10 
seconds)
+    }
+
+    override def finishFileUpload(location: String): Unit = {
+    }
+
+    override def uploadNewCredentials(s: String, credentials: Credentials): 
Unit = {
+      throw new UnsupportedOperationException
+    }
+  }
+}
+
+class GearpumpThriftServer(server: ThriftServer) extends Thread {
+
+  override def run(): Unit = {
+    server.serve()
+  }
+
+  def close(): Unit = {
+    server.stop()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
index dd8d782..84d9460 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala
@@ -18,36 +18,131 @@
 
 package io.gearpump.experiments.storm
 
-import io.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient}
-import io.gearpump.util.LogUtil
-import org.slf4j.Logger
+import java.io.File
+import java.util.{Map => JMap}
 
-object StormRunner {
-  private val LOG: Logger = LogUtil.getLogger(getClass)
+import akka.actor.{Actor, ActorSystem, Props}
+import backtype.storm.Config
+import backtype.storm.generated.{ClusterSummary, StormTopology, 
SupervisorSummary, TopologySummary}
+import com.typesafe.config.ConfigValueFactory
+import io.gearpump.cluster.UserConfig
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import io.gearpump.experiments.storm.Commands.{GetClusterInfo, _}
+import io.gearpump.experiments.storm.topology.GearpumpStormTopology
+import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants}
+import io.gearpump.streaming.StreamApplication
+import io.gearpump.util.Constants._
+import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util}
 
-  private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> 
GearpumpStormClient)
+import scala.collection.JavaConverters._
 
-  private def usage: Unit = {
-    println(commands)
-    val keys = commands.keys.toList.sorted
-    Console.err.println("Usage: " + "<" + keys.mkString("|") + ">")
+object StormRunner extends AkkaApp with ArgumentsParser {
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "jar" -> CLIOption[String]("<storm jar>", required = true),
+    "config" -> CLIOption[String]("<storm config path>", required = false),
+    "verbose" -> CLIOption("<print verbose log on console>", required = false, 
defaultValue = Some(false)))
+
+  override val remainArgs = Array("topology_name")
+
+  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
+
+    val akkaConf = updateConfig(inputAkkaConf)
+    val config = parse(args)
+
+    val verbose = config.getBoolean("verbose")
+    if (verbose) {
+      LogUtil.verboseLogToConsole
+    }
+
+    val jar = config.getString("jar")
+    val stormConfig = config.getString("config")
+    val topology = config.remainArgs(0)
+    val stormArgs = config.remainArgs.drop(1)
+
+    val system = ActorSystem("storm", akkaConf)
+    val clientContext = new ClientContext(akkaConf, system, null)
+
+    val gearpumpNimbus = system.actorOf(Props(new Handler(clientContext, jar, 
stormConfig)))
+    val thriftServer = GearpumpThriftServer(gearpumpNimbus)
+    thriftServer.start()
+
+    val stormOptions = Array("-Dstorm.options=" +
+      
s"${Config.NIMBUS_HOST}=127.0.0.1,${Config.NIMBUS_THRIFT_PORT}=${GearpumpThriftServer.THRIFT_PORT}",
+      "-Dstorm.jar=" + jar,
+      s"-D${PREFER_IPV4}=true"
+    )
+
+    val classPath = Array(System.getProperty("java.class.path"), jar)
+    val process = Util.startProcess(stormOptions, classPath, topology, 
stormArgs)
+
+    // wait till the process exit
+    val exit = process.exitValue()
+
+    thriftServer.close()
+    clientContext.close()
+    system.shutdown()
+    system.awaitTermination()
+
+    if (exit != 0) {
+      throw new Exception(s"failed to submit jar, exit code $exit, error 
summary: ${process.logger.error}")
+    }
   }
 
-  private def executeCommand(command : String, commandArgs : Array[String]): 
Unit = {
-    if (!commands.contains(command)) {
-      usage
+  import Constants._
+  private def updateConfig(config: Config): Config = {
+    val storm = s"<${GEARPUMP_HOME}>/lib/storm/*"
+    val appClassPath = s"$storm${File.pathSeparator}" + 
config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
+    val executorClassPath = s"$storm${File.pathSeparator}" + 
config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
+
+    val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, 
ConfigValueFactory.fromAnyRef(appClassPath))
+      .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, 
ConfigValueFactory.fromAnyRef(executorClassPath))
+
+    if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) {
+      val serializerConfig = 
ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK))
+      updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig)
     } else {
-      commands(command).main(commandArgs)
+      updated
     }
   }
 
-  def main(args: Array[String]) = {
-    if (args.length == 0) {
-      usage
-    } else {
-      val command = args(0)
-      val commandArgs = args.drop(1)
-      executeCommand(command, commandArgs)
+
+  class Handler(clientContext: ClientContext, jar: String, fileConfig: String) 
extends Actor {
+    private var applications = Map.empty[String, Int]
+    private var topologies = Map.empty[String, StormTopology]
+    private val LOG = LogUtil.getLogger(classOf[Handler])
+
+    implicit val system = context.system
+
+    def receive: Receive = {
+      case Kill(name, option) =>
+        topologies -= name
+        clientContext.shutdown(applications.getOrElse(name, throw new 
RuntimeException(s"topology $name not found")))
+        val appId = applications(name)
+        applications -= name
+        LOG.info(s"Killed topology $name")
+        sender ! AppKilled(name, appId)
+      case Submit(name, uploadedJarLocation, appConfig, topology, option) =>
+        topologies += name -> topology
+
+        val gearpumpStormTopology = GearpumpStormTopology(topology, appConfig, 
fileConfig)
+        val stormConfig = gearpumpStormTopology.getStormConfig
+        val processorGraph = GraphBuilder.build(gearpumpStormTopology)
+        val config = UserConfig.empty
+            .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
+            .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, 
stormConfig)
+        val app = StreamApplication(name, processorGraph, config)
+        val appId = clientContext.submit(app, jar)
+        applications += name -> appId
+        LOG.info(s"Storm Application $appId submitted")
+        sender ! AppSubmitted(name, appId)
+      case GetClusterInfo =>
+        val topologySummaryList = topologies.map { case (name, _) =>
+          new TopologySummary(name, name, 0, 0, 0, 0, "")
+        }.toSeq
+        sender ! new ClusterSummary(List[SupervisorSummary]().asJava, 0, 
topologySummaryList.asJava)
+      case GetTopology(id) =>
+        sender ! topologies(id)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
deleted file mode 100644
index 83328b4..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.main
-
-import java.io.{File, FileOutputStream, FileWriter}
-import java.nio.ByteBuffer
-import java.nio.channels.{Channels, WritableByteChannel}
-import java.util.{Map => JMap, UUID}
-
-import akka.actor.ActorSystem
-import backtype.storm.Config
-import backtype.storm.generated._
-import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer}
-import backtype.storm.utils.TimeCacheMap.ExpiredCallback
-import backtype.storm.utils.{TimeCacheMap, Utils}
-import com.typesafe.config.ConfigValueFactory
-import io.gearpump.cluster.{MasterToAppMaster, UserConfig}
-import io.gearpump.cluster.client.ClientContext
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.experiments.storm.topology.GearpumpStormTopology
-import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, 
StormUtil}
-import io.gearpump.streaming.StreamApplication
-import io.gearpump.util.{FileUtils, AkkaApp, Constants, LogUtil}
-import org.apache.storm.shade.org.json.simple.JSONValue
-import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
-import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor
-import org.slf4j.Logger
-
-import scala.collection.JavaConverters._
-
-object GearpumpNimbus extends AkkaApp with ArgumentsParser {
-  private val THRIFT_PORT = StormUtil.getThriftPort
-  private val OUTPUT = "output"
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    OUTPUT -> CLIOption[String]("<output path for configuration file>", 
required = false, defaultValue = Some("app.yaml"))
-  )
-
-  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
-    val parsed = parse(args)
-    val output = parsed.getString(OUTPUT)
-    val akkaConf = updateClientConfig(inputAkkaConf)
-    val system = ActorSystem("storm", akkaConf)
-    val clientContext = new ClientContext(akkaConf, system, null)
-    val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]]
-    val thriftConf: JMap[String, String] = Map(
-      Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME),
-      Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava
-    updateOutputStormConfig(thriftConf, output)
-    stormConf.putAll(thriftConf)
-    val thriftServer = createServer(clientContext, stormConf)
-    thriftServer.serve()
-    system.awaitTermination()
-  }
-
-  private def createServer(clientContext: ClientContext, stormConf: 
JMap[AnyRef, AnyRef]): ThriftServer = {
-    val processor = new Nimbus.Processor[GearpumpNimbus](new 
GearpumpNimbus(clientContext, stormConf))
-    val connectionType = ThriftConnectionType.NIMBUS
-    new ThriftServer(stormConf, processor, connectionType)
-  }
-
-  private def updateOutputStormConfig(conf: JMap[String, String], output: 
String): Unit = {
-    // read existing config
-    val outputConfig = Utils.findAndReadConfigFile(output, 
false).asInstanceOf[JMap[AnyRef, AnyRef]]
-    outputConfig.putAll(conf)
-    val yaml = new Yaml(new SafeConstructor)
-    val writer = new FileWriter(new File(output))
-    yaml.dump(outputConfig, writer)
-  }
-
-  import Constants._
-  private def updateClientConfig(config: Config): Config = {
-    val storm = s"<${GEARPUMP_HOME}>/lib/storm/*"
-    val appClassPath = s"$storm${File.pathSeparator}" + 
config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH)
-    val executorClassPath = s"$storm${File.pathSeparator}" + 
config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH)
-
-    val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, 
ConfigValueFactory.fromAnyRef(appClassPath))
-        .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, 
ConfigValueFactory.fromAnyRef(executorClassPath))
-
-    if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) {
-      val serializerConfig = 
ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK))
-      updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig)
-    } else {
-      updated
-    }
-  }
-
-}
-
-class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, 
AnyRef]) extends Nimbus.Iface {
-  private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus])
-  private var applications = Map.empty[String, Int]
-  private var topologies = Map.empty[String, (StormTopology, JMap[AnyRef, 
AnyRef])]
-  private val expireSeconds = StormUtil.getInt(stormConf, 
Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get
-  private val expiredCallback = new ExpiredCallback[String, 
WritableByteChannel] {
-    override def expire(k: String, v: WritableByteChannel): Unit = {
-      v.close()
-    }
-  }
-  private val fileCacheMap = new TimeCacheMap[String, 
WritableByteChannel](expireSeconds, expiredCallback)
-
-  override def submitTopology(name: String, uploadedJarLocation: String, 
jsonConf: String, topology: StormTopology): Unit = {
-    submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, new 
SubmitOptions(TopologyInitialStatus.ACTIVE))
-  }
-
-  override def submitTopologyWithOpts(name: String, uploadedJarLocation: 
String, jsonConf: String, topology: StormTopology, options: SubmitOptions): 
Unit = {
-
-    implicit val system = clientContext.system
-    val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
-    val stormConfig = gearpumpStormTopology.getStormConfig
-    val processorGraph = GraphBuilder.build(gearpumpStormTopology)
-    val config = UserConfig.empty
-          .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
-          .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, 
stormConfig)
-    val app = StreamApplication(name, processorGraph, config)
-    LOG.info(s"jar file uploaded to $uploadedJarLocation")
-    val appId = clientContext.submit(app, uploadedJarLocation)
-    applications += name -> appId
-    topologies += name -> (topology, stormConfig)
-    LOG.info(s"Storm Application $appId submitted")
-  }
-
-  override def killTopologyWithOpts(name: String, options: KillOptions): Unit 
= {
-    clientContext.shutdown(applications.getOrElse(name, throw new 
RuntimeException(s"topology $name not found")))
-    applications -= name
-    topologies -= name
-    LOG.info(s"Killed topology $name")
-  }
-
-  override def getNimbusConf: String = {
-    JSONValue.toJSONString(stormConf)
-  }
-
-  override def getTopology(name: String): StormTopology = {
-    updateApps
-    topologies.getOrElse(name,
-      throw new RuntimeException(s"topology $name not found"))._1
-  }
-
-  override def getTopologyConf(name: String): String = {
-    updateApps
-    JSONValue.toJSONString(topologies.getOrElse(name,
-      throw new RuntimeException(s"topology $name not found"))._2)
-  }
-
-  override def getUserTopology(id: String): StormTopology = getTopology(id)
-
-  override def beginFileUpload(): String = {
-    val location = s"stormjar-${UUID.randomUUID()}.jar"
-    val channel = Channels.newChannel(new FileOutputStream(location))
-    fileCacheMap.put(location, channel)
-    LOG.info(s"Uploading file from client to $location")
-    location
-  }
-
-  override def uploadChunk(location: String, chunk: ByteBuffer): Unit = {
-    if (!fileCacheMap.containsKey(location)) {
-      throw new RuntimeException(s"File for $location does not exist (or timed 
out)")
-    } else {
-      val channel = fileCacheMap.get(location)
-      channel.write(chunk)
-      fileCacheMap.put(location, channel)
-    }
-  }
-
-  override def finishFileUpload(location: String): Unit = {
-    if (!fileCacheMap.containsKey(location)) {
-      throw new RuntimeException(s"File for $location does not exist (or timed 
out)")
-    } else {
-      val channel = fileCacheMap.get(location)
-      channel.close()
-      fileCacheMap.remove(location)
-      new File(location).delete()
-    }
-  }
-
-  override def getClusterInfo: ClusterSummary = {
-    updateApps
-    val topologySummaryList = topologies.map { case (name, _) =>
-      new TopologySummary(name, name, 0, 0, 0, 0, "")
-    }.toSeq
-    new ClusterSummary(List[SupervisorSummary]().asJava, 0, 
topologySummaryList.asJava)
-  }
-
-  override def beginFileDownload(file: String): String = {
-    throw new UnsupportedOperationException
-  }
-
-  override def uploadNewCredentials(s: String, credentials: Credentials): Unit 
= {
-    throw new UnsupportedOperationException
-  }
-    override def activate(name: String): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  override def rebalance(name: String, options: RebalanceOptions): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  override def deactivate(name: String): Unit = {
-    throw new UnsupportedOperationException
-  }
-
-  override def getTopologyInfo(name: String): TopologyInfo = {
-    throw new UnsupportedOperationException
-  }
-
-  override def getTopologyInfoWithOpts(s: String, getInfoOptions: 
GetInfoOptions): TopologyInfo = {
-    throw new UnsupportedOperationException
-  }
-
-  override def killTopology(name: String): Unit = killTopologyWithOpts(name, 
new KillOptions())
-
-  override def downloadChunk(name: String): ByteBuffer = {
-    throw new UnsupportedOperationException
-  }
-
-  private def updateApps: Unit = {
-    clientContext.listApps.appMasters.foreach { app =>
-      val name = app.appName
-      if (applications.contains(app.appName)) {
-        if (app.status != MasterToAppMaster.AppMasterActive) {
-          applications -= name
-          topologies -= name
-        }
-      }
-    }
-  }
-}
-
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
deleted file mode 100644
index b813ae0..0000000
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.gearpump.experiments.storm.main
-
-import backtype.storm.Config
-import backtype.storm.utils.Utils
-import io.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import io.gearpump.util.Constants._
-import io.gearpump.util.{AkkaApp, LogUtil, Util}
-
-object GearpumpStormClient extends AkkaApp with ArgumentsParser {
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "jar" -> CLIOption[String]("<storm jar>", required = true),
-    "config" -> CLIOption[Int]("<storm config file>", required = true),
-    "verbose" -> CLIOption("<print verbose log on console>", required = false, 
defaultValue = Some(false)))
-
-  override def main(inputAkkaConf: Config, args: Array[String]): Unit = {
-    val config = parse(args)
-
-    val verbose = config.getBoolean("verbose")
-    if (verbose) {
-      LogUtil.verboseLogToConsole
-    }
-
-    val jar = config.getString("jar")
-    val stormConfig = config.getString("config")
-    val topology = config.remainArgs(0)
-    val stormArgs = config.remainArgs.drop(1)
-    val stormOptions = Array(
-      s"-Dstorm.options=${getThriftOptions(stormConfig)}",
-      s"-Dstorm.jar=$jar",
-      s"-Dstorm.config.file=$stormConfig",
-      s"-D${PREFER_IPV4}=true"
-    )
-
-    val classPath = Array(s"${System.getProperty(GEARPUMP_HOME)}/lib/storm/*", 
jar)
-    val process = Util.startProcess(stormOptions, classPath, topology, 
stormArgs)
-
-    // wait till the process exit
-    val exit = process.exitValue()
-
-    if (exit != 0) {
-      throw new Exception(s"failed to submit jar, exit code $exit, error 
summary: ${process.logger.error}")
-    }
-  }
-
-  private def getThriftOptions(stormConfig: String): String = {
-    val config = Utils.findAndReadConfigFile(stormConfig, true)
-    val host = config.get(Config.NIMBUS_HOST)
-    val thriftPort = config.get(Config.NIMBUS_THRIFT_PORT)
-    s"${Config.NIMBUS_HOST}=$host,${Config.NIMBUS_THRIFT_PORT}=$thriftPort"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
index b88a200..996498b 100644
--- 
a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
+++ 
b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala
@@ -18,6 +18,7 @@
 
 package io.gearpump.experiments.storm.topology
 
+import java.io._
 import java.lang.{Iterable => JIterable}
 import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, 
Map => JMap}
 
@@ -34,6 +35,8 @@ import io.gearpump.experiments.storm.util.StormUtil._
 import io.gearpump.streaming.Processor
 import io.gearpump.streaming.task.Task
 import io.gearpump.util.LogUtil
+import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
+import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor
 import org.slf4j.Logger
 
 import scala.collection.JavaConversions._
@@ -42,18 +45,43 @@ object GearpumpStormTopology {
   private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology])
 
   def apply(
-      name: String,
       topology: StormTopology,
-      appConfigInJson: String)(implicit system: ActorSystem): 
GearpumpStormTopology = {
+      appConfigInJson: String,
+      fileConfig: String)(implicit system: ActorSystem): GearpumpStormTopology 
= {
     new GearpumpStormTopology(
-      name,
       topology,
       Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]],
-      parseJsonStringToMap(appConfigInJson)
+      parseJsonStringToMap(appConfigInJson),
+      readStormConfig(fileConfig)
     )
 
   }
 
+  /**
+   * @param configFile user provided local config file
+   * @return a config Map loaded from config file
+   */
+  private def readStormConfig(configFile: String): JMap[AnyRef, AnyRef] = {
+    var ret: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef]
+    try {
+      val yaml = new Yaml(new SafeConstructor)
+      val input: InputStream = new FileInputStream(configFile)
+      try {
+        ret = yaml.load(new 
InputStreamReader(input)).asInstanceOf[JMap[AnyRef, AnyRef]]
+      } catch {
+        case e: IOException =>
+          LOG.warn(s"failed to load config file $configFile")
+      } finally {
+        input.close()
+      }
+    } catch {
+      case e: FileNotFoundException =>
+        LOG.warn(s"failed to find config file $configFile")
+      case t: Throwable =>
+        LOG.error(t.getMessage)
+    }
+    ret
+  }
 }
 
 /**
@@ -63,20 +91,21 @@ object GearpumpStormTopology {
  *   3. provides interface for Gearpump applications to use Storm topology
  *
  * an implicit ActorSystem is required to create Gearpump processors
- * @param name topology name
  * @param topology Storm topology
- * @param sysConfig configs from "defaults.yaml" and custom config file
+ * @param sysConfig configs from "defaults.yaml" and "storm.yaml"
  * @param appConfig config submitted from user application
+ * @param fileConfig custom file config set by user in command line
  */
 private[storm] class GearpumpStormTopology(
-    name: String,
     topology: StormTopology,
     sysConfig: JMap[AnyRef, AnyRef],
-    appConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) {
+    appConfig: JMap[AnyRef, AnyRef],
+    fileConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) {
+  import io.gearpump.experiments.storm.topology.GearpumpStormTopology._
 
   private val spouts = topology.get_spouts()
   private val bolts = topology.get_bolts()
-  private val stormConfig = mergeConfigs(sysConfig, appConfig, 
getComponentConfigs(spouts, bolts))
+  private val stormConfig = mergeConfigs(sysConfig, appConfig, fileConfig, 
getComponentConfigs(spouts, bolts))
   private val spoutProcessors = spouts.map { case (id, spout) =>
     id -> spoutToProcessor(id, spout, stormConfig.toMap) }.toMap
   private val boltProcessors = bolts.map { case (id, bolt) =>
@@ -85,7 +114,7 @@ private[storm] class GearpumpStormTopology(
 
   /**
    * @return merged Storm config with priority
-   *    defaults.yaml < custom file config < application config < component 
config
+   *    defaults.yaml < storm.yaml < application config < component config < 
custom file config
    */
   def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig
 
@@ -110,12 +139,13 @@ private[storm] class GearpumpStormTopology(
   private def mergeConfigs(
       sysConfig: JMap[AnyRef, AnyRef],
       appConfig: JMap[AnyRef, AnyRef],
+      fileConfig: JMap[AnyRef, AnyRef],
       componentConfigs: Iterable[JMap[AnyRef, AnyRef]]): JMap[AnyRef, AnyRef] 
= {
     val allConfig = new JHashMap[AnyRef, AnyRef]
     allConfig.putAll(sysConfig)
     allConfig.putAll(appConfig)
     allConfig.putAll(getMergedComponentConfig(componentConfigs, 
allConfig.toMap))
-    allConfig.put(Config.TOPOLOGY_NAME, name)
+    allConfig.putAll(fileConfig)
     allConfig
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala
new file mode 100644
index 0000000..20b63b3
--- /dev/null
+++ 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.experiments.storm
+
+import akka.actor.ActorSystem
+import akka.testkit.TestProbe
+import backtype.storm.generated.StormTopology
+import io.gearpump.experiments.storm.util.TopologyUtil
+import io.gearpump.cluster.TestUtil
+import Commands.{GetTopology, Kill, Submit}
+import GearpumpThriftServer.GearpumpNimbus
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, WordSpec}
+
+import scala.concurrent.Future
+
+class GearpumpNimbusSpec extends WordSpec with Matchers with MockitoSugar {
+
+  "GearpumpNimbus" should {
+    "submit and kill topology through ClientContext" in {
+
+      implicit val system = ActorSystem("storm-test", TestUtil.DEFAULT_CONFIG)
+      implicit val dispatcher = system.dispatcher
+
+      val handler = TestProbe()
+      val gearpumpNimbus = new GearpumpNimbus(handler.ref)
+
+      val appId = 0
+      val name = "test"
+      val uploadedJarLocation = "local"
+      val jsonConf = "storm_json_conf"
+      val topology = TopologyUtil.getTestTopology
+
+      Future(gearpumpNimbus.submitTopology(name, uploadedJarLocation, 
jsonConf, topology))
+      handler.expectMsgType[Submit]
+
+      Future(gearpumpNimbus.getTopology(name))
+      handler.expectMsgType[GetTopology]
+      handler.reply(new StormTopology)
+
+      Future(gearpumpNimbus.getUserTopology(name))
+      handler.expectMsgType[GetTopology]
+      handler.reply(new StormTopology)
+
+      Future(gearpumpNimbus.killTopology(name))
+      handler.expectMsgType[Kill]
+
+      system.shutdown()
+      system.awaitTermination()
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala
new file mode 100644
index 0000000..b1736f0
--- /dev/null
+++ 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.gearpump.experiments.storm
+
+import backtype.storm.security.auth.ThriftServer
+import org.mockito.Mockito._
+import org.scalatest.WordSpec
+import org.scalatest.mock.MockitoSugar
+
+class GearpumpThriftServerSpec extends WordSpec with MockitoSugar {
+
+  "GearpumpThriftServer" should {
+    "run ThriftServer.serve" in {
+      val tServer = mock[ThriftServer]
+      val thriftServer = new GearpumpThriftServer(tServer)
+
+      thriftServer.run()
+
+      verify(tServer).serve()
+    }
+
+    "stop ThriftServer" in {
+      val tServer = mock[ThriftServer]
+      val thriftServer = new GearpumpThriftServer(tServer)
+
+      thriftServer.close()
+
+      verify(tServer).stop()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala
new file mode 100644
index 0000000..95cf279
--- /dev/null
+++ 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package io.gearpump.experiments.storm
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.TestProbe
+import io.gearpump.cluster.TestUtil
+import io.gearpump.cluster.client.ClientContext
+import io.gearpump.experiments.storm.Commands.{AppKilled, AppSubmitted, Kill, 
Submit}
+import io.gearpump.experiments.storm.StormRunner.Handler
+import io.gearpump.experiments.storm.util.TopologyUtil
+import org.mockito.Matchers._
+import org.mockito.Mockito
+import org.scalatest.{FlatSpec, Matchers}
+
+class StormRunnerSpec extends FlatSpec with Matchers {
+  it should "handle submit/kill correctly"  in {
+    val conf = TestUtil.DEFAULT_CONFIG
+    implicit val system = ActorSystem("storm-test", conf)
+
+    val uploadedJarLocation = "local"
+    val jsonConf = "storm_json_conf"
+    val topology = TopologyUtil.getTestTopology
+
+    implicit val dispatcher = system.dispatcher
+    val clientContext = Mockito.mock(classOf[ClientContext])
+    Mockito.when(clientContext.submit(any(), any())).thenReturn(0)
+    val handler = system.actorOf(Props(new Handler(clientContext, "jar", 
"user_config")))
+    val client = TestProbe()
+
+    client.send(handler, Submit("app", uploadedJarLocation, jsonConf, 
topology, null))
+    client.expectMsg(AppSubmitted("app", 0))
+
+
+    client.send(handler, Kill("app", null))
+    client.expectMsg(AppKilled("app", 0))
+
+    system.shutdown()
+    system.awaitTermination()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
index 8f10886..d1c330b 100644
--- 
a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
+++ 
b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala
@@ -20,7 +20,6 @@ package io.gearpump.experiments.storm.topology
 
 import java.util.{HashMap => JHashMap, Map => JMap}
 
-import backtype.storm.Config
 import io.gearpump.experiments.storm.processor.StormProcessor
 import io.gearpump.experiments.storm.producer.StormProducer
 import io.gearpump.experiments.storm.util.TopologyUtil
@@ -41,27 +40,28 @@ class GearpumpStormTopologySpec extends WordSpec with 
Matchers with MockitoSugar
       val sysConfig = newJavaConfig(name, sysVal)
       val appVal = "app"
       val appConfig = newJavaConfig(name, appVal)
+      val fileVal = "file"
+      val fileConfig = newJavaConfig(name, fileVal)
 
       implicit val system = MockUtil.system
-      val topology1 = new GearpumpStormTopology("topology1", stormTopology, 
newEmptyConfig, newEmptyConfig)
-      topology1.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology1"
-      topology1.getStormConfig should not contain name
+      val topology1 = new GearpumpStormTopology(stormTopology, newEmptyConfig, 
newEmptyConfig, newEmptyConfig)
+      topology1.getStormConfig shouldBe empty
 
-      val topology2 = new GearpumpStormTopology("topology2", stormTopology, 
sysConfig, newEmptyConfig)
-      topology2.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology2"
+      val topology2 = new GearpumpStormTopology(stormTopology, sysConfig, 
newEmptyConfig, newEmptyConfig)
       topology2.getStormConfig.get(name) shouldBe sysVal
 
-      val topology3 = new GearpumpStormTopology("topology3", stormTopology, 
sysConfig, appConfig)
-      topology3.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology3"
+      val topology3 = new GearpumpStormTopology(stormTopology, sysConfig, 
appConfig, newEmptyConfig)
       topology3.getStormConfig.get(name) shouldBe appVal
 
+      val topology4 = new GearpumpStormTopology(stormTopology, sysConfig, 
appConfig, fileConfig)
+      topology4.getStormConfig.get(name) shouldBe fileVal
     }
 
     "create Gearpump processors from Storm topology" in {
       val stormTopology = TopologyUtil.getTestTopology
       implicit val system = MockUtil.system
       val gearpumpStormTopology =
-        GearpumpStormTopology("app", stormTopology, null)
+        GearpumpStormTopology(stormTopology, null, "")
       val processors = gearpumpStormTopology.getProcessors
       stormTopology.get_spouts().foreach { case (spoutId, _) =>
         val processor = processors(spoutId)
@@ -80,7 +80,7 @@ class GearpumpStormTopologySpec extends WordSpec with 
Matchers with MockitoSugar
       implicit val system = MockUtil.system
       val sysConfig = new JHashMap[AnyRef, AnyRef]
       val gearpumpStormTopology =
-        GearpumpStormTopology("app", stormTopology, null)
+        GearpumpStormTopology(stormTopology, null, "")
       val targets0 = gearpumpStormTopology.getTargets("1")
       targets0 should contain key "3"
       targets0 should contain key "4"

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
index 15acf4e..ed792f4 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala
@@ -20,6 +20,7 @@ package io.gearpump.integrationtest.storm
 
 
 import backtype.storm.utils.{Utils, DRPCClient}
+import io.gearpump.experiments.storm.GearpumpThriftServer
 import io.gearpump.integrationtest.Docker
 import io.gearpump.integrationtest.minicluster.BaseContainer
 import org.apache.log4j.Logger
@@ -29,28 +30,23 @@ class StormClient(masterAddrs: Seq[(String, Int)]) {
 
   private val LOG = Logger.getLogger(getClass)
   private val STORM_HOST = "storm0"
-  private val STORM_NIMBUS = "/opt/start storm nimbus"
-  private val STORM_APP = "/opt/start storm app"
+  private val STORM_CMD = "/opt/start storm"
   private val STORM_DRPC = "storm-drpc"
   private val CONFIG_FILE = "storm.yaml"
+  private val NIMBUS_THRIFT_PORT = GearpumpThriftServer.THRIFT_PORT
   private val DRPC_PORT = 3772
   private val DRPC_INVOCATIONS_PORT = 3773
 
   private val container = new BaseContainer(STORM_HOST, STORM_DRPC, 
masterAddrs,
-    tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT))
+    tunnelPorts = Set(NIMBUS_THRIFT_PORT, DRPC_PORT, DRPC_INVOCATIONS_PORT))
 
   def start(): Unit = {
     container.createAndStart()
-    startNimbus
-  }
-
-  private def startNimbus: String = {
-    Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_NIMBUS -output 
$CONFIG_FILE")
   }
 
   def submitStormApp(jar: String, mainClass: String, args: String = ""): Int = 
{
     try {
-      Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_APP -config 
$CONFIG_FILE " +
+      Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_CMD -config 
$CONFIG_FILE " +
         s"-jar $jar $mainClass $args").split("\n")
         .filter(_.contains("The application id is ")).head.split(" 
").last.toInt
     } catch {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/project/Pack.scala
----------------------------------------------------------------------
diff --git a/project/Pack.scala b/project/Pack.scala
index a3852c4..b242b36 100644
--- a/project/Pack.scala
+++ b/project/Pack.scala
@@ -59,7 +59,7 @@ object Pack extends sbt.Build {
             "worker" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", 
"-DlogFilename=worker", "-Dgearpump.home=${PROG_HOME}", 
"-Djava.rmi.server.hostname=localhost"),
             "services" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", 
"-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"),
             "yarnclient" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", 
"-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"),
-            "storm" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", 
"-Dgearpump.home=${PROG_HOME}")
+            "storm" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", 
"-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost")
           ),
           packLibDir := Map(
             "lib" -> new ProjectsToPack(core.id, streaming.id),

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala 
b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
index aa90726..e2b2c47 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
@@ -221,7 +221,7 @@ object MasterService {
    */
   def submitStormApp(jar: Option[File], stormConf: Option[File], args: String, 
systemConfig: Config): Boolean = {
     submitAndDeleteTempFiles(
-      "io.gearpump.experiments.storm.main.GearpumpStormClient",
+      "io.gearpump.experiments.storm.StormRunner",
       argsArray = spaceSeparatedArgumentsToArray(args),
       fileMap = Map("jar" -> jar, "config" -> 
stormConf).filter(_._2.isDefined).mapValues(_.get),
       classPath = getStormApplicationClassPath,

Reply via email to