Revert "fix #1895, make GearpumpNimbus a standalone service"
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/b8f8bb12 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/b8f8bb12 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/b8f8bb12 Branch: refs/heads/master Commit: b8f8bb12bd0fe51d45fc63962579ef5b71f7cbe1 Parents: 7f48c12 Author: ManuZhang <[email protected]> Authored: Tue Feb 2 07:54:53 2016 +0800 Committer: ManuZhang <[email protected]> Committed: Tue Feb 2 07:54:53 2016 +0800 ---------------------------------------------------------------------- docs/dev-storm.md | 40 +-- .../gearpump/experiments/storm/Commands.scala | 36 +++ .../storm/GearpumpThriftServer.scala | 143 +++++++++++ .../experiments/storm/StormRunner.scala | 137 ++++++++-- .../experiments/storm/main/GearpumpNimbus.scala | 247 ------------------- .../storm/main/GearpumpStormClient.scala | 70 ------ .../storm/topology/GearpumpStormTopology.scala | 52 +++- .../experiments/storm/GearpumpNimbusSpec.scala | 68 +++++ .../storm/GearpumpThriftServerSpec.scala | 48 ++++ .../experiments/storm/StormRunnerSpec.scala | 58 +++++ .../topology/GearpumpStormTopologySpec.scala | 20 +- .../integrationtest/storm/StormClient.scala | 14 +- project/Pack.scala | 2 +- .../io/gearpump/services/MasterService.scala | 2 +- 14 files changed, 537 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/docs/dev-storm.md ---------------------------------------------------------------------- diff --git a/docs/dev-storm.md b/docs/dev-storm.md index a22ed6c..a05c4ed 100644 --- a/docs/dev-storm.md +++ b/docs/dev-storm.md @@ -77,43 +77,23 @@ This section shows how to run an existing Storm jar in a local Gearpump cluster. 1. launch a local cluster ``` - bin/local + ./target/pack/bin/local ``` -2. start a local Gearpump Nimbus server - - Users need server's thrift port to submit topologies later. The thrift port is written to a yaml config file set with `-output` option. - Users can provide an existing config file where only `nimbus.thrift.port` is overwritten. If not provided, a new file `app.yaml` is created with the config. - +2. submit a topology from storm-starter. ``` - bin/storm nimbus -output [conf <custom yaml config>] + bin/storm -verbose -config storm.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation ``` - -3. submit Storm applications - - Users can either submit Storm applications through command line or UI. - - a. submit Storm applications through command line - - - ``` - bin/storm app -verbose -config app.yaml -jar storm-starter-${STORM_VERSION}.jar storm.starter.ExclamationTopology exclamation - ``` - Users are able to configure their applications through following options + Users are able to configure their applications through following options - * `jar` - set the path of a Storm application jar - * `config` - submit the custom configuration file generated when launching Nimbus + * `jar` - set the path of a storm application jar + * `config` - submit a customized storm configuration file - b. submit Storm application through UI + That's it. Check the dashboard and you should see data flowing through your topology. - 1. Click on the "Create" button on the applications page on UI. - 2. Click on the "Submit Storm Application" item in the pull down menu. - 3. In the popup console, upload the Storm application jar and the configuration file generated when launching Nimbus, - and fill in `storm.starter.ExclamationTopology exclamation` as arguments. - 4. Click on the "Submit" button + *Note that submission from UI is not supported yet*. - Either way, check the dashboard and you should see data flowing through your topology. ## How is it different from running on Storm @@ -165,11 +145,11 @@ Gearpump has flow control between tasks such that [sender cannot flood receiver] All Storm configurations are respected with the following priority order ``` -defaults.yaml < custom file config < application config < component config +defaults.yaml < storm.yaml < application config < component config < custom user config ``` where * application config is submit from Storm application along with the topology * component config is set in spout / bolt with `getComponentConfiguration` -* custom file config is specified with the `-config` option when submitting Storm application from command line or uploaded from UI \ No newline at end of file +* custom user config is specified with the `-config` option when submitting Storm application from command line \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala new file mode 100644 index 0000000..1d5b903 --- /dev/null +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/Commands.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.experiments.storm + +import backtype.storm.generated.{KillOptions, StormTopology, SubmitOptions} + +object Commands { + + case class Submit(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions) + + case class AppSubmitted(name: String, appId: Int) + + case class Kill(name: String, option: KillOptions) + + case class AppKilled(name: String, appId: Int) + + case object GetClusterInfo + + case class GetTopology(id: String) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala new file mode 100644 index 0000000..640c20c --- /dev/null +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/GearpumpThriftServer.scala @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.experiments.storm + +import java.nio.ByteBuffer +import java.util.concurrent.TimeUnit +import java.util.{HashMap => JHashMap, Map => JMap} + +import akka.actor.ActorRef +import backtype.storm.Config +import backtype.storm.generated._ +import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer} +import backtype.storm.utils.Utils +import io.gearpump.experiments.storm.Commands.{GetClusterInfo, _} +import io.gearpump.experiments.storm.util.StormUtil +import io.gearpump.util.ActorUtil.askActor + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import scala.language.postfixOps + +object GearpumpThriftServer { + val THRIFT_PORT = StormUtil.getThriftPort + implicit val timeOut = akka.util.Timeout(3, TimeUnit.SECONDS) + + private def createServer(handler: ActorRef): ThriftServer = { + val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(handler)) + val connectionType = ThriftConnectionType.NIMBUS + val config = Utils.readDefaultConfig().asInstanceOf[JMap[AnyRef, AnyRef]] + config.put(Config.NIMBUS_THRIFT_PORT, s"$THRIFT_PORT") + new ThriftServer(config, processor, connectionType) + } + + def apply(handler: ActorRef): GearpumpThriftServer = { + new GearpumpThriftServer(createServer(handler)) + } + + class GearpumpNimbus(handler: ActorRef) extends Nimbus.Iface { + + override def submitTopology(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology): Unit = { + val ask = askActor(handler, Submit(name, uploadedJarLocation, jsonConf, topology, new SubmitOptions(TopologyInitialStatus.ACTIVE))) + Await.result(ask, 30 seconds) + } + + override def killTopologyWithOpts(name: String, options: KillOptions): Unit = { + Await.result(askActor(handler,Kill(name, options)), 10 seconds) + } + + override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = { + Await.result(askActor(handler,Submit(name, uploadedJarLocation, jsonConf, topology, options)), 10 seconds) + } + + override def uploadChunk(location: String, chunk: ByteBuffer): Unit = { + } + + override def getNimbusConf: String = { + throw new UnsupportedOperationException + } + + override def getTopology(id: String): StormTopology = { + Await.result(askActor[StormTopology](handler, GetTopology(id)), 10 seconds) + } + + override def getTopologyConf(id: String): String = { + throw new UnsupportedOperationException + } + + override def beginFileDownload(file: String): String = { + throw new UnsupportedOperationException + } + + override def getUserTopology(id: String): StormTopology = getTopology(id) + + override def activate(name: String): Unit = { + throw new UnsupportedOperationException + } + + override def rebalance(name: String, options: RebalanceOptions): Unit = { + throw new UnsupportedOperationException + } + + override def deactivate(name: String): Unit = { + throw new UnsupportedOperationException + } + + override def getTopologyInfo(id: String): TopologyInfo = { + throw new UnsupportedOperationException + } + + override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = { + throw new UnsupportedOperationException + } + + override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions()) + + override def downloadChunk(id: String): ByteBuffer = { + throw new UnsupportedOperationException + } + + override def beginFileUpload(): String = { + "local thrift server" + } + + override def getClusterInfo: ClusterSummary = { + Await.result(askActor[ClusterSummary](handler, GetClusterInfo), 10 seconds) + } + + override def finishFileUpload(location: String): Unit = { + } + + override def uploadNewCredentials(s: String, credentials: Credentials): Unit = { + throw new UnsupportedOperationException + } + } +} + +class GearpumpThriftServer(server: ThriftServer) extends Thread { + + override def run(): Unit = { + server.serve() + } + + def close(): Unit = { + server.stop() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala index dd8d782..84d9460 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/StormRunner.scala @@ -18,36 +18,131 @@ package io.gearpump.experiments.storm -import io.gearpump.experiments.storm.main.{GearpumpNimbus, GearpumpStormClient} -import io.gearpump.util.LogUtil -import org.slf4j.Logger +import java.io.File +import java.util.{Map => JMap} -object StormRunner { - private val LOG: Logger = LogUtil.getLogger(getClass) +import akka.actor.{Actor, ActorSystem, Props} +import backtype.storm.Config +import backtype.storm.generated.{ClusterSummary, StormTopology, SupervisorSummary, TopologySummary} +import com.typesafe.config.ConfigValueFactory +import io.gearpump.cluster.UserConfig +import io.gearpump.cluster.client.ClientContext +import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import io.gearpump.experiments.storm.Commands.{GetClusterInfo, _} +import io.gearpump.experiments.storm.topology.GearpumpStormTopology +import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants} +import io.gearpump.streaming.StreamApplication +import io.gearpump.util.Constants._ +import io.gearpump.util.{AkkaApp, Constants, LogUtil, Util} - private val commands = Map("nimbus" -> GearpumpNimbus, "app" -> GearpumpStormClient) +import scala.collection.JavaConverters._ - private def usage: Unit = { - println(commands) - val keys = commands.keys.toList.sorted - Console.err.println("Usage: " + "<" + keys.mkString("|") + ">") +object StormRunner extends AkkaApp with ArgumentsParser { + override val options: Array[(String, CLIOption[Any])] = Array( + "jar" -> CLIOption[String]("<storm jar>", required = true), + "config" -> CLIOption[String]("<storm config path>", required = false), + "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false))) + + override val remainArgs = Array("topology_name") + + override def main(inputAkkaConf: Config, args: Array[String]): Unit = { + + val akkaConf = updateConfig(inputAkkaConf) + val config = parse(args) + + val verbose = config.getBoolean("verbose") + if (verbose) { + LogUtil.verboseLogToConsole + } + + val jar = config.getString("jar") + val stormConfig = config.getString("config") + val topology = config.remainArgs(0) + val stormArgs = config.remainArgs.drop(1) + + val system = ActorSystem("storm", akkaConf) + val clientContext = new ClientContext(akkaConf, system, null) + + val gearpumpNimbus = system.actorOf(Props(new Handler(clientContext, jar, stormConfig))) + val thriftServer = GearpumpThriftServer(gearpumpNimbus) + thriftServer.start() + + val stormOptions = Array("-Dstorm.options=" + + s"${Config.NIMBUS_HOST}=127.0.0.1,${Config.NIMBUS_THRIFT_PORT}=${GearpumpThriftServer.THRIFT_PORT}", + "-Dstorm.jar=" + jar, + s"-D${PREFER_IPV4}=true" + ) + + val classPath = Array(System.getProperty("java.class.path"), jar) + val process = Util.startProcess(stormOptions, classPath, topology, stormArgs) + + // wait till the process exit + val exit = process.exitValue() + + thriftServer.close() + clientContext.close() + system.shutdown() + system.awaitTermination() + + if (exit != 0) { + throw new Exception(s"failed to submit jar, exit code $exit, error summary: ${process.logger.error}") + } } - private def executeCommand(command : String, commandArgs : Array[String]): Unit = { - if (!commands.contains(command)) { - usage + import Constants._ + private def updateConfig(config: Config): Config = { + val storm = s"<${GEARPUMP_HOME}>/lib/storm/*" + val appClassPath = s"$storm${File.pathSeparator}" + config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH) + val executorClassPath = s"$storm${File.pathSeparator}" + config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH) + + val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath)) + .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(executorClassPath)) + + if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) { + val serializerConfig = ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) + updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig) } else { - commands(command).main(commandArgs) + updated } } - def main(args: Array[String]) = { - if (args.length == 0) { - usage - } else { - val command = args(0) - val commandArgs = args.drop(1) - executeCommand(command, commandArgs) + + class Handler(clientContext: ClientContext, jar: String, fileConfig: String) extends Actor { + private var applications = Map.empty[String, Int] + private var topologies = Map.empty[String, StormTopology] + private val LOG = LogUtil.getLogger(classOf[Handler]) + + implicit val system = context.system + + def receive: Receive = { + case Kill(name, option) => + topologies -= name + clientContext.shutdown(applications.getOrElse(name, throw new RuntimeException(s"topology $name not found"))) + val appId = applications(name) + applications -= name + LOG.info(s"Killed topology $name") + sender ! AppKilled(name, appId) + case Submit(name, uploadedJarLocation, appConfig, topology, option) => + topologies += name -> topology + + val gearpumpStormTopology = GearpumpStormTopology(topology, appConfig, fileConfig) + val stormConfig = gearpumpStormTopology.getStormConfig + val processorGraph = GraphBuilder.build(gearpumpStormTopology) + val config = UserConfig.empty + .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology) + .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig) + val app = StreamApplication(name, processorGraph, config) + val appId = clientContext.submit(app, jar) + applications += name -> appId + LOG.info(s"Storm Application $appId submitted") + sender ! AppSubmitted(name, appId) + case GetClusterInfo => + val topologySummaryList = topologies.map { case (name, _) => + new TopologySummary(name, name, 0, 0, 0, 0, "") + }.toSeq + sender ! new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava) + case GetTopology(id) => + sender ! topologies(id) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala deleted file mode 100644 index 83328b4..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.main - -import java.io.{File, FileOutputStream, FileWriter} -import java.nio.ByteBuffer -import java.nio.channels.{Channels, WritableByteChannel} -import java.util.{Map => JMap, UUID} - -import akka.actor.ActorSystem -import backtype.storm.Config -import backtype.storm.generated._ -import backtype.storm.security.auth.{ThriftConnectionType, ThriftServer} -import backtype.storm.utils.TimeCacheMap.ExpiredCallback -import backtype.storm.utils.{TimeCacheMap, Utils} -import com.typesafe.config.ConfigValueFactory -import io.gearpump.cluster.{MasterToAppMaster, UserConfig} -import io.gearpump.cluster.client.ClientContext -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.experiments.storm.topology.GearpumpStormTopology -import io.gearpump.experiments.storm.util.{GraphBuilder, StormConstants, StormUtil} -import io.gearpump.streaming.StreamApplication -import io.gearpump.util.{FileUtils, AkkaApp, Constants, LogUtil} -import org.apache.storm.shade.org.json.simple.JSONValue -import org.apache.storm.shade.org.yaml.snakeyaml.Yaml -import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor -import org.slf4j.Logger - -import scala.collection.JavaConverters._ - -object GearpumpNimbus extends AkkaApp with ArgumentsParser { - private val THRIFT_PORT = StormUtil.getThriftPort - private val OUTPUT = "output" - - override val options: Array[(String, CLIOption[Any])] = Array( - OUTPUT -> CLIOption[String]("<output path for configuration file>", required = false, defaultValue = Some("app.yaml")) - ) - - override def main(inputAkkaConf: Config, args: Array[String]): Unit = { - val parsed = parse(args) - val output = parsed.getString(OUTPUT) - val akkaConf = updateClientConfig(inputAkkaConf) - val system = ActorSystem("storm", akkaConf) - val clientContext = new ClientContext(akkaConf, system, null) - val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]] - val thriftConf: JMap[String, String] = Map( - Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME), - Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava - updateOutputStormConfig(thriftConf, output) - stormConf.putAll(thriftConf) - val thriftServer = createServer(clientContext, stormConf) - thriftServer.serve() - system.awaitTermination() - } - - private def createServer(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]): ThriftServer = { - val processor = new Nimbus.Processor[GearpumpNimbus](new GearpumpNimbus(clientContext, stormConf)) - val connectionType = ThriftConnectionType.NIMBUS - new ThriftServer(stormConf, processor, connectionType) - } - - private def updateOutputStormConfig(conf: JMap[String, String], output: String): Unit = { - // read existing config - val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]] - outputConfig.putAll(conf) - val yaml = new Yaml(new SafeConstructor) - val writer = new FileWriter(new File(output)) - yaml.dump(outputConfig, writer) - } - - import Constants._ - private def updateClientConfig(config: Config): Config = { - val storm = s"<${GEARPUMP_HOME}>/lib/storm/*" - val appClassPath = s"$storm${File.pathSeparator}" + config.getString(GEARPUMP_APPMASTER_EXTRA_CLASSPATH) - val executorClassPath = s"$storm${File.pathSeparator}" + config.getString(Constants.GEARPUMP_EXECUTOR_EXTRA_CLASSPATH) - - val updated = config.withValue(GEARPUMP_APPMASTER_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(appClassPath)) - .withValue(GEARPUMP_EXECUTOR_EXTRA_CLASSPATH, ConfigValueFactory.fromAnyRef(executorClassPath)) - - if (config.hasPath(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) { - val serializerConfig = ConfigValueFactory.fromAnyRef(config.getString(StormConstants.STORM_SERIALIZATION_FRAMEWORK)) - updated.withValue(GEARPUMP_SERIALIZER_POOL, serializerConfig) - } else { - updated - } - } - -} - -class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]) extends Nimbus.Iface { - private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus]) - private var applications = Map.empty[String, Int] - private var topologies = Map.empty[String, (StormTopology, JMap[AnyRef, AnyRef])] - private val expireSeconds = StormUtil.getInt(stormConf, Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get - private val expiredCallback = new ExpiredCallback[String, WritableByteChannel] { - override def expire(k: String, v: WritableByteChannel): Unit = { - v.close() - } - } - private val fileCacheMap = new TimeCacheMap[String, WritableByteChannel](expireSeconds, expiredCallback) - - override def submitTopology(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology): Unit = { - submitTopologyWithOpts(name, uploadedJarLocation, jsonConf, topology, new SubmitOptions(TopologyInitialStatus.ACTIVE)) - } - - override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = { - - implicit val system = clientContext.system - val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf) - val stormConfig = gearpumpStormTopology.getStormConfig - val processorGraph = GraphBuilder.build(gearpumpStormTopology) - val config = UserConfig.empty - .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology) - .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig) - val app = StreamApplication(name, processorGraph, config) - LOG.info(s"jar file uploaded to $uploadedJarLocation") - val appId = clientContext.submit(app, uploadedJarLocation) - applications += name -> appId - topologies += name -> (topology, stormConfig) - LOG.info(s"Storm Application $appId submitted") - } - - override def killTopologyWithOpts(name: String, options: KillOptions): Unit = { - clientContext.shutdown(applications.getOrElse(name, throw new RuntimeException(s"topology $name not found"))) - applications -= name - topologies -= name - LOG.info(s"Killed topology $name") - } - - override def getNimbusConf: String = { - JSONValue.toJSONString(stormConf) - } - - override def getTopology(name: String): StormTopology = { - updateApps - topologies.getOrElse(name, - throw new RuntimeException(s"topology $name not found"))._1 - } - - override def getTopologyConf(name: String): String = { - updateApps - JSONValue.toJSONString(topologies.getOrElse(name, - throw new RuntimeException(s"topology $name not found"))._2) - } - - override def getUserTopology(id: String): StormTopology = getTopology(id) - - override def beginFileUpload(): String = { - val location = s"stormjar-${UUID.randomUUID()}.jar" - val channel = Channels.newChannel(new FileOutputStream(location)) - fileCacheMap.put(location, channel) - LOG.info(s"Uploading file from client to $location") - location - } - - override def uploadChunk(location: String, chunk: ByteBuffer): Unit = { - if (!fileCacheMap.containsKey(location)) { - throw new RuntimeException(s"File for $location does not exist (or timed out)") - } else { - val channel = fileCacheMap.get(location) - channel.write(chunk) - fileCacheMap.put(location, channel) - } - } - - override def finishFileUpload(location: String): Unit = { - if (!fileCacheMap.containsKey(location)) { - throw new RuntimeException(s"File for $location does not exist (or timed out)") - } else { - val channel = fileCacheMap.get(location) - channel.close() - fileCacheMap.remove(location) - new File(location).delete() - } - } - - override def getClusterInfo: ClusterSummary = { - updateApps - val topologySummaryList = topologies.map { case (name, _) => - new TopologySummary(name, name, 0, 0, 0, 0, "") - }.toSeq - new ClusterSummary(List[SupervisorSummary]().asJava, 0, topologySummaryList.asJava) - } - - override def beginFileDownload(file: String): String = { - throw new UnsupportedOperationException - } - - override def uploadNewCredentials(s: String, credentials: Credentials): Unit = { - throw new UnsupportedOperationException - } - override def activate(name: String): Unit = { - throw new UnsupportedOperationException - } - - override def rebalance(name: String, options: RebalanceOptions): Unit = { - throw new UnsupportedOperationException - } - - override def deactivate(name: String): Unit = { - throw new UnsupportedOperationException - } - - override def getTopologyInfo(name: String): TopologyInfo = { - throw new UnsupportedOperationException - } - - override def getTopologyInfoWithOpts(s: String, getInfoOptions: GetInfoOptions): TopologyInfo = { - throw new UnsupportedOperationException - } - - override def killTopology(name: String): Unit = killTopologyWithOpts(name, new KillOptions()) - - override def downloadChunk(name: String): ByteBuffer = { - throw new UnsupportedOperationException - } - - private def updateApps: Unit = { - clientContext.listApps.appMasters.foreach { app => - val name = app.appName - if (applications.contains(app.appName)) { - if (app.status != MasterToAppMaster.AppMasterActive) { - applications -= name - topologies -= name - } - } - } - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala deleted file mode 100644 index b813ae0..0000000 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.experiments.storm.main - -import backtype.storm.Config -import backtype.storm.utils.Utils -import io.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import io.gearpump.util.Constants._ -import io.gearpump.util.{AkkaApp, LogUtil, Util} - -object GearpumpStormClient extends AkkaApp with ArgumentsParser { - - override val options: Array[(String, CLIOption[Any])] = Array( - "jar" -> CLIOption[String]("<storm jar>", required = true), - "config" -> CLIOption[Int]("<storm config file>", required = true), - "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false))) - - override def main(inputAkkaConf: Config, args: Array[String]): Unit = { - val config = parse(args) - - val verbose = config.getBoolean("verbose") - if (verbose) { - LogUtil.verboseLogToConsole - } - - val jar = config.getString("jar") - val stormConfig = config.getString("config") - val topology = config.remainArgs(0) - val stormArgs = config.remainArgs.drop(1) - val stormOptions = Array( - s"-Dstorm.options=${getThriftOptions(stormConfig)}", - s"-Dstorm.jar=$jar", - s"-Dstorm.config.file=$stormConfig", - s"-D${PREFER_IPV4}=true" - ) - - val classPath = Array(s"${System.getProperty(GEARPUMP_HOME)}/lib/storm/*", jar) - val process = Util.startProcess(stormOptions, classPath, topology, stormArgs) - - // wait till the process exit - val exit = process.exitValue() - - if (exit != 0) { - throw new Exception(s"failed to submit jar, exit code $exit, error summary: ${process.logger.error}") - } - } - - private def getThriftOptions(stormConfig: String): String = { - val config = Utils.findAndReadConfigFile(stormConfig, true) - val host = config.get(Config.NIMBUS_HOST) - val thriftPort = config.get(Config.NIMBUS_THRIFT_PORT) - s"${Config.NIMBUS_HOST}=$host,${Config.NIMBUS_THRIFT_PORT}=$thriftPort" - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala index b88a200..996498b 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopology.scala @@ -18,6 +18,7 @@ package io.gearpump.experiments.storm.topology +import java.io._ import java.lang.{Iterable => JIterable} import java.util.{ArrayList => JArrayList, HashMap => JHashMap, List => JList, Map => JMap} @@ -34,6 +35,8 @@ import io.gearpump.experiments.storm.util.StormUtil._ import io.gearpump.streaming.Processor import io.gearpump.streaming.task.Task import io.gearpump.util.LogUtil +import org.apache.storm.shade.org.yaml.snakeyaml.Yaml +import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor import org.slf4j.Logger import scala.collection.JavaConversions._ @@ -42,18 +45,43 @@ object GearpumpStormTopology { private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpStormTopology]) def apply( - name: String, topology: StormTopology, - appConfigInJson: String)(implicit system: ActorSystem): GearpumpStormTopology = { + appConfigInJson: String, + fileConfig: String)(implicit system: ActorSystem): GearpumpStormTopology = { new GearpumpStormTopology( - name, topology, Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]], - parseJsonStringToMap(appConfigInJson) + parseJsonStringToMap(appConfigInJson), + readStormConfig(fileConfig) ) } + /** + * @param configFile user provided local config file + * @return a config Map loaded from config file + */ + private def readStormConfig(configFile: String): JMap[AnyRef, AnyRef] = { + var ret: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] + try { + val yaml = new Yaml(new SafeConstructor) + val input: InputStream = new FileInputStream(configFile) + try { + ret = yaml.load(new InputStreamReader(input)).asInstanceOf[JMap[AnyRef, AnyRef]] + } catch { + case e: IOException => + LOG.warn(s"failed to load config file $configFile") + } finally { + input.close() + } + } catch { + case e: FileNotFoundException => + LOG.warn(s"failed to find config file $configFile") + case t: Throwable => + LOG.error(t.getMessage) + } + ret + } } /** @@ -63,20 +91,21 @@ object GearpumpStormTopology { * 3. provides interface for Gearpump applications to use Storm topology * * an implicit ActorSystem is required to create Gearpump processors - * @param name topology name * @param topology Storm topology - * @param sysConfig configs from "defaults.yaml" and custom config file + * @param sysConfig configs from "defaults.yaml" and "storm.yaml" * @param appConfig config submitted from user application + * @param fileConfig custom file config set by user in command line */ private[storm] class GearpumpStormTopology( - name: String, topology: StormTopology, sysConfig: JMap[AnyRef, AnyRef], - appConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) { + appConfig: JMap[AnyRef, AnyRef], + fileConfig: JMap[AnyRef, AnyRef])(implicit system: ActorSystem) { + import io.gearpump.experiments.storm.topology.GearpumpStormTopology._ private val spouts = topology.get_spouts() private val bolts = topology.get_bolts() - private val stormConfig = mergeConfigs(sysConfig, appConfig, getComponentConfigs(spouts, bolts)) + private val stormConfig = mergeConfigs(sysConfig, appConfig, fileConfig, getComponentConfigs(spouts, bolts)) private val spoutProcessors = spouts.map { case (id, spout) => id -> spoutToProcessor(id, spout, stormConfig.toMap) }.toMap private val boltProcessors = bolts.map { case (id, bolt) => @@ -85,7 +114,7 @@ private[storm] class GearpumpStormTopology( /** * @return merged Storm config with priority - * defaults.yaml < custom file config < application config < component config + * defaults.yaml < storm.yaml < application config < component config < custom file config */ def getStormConfig: JMap[AnyRef, AnyRef] = stormConfig @@ -110,12 +139,13 @@ private[storm] class GearpumpStormTopology( private def mergeConfigs( sysConfig: JMap[AnyRef, AnyRef], appConfig: JMap[AnyRef, AnyRef], + fileConfig: JMap[AnyRef, AnyRef], componentConfigs: Iterable[JMap[AnyRef, AnyRef]]): JMap[AnyRef, AnyRef] = { val allConfig = new JHashMap[AnyRef, AnyRef] allConfig.putAll(sysConfig) allConfig.putAll(appConfig) allConfig.putAll(getMergedComponentConfig(componentConfigs, allConfig.toMap)) - allConfig.put(Config.TOPOLOGY_NAME, name) + allConfig.putAll(fileConfig) allConfig } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala new file mode 100644 index 0000000..20b63b3 --- /dev/null +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpNimbusSpec.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.experiments.storm + +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import backtype.storm.generated.StormTopology +import io.gearpump.experiments.storm.util.TopologyUtil +import io.gearpump.cluster.TestUtil +import Commands.{GetTopology, Kill, Submit} +import GearpumpThriftServer.GearpumpNimbus +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, WordSpec} + +import scala.concurrent.Future + +class GearpumpNimbusSpec extends WordSpec with Matchers with MockitoSugar { + + "GearpumpNimbus" should { + "submit and kill topology through ClientContext" in { + + implicit val system = ActorSystem("storm-test", TestUtil.DEFAULT_CONFIG) + implicit val dispatcher = system.dispatcher + + val handler = TestProbe() + val gearpumpNimbus = new GearpumpNimbus(handler.ref) + + val appId = 0 + val name = "test" + val uploadedJarLocation = "local" + val jsonConf = "storm_json_conf" + val topology = TopologyUtil.getTestTopology + + Future(gearpumpNimbus.submitTopology(name, uploadedJarLocation, jsonConf, topology)) + handler.expectMsgType[Submit] + + Future(gearpumpNimbus.getTopology(name)) + handler.expectMsgType[GetTopology] + handler.reply(new StormTopology) + + Future(gearpumpNimbus.getUserTopology(name)) + handler.expectMsgType[GetTopology] + handler.reply(new StormTopology) + + Future(gearpumpNimbus.killTopology(name)) + handler.expectMsgType[Kill] + + system.shutdown() + system.awaitTermination() + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala new file mode 100644 index 0000000..b1736f0 --- /dev/null +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/GearpumpThriftServerSpec.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.experiments.storm + +import backtype.storm.security.auth.ThriftServer +import org.mockito.Mockito._ +import org.scalatest.WordSpec +import org.scalatest.mock.MockitoSugar + +class GearpumpThriftServerSpec extends WordSpec with MockitoSugar { + + "GearpumpThriftServer" should { + "run ThriftServer.serve" in { + val tServer = mock[ThriftServer] + val thriftServer = new GearpumpThriftServer(tServer) + + thriftServer.run() + + verify(tServer).serve() + } + + "stop ThriftServer" in { + val tServer = mock[ThriftServer] + val thriftServer = new GearpumpThriftServer(tServer) + + thriftServer.close() + + verify(tServer).stop() + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala new file mode 100644 index 0000000..95cf279 --- /dev/null +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/StormRunnerSpec.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package io.gearpump.experiments.storm + +import akka.actor.{ActorSystem, Props} +import akka.testkit.TestProbe +import io.gearpump.cluster.TestUtil +import io.gearpump.cluster.client.ClientContext +import io.gearpump.experiments.storm.Commands.{AppKilled, AppSubmitted, Kill, Submit} +import io.gearpump.experiments.storm.StormRunner.Handler +import io.gearpump.experiments.storm.util.TopologyUtil +import org.mockito.Matchers._ +import org.mockito.Mockito +import org.scalatest.{FlatSpec, Matchers} + +class StormRunnerSpec extends FlatSpec with Matchers { + it should "handle submit/kill correctly" in { + val conf = TestUtil.DEFAULT_CONFIG + implicit val system = ActorSystem("storm-test", conf) + + val uploadedJarLocation = "local" + val jsonConf = "storm_json_conf" + val topology = TopologyUtil.getTestTopology + + implicit val dispatcher = system.dispatcher + val clientContext = Mockito.mock(classOf[ClientContext]) + Mockito.when(clientContext.submit(any(), any())).thenReturn(0) + val handler = system.actorOf(Props(new Handler(clientContext, "jar", "user_config"))) + val client = TestProbe() + + client.send(handler, Submit("app", uploadedJarLocation, jsonConf, topology, null)) + client.expectMsg(AppSubmitted("app", 0)) + + + client.send(handler, Kill("app", null)) + client.expectMsg(AppKilled("app", 0)) + + system.shutdown() + system.awaitTermination() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala index 8f10886..d1c330b 100644 --- a/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala +++ b/experiments/storm/src/test/scala/io/gearpump/experiments/storm/topology/GearpumpStormTopologySpec.scala @@ -20,7 +20,6 @@ package io.gearpump.experiments.storm.topology import java.util.{HashMap => JHashMap, Map => JMap} -import backtype.storm.Config import io.gearpump.experiments.storm.processor.StormProcessor import io.gearpump.experiments.storm.producer.StormProducer import io.gearpump.experiments.storm.util.TopologyUtil @@ -41,27 +40,28 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar val sysConfig = newJavaConfig(name, sysVal) val appVal = "app" val appConfig = newJavaConfig(name, appVal) + val fileVal = "file" + val fileConfig = newJavaConfig(name, fileVal) implicit val system = MockUtil.system - val topology1 = new GearpumpStormTopology("topology1", stormTopology, newEmptyConfig, newEmptyConfig) - topology1.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology1" - topology1.getStormConfig should not contain name + val topology1 = new GearpumpStormTopology(stormTopology, newEmptyConfig, newEmptyConfig, newEmptyConfig) + topology1.getStormConfig shouldBe empty - val topology2 = new GearpumpStormTopology("topology2", stormTopology, sysConfig, newEmptyConfig) - topology2.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology2" + val topology2 = new GearpumpStormTopology(stormTopology, sysConfig, newEmptyConfig, newEmptyConfig) topology2.getStormConfig.get(name) shouldBe sysVal - val topology3 = new GearpumpStormTopology("topology3", stormTopology, sysConfig, appConfig) - topology3.getStormConfig(Config.TOPOLOGY_NAME) shouldBe "topology3" + val topology3 = new GearpumpStormTopology(stormTopology, sysConfig, appConfig, newEmptyConfig) topology3.getStormConfig.get(name) shouldBe appVal + val topology4 = new GearpumpStormTopology(stormTopology, sysConfig, appConfig, fileConfig) + topology4.getStormConfig.get(name) shouldBe fileVal } "create Gearpump processors from Storm topology" in { val stormTopology = TopologyUtil.getTestTopology implicit val system = MockUtil.system val gearpumpStormTopology = - GearpumpStormTopology("app", stormTopology, null) + GearpumpStormTopology(stormTopology, null, "") val processors = gearpumpStormTopology.getProcessors stormTopology.get_spouts().foreach { case (spoutId, _) => val processor = processors(spoutId) @@ -80,7 +80,7 @@ class GearpumpStormTopologySpec extends WordSpec with Matchers with MockitoSugar implicit val system = MockUtil.system val sysConfig = new JHashMap[AnyRef, AnyRef] val gearpumpStormTopology = - GearpumpStormTopology("app", stormTopology, null) + GearpumpStormTopology(stormTopology, null, "") val targets0 = gearpumpStormTopology.getTargets("1") targets0 should contain key "3" targets0 should contain key "4" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala index 15acf4e..ed792f4 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala @@ -20,6 +20,7 @@ package io.gearpump.integrationtest.storm import backtype.storm.utils.{Utils, DRPCClient} +import io.gearpump.experiments.storm.GearpumpThriftServer import io.gearpump.integrationtest.Docker import io.gearpump.integrationtest.minicluster.BaseContainer import org.apache.log4j.Logger @@ -29,28 +30,23 @@ class StormClient(masterAddrs: Seq[(String, Int)]) { private val LOG = Logger.getLogger(getClass) private val STORM_HOST = "storm0" - private val STORM_NIMBUS = "/opt/start storm nimbus" - private val STORM_APP = "/opt/start storm app" + private val STORM_CMD = "/opt/start storm" private val STORM_DRPC = "storm-drpc" private val CONFIG_FILE = "storm.yaml" + private val NIMBUS_THRIFT_PORT = GearpumpThriftServer.THRIFT_PORT private val DRPC_PORT = 3772 private val DRPC_INVOCATIONS_PORT = 3773 private val container = new BaseContainer(STORM_HOST, STORM_DRPC, masterAddrs, - tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT)) + tunnelPorts = Set(NIMBUS_THRIFT_PORT, DRPC_PORT, DRPC_INVOCATIONS_PORT)) def start(): Unit = { container.createAndStart() - startNimbus - } - - private def startNimbus: String = { - Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE") } def submitStormApp(jar: String, mainClass: String, args: String = ""): Int = { try { - Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_APP -config $CONFIG_FILE " + + Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_CMD -config $CONFIG_FILE " + s"-jar $jar $mainClass $args").split("\n") .filter(_.contains("The application id is ")).head.split(" ").last.toInt } catch { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/project/Pack.scala ---------------------------------------------------------------------- diff --git a/project/Pack.scala b/project/Pack.scala index a3852c4..b242b36 100644 --- a/project/Pack.scala +++ b/project/Pack.scala @@ -59,7 +59,7 @@ object Pack extends sbt.Build { "worker" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-DlogFilename=worker", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"), "services" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"), "yarnclient" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost"), - "storm" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}") + "storm" -> Seq("-server", "-Djava.net.preferIPv4Stack=true", "-Dgearpump.home=${PROG_HOME}", "-Djava.rmi.server.hostname=localhost") ), packLibDir := Map( "lib" -> new ProjectsToPack(core.id, streaming.id), http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/b8f8bb12/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala index aa90726..e2b2c47 100644 --- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala @@ -221,7 +221,7 @@ object MasterService { */ def submitStormApp(jar: Option[File], stormConf: Option[File], args: String, systemConfig: Config): Boolean = { submitAndDeleteTempFiles( - "io.gearpump.experiments.storm.main.GearpumpStormClient", + "io.gearpump.experiments.storm.StormRunner", argsArray = spaceSeparatedArgumentsToArray(args), fileMap = Map("jar" -> jar, "config" -> stormConf).filter(_._2.isDefined).mapValues(_.get), classPath = getStormApplicationClassPath,
