fix #1895, make GearpumpNimbus a standalone service
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/502dbae9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/502dbae9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/502dbae9 Branch: refs/heads/master Commit: 502dbae996440cfde7b2b9473a44af9cd1714cc8 Parents: 20c62e5 Author: manuzhang <[email protected]> Authored: Fri Jan 29 16:55:53 2016 +0800 Committer: manuzhang <[email protected]> Committed: Mon Feb 1 16:14:27 2016 +0800 ---------------------------------------------------------------------- docs/dev-storm.md | 40 ++- .../gearpump/experiments/storm/Commands.scala | 36 --- .../storm/GearpumpThriftServer.scala | 143 ----------- .../experiments/storm/StormRunner.scala | 137 ++--------- .../experiments/storm/main/GearpumpNimbus.scala | 245 +++++++++++++++++++ .../storm/main/GearpumpStormClient.scala | 69 ++++++ .../storm/topology/GearpumpStormTopology.scala | 52 +--- .../experiments/storm/GearpumpNimbusSpec.scala | 68 ----- .../storm/GearpumpThriftServerSpec.scala | 48 ---- .../experiments/storm/StormRunnerSpec.scala | 58 ----- .../topology/GearpumpStormTopologySpec.scala | 20 +- .../integrationtest/storm/StormClient.scala | 14 +- project/Pack.scala | 2 +- .../io/gearpump/services/MasterService.scala | 2 +- 14 files changed, 397 insertions(+), 537 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/docs/dev-storm.md ---------------------------------------------------------------------- diff --git a/docs/dev-storm.md b/docs/dev-storm.md index a05c4ed..a22ed6c 100644 --- a/docs/dev-storm.md +++ b/docs/dev-storm.md @@ -77,23 +77,43 @@ This section shows how to run an existing Storm jar in a local Gearpump cluster. 1. launch a local cluster ``` - ./target/pack/bin/local + bin/local ``` -2. submit a topology from storm-starter. +2. start a local Gearpump Nimbus server + + Users need server's thrift port to submit topologies later. The thrift port is written to a yaml config file set with `-output` option. + Users can provide an existing config file where only `nimbus.thrift.port` is overwritten. If not provided, a new file `app.yaml` is created with the config. + ``` - bin/storm -verbose -config storm.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation + bin/storm nimbus -output [conf <custom yaml config>] ``` + +3. submit Storm applications + + Users can either submit Storm applications through command line or UI. + + a. submit Storm applications through command line + + + ``` + bin/storm app -verbose -config app.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation + ``` - Users are able to configure their applications through following options + Users are able to configure their applications through following options - * `jar` - set the path of a storm application jar - * `config` - submit a customized storm configuration file + * `jar` - set the path of a Storm application jar + * `config` - submit the custom configuration file generated when launching Nimbus - That's it. Check the dashboard and you should see data flowing through your topology. + b. submit Storm application through UI - *Note that submission from UI is not supported yet*. + 1. Click on the "Create" button on the applications page on UI. + 2. Click on the "Submit Storm Application" item in the pull down menu. + 3. In the popup console, upload the Storm application jar and the configuration file generated when launching Nimbus, + and fill in `storm.starter.ExclamationTopology exclamation` as arguments. + 4. Click on the "Submit" button + Either way, check the dashboard and you should see data flowing through your topology. ## How is it different from running on Storm @@ -145,11 +165,11 @@ Gearpump has flow control between tasks such that [sender cannot flood receiver] All Storm configurations are respected with the following priority order ``` -defaults.yaml < storm.yaml < application config < component config < custom user config +defaults.yaml < custom file config < application config < component config ``` where * application config is submit from Storm application along with the topology * component config is set in spout / bolt with `getComponentConfiguration` -* custom user config is specified with the `-config` option when submitting Storm application from command line \ No newline at end of file +* custom file config is specified with the `-config` option when submitting Storm application from command line or uploaded from UI \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala deleted file mode 100644 index 1d5b903..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm - -import backtype.storm.generated.{KillOptions, StormTopology, SubmitOptions} - -object Commands { - - case class Submit(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions) - - case class AppSubmitted(name: String, appId: Int) - - case class Kill(name: String, option: KillOptions) - - case class AppKilled(name: String, appId: Int) - - case object GetClusterInfo - - case class GetTopology(id: String) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala deleted file mode 100644 index 640c20c..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm - -import java.nio.ByteBuffer -import java.util.concurrent.TimeUnit -import java.util.{HashMap => JHashMap, Map => JMap} - -import akka.actor.ActorRef -import backtype.storm.Config -import backtype.storm.generated._ -import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer} -import backtype.storm.utils.Utils -import io.gearpump.experiments.storm.Commands.{GetClusterInfo, _} -import io.gearpump.experiments.storm.util.StormUtil -import io.gearpump.util.ActorUtil.askActor - -import scala.concurrent.Await -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ -import scala.language.postfixOps - -object GearpumpThriftServer { - val THRIFT_PORT = StormUtil.getThriftPort - implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) - - private def createServer(handler: ActorRef): ThriftServer = { - val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(handler)) - val connectionType = ThriftConnectionType.NIMBUS - val config = Utils.readDefaultConfig().asInstanceOf[JMap[AnyRef, AnyRef]] - config.put(Config.NIMBUS_THRIFT_PORT, s"$THRIFT_PORT") - new ThriftServer(config, processor, connectionType) - } - - def apply(handler: ActorRef): GearpumpThriftServer = { - new GearpumpThriftServer(createServer(handler)) - } - - class GearpumpNimbus(handler: ActorRef) extends Nimbus.Iface { - - override def submitTopology(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology): Unit = { - val ask = askActor(handler, Submit(name, uploadedJarLocation, jsonConf, topology, new SubmitOptions(TopologyInitialStatus.ACTIVE))) - Await.result(ask, 30 seconds) - } - - override def killTopologyWithOpts(name: String, options: KillOptions): Unit = { - Await.result(askActor(handler,Kill(name, options)), 10 seconds) - } - - override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = { - Await.result(askActor(handler,Submit(name, uploadedJarLocation, jsonConf, topology, options)), 10 seconds) - } - - override def uploadChunk(location: String, chunk: ByteBuffer): Unit = { - } - - override def getNimbusConf: String = { - throw new UnsupportedOperationException - } - - override def getTopology(id: String): StormTopology = { - Await.result(askActor[StormTopology](handler, GetTopology(id)), 10 seconds) - } - - override def getTopologyConf(id: String): String = { - throw new UnsupportedOperationException - } - - override def beginFileDownload(file: String): String = { - throw new UnsupportedOperationException - } - - override def getUserTopology(id: String): StormTopology = getTopology(id) - - override def activate(name: String): Unit = { - throw new UnsupportedOperationException - } - - override def rebalance(name: String, options: RebalanceOptions): Unit = { - throw new UnsupportedOperationException - } - - override def deactivate(name: String): Unit = { - throw new UnsupportedOperationException - } - - override def getTopologyInfo(id: String): TopologyInfo = { - throw new UnsupportedOperationException - } - - override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = { - throw new UnsupportedOperationException - } - - override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions()) - - override def downloadChunk(id: String): ByteBuffer = { - throw new UnsupportedOperationException - } - - override def beginFileUpload(): String = { - "local thrift server" - } - - override def getClusterInfo: ClusterSummary = { - Await.result(askActor[ClusterSummary](handler, GetClusterInfo), 10 seconds) - } - - override def finishFileUpload(location: String): Unit = { - } - - override def uploadNewCredentials(s: String, credentials: Credentials): Unit = { - throw new UnsupportedOperationException - } - } -} - -class GearpumpThriftServer(server: ThriftServer) extends Thread { - - override def run(): Unit = { - server.serve() - } - - def close(): Unit = { - server.stop() - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala index 84d9460..dd8d782 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala @@ -18,131 +18,36 @@ package io.gearpump.experiments.storm -import java.io.File -import java.util.{Map => JMap} +import io.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient} +import io.gearpump.util.LogUtil +import org.slf4j.Logger -import akka.actor.{Actor, ActorSystem, Props} -import backtype.storm.Config -import backtype.storm.generated.{ClusterSummary, StormTopology, SupervisorSummary, TopologySummary} -import com.typesafe.config.ConfigValueFactory -import io.gearpump.cluster.UserConfig -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.experiments.storm.Commands.{GetClusterInfo, _} -import io.gearpump.experiments.storm.topology.GearpumpStormTopology -import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants} -import io.gearpump.streaming.StreamApplication -import io.gearpump.util.Constants._ -import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util} +object StormRunner { + private val LOG: Logger = LogUtil.getLogger(getClass) -import scala.collection.JavaConverters._ + private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> GearpumpStormClient) -object StormRunner extends AkkaApp with ArgumentsParser { - override val options: Array[(String, CLIOption[Any])] = Array( - "jar" -> CLIOption[String]("<storm jar>", required = true), - "config" -> CLIOption[String]("<storm config path>", required = false), - "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false))) - - override val remainArgs = Array("topology_name") - - override def main(inputAkkaConf: Config, args: Array[String]): Unit = { - - val akkaConf = updateConfig(inputAkkaConf) - val config = parse(args) - - val verbose = config.getBoolean("verbose") - if (verbose) { - LogUtil.verboseLogToConsole - } - - val jar = config.getString("jar") - val stormConfig = config.getString("config") - val topology = config.remainArgs(0) - val stormArgs = config.remainArgs.drop(1) - - val system = ActorSystem("storm", akkaConf) - val clientContext = new ClientContext(akkaConf, system, null) - - val gearpumpNimbus = system.actorOf(Props(new Handler(clientContext, jar, stormConfig))) - val thriftServer = GearpumpThriftServer(gearpumpNimbus) - thriftServer.start() - - val stormOptions = Array("-Dstorm.options=" + - s"${Config.NIMBUS_HOST}=127.0.0.1,${Config.NIMBUS_THRIFT_PORT}=${GearpumpThriftServer.THRIFT_PORT}", - "-Dstorm.jar=" + jar, - s"-D${PREFER_IPV4}=true" - ) - - val classPath = Array(System.getProperty("java.class.path"), jar) - val process = Util.startProcess(stormOptions, classPath, topology, stormArgs) - - // wait till the process exit - val exit = process.exitValue() - - thriftServer.close() - clientContext.close() - system.shutdown() - system.awaitTermination() - - if (exit != 0) { - throw new Exception(s"failed to submit jar, exit code $exit, error summary: ${process.logger.error}") - } + private def usage: Unit = { + println(commands) + val keys = commands.keys.toList.sorted + Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") } - import Constants._ - private def updateConfig(config: Config): Config = { - val storm = s"<${GEARPUMP_HOME}>/lib/storm/*" - val appClassPath = s"$storm${File.pathSeparator}" + config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH) - val executorClassPath = s"$storm${File.pathSeparator}" + config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH) - - val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath)) - .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(executorClassPath)) - - if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) { - val serializerConfig = ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) - updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig) + private def executeCommand(command : String, commandArgs : Array[String]): Unit = { + if (!commands.contains(command)) { + usage } else { - updated + commands(command).main(commandArgs) } } - - class Handler(clientContext: ClientContext, jar: String, fileConfig: String) extends Actor { - private var applications = Map.empty[String, Int] - private var topologies = Map.empty[String, StormTopology] - private val LOG = LogUtil.getLogger(classOf[Handler]) - - implicit val system = context.system - - def receive: Receive = { - case Kill(name, option) => - topologies -= name - clientContext.shutdown(applications.getOrElse(name, throw new RuntimeException(s"topology $name not found"))) - val appId = applications(name) - applications -= name - LOG.info(s"Killed topology $name") - sender ! AppKilled(name, appId) - case Submit(name, uploadedJarLocation, appConfig, topology, option) => - topologies += name -> topology - - val gearpumpStormTopology = GearpumpStormTopology(topology, appConfig, fileConfig) - val stormConfig = gearpumpStormTopology.getStormConfig - val processorGraph = GraphBuilder.build(gearpumpStormTopology) - val config = UserConfig.empty - .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology) - .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig) - val app = StreamApplication(name, processorGraph, config) - val appId = clientContext.submit(app, jar) - applications += name -> appId - LOG.info(s"Storm Application $appId submitted") - sender ! AppSubmitted(name, appId) - case GetClusterInfo => - val topologySummaryList = topologies.map { case (name, _) => - new TopologySummary(name, name, 0, 0, 0, 0, "") - }.toSeq - sender ! new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava) - case GetTopology(id) => - sender ! topologies(id) + def main(args: Array[String]) = { + if (args.length == 0) { + usage + } else { + val command = args(0) + val commandArgs = args.drop(1) + executeCommand(command, commandArgs) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala new file mode 100644 index 0000000..4204580 --- /dev/null +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.experiments.storm.main + +import java.io.{File, FileOutputStream, FileWriter} +import java.nio.ByteBuffer +import java.nio.channels.{Channels, WritableByteChannel} +import java.util.{Map => JMap, UUID} + +import akka.actor.ActorSystem +import backtype.storm.Config +import backtype.storm.generated._ +import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer} +import backtype.storm.utils.TimeCacheMap.ExpiredCallback +import backtype.storm.utils.{TimeCacheMap, Utils} +import com.typesafe.config.ConfigValueFactory +import io.gearpump.cluster.{MasterToAppMaster, UserConfig} +import io.gearpump.cluster.client.ClientContext +import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import io.gearpump.experiments.storm.topology.GearpumpStormTopology +import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil} +import io.gearpump.streaming.StreamApplication +import io.gearpump.util.{FileUtils, AkkaApp, Constants, LogUtil} +import org.apache.storm.shade.org.json.simple.JSONValue +import org.apache.storm.shade.org.yaml.snakeyaml.Yaml +import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor +import org.slf4j.Logger + +import scala.collection.JavaConverters._ + +object GearpumpNimbus extends AkkaApp with ArgumentsParser { + private val THRIFT_PORT = StormUtil.getThriftPort + private val OUTPUT = "output" + + override val options: Array[(String, CLIOption[Any])] = Array( + OUTPUT -> CLIOption[String]("<output path for configuration file>", required = false, defaultValue = Some("app.yaml")) + ) + + override def main(inputAkkaConf: Config, args: Array[String]): Unit = { + val parsed = parse(args) + val output = parsed.getString(OUTPUT) + val akkaConf = updateClientConfig(inputAkkaConf) + val system = ActorSystem("storm", akkaConf) + val clientContext = new ClientContext(akkaConf, system, null) + val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]] + val thriftConf: JMap[String, String] = Map(Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava + updateOutputStormConfig(thriftConf, output) + stormConf.putAll(thriftConf) + val thriftServer = createServer(clientContext, stormConf) + thriftServer.serve() + system.awaitTermination() + } + + private def createServer(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): ThriftServer = { + val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(clientContext, stormConf)) + val connectionType = ThriftConnectionType.NIMBUS + new ThriftServer(stormConf, processor, connectionType) + } + + private def updateOutputStormConfig(conf: JMap[String, String], output: String): Unit = { + // read existing config + val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]] + outputConfig.putAll(conf) + val yaml = new Yaml(new SafeConstructor) + val writer = new FileWriter(new File(output)) + yaml.dump(outputConfig, writer) + } + + import Constants._ + private def updateClientConfig(config: Config): Config = { + val storm = s"<${GEARPUMP_HOME}>/lib/storm/*" + val appClassPath = s"$storm${File.pathSeparator}" + config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH) + val executorClassPath = s"$storm${File.pathSeparator}" + config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH) + + val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath)) + .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(executorClassPath)) + + if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) { + val serializerConfig = ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) + updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig) + } else { + updated + } + } + +} + +class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]) extends Nimbus.Iface { + private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus]) + private var applications = Map.empty[String, Int] + private var topologies = Map.empty[String, (StormTopology, JMap[AnyRef, AnyRef])] + private val expireSeconds = StormUtil.getInt(stormConf, Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get + private val expiredCallback = new ExpiredCallback[String, WritableByteChannel] { + override def expire(k: String, v: WritableByteChannel): Unit = { + v.close() + } + } + private val fileCacheMap = new TimeCacheMap[String, WritableByteChannel](expireSeconds, expiredCallback) + + override def submitTopology(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology): Unit = { + submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, new SubmitOptions(TopologyInitialStatus.ACTIVE)) + } + + override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = { + + implicit val system = clientContext.system + val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf) + val stormConfig = gearpumpStormTopology.getStormConfig + val processorGraph = GraphBuilder.build(gearpumpStormTopology) + val config = UserConfig.empty + .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology) + .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig) + val app = StreamApplication(name, processorGraph, config) + LOG.info(s"jar file uploaded to $uploadedJarLocation") + val appId = clientContext.submit(app, uploadedJarLocation) + applications += name -> appId + topologies += name -> (topology, stormConfig) + LOG.info(s"Storm Application $appId submitted") + } + + override def killTopologyWithOpts(name: String, options: KillOptions): Unit = { + clientContext.shutdown(applications.getOrElse(name, throw new RuntimeException(s"topology $name not found"))) + applications -= name + topologies -= name + LOG.info(s"Killed topology $name") + } + + override def getNimbusConf: String = { + JSONValue.toJSONString(stormConf) + } + + override def getTopology(name: String): StormTopology = { + updateApps + topologies.getOrElse(name, + throw new RuntimeException(s"topology $name not found"))._1 + } + + override def getTopologyConf(name: String): String = { + updateApps + JSONValue.toJSONString(topologies.getOrElse(name, + throw new RuntimeException(s"topology $name not found"))._2) + } + + override def getUserTopology(id: String): StormTopology = getTopology(id) + + override def beginFileUpload(): String = { + val location = s"stormjar-${UUID.randomUUID()}.jar" + val channel = Channels.newChannel(new FileOutputStream(location)) + fileCacheMap.put(location, channel) + LOG.info(s"Uploading file from client to $location") + location + } + + override def uploadChunk(location: String, chunk: ByteBuffer): Unit = { + if (!fileCacheMap.containsKey(location)) { + throw new RuntimeException(s"File for $location does not exist (or timed out)") + } else { + val channel = fileCacheMap.get(location) + channel.write(chunk) + fileCacheMap.put(location, channel) + } + } + + override def finishFileUpload(location: String): Unit = { + if (!fileCacheMap.containsKey(location)) { + throw new RuntimeException(s"File for $location does not exist (or timed out)") + } else { + val channel = fileCacheMap.get(location) + channel.close() + fileCacheMap.remove(location) + new File(location).delete() + } + } + + override def getClusterInfo: ClusterSummary = { + updateApps + val topologySummaryList = topologies.map { case (name, _) => + new TopologySummary(name, name, 0, 0, 0, 0, "") + }.toSeq + new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava) + } + + override def beginFileDownload(file: String): String = { + throw new UnsupportedOperationException + } + + override def uploadNewCredentials(s: String, credentials: Credentials): Unit = { + throw new UnsupportedOperationException + } + override def activate(name: String): Unit = { + throw new UnsupportedOperationException + } + + override def rebalance(name: String, options: RebalanceOptions): Unit = { + throw new UnsupportedOperationException + } + + override def deactivate(name: String): Unit = { + throw new UnsupportedOperationException + } + + override def getTopologyInfo(name: String): TopologyInfo = { + throw new UnsupportedOperationException + } + + override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = { + throw new UnsupportedOperationException + } + + override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions()) + + override def downloadChunk(name: String): ByteBuffer = { + throw new UnsupportedOperationException + } + + private def updateApps: Unit = { + clientContext.listApps.appMasters.foreach { app => + val name = app.appName + if (applications.contains(app.appName)) { + if (app.status != MasterToAppMaster.AppMasterActive) { + applications -= name + topologies -= name + } + } + } + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala new file mode 100644 index 0000000..70efbf3 --- /dev/null +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.experiments.storm.main + +import backtype.storm.Config +import backtype.storm.utils.Utils +import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import io.gearpump.util.Constants._ +import io.gearpump.util.{AkkaApp, LogUtil, Util} + +object GearpumpStormClient extends AkkaApp with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array( + "jar" -> CLIOption[String]("<storm jar>", required = true), + "config" -> CLIOption[Int]("<storm config file>", required = true), + "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false))) + + override def main(inputAkkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + + val verbose = config.getBoolean("verbose") + if (verbose) { + LogUtil.verboseLogToConsole + } + + val jar = config.getString("jar") + val stormConfig = config.getString("config") + val topology = config.remainArgs(0) + val stormArgs = config.remainArgs.drop(1) + val stormOptions = Array( + s"-Dstorm.options=${getThriftOptions(stormConfig)}", + s"-Dstorm.jar=$jar", + s"-Dstorm.config.file=$stormConfig", + s"-D${PREFER_IPV4}=true" + ) + + val classPath = Array(s"${System.getProperty(GEARPUMP_HOME)}/lib/storm/*", jar) + val process = Util.startProcess(stormOptions, classPath, topology, stormArgs) + + // wait till the process exit + val exit = process.exitValue() + + if (exit != 0) { + throw new Exception(s"failed to submit jar, exit code $exit, error summary: ${process.logger.error}") + } + } + + private def getThriftOptions(stormConfig: String): String = { + val config = Utils.findAndReadConfigFile(stormConfig, true) + val thriftPort = config.get(Config.NIMBUS_THRIFT_PORT) + s"${Config.NIMBUS_HOST}=127.0.0.1,${Config.NIMBUS_THRIFT_PORT}=$thriftPort" + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala index 996498b..b88a200 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala @@ -18,7 +18,6 @@ package io.gearpump.experiments.storm.topology -import java.io._ import java.lang.{Iterable => JIterable} import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} @@ -35,8 +34,6 @@ import io.gearpump.experiments.storm.util.StormUtil._ import io.gearpump.streaming.Processor import io.gearpump.streaming.task.Task import io.gearpump.util.LogUtil -import org.apache.storm.shade.org.yaml.snakeyaml.Yaml -import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor import org.slf4j.Logger import scala.collection.JavaConversions._ @@ -45,43 +42,18 @@ object GearpumpStormTopology { private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology]) def apply( + name: String, topology: StormTopology, - appConfigInJson: String, - fileConfig: String)(implicit system: ActorSystem): GearpumpStormTopology = { + appConfigInJson: String)(implicit system: ActorSystem): GearpumpStormTopology = { new GearpumpStormTopology( + name, topology, Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]], - parseJsonStringToMap(appConfigInJson), - readStormConfig(fileConfig) + parseJsonStringToMap(appConfigInJson) ) } - /** - * @param configFile user provided local config file - * @return a config Map loaded from config file - */ - private def readStormConfig(configFile: String): JMap[AnyRef, AnyRef] = { - var ret: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] - try { - val yaml = new Yaml(new SafeConstructor) - val input: InputStream = new FileInputStream(configFile) - try { - ret = yaml.load(new InputStreamReader(input)).asInstanceOf[JMap[AnyRef, AnyRef]] - } catch { - case e: IOException => - LOG.warn(s"failed to load config file $configFile") - } finally { - input.close() - } - } catch { - case e: FileNotFoundException => - LOG.warn(s"failed to find config file $configFile") - case t: Throwable => - LOG.error(t.getMessage) - } - ret - } } /** @@ -91,21 +63,20 @@ object GearpumpStormTopology { * 3. provides interface for Gearpump applications to use Storm topology * * an implicit ActorSystem is required to create Gearpump processors + * @param name topology name * @param topology Storm topology - * @param sysConfig configs from "defaults.yaml" and "storm.yaml" + * @param sysConfig configs from "defaults.yaml" and custom config file * @param appConfig config submitted from user application - * @param fileConfig custom file config set by user in command line */ private[storm] class GearpumpStormTopology( + name: String, topology: StormTopology, sysConfig: JMap[AnyRef, AnyRef], - appConfig: JMap[AnyRef, AnyRef], - fileConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) { - import io.gearpump.experiments.storm.topology.GearpumpStormTopology._ + appConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) { private val spouts = topology.get_spouts() private val bolts = topology.get_bolts() - private val stormConfig = mergeConfigs(sysConfig, appConfig, fileConfig, getComponentConfigs(spouts, bolts)) + private val stormConfig = mergeConfigs(sysConfig, appConfig, getComponentConfigs(spouts, bolts)) private val spoutProcessors = spouts.map { case (id, spout) => id -> spoutToProcessor(id, spout, stormConfig.toMap) }.toMap private val boltProcessors = bolts.map { case (id, bolt) => @@ -114,7 +85,7 @@ private[storm] class GearpumpStormTopology( /** * @return merged Storm config with priority - * defaults.yaml < storm.yaml < application config < component config < custom file config + * defaults.yaml < custom file config < application config < component config */ def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig @@ -139,13 +110,12 @@ private[storm] class GearpumpStormTopology( private def mergeConfigs( sysConfig: JMap[AnyRef, AnyRef], appConfig: JMap[AnyRef, AnyRef], - fileConfig: JMap[AnyRef, AnyRef], componentConfigs: Iterable[JMap[AnyRef, AnyRef]]): JMap[AnyRef, AnyRef] = { val allConfig = new JHashMap[AnyRef, AnyRef] allConfig.putAll(sysConfig) allConfig.putAll(appConfig) allConfig.putAll(getMergedComponentConfig(componentConfigs, allConfig.toMap)) - allConfig.putAll(fileConfig) + allConfig.put(Config.TOPOLOGY_NAME, name) allConfig } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala deleted file mode 100644 index 20b63b3..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm - -import akka.actor.ActorSystem -import akka.testkit.TestProbe -import backtype.storm.generated.StormTopology -import io.gearpump.experiments.storm.util.TopologyUtil -import io.gearpump.cluster.TestUtil -import Commands.{GetTopology, Kill, Submit} -import GearpumpThriftServer.GearpumpNimbus -import org.scalatest.mock.MockitoSugar -import org.scalatest.{Matchers, WordSpec} - -import scala.concurrent.Future - -class GearpumpNimbusSpec extends WordSpec with Matchers with MockitoSugar { - - "GearpumpNimbus" should { - "submit and kill topology through ClientContext" in { - - implicit val system = ActorSystem("storm-test", TestUtil.DEFAULT_CONFIG) - implicit val dispatcher = system.dispatcher - - val handler = TestProbe() - val gearpumpNimbus = new GearpumpNimbus(handler.ref) - - val appId = 0 - val name = "test" - val uploadedJarLocation = "local" - val jsonConf = "storm_json_conf" - val topology = TopologyUtil.getTestTopology - - Future(gearpumpNimbus.submitTopology(name, uploadedJarLocation, jsonConf, topology)) - handler.expectMsgType[Submit] - - Future(gearpumpNimbus.getTopology(name)) - handler.expectMsgType[GetTopology] - handler.reply(new StormTopology) - - Future(gearpumpNimbus.getUserTopology(name)) - handler.expectMsgType[GetTopology] - handler.reply(new StormTopology) - - Future(gearpumpNimbus.killTopology(name)) - handler.expectMsgType[Kill] - - system.shutdown() - system.awaitTermination() - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala deleted file mode 100644 index b1736f0..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm - -import backtype.storm.security.auth.ThriftServer -import org.mockito.Mockito._ -import org.scalatest.WordSpec -import org.scalatest.mock.MockitoSugar - -class GearpumpThriftServerSpec extends WordSpec with MockitoSugar { - - "GearpumpThriftServer" should { - "run ThriftServer.serve" in { - val tServer = mock[ThriftServer] - val thriftServer = new GearpumpThriftServer(tServer) - - thriftServer.run() - - verify(tServer).serve() - } - - "stop ThriftServer" in { - val tServer = mock[ThriftServer] - val thriftServer = new GearpumpThriftServer(tServer) - - thriftServer.close() - - verify(tServer).stop() - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala deleted file mode 100644 index 95cf279..0000000 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package io.gearpump.experiments.storm - -import akka.actor.{ActorSystem, Props} -import akka.testkit.TestProbe -import io.gearpump.cluster.TestUtil -import io.gearpump.cluster.client.ClientContext -import io.gearpump.experiments.storm.Commands.{AppKilled, AppSubmitted, Kill, Submit} -import io.gearpump.experiments.storm.StormRunner.Handler -import io.gearpump.experiments.storm.util.TopologyUtil -import org.mockito.Matchers._ -import org.mockito.Mockito -import org.scalatest.{FlatSpec, Matchers} - -class StormRunnerSpec extends FlatSpec with Matchers { - it should "handle submit/kill correctly" in { - val conf = TestUtil.DEFAULT_CONFIG - implicit val system = ActorSystem("storm-test", conf) - - val uploadedJarLocation = "local" - val jsonConf = "storm_json_conf" - val topology = TopologyUtil.getTestTopology - - implicit val dispatcher = system.dispatcher - val clientContext = Mockito.mock(classOf[ClientContext]) - Mockito.when(clientContext.submit(any(), any())).thenReturn(0) - val handler = system.actorOf(Props(new Handler(clientContext, "jar", "user_config"))) - val client = TestProbe() - - client.send(handler, Submit("app", uploadedJarLocation, jsonConf, topology, null)) - client.expectMsg(AppSubmitted("app", 0)) - - - client.send(handler, Kill("app", null)) - client.expectMsg(AppKilled("app", 0)) - - system.shutdown() - system.awaitTermination() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala index d1c330b..8f10886 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala @@ -20,6 +20,7 @@ package io.gearpump.experiments.storm.topology import java.util.{HashMap => JHashMap, Map => JMap} +import backtype.storm.Config import io.gearpump.experiments.storm.processor.StormProcessor import io.gearpump.experiments.storm.producer.StormProducer import io.gearpump.experiments.storm.util.TopologyUtil @@ -40,28 +41,27 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar val sysConfig = newJavaConfig(name, sysVal) val appVal = "app" val appConfig = newJavaConfig(name, appVal) - val fileVal = "file" - val fileConfig = newJavaConfig(name, fileVal) implicit val system = MockUtil.system - val topology1 = new GearpumpStormTopology(stormTopology, newEmptyConfig, newEmptyConfig, newEmptyConfig) - topology1.getStormConfig shouldBe empty + val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig, newEmptyConfig) + topology1.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology1" + topology1.getStormConfig should not contain name - val topology2 = new GearpumpStormTopology(stormTopology, sysConfig, newEmptyConfig, newEmptyConfig) + val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig, newEmptyConfig) + topology2.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology2" topology2.getStormConfig.get(name) shouldBe sysVal - val topology3 = new GearpumpStormTopology(stormTopology, sysConfig, appConfig, newEmptyConfig) + val topology3 = new GearpumpStormTopology("topology3", stormTopology, sysConfig, appConfig) + topology3.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology3" topology3.getStormConfig.get(name) shouldBe appVal - val topology4 = new GearpumpStormTopology(stormTopology, sysConfig, appConfig, fileConfig) - topology4.getStormConfig.get(name) shouldBe fileVal } "create Gearpump processors from Storm topology" in { val stormTopology = TopologyUtil.getTestTopology implicit val system = MockUtil.system val gearpumpStormTopology = - GearpumpStormTopology(stormTopology, null, "") + GearpumpStormTopology("app", stormTopology, null) val processors = gearpumpStormTopology.getProcessors stormTopology.get_spouts().foreach { case (spoutId, _) => val processor = processors(spoutId) @@ -80,7 +80,7 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar implicit val system = MockUtil.system val sysConfig = new JHashMap[AnyRef, AnyRef] val gearpumpStormTopology = - GearpumpStormTopology(stormTopology, null, "") + GearpumpStormTopology("app", stormTopology, null) val targets0 = gearpumpStormTopology.getTargets("1") targets0 should contain key "3" targets0 should contain key "4" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala index ed792f4..15acf4e 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala @@ -20,7 +20,6 @@ package io.gearpump.integrationtest.storm import backtype.storm.utils.{Utils, DRPCClient} -import io.gearpump.experiments.storm.GearpumpThriftServer import io.gearpump.integrationtest.Docker import io.gearpump.integrationtest.minicluster.BaseContainer import org.apache.log4j.Logger @@ -30,23 +29,28 @@ class StormClient(masterAddrs: Seq[(String, Int)]) { private val LOG = Logger.getLogger(getClass) private val STORM_HOST = "storm0" - private val STORM_CMD = "/opt/start storm" + private val STORM_NIMBUS = "/opt/start storm nimbus" + private val STORM_APP = "/opt/start storm app" private val STORM_DRPC = "storm-drpc" private val CONFIG_FILE = "storm.yaml" - private val NIMBUS_THRIFT_PORT = GearpumpThriftServer.THRIFT_PORT private val DRPC_PORT = 3772 private val DRPC_INVOCATIONS_PORT = 3773 private val container = new BaseContainer(STORM_HOST, STORM_DRPC, masterAddrs, - tunnelPorts = Set(NIMBUS_THRIFT_PORT, DRPC_PORT, DRPC_INVOCATIONS_PORT)) + tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT)) def start(): Unit = { container.createAndStart() + startNimbus + } + + private def startNimbus: String = { + Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE") } def submitStormApp(jar: String, mainClass: String, args: String = ""): Int = { try { - Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_CMD -config $CONFIG_FILE " + + Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_APP -config $CONFIG_FILE " + s"-jar $jar $mainClass $args").split("\n") .filter(_.contains("The application id is ")).head.split(" ").last.toInt } catch { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/project/Pack.scala ---------------------------------------------------------------------- diff --git a/project/Pack.scala b/project/Pack.scala index b242b36..a3852c4 100644 --- a/project/Pack.scala +++ b/project/Pack.scala @@ -59,7 +59,7 @@ object Pack extends sbt.Build { "worker" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-DlogFilename=worker", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"), "services" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"), "yarnclient" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"), - "storm" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost") + "storm" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}") ), packLibDir := Map( "lib" -> new ProjectsToPack(core.id, streaming.id), http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/502dbae9/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala index e2b2c47..aa90726 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala @@ -221,7 +221,7 @@ object MasterService { */ def submitStormApp(jar: Option[File], stormConf: Option[File], args: String, systemConfig: Config): Boolean = { submitAndDeleteTempFiles( - "io.gearpump.experiments.storm.StormRunner", + "io.gearpump.experiments.storm.main.GearpumpStormClient", argsArray = spaceSeparatedArgumentsToArray(args), fileMap = Map("jar" -> jar, "config" -> stormConf).filter(_._2.isDefined).mapValues(_.get), classPath = getStormApplicationClassPath,
