fix #1975, fix storm integration test
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/920ad262 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/920ad262 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/920ad262 Branch: refs/heads/master Commit: 920ad262f9f9e88611d520df9b40d1ff8363f81c Parents: 42806a9 Author: manuzhang <[email protected]> Authored: Tue Feb 2 16:58:00 2016 +0800 Committer: manuzhang <[email protected]> Committed: Wed Feb 24 16:37:01 2016 +0800 ---------------------------------------------------------------------- .../experiments/storm/main/GearpumpNimbus.scala | 27 ++++++---- .../storm/main/GearpumpStormClient.scala | 2 +- .../checklist/StormCompatibilitySpec.scala | 51 +++++++++++-------- .../suites/StandaloneModeSuite.scala | 4 +- .../integrationtest/storm/StormClient.scala | 52 +++++++++----------- project/Build.scala | 3 +- .../streaming/appmaster/ClockService.scala | 2 - 7 files changed, 74 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala index 6d5b0aa..e9973e5 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -39,14 +39,15 @@ import io.gearpump.streaming.StreamApplication import io.gearpump.util.{AkkaApp, Constants, LogUtil} import org.apache.storm.shade.org.json.simple.JSONValue import org.apache.storm.shade.org.yaml.snakeyaml.Yaml -import org.apache.storm.shade.org.yaml.snakeyaml.constructor.SafeConstructor import org.slf4j.Logger import scala.collection.JavaConverters._ +import scala.concurrent.Future object GearpumpNimbus extends AkkaApp with ArgumentsParser { private val THRIFT_PORT = StormUtil.getThriftPort private val OUTPUT = "output" + private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus]) override val options: Array[(String, CLIOption[Any])] = Array( OUTPUT -> CLIOption[String]("<output path for configuration file>", required = false, defaultValue = Some("app.yaml")) @@ -57,15 +58,20 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser { val output = parsed.getString(OUTPUT) val akkaConf = updateClientConfig(inputAkkaConf) val system = ActorSystem("storm", akkaConf) + val clientContext = new ClientContext(akkaConf, system, null) val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]] - val thriftConf: JMap[String, String] = Map( - Config.NIMBUS_HOST -> ("" + akkaConf.getString(Constants.GEARPUMP_HOSTNAME)), - Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava + val thriftConf: JMap[AnyRef, AnyRef] = Map( + Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME), + Config.NIMBUS_THRIFT_PORT -> s"$THRIFT_PORT").asJava.asInstanceOf[JMap[AnyRef, AnyRef]] updateStormConfig(thriftConf, output) stormConf.putAll(thriftConf) - val thriftServer = createServer(clientContext, stormConf) - thriftServer.serve() + + import scala.concurrent.ExecutionContext.Implicits.global + Future { + val thriftServer = createServer(clientContext, stormConf) + thriftServer.serve() + } system.awaitTermination() } @@ -75,12 +81,12 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser { new ThriftServer(stormConf, processor, connectionType) } - private def updateStormConfig(thriftConfig: JMap[String, String], output: String): Unit = { + private def updateStormConfig(thriftConfig: JMap[AnyRef, AnyRef], output: String): Unit = { val updatedConfig: JMap[AnyRef, AnyRef] = new JHashMap[AnyRef, AnyRef] val outputConfig = Utils.findAndReadConfigFile(output, false).asInstanceOf[JMap[AnyRef, AnyRef]] updatedConfig.putAll(outputConfig) updatedConfig.putAll(thriftConfig) - val yaml = new Yaml(new SafeConstructor) + val yaml = new Yaml val serialized = yaml.dumpAsMap(updatedConfig) val writer = new FileWriter(new File(output)) try { @@ -112,7 +118,8 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser { } class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRef]) extends Nimbus.Iface { - private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus]) + import io.gearpump.experiments.storm.main.GearpumpNimbus._ + private var applications = Map.empty[String, Int] private var topologies = Map.empty[String, TopologyData] private val expireSeconds = StormUtil.getInt(stormConf, Config.NIMBUS_FILE_COPY_EXPIRATION_SECS).get @@ -128,7 +135,7 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe } override def submitTopologyWithOpts(name: String, uploadedJarLocation: String, jsonConf: String, topology: StormTopology, options: SubmitOptions): Unit = { - + LOG.info(s"Submitted topology $name") implicit val system = clientContext.system val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf) val stormConfig = gearpumpStormTopology.getStormConfig http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala index 643e926..d4c0e8a 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpStormClient.scala @@ -46,7 +46,7 @@ object GearpumpStormClient extends AkkaApp with ArgumentsParser { val stormOptions = Array( s"-Dstorm.options=${getThriftOptions(stormConfig)}", s"-Dstorm.jar=$jar", - s"-Dstorm.config.file=$stormConfig", + s"-Dstorm.conf.file=$stormConfig", s"-D${PREFER_IPV4}=true" ) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala index 30c6d57..3baabf6 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala @@ -26,7 +26,7 @@ import io.gearpump.integrationtest.{TestSpecBase, Util} */ class StormCompatibilitySpec extends TestSpecBase { - private lazy val stormClient = new StormClient(cluster.getMastersAddresses) + private lazy val stormClient = new StormClient(cluster.getMastersAddresses, restClient) override def beforeAll(): Unit = { super.beforeAll() @@ -74,27 +74,33 @@ class StormCompatibilitySpec extends TestSpecBase { "Storm over Gearpump" should withStorm { stormVersion => - s"support basic topologies ($stormVersion)" in { + s"support basic topologies ($stormVersion)" in { val stormJar = getStormJar(stormVersion) val topologyName = getTopologyName("exclamation", stormVersion) + // exercise - val appId = stormClient.submitStormApp(stormJar, - mainClass = "storm.starter.ExclamationTopology", args = topologyName) + val appId = stormClient.submitStormApp( + jar = stormJar, + mainClass = "storm.starter.ExclamationTopology", + args = topologyName, + appName = topologyName) // verify - val actual = expectAppIsRunning(appId, topologyName) - Util.retryUntil(restClient.queryStreamingAppDetail(actual.appId).clock > 0) + Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) } s"support to run a python version of wordcount ($stormVersion)" in { val stormJar = getStormJar(stormVersion) val topologyName = getTopologyName("wordcount", stormVersion) + // exercise - val appId = stormClient.submitStormApp(stormJar, - mainClass = "storm.starter.WordCountTopology", args = topologyName) + val appId = stormClient.submitStormApp( + jar = stormJar, + mainClass = "storm.starter.WordCountTopology", + args = topologyName, + appName = topologyName) // verify - expectAppIsRunning(appId, topologyName) Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) } @@ -104,8 +110,11 @@ class StormCompatibilitySpec extends TestSpecBase { // input (user and follower) data are already prepared in memory val stormJar = getStormJar(stormVersion) val topologyName = getTopologyName("reach", stormVersion) - stormClient.submitStormApp(stormJar, - mainClass = "storm.starter.ReachTopology", args = topologyName) + stormClient.submitStormApp( + jar = stormJar, + mainClass = "storm.starter.ReachTopology", + args = topologyName, + appName = topologyName) val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway) // verify @@ -119,13 +128,16 @@ class StormCompatibilitySpec extends TestSpecBase { s"support tick tuple ($stormVersion)" in { val stormJar = getStormJar(stormVersion) val topologyName = getTopologyName("slidingWindowCounts", stormVersion) + // exercise - val appId = stormClient.submitStormApp(stormJar, - mainClass = "storm.starter.RollingTopWords", args = s"$topologyName remote") + val appId = stormClient.submitStormApp( + jar = stormJar, + mainClass = "storm.starter.RollingTopWords", + args = s"$topologyName remote", + appName = topologyName) // verify - val actual = expectAppIsRunning(appId, topologyName) - Util.retryUntil(restClient.queryStreamingAppDetail(actual.appId).clock > 0) + Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) } s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in { @@ -142,8 +154,6 @@ class StormCompatibilitySpec extends TestSpecBase { val brokerList = kafkaCluster.getBrokerListConnectString val sourceTopic = "topic1" val sinkTopic = "topic2" - val appsCount = restClient.listApps().length - val appId = appsCount + 1 val args = Array("-topologyName", topologyName, "-sourceTopic", sourceTopic, "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, "-brokerList", brokerList, @@ -155,11 +165,12 @@ class StormCompatibilitySpec extends TestSpecBase { // generate number sequence (1, 2, 3, ...) to the topic withDataProducer(sourceTopic, brokerList) { producer => - stormClient.submitStormApp(stormJar, + val appId = stormClient.submitStormApp( + jar = stormJar, mainClass = stormKafkaTopology, - args = args.mkString(" ")) + args = args.mkString(" "), + appName = topologyName) - expectAppIsRunning(appId, topologyName) Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) // kill executor and verify at-least-once is guaranteed on application restart http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala index 944f542..c7a2c3e 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala @@ -32,8 +32,8 @@ class StandaloneModeSuite extends Suites( new RestServiceSpec, new ExampleSpec, new DynamicDagSpec, - new StabilitySpec/*, - new StormCompatibilitySpec*/ + new StabilitySpec, + new StormCompatibilitySpec ) with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala index 15acf4e..a018280 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala @@ -18,46 +18,37 @@ package io.gearpump.integrationtest.storm - import backtype.storm.utils.{Utils, DRPCClient} -import io.gearpump.integrationtest.Docker -import io.gearpump.integrationtest.minicluster.BaseContainer -import org.apache.log4j.Logger - +import io.gearpump.integrationtest.{Util, Docker} +import io.gearpump.integrationtest.minicluster.{RestClient, BaseContainer} -class StormClient(masterAddrs: Seq[(String, Int)]) { +class StormClient(masterAddrs: Seq[(String, Int)], restClient: RestClient) { - private val LOG = Logger.getLogger(getClass) - private val STORM_HOST = "storm0" - private val STORM_NIMBUS = "/opt/start storm nimbus" - private val STORM_APP = "/opt/start storm app" - private val STORM_DRPC = "storm-drpc" - private val CONFIG_FILE = "storm.yaml" + private val CONFIG_FILE = "/opt/gearpump/storm.yaml" + private val DRPC_HOST = "storm0" private val DRPC_PORT = 3772 private val DRPC_INVOCATIONS_PORT = 3773 + private val STORM_DRPC = "storm-drpc" + private val NIMBUS_HOST = "storm1" + private val STORM_NIMBUS = "storm nimbus" + private val STORM_APP = "/opt/start storm app" - private val container = new BaseContainer(STORM_HOST, STORM_DRPC, masterAddrs, + private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs, tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT)) + private val nimbusContainer = new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs) def start(): Unit = { - container.createAndStart() - startNimbus - } - - private def startNimbus: String = { - Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE") + drpcContainer.createAndStart() + nimbusContainer.createAndStart() } - def submitStormApp(jar: String, mainClass: String, args: String = ""): Int = { - try { - Docker.execAndCaptureOutput(STORM_HOST, s"$STORM_APP -config $CONFIG_FILE " + - s"-jar $jar $mainClass $args").split("\n") - .filter(_.contains("The application id is ")).head.split(" ").last.toInt - } catch { - case ex: Throwable => - LOG.warn(s"swallowed an exception: $ex") - -1 - } + def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = { + Util.retryUntil({ + Docker.exec(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " + + s"-jar $jar $mainClass $args") + restClient.listRunningApps().exists(_.appName == appName) + }) + restClient.listRunningApps().filter(_.appName == appName).head.appId } def getDRPCClient(drpcServerIp: String): DRPCClient = { @@ -66,7 +57,8 @@ class StormClient(masterAddrs: Seq[(String, Int)]) { } def shutDown(): Unit = { - container.killAndRemove() + drpcContainer.killAndRemove() + nimbusContainer.killAndRemove() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index 830e7c2..3f91f1e 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -385,8 +385,7 @@ object Build extends sbt.Build { "commons-httpclient" % "commons-httpclient" % commonsHttpVersion, "org.apache.hadoop" % "hadoop-mapreduce-client-core" % clouderaVersion, "org.apache.hadoop" % "hadoop-yarn-server-resourcemanager" % clouderaVersion % "provided", - "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % clouderaVersion % "provided", - "org.specs2" %% "specs2-mock" % "3.6.4" % "test" + "org.apache.hadoop" % "hadoop-yarn-server-nodemanager" % clouderaVersion % "provided" ) ) ) dependsOn(services % "test->test;compile->compile", core % "provided") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/920ad262/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala index 578a15d..67fd592 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala @@ -348,8 +348,6 @@ object ClockService { def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = { taskClocks(taskIndex) = clock _min = Longs.min(taskClocks: _*) - - Console.print(s"min clock of $processorId is ${_min}") } }
