GEARPUMP-9, Clean and fix integration test
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/d092e80f Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/d092e80f Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/d092e80f Branch: refs/heads/master Commit: d092e80f375e944eb978edc68dcea035fc9f723b Parents: e7a7f54 Author: Sean Zhong <[email protected]> Authored: Sat Apr 2 18:46:45 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:26:04 2016 +0800 ---------------------------------------------------------------------- .sbtopts | 2 +- core/src/main/scala/io/gearpump/util/Util.scala | 2 +- .../experiments/storm/main/GearpumpNimbus.scala | 2 +- .../experiments/storm/util/StormUtil.scala | 2 +- integrationtest/README.md | 78 ++++++-- .../gearpump/integrationtest/TestSpecBase.scala | 27 ++- .../checklist/ConnectorKafkaSpec.scala | 17 +- .../checklist/DynamicDagSpec.scala | 32 ++-- .../integrationtest/checklist/ExampleSpec.scala | 21 +- .../checklist/MessageDeliverySpec.scala | 44 +++-- .../checklist/RestServiceSpec.scala | 31 +-- .../checklist/StabilitySpec.scala | 68 +++++-- .../checklist/StormCompatibilitySpec.scala | 30 +-- .../suites/StandaloneModeSuite.scala | 7 +- .../io/gearpump/integrationtest/Docker.scala | 192 ++++++++++++++----- .../io/gearpump/integrationtest/ShellExec.scala | 24 ++- .../io/gearpump/integrationtest/Util.scala | 41 ++-- .../integrationtest/hadoop/HadoopCluster.scala | 8 +- .../integrationtest/kafka/KafkaCluster.scala | 35 ++-- .../kafka/NumericalDataProducer.scala | 7 +- .../integrationtest/kafka/ResultVerifier.scala | 2 + .../minicluster/BaseContainer.scala | 14 +- .../minicluster/CommandLineClient.scala | 20 +- .../minicluster/MiniCluster.scala | 66 ++++--- .../minicluster/RestClient.scala | 17 +- .../integrationtest/storm/StormClient.scala | 40 +++- jvm.sbt | 6 +- project/Build.scala | 5 +- project/BuildIntegrationTest.scala | 1 + project/travis/jvmopts | 1 + .../streaming/appmaster/AppMaster.scala | 20 +- .../appmaster/StreamAppMasterSummary.scala | 14 +- 32 files changed, 597 insertions(+), 279 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/.sbtopts ---------------------------------------------------------------------- diff --git a/.sbtopts b/.sbtopts index f6f24bf..c280c61 100644 --- a/.sbtopts +++ b/.sbtopts @@ -1,4 +1,4 @@ -J-XX:+CMSClassUnloadingEnabled -J-XX:+UseConcMarkSweepGC -J-Xss6M --J-XX:MaxMetaspaceSize=512m +-J-XX:MaxMetaspaceSize=1024m http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/core/src/main/scala/io/gearpump/util/Util.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/io/gearpump/util/Util.scala b/core/src/main/scala/io/gearpump/util/Util.scala index 03815a7..96eddfd 100644 --- a/core/src/main/scala/io/gearpump/util/Util.scala +++ b/core/src/main/scala/io/gearpump/util/Util.scala @@ -66,7 +66,7 @@ object Util { arguments : Array[String]) : RichProcess = { val java = System.getProperty("java.home") + "/bin/java" val command = List(java) ++ options ++ List("-cp", classPath.mkString(File.pathSeparator), mainClass) ++ arguments - LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")}; options: ${options.mkString(" ")}") + LOG.info(s"Starting executor process java $mainClass ${arguments.mkString(" ")} \n ${options.mkString(" ")}") val logger = new ProcessLogRedirector() val process = Process(command).run(logger) new RichProcess(process, logger) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala index c16ab41..dbbe738 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -45,7 +45,7 @@ import scala.collection.JavaConverters._ import scala.concurrent.Future object GearpumpNimbus extends AkkaApp with ArgumentsParser { - private val THRIFT_PORT = StormUtil.getThriftPort + private val THRIFT_PORT = StormUtil.getThriftPort() private val OUTPUT = "output" private val LOG: Logger = LogUtil.getLogger(classOf[GearpumpNimbus]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala index 33a6181..4f4fced 100644 --- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala +++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/util/StormUtil.scala @@ -125,7 +125,7 @@ object StormUtil { } } - def getThriftPort: Int = { + def getThriftPort(): Int = { Util.findFreePort.getOrElse( throw new Exception("unable to find free port for thrift server")) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/README.md ---------------------------------------------------------------------- diff --git a/integrationtest/README.md b/integrationtest/README.md index 9afb44f..95fa6c5 100644 --- a/integrationtest/README.md +++ b/integrationtest/README.md @@ -1,15 +1,27 @@ ## Getting Started -To run the integration test, you need a Linux with Kernel version >= 3.10 and Docker 1.7 (or higher). The test framework will use two Docker images. They will be downloaded at the first time, when you launching the tests. You can also prepare them manually: +To run the integration test, you need a Linux with Kernel version >= 3.10 and Docker 1.7 (or higher). The test framework will use several Docker images. +These docker image **NEED** to be prepared **BEFOREHAND** to avoid timeout during testing: - * [The Gearpump Cluster Launcher and Storm Client](https://hub.docker.com/r/stanleyxu2005/gpct-jdk8) - `docker pull stanleyxu2005/gpct-jdk8` + * [The Gearpump Cluster Launcher and Storm Client](https://hub.docker.com/r/stanleyxu2005/gearpump-launcher/) + `docker pull stanleyxu2005/gearpump-launcher` * [The standalone single node Kafka cluster with Zookeeper](https://hub.docker.com/r/spotify/kafka/) `docker pull spotify/kafka` + * [The Hadoop image](https://hub.docker.com/r/sequenceiq/hadoop-docker/) + `docker pull sequenceiq/hadoop-docker:2.6.0` ### Install Docker -The integration test framework is majorly developed on CentOS 7.0. We assume the command `docker` has added to `sudo` group without asking password. `sudo usermod -aG docker $(whoami)` Here is the [installation guide of Docker for CentOS](http://docs.docker.com/engine/installation/centos/). You can find more installation guide for other Linux/Mac OS distributions. If your machine is behind a firewall, here are [some leads](https://github.com/gearpump/gearpump-docker) to make it right. +The integration test framework use docker to simulate a real cluster. The test script assume the docker + command can be executed without prefixing with `sudo`, and without asking password. + + In Docker site, there are instructions about how to configure docker to avoid using sudo, with command like + this: `sudo usermod -aG docker $(whoami)` + + Please refer to the Docker documents for more information. + + 1. For CentOS, please check [How to create a docker group in CentOS](https://docs.docker.com/engine/installation/linux/centos/#create-a-docker-group) + 2. For Ubuntu, please check [How to create a docker group in Ubuntu](https://docs.docker.com/engine/installation/linux/ubuntulinux/#create-a-docker-group). ### Step to Run Tests @@ -22,26 +34,52 @@ The integration test framework is majorly developed on CentOS 7.0. We assume the The test will launch a Gearpump cluster with 1 master and 2 worker nodes as 3 Docker containers. It might take 10-20 minutes to go through all the test cases. It depends on, how powerful your machine is. The Docker container itself does not have a Gearpump distribution. It will link your local build to the Docker container. When tests are finished, you can see the test result on the screen, or you can save them to a file with this command `sbt it:test > test_report.out`. To investigate Gearpump log, please check the directory `output/target/pack/logs`. -## Manual Test +### How To test single integration test suite like `io.gearpump.integrationtest.checklist.CommandlineSpec`? + +Unfortunately, I searched around, and didn't find a clean way to do this in sbt. Gearpump is using nested suite for +integration test, which I think sbt don't support well with `sbt test-only <className>`. Please also see discussion at: +`https://groups.google.com/forum/#!topic/scalatest-users/l8FK7_I0agU` + +For a not that clean solution, here is the steps: + +1. Locate class `io.gearpump.integrationtest.suites.StandaloneModeSuite` source file at + `gearpump/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites` +2. Document out suite you don't want to test like this: + + ``` + class StandaloneModeSuite extends Suites( + new CommandLineSpec + // ,new ConnectorKafkaSpec, + // new RestServiceSpec, + // new ExampleSpec, + // new DynamicDagSpec, + // new StabilitySpec, + // new StormCompatibilitySpec, + // new MessageDeliverySpec + ) + ``` +3. Run `sbt it:test` + + +## Manual test by creating docker cluster manually + +To launch a Gearpump cluster manually, you can run the commands as follows. +You can launch as many worker containers as you wish, but only one master for the time being. -To launch a Gearpump cluster manually, you can run the commands as follows. You can launch so many worker containers, as you wish. But only one master for the time being. ``` export GEARPUMP_HOME=/path/to/gearpump/dist -docker run -d \ - -h master0 --name master0 \ - -v $GEARPUMP_HOME:/opt/gearpump \ - -e JAVA_OPTS=-Dgearpump.cluster.masters.0=master0:3000 \ - -p 8090:8090 \ - stanleyxu2005/gpct-jdk8 \ - master -ip master0 -port 3000 - -docker run -d \ - --link master0 \ - -v $GEARPUMP_HOME:/opt/gearpump \ - -e JAVA_OPTS=-Dgearpump.cluster.masters.0=master0:3000 \ - stanleyxu2005/gpct-jdk8 \ - worker +## Start Master node +docker run -d -h master0 -v /etc/localtime:/etc/localtime:ro -e JAVA_OPTS=-Dgearpump.cluster.masters.0=master0:3000 -v $GEARPUMP_HOME:/opt/gearpump -v /tmp/gearpump:/var/log/gearpump --name master0 stanleyxu2005/gearpump-launcher master -ip master0 -port 3000 + +## Start Worker0 node +docker run -d -h worker0 -v /etc/localtime:/etc/localtime:ro -e JAVA_OPTS=-Dgearpump.cluster.masters.0=master0:3000 -v $GEARPUMP_HOME:/opt/gearpump -v /tmp/gearpump:/var/log/gearpump --link master0 --name worker0 stanleyxu2005/gearpump-launcher worker + +## ... + +## Start Worker1 node +docker run -d -h worker1 -v /etc/localtime:/etc/localtime:ro -e JAVA_OPTS=-Dgearpump.cluster.masters.0=master0:3000 -v $GEARPUMP_HOME:/opt/gearpump -v /tmp/gearpump:/var/log/gearpump --link master0 --name worker0 stanleyxu2005/gearpump-launcher worker + ``` Have a nice test drive! \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala index 6c40a73..c07222f 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala @@ -19,27 +19,28 @@ package io.gearpump.integrationtest import io.gearpump.cluster.MasterToAppMaster import io.gearpump.cluster.MasterToAppMaster.AppMasterData -import org.apache.log4j.Logger +import io.gearpump.util.LogUtil import org.scalatest._ /** * The abstract test spec */ -trait TestSpecBase extends WordSpec with Matchers with BeforeAndAfterEach with BeforeAndAfterAll { +trait TestSpecBase + extends WordSpec with Matchers with BeforeAndAfterEachTestData with BeforeAndAfterAll { - private val LOG = Logger.getLogger(getClass) + private def LOGGER = LogUtil.getLogger(getClass) override def beforeAll(): Unit = { super.beforeAll() if (!MiniClusterProvider.managed) { - LOG.info("Will test with a default standalone mini cluster") + LOGGER.info("Will test with a default standalone mini cluster") MiniClusterProvider.get.start() } } override def afterAll(): Unit = { if (!MiniClusterProvider.managed) { - LOG.info("Will shutdown the default mini cluster") + LOGGER.info("Will shutdown the default mini cluster") MiniClusterProvider.get.shutDown() } super.afterAll() @@ -54,16 +55,25 @@ trait TestSpecBase extends WordSpec with Matchers with BeforeAndAfterEach with B var restartClusterRequired: Boolean = false - override def beforeEach() = { + override def beforeEach(td: TestData) = { + + LOGGER.debug(s">### =============================================================") + LOGGER.debug(s">###1 Prepare test: ${td.name}\n") + assert(cluster != null, "Configure MiniCluster properly in suite spec") cluster.isAlive shouldBe true restClient.listRunningApps().isEmpty shouldBe true + LOGGER.debug(s">### =============================================================") + LOGGER.debug(s">###2 Start test: ${td.name}\n") } - override def afterEach() = { + override def afterEach(td: TestData) = { + LOGGER.debug(s"<### =============================================================") + LOGGER.debug(s"<###3 End test: ${td.name}\n") + if (restartClusterRequired || !cluster.isAlive) { restartClusterRequired = false - LOG.info("Will restart the cluster for next test case") + LOGGER.info("Will restart the cluster for next test case") cluster.restart() } else { restClient.listRunningApps().foreach(app => { @@ -78,5 +88,4 @@ trait TestSpecBase extends WordSpec with Matchers with BeforeAndAfterEach with B app.appName shouldEqual expectedAppName app } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala index 59c956b..7c9176a 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala @@ -19,6 +19,7 @@ package io.gearpump.integrationtest.checklist import io.gearpump.integrationtest.{Util, TestSpecBase} import io.gearpump.integrationtest.kafka._ +import org.scalatest.TestData /** * The test spec checks the Kafka datasource connector @@ -39,8 +40,8 @@ class ConnectorKafkaSpec extends TestSpecBase { super.afterAll() } - override def afterEach() = { - super.afterEach() + override def afterEach(test: TestData) = { + super.afterEach(test) if (producer != null) { producer.stop() producer = null @@ -67,7 +68,8 @@ class ConnectorKafkaSpec extends TestSpecBase { // verify expectAppIsRunning(appId, "KafkaReadWrite") - Util.retryUntil(kafkaCluster.getLatestOffset(sinkTopic) == messageNum) + Util.retryUntil(()=>kafkaCluster.getLatestOffset(sinkTopic) == messageNum, + "kafka all message written") } } @@ -95,21 +97,22 @@ class ConnectorKafkaSpec extends TestSpecBase { // verify #1 expectAppIsRunning(appId, "KafkaReadWrite") - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") // verify #2 val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max restClient.killExecutor(appId, executorToKill) shouldBe true - Util.retryUntil(restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill) + Util.retryUntil(()=>restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill, + s"executor $executorToKill killed") // verify #3 val detector = new MessageLossDetector(producer.lastWriteNum) val kafkaReader = new SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort) - Util.retryUntil({ + Util.retryUntil(()=>{ kafkaReader.read() detector.allReceived - }) + }, "kafka all message read") } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala index 44f98aa..c8c57f5 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala @@ -50,10 +50,10 @@ class DynamicDagSpec extends TestSpecBase { val formerProcessors = restClient.queryStreamingAppDetail(appId).processors replaceProcessor(appId, 1, sumTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil({ + Util.retryUntil(()=>{ laterProcessors = restClient.queryStreamingAppDetail(appId).processors laterProcessors.size == formerProcessors.size + 1 - }) + }, "new processor successfully added") processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") } @@ -65,10 +65,10 @@ class DynamicDagSpec extends TestSpecBase { val formerProcessors = restClient.queryStreamingAppDetail(appId).processors replaceProcessor(appId, 0, splitTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil({ + Util.retryUntil(()=>{ laterProcessors = restClient.queryStreamingAppDetail(appId).processors laterProcessors.size == formerProcessors.size + 1 - }) + }, "new processor added") processorHasThroughput(appId, laterProcessors.keySet.max, "sendThroughput") } @@ -80,20 +80,21 @@ class DynamicDagSpec extends TestSpecBase { val formerProcessors = restClient.queryStreamingAppDetail(appId).processors replaceProcessor(appId, 1, sumTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil({ + Util.retryUntil(()=>{ laterProcessors = restClient.queryStreamingAppDetail(appId).processors laterProcessors.size == formerProcessors.size + 1 - }) + }, "new processor added") processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") val fakeTaskClass = "io.gearpump.streaming.examples.wordcount.Fake" replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass) - Util.retryUntil({ + Util.retryUntil(()=>{ val processorsAfterFailure = restClient.queryStreamingAppDetail(appId).processors processorsAfterFailure.size == laterProcessors.size - }) + }, "new processor added") val currentClock = restClient.queryStreamingAppDetail(appId).clock - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > currentClock) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > currentClock, + "app clock is advancing") } "fall back to last dag version when AppMaster HA triggered" in { @@ -105,14 +106,15 @@ class DynamicDagSpec extends TestSpecBase { val formerProcessors = restClient.queryStreamingAppDetail(appId).processors replaceProcessor(appId, 1, sumTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil({ + Util.retryUntil(()=>{ laterProcessors = restClient.queryStreamingAppDetail(appId).processors laterProcessors.size == formerProcessors.size + 1 - }) + }, "new processor added") processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") restClient.killAppMaster(appId) shouldBe true - Util.retryUntil(restClient.queryApp(appId).appMasterPath != formerAppMaster) + Util.retryUntil(()=>restClient.queryApp(appId).appMasterPath != formerAppMaster, + "new AppMaster created") val processors = restClient.queryStreamingAppDetail(appId).processors processors.size shouldEqual laterProcessors.size } @@ -124,7 +126,7 @@ class DynamicDagSpec extends TestSpecBase { val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length) success shouldBe true expectAppIsRunning(appId, solName) - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") appId } @@ -145,13 +147,13 @@ class DynamicDagSpec extends TestSpecBase { } private def processorHasThroughput(appId: Int, processorId: Int, metrics: String): Unit = { - Util.retryUntil({ + Util.retryUntil(()=>{ val actual = restClient.queryStreamingAppMetrics(appId, current = false, path = "processor" + processorId) val throughput = actual.metrics.filter(_.value.name.endsWith(metrics)) throughput.size should be > 0 throughput.forall(_.value.asInstanceOf[Meter].count > 0L) - }) + }, "new processor has message received") } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala index 2772bc3..efee3c6 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala @@ -17,15 +17,19 @@ */ package io.gearpump.integrationtest.checklist +import io.gearpump.integrationtest.Docker._ import io.gearpump.integrationtest.{Docker, TestSpecBase, Util} import io.gearpump.streaming._ import io.gearpump.streaming.appmaster.ProcessorSummary +import org.apache.log4j.Logger /** * The test spec will perform destructive operations to check the stability */ class ExampleSpec extends TestSpecBase { + private val LOG = Logger.getLogger(getClass) + "distributed shell" should { "execute commands on machines where its executors are running" in { val distShellJar = cluster.queryBuiltInExampleJars("distributedshell-").head @@ -40,7 +44,8 @@ class ExampleSpec extends TestSpecBase { "-appid", appId.toString, "-command", "hostname" ) - val expectedHostNames = cluster.getWorkerHosts.map(Docker.execAndCaptureOutput(_, "hostname")) + + val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName(_)) def verify(): Boolean = { val workerNum = cluster.getWorkerHosts.length @@ -49,7 +54,8 @@ class ExampleSpec extends TestSpecBase { expectedHostNames.forall(result.contains) } - Util.retryUntil(verify()) + Util.retryUntil(()=>verify(), + s"executors started on all expected hosts ${expectedHostNames.mkString(", ")}") } } @@ -63,7 +69,7 @@ class ExampleSpec extends TestSpecBase { val formerSubmissionSuccess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) formerSubmissionSuccess shouldBe true expectAppIsRunning(formerAppId, wordCountName) - Util.retryUntil(restClient.queryStreamingAppDetail(formerAppId).clock > 0) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app running") restClient.killApp(formerAppId) // exercise @@ -103,9 +109,10 @@ class ExampleSpec extends TestSpecBase { expectAppIsRunning(appId, appName) // exercise - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app submitted") val formerClock = restClient.queryStreamingAppDetail(appId).clock - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > formerClock) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > formerClock, + "app clock is advancing") } "can change the parallelism and description of a processor" in { @@ -126,10 +133,10 @@ class ExampleSpec extends TestSpecBase { val success = restClient.replaceStreamingAppProcessor(appId, replaceMe) success shouldBe true var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil({ + Util.retryUntil(()=>{ laterProcessors = restClient.queryStreamingAppDetail(appId).processors laterProcessors.size == formerProcessors.size + 1 - }) + }, "new process added") val laterProcessor0 = laterProcessors.get(expectedProcessorId).get laterProcessor0.parallelism shouldEqual expectedParallelism laterProcessor0.description shouldEqual expectedDescription http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala index a9fdee5..35c31cd 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala @@ -1,12 +1,16 @@ package io.gearpump.integrationtest.checklist +import io.gearpump.integrationtest.Docker._ import io.gearpump.integrationtest.hadoop.HadoopCluster._ import io.gearpump.integrationtest.{Util, TestSpecBase} -import io.gearpump.integrationtest.kafka.{SimpleKafkaReader, MessageLossDetector, NumericalDataProducer} +import io.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader, MessageLossDetector, NumericalDataProducer} import io.gearpump.integrationtest.kafka.KafkaCluster._ +import org.apache.log4j.Logger class MessageDeliverySpec extends TestSpecBase { + private val LOG = Logger.getLogger(getClass) + override def beforeAll(): Unit = { super.beforeAll() } @@ -15,10 +19,6 @@ class MessageDeliverySpec extends TestSpecBase { super.afterAll() } - override def afterEach() = { - super.afterEach() - } - "Gearpump" should { "support exactly-once message delivery" in { withKafkaCluster(cluster) { kafkaCluster => @@ -49,27 +49,45 @@ class MessageDeliverySpec extends TestSpecBase { // verify #1 expectAppIsRunning(appId, "MessageCount") - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app is running") // wait for checkpoint to take place Thread.sleep(1000) - // verify #2 + LOG.info("Trigger message replay by kill and restart the executors") val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max restClient.killExecutor(appId, executorToKill) shouldBe true - Util.retryUntil(restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill) + Util.retryUntil(()=> restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill, + s"executor $executorToKill killed") producer.stop() + val producedNumbers = producer.producedNumbers + LOG.info(s"In total, numbers in range[${producedNumbers.start}, ${producedNumbers.end - 1}] have been written to Kafka") // verify #3 - val count = kafkaCluster.getLatestOffset(sourceTopic) + 1 - val detector = new MessageLossDetector(count) + val kafkaSourceOffset = kafkaCluster.getLatestOffset(sourceTopic) + + assert(producedNumbers.size == kafkaSourceOffset, "produced message should match Kafka queue size") + + LOG.info(s"The Kafka source topic $sourceTopic offset is " + kafkaSourceOffset) + + // The sink processor of this job (MessageCountApp) writes total message + // count to Kafka Sink periodically (once every checkpoint interval). + // The detector keep record of latest message count. + val detector = new ResultVerifier { + var latestMessageCount: Int = 0 + override def onNext(messageCount: Int): Unit = { + this.latestMessageCount = messageCount + } + } + val kafkaReader = new SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort) - Util.retryUntil({ + Util.retryUntil(()=>{ kafkaReader.read() - detector.received(count) - }) + LOG.info(s"Received message count: ${detector.latestMessageCount}, expect: ${producedNumbers.size}") + detector.latestMessageCount == producedNumbers.size + }, "MessageCountApp calculated message count matches expected in case of message replay") } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala index 92ed3ec..722cb33 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala @@ -104,7 +104,7 @@ class RestServiceSpec extends TestSpecBase { // exercise expectMetricsAvailable( - restClient.queryStreamingAppMetrics(appId, current = true).metrics.nonEmpty) + restClient.queryStreamingAppMetrics(appId, current = true).metrics.nonEmpty, "metrics available") val actual = restClient.queryStreamingAppMetrics(appId, current = true) actual.path shouldEqual s"app$appId.processor*" actual.metrics.foreach(metric => { @@ -116,7 +116,7 @@ class RestServiceSpec extends TestSpecBase { expectMetricsAvailable({ val laterMetrics = restClient.queryStreamingAppMetrics(appId, current = true).metrics laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump - }) + }, "metrics available") } "can obtain application corresponding executors' metrics and the metrics will keep changing" in { @@ -128,7 +128,7 @@ class RestServiceSpec extends TestSpecBase { // exercise expectMetricsAvailable( - restClient.queryExecutorMetrics(appId, current = true).metrics.nonEmpty) + restClient.queryExecutorMetrics(appId, current = true).metrics.nonEmpty, "metrics available") val actual = restClient.queryExecutorMetrics(appId, current = true) actual.path shouldEqual s"app$appId.executor*" actual.metrics.foreach(metric => { @@ -140,7 +140,7 @@ class RestServiceSpec extends TestSpecBase { expectMetricsAvailable({ val laterMetrics = restClient.queryExecutorMetrics(appId, current = true).metrics laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump - }) + }, "metrics available") } } @@ -194,10 +194,10 @@ class RestServiceSpec extends TestSpecBase { // exercise var runningWorkers: Array[WorkerSummary] = Array.empty - Util.retryUntil({ + Util.retryUntil(()=>{ runningWorkers = restClient.listRunningWorkers() runningWorkers.length == expectedWorkersCount - }) + }, "all workers running") runningWorkers.foreach { worker => worker.state shouldEqual MasterToAppMaster.AppMasterActive } @@ -207,12 +207,13 @@ class RestServiceSpec extends TestSpecBase { // setup restartClusterRequired = true val formerWorkersCount = cluster.getWorkerHosts.length - Util.retryUntil(restClient.listRunningWorkers().length == formerWorkersCount) + Util.retryUntil(()=>restClient.listRunningWorkers().length == formerWorkersCount, + "all workers running") val workerName = "newWorker" // exercise cluster.addWorkerNode(workerName) - Util.retryUntil(restClient.listRunningWorkers().length > formerWorkersCount) + Util.retryUntil(()=>restClient.listRunningWorkers().length > formerWorkersCount, "new worker added") cluster.getWorkerHosts.length shouldEqual formerWorkersCount + 1 restClient.listRunningWorkers().length shouldEqual formerWorkersCount + 1 } @@ -231,7 +232,7 @@ class RestServiceSpec extends TestSpecBase { "can obtain master's metrics and the metrics will keep changing" in { // exercise expectMetricsAvailable( - restClient.queryMasterMetrics(current = true).metrics.nonEmpty) + restClient.queryMasterMetrics(current = true).metrics.nonEmpty, "metrics available") val actual = restClient.queryMasterMetrics(current = true) actual.path shouldEqual s"master" actual.metrics.foreach(metric => { @@ -243,7 +244,7 @@ class RestServiceSpec extends TestSpecBase { expectMetricsAvailable({ val laterMetrics = restClient.queryMasterMetrics(current = true).metrics laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump - }) + }, "metrics available") } "can obtain workers' metrics and the metrics will keep changing" in { @@ -251,7 +252,7 @@ class RestServiceSpec extends TestSpecBase { restClient.listRunningWorkers().foreach { worker => val workerId = worker.workerId expectMetricsAvailable( - restClient.queryWorkerMetrics(workerId, current = true).metrics.nonEmpty) + restClient.queryWorkerMetrics(workerId, current = true).metrics.nonEmpty, "metrics available") val actual = restClient.queryWorkerMetrics(workerId, current = true) actual.path shouldEqual s"worker$workerId" actual.metrics.foreach(metric => { @@ -263,7 +264,7 @@ class RestServiceSpec extends TestSpecBase { expectMetricsAvailable({ val laterMetrics = restClient.queryWorkerMetrics(workerId, current = true).metrics laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump - }) + }, "metrics available") } } } @@ -328,7 +329,7 @@ class RestServiceSpec extends TestSpecBase { val originAppDetail = restClient.queryStreamingAppDetail(originAppId) // exercise - Util.retryUntil(restClient.restartApp(originAppId)) + Util.retryUntil(()=>restClient.restartApp(originAppId), "app restarted") val killedApp = restClient.queryApp(originAppId) killedApp.appId shouldEqual originAppId killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive @@ -351,10 +352,10 @@ class RestServiceSpec extends TestSpecBase { actualApp.status shouldEqual MasterToAppMaster.AppMasterInActive } - private def expectMetricsAvailable(condition: => Boolean): Unit = { + private def expectMetricsAvailable(condition: => Boolean, conditionDescription: String): Unit = { val config = restClient.queryMasterConfig() val reportInterval = Duration(config.getString("gearpump.metrics.report-interval-ms") + "ms") - Util.retryUntil(condition, interval = reportInterval) + Util.retryUntil(()=>condition, conditionDescription, interval = reportInterval) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala index 0de08b6..ffad3ec 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala @@ -17,8 +17,10 @@ */ package io.gearpump.integrationtest.checklist +import io.gearpump.WorkerId import io.gearpump.cluster.MasterToAppMaster import io.gearpump.integrationtest.{TestSpecBase, Util} +import io.gearpump.util.{Constants, LogUtil} import scala.concurrent.duration.Duration @@ -27,18 +29,21 @@ import scala.concurrent.duration.Duration */ class StabilitySpec extends TestSpecBase { + private val LOG = LogUtil.getLogger(getClass) + "kill appmaster" should { "restart the whole application" in { // setup val appId = commandLineClient.submitApp(wordCountJar) val formerAppMaster = restClient.queryApp(appId).appMasterPath - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") ensureClockStoredInMaster() // exercise restClient.killAppMaster(appId) shouldBe true // todo: how long master will begin to recover and how much time for the recovering? - Util.retryUntil(restClient.queryApp(appId).appMasterPath != formerAppMaster) + Util.retryUntil(()=>restClient.queryApp(appId).appMasterPath != formerAppMaster, + "appmaster killed and restarted") // verify val laterAppMaster = restClient.queryStreamingAppDetail(appId) @@ -51,14 +56,15 @@ class StabilitySpec extends TestSpecBase { "will create a new executor and application will replay from the latest application clock" in { // setup val appId = commandLineClient.submitApp(wordCountJar) - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max ensureClockStoredInMaster() // exercise restClient.killExecutor(appId, executorToKill) shouldBe true // todo: how long appmaster will begin to recover and how much time for the recovering? - Util.retryUntil(restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill) + Util.retryUntil(()=>restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill, + s"executor $executorToKill killed and restarted") // verify val laterAppMaster = restClient.queryStreamingAppDetail(appId) @@ -67,20 +73,54 @@ class StabilitySpec extends TestSpecBase { } } + private def hostName(workerId: WorkerId): String = { + val worker = restClient.listRunningWorkers().find(_.workerId == workerId) + // Parse hostname from JVM info (in format: PID@hostname) + val hostname = worker.get.jvmName.split("@")(1) + hostname + } + "kill worker" should { "worker will not recover but all its executors will be migrated to other workers" in { // setup restartClusterRequired = true val appId = commandLineClient.submitApp(wordCountJar) - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) - val maximalExecutorId = restClient.queryExecutorBrief(appId).map(_.executorId).max - val workerToKill = cluster.getWorkerHosts.head + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + + val allexecutors = restClient.queryExecutorBrief(appId) + val maxExecutor = allexecutors.sortBy(_.executorId).last ensureClockStoredInMaster() - // exercise - cluster.removeWorkerNode(workerToKill) - // todo: how long master will begin to recover and how much time for the recovering? - Util.retryUntil(restClient.queryExecutorBrief(appId).map(_.executorId).max > maximalExecutorId) + val appMaster = allexecutors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) + + LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, worker: ${maxExecutor.workerId}") + val executorsSharingSameWorker = allexecutors.filter(_.workerId == maxExecutor.workerId).map(_.executorId) + LOG.info(s"These executors sharing the same worker Id ${maxExecutor.workerId}," + + s" ${executorsSharingSameWorker.mkString(",")}") + + // kill the worker and expect restarting all killed executors on other workers. + val workerIdToKill = maxExecutor.workerId + cluster.removeWorkerNode(hostName(workerIdToKill)) + + val appMasterKilled = executorsSharingSameWorker.exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) + + def executorsMigrated(): Boolean = { + val executors = restClient.queryExecutorBrief(appId) + val newAppMaster = executors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) + + if (appMasterKilled) { + newAppMaster.get.workerId != appMaster.get.workerId + } else { + // New executors will be started to replace killed executors. + // The new executors will be assigned larger executor Id. We use this trick to detect + // Whether new executors have been started successfully. + executors.map(_.executorId).max > maxExecutor.executorId + } + } + + Util.retryUntil(()=> { + executorsMigrated() + }, s"new executor created with id > ${maxExecutor.executorId} when worker is killed") // verify val laterAppMaster = restClient.queryStreamingAppDetail(appId) @@ -104,7 +144,8 @@ class StabilitySpec extends TestSpecBase { // verify val aliveWorkers = cluster.getWorkerHosts - Util.retryUntil(aliveWorkers.forall(worker => !cluster.nodeIsOnline(worker))) + Util.retryUntil(()=>aliveWorkers.forall(worker => !cluster.nodeIsOnline(worker)), + "all workers down") } } @@ -112,5 +153,4 @@ class StabilitySpec extends TestSpecBase { // todo: 5000ms is a fixed sync period in clock service. we wait for 5000ms to assume the clock is stored Thread.sleep(5000) } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala index ef737a0..710cfa7 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala @@ -26,7 +26,12 @@ import io.gearpump.integrationtest.{TestSpecBase, Util} */ class StormCompatibilitySpec extends TestSpecBase { - private lazy val stormClient = new StormClient(cluster.getMastersAddresses, restClient) + private lazy val stormClient = { + new StormClient(cluster, restClient) + } + + val `version0.9` = "09" + val `version0.10` = "010" override def beforeAll(): Unit = { super.beforeAll() @@ -39,8 +44,8 @@ class StormCompatibilitySpec extends TestSpecBase { } def withStorm(testCode: String => Unit): Unit = { - testCode("09") - testCode("010") + testCode(`version0.9`) + testCode(`version0.10`) } def getTopologyName(name: String, stormVersion: String): String = { @@ -65,7 +70,7 @@ class StormCompatibilitySpec extends TestSpecBase { appName = topologyName) // verify - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") } s"support to run a python version of wordcount ($stormVersion)" in { @@ -80,7 +85,7 @@ class StormCompatibilitySpec extends TestSpecBase { appName = topologyName) // verify - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") } s"support DRPC ($stormVersion)" in { @@ -97,9 +102,9 @@ class StormCompatibilitySpec extends TestSpecBase { val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway) // verify - Util.retryUntil { + Util.retryUntil(()=>{ drpcClient.execute("reach", "notaurl.com") == "0" - } + }, "drpc reach == 0") drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16" drpcClient.execute("reach", "engineering.twitter.com/blog/5") shouldEqual "14" } @@ -116,7 +121,7 @@ class StormCompatibilitySpec extends TestSpecBase { appName = topologyName) // verify - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") } s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in { @@ -151,12 +156,13 @@ class StormCompatibilitySpec extends TestSpecBase { args = args.mkString(" "), appName = topologyName) - Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0) + Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") // kill executor and verify at-least-once is guaranteed on application restart val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max restClient.killExecutor(appId, executorToKill) shouldBe true - Util.retryUntil(restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill) + Util.retryUntil(()=>restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill, + s"executor $executorToKill killed") // verify no message loss val detector = new @@ -165,10 +171,10 @@ class StormCompatibilitySpec extends TestSpecBase { SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort) - Util.retryUntil { + Util.retryUntil (()=>{ kafkaReader.read() detector.allReceived - } + }, "all kafka message read") } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala index b202d01..65d0d4c 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala @@ -26,16 +26,14 @@ import org.scalatest._ * Launch a Gearpump cluster in standalone mode and run all test specs */ class StandaloneModeSuite extends Suites( - new CommandLineSpec, - new ConnectorKafkaSpec, new RestServiceSpec, new ExampleSpec, new DynamicDagSpec, - new StabilitySpec, new StormCompatibilitySpec, + new StabilitySpec, + new ConnectorKafkaSpec, new MessageDeliverySpec - ) with BeforeAndAfterAll { override def beforeAll(): Unit = { @@ -48,5 +46,4 @@ class StandaloneModeSuite extends Suites( MiniClusterProvider.get.shutDown() super.afterAll() } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala index 3656219..64ef9a7 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala @@ -17,95 +17,189 @@ */ package io.gearpump.integrationtest +import io.gearpump.integrationtest.ShellExec._ +import org.apache.log4j.Logger + /** * The class is used to execute Docker commands. */ object Docker { - def listContainers(): Seq[String] = { - ShellExec.execAndCaptureOutput("docker ps -q -a", "LIST") - .split("\n").filter(_.nonEmpty) + private val LOG = Logger.getLogger(getClass) + + /** + * @throws RuntimeException in case retval != 0 + */ + private def doExecute(container: String, command: String): String = { + ShellExec.execAndCaptureOutput(s"docker exec $container $command", s"EXEC $container") } - def containerIsRunning(name: String): Boolean = { - ShellExec.execAndCaptureOutput(s"docker ps -q --filter name=$name", s"FIND $name").nonEmpty + private def doExecuteSilently(container: String, command: String): Boolean = { + ShellExec.exec(s"docker exec $container $command", s"EXEC $container") } - def getContainerIPAddr(name: String): String = { - Docker.inspect(name, "--format={{.NetworkSettings.IPAddress}}") + + /** + * @throws RuntimeException in case retval != 0 + */ + final def execute(container: String, command: String): String = { + trace(container, s"Execute $command") { + doExecute(container, command) + } } - def containerExists(name: String): Boolean = { - ShellExec.execAndCaptureOutput(s"docker ps -q -a --filter name=$name", s"FIND $name").nonEmpty + + final def executeSilently(container: String, command: String): Boolean = { + trace(container, s"Execute silently $command") { + doExecuteSilently(container, command) + } + } + + final def listContainers(): Seq[String] = { + trace("", s"Listing how many containers...") { + ShellExec.execAndCaptureOutput("docker ps -q -a", "LIST") + .split("\n").filter(_.nonEmpty) + } + } + + final def containerIsRunning(name: String): Boolean = { + trace(name, s"Check container running or not...") { + ShellExec.execAndCaptureOutput(s"docker ps -q --filter name=$name", s"FIND $name").nonEmpty + } + } + + final def getContainerIPAddr(name: String): String = { + trace(name, s"Get Ip Address") { + Docker.inspect(name, "--format={{.NetworkSettings.IPAddress}}") + } + } + + final def containerExists(name: String): Boolean = { + trace(name, s"Check container existing or not...") { + ShellExec.execAndCaptureOutput(s"docker ps -q -a --filter name=$name", s"FIND $name").nonEmpty + } } /** * @throws RuntimeException in case particular container is created already */ - def createAndStartContainer(name: String, image: String, command: String, - environ: Map[String, String] = Map.empty, // key, value - volumes: Map[String, String] = Map.empty, // from, to - knownHosts: Set[String] = Set.empty, - tunnelPorts: Set[Int] = Set.empty): String = { + final def createAndStartContainer(name: String, image: String, command: String, + environ: Map[String, String] = Map.empty, // key, value + volumes: Map[String, String] = Map.empty, // from, to + knownHosts: Set[String] = Set.empty, + tunnelPorts: Set[Int] = Set.empty): String = { + if (containerExists(name)) { killAndRemoveContainer(name) } - val optsBuilder = new StringBuilder - optsBuilder.append("-d") // run in background - optsBuilder.append(" -h " + name) // use container name as hostname - optsBuilder.append(" -v /etc/localtime:/etc/localtime:ro") // synchronize timezone settings - - environ.foreach { case (key, value) => - optsBuilder.append(s" -e $key=$value") - } - volumes.foreach { case (from, to) => - optsBuilder.append(s" -v $from:$to") + trace(name, s"Create and start $name ($image)...") { + + val optsBuilder = new StringBuilder + optsBuilder.append("-d") // run in background + optsBuilder.append(" -h " + name) // use container name as hostname + optsBuilder.append(" -v /etc/localtime:/etc/localtime:ro") // synchronize timezone settings + + environ.foreach { case (key, value) => + optsBuilder.append(s" -e $key=$value") + } + volumes.foreach { case (from, to) => + optsBuilder.append(s" -v $from:$to") + } + knownHosts.foreach(host => + optsBuilder.append(" --link " + host) + ) + tunnelPorts.foreach(port => + optsBuilder.append(s" -p $port:$port") + ) + createAndStartContainer(name, optsBuilder.toString(), command, image) } - knownHosts.foreach(host => - optsBuilder.append(" --link " + host) - ) - tunnelPorts.foreach(port => - optsBuilder.append(s" -p $port:$port") - ) - - createAndStartContainer(name, optsBuilder.toString(), command, image) } /** * @throws RuntimeException in case particular container is created already */ - def createAndStartContainer(name: String, options: String, command: String, image: String): String = { + private def createAndStartContainer(name: String, options: String, command: String, image: String): String = { ShellExec.execAndCaptureOutput(s"docker run $options --name $name $image $command", s"MAKE $name") } - def killAndRemoveContainer(name: String): Boolean = { - ShellExec.exec(s"docker rm -f $name", s"STOP $name") + final def killAndRemoveContainer(name: String): Boolean = { + trace(name, s"kill and remove container"){ + ShellExec.exec(s"docker rm -f $name", s"STOP $name") + } } - def killAndRemoveContainer(names: Array[String]): Boolean = { + final def killAndRemoveContainer(names: Array[String]): Boolean = { assert(names.length > 0) val args = names.mkString(" ") - ShellExec.exec(s"docker rm -f $args", s"STOP MUL.") + trace(names.mkString(","), s"kill and remove containers") { + ShellExec.exec(s"docker rm -f $args", s"STOP $args.") + } } - def exec(container: String, command: String): Boolean = { - ShellExec.exec(s"docker exec $container $command", s"EXEC $container") + private def inspect(container: String, option: String): String = { + ShellExec.execAndCaptureOutput(s"docker inspect $option $container", s"EXEC $container") } - def inspect(container: String, option: String): String = { - ShellExec.execAndCaptureOutput(s"docker inspect $option $container", s"EXEC $container") + final def curl(container: String, url: String, options: Array[String] = Array.empty[String]): String = { + trace(container, s"curl $url") { + doExecute(container, s"curl -s ${options.mkString(" ")} $url") + } } - /** - * @throws RuntimeException in case retval != 0 - */ - def execAndCaptureOutput(container: String, command: String): String = { - ShellExec.execAndCaptureOutput(s"docker exec $container $command", s"EXEC $container") + final def getHostName(container: String): String = { + trace(container, s"Get hostname of container...") { + doExecute(container, "hostname") + } } - def killProcess(container: String, pid: Int, signal: String = "SIGKILL"): Boolean = { - exec(container, s"kill -$signal $pid") + final def getNetworkGateway(container: String): String = { + trace(container, s"Get gateway of container...") { + doExecute(container, "ip route").split("\\s+")(2) + } + } + final def killProcess(container: String, pid: Int, signal: String = "SIGKILL"): Boolean = { + trace(container, s"Kill process pid: $pid") { + doExecuteSilently(container, s"kill -$signal $pid") + } } -} + final def findJars(container: String, folder: String): Array[String] = { + trace(container, s"Find jars under $folder") { + doExecute(container, s"find $folder") + .split("\n").filter(_.endsWith(".jar")) + } + } + + private def trace[T](container: String, msg: String)(fun: => T): T = { + Console.println() + LOG.debug(s"Container $container====>> $msg") + LOG.debug("INPUT==>>") + val response = fun + LOG.debug("<<==OUTPUT") + + LOG.debug(brief(response)) + + LOG.debug(s"<<====Command END. Container $container, $msg \n") + response + } + + private val PREVIEW_MAX_LENGTH = 1024 + + private def brief[T](input: T): String = { + val output = input match { + case true => + "Success|True" + case false => + "Failure|False" + case x: Array[Any] => + "Success: [" + x.mkString(",") + "]" + case x => + x.toString + } + + val preview = if (output.length > PREVIEW_MAX_LENGTH) + output.substring(0, PREVIEW_MAX_LENGTH) + "..." else output + preview + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala index 642d94e..1c01b51 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala @@ -17,12 +17,15 @@ */ package io.gearpump.integrationtest +import org.apache.commons.lang.text.{StrMatcher, StrTokenizer} import org.apache.log4j.Logger +import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent._ import scala.concurrent.duration._ import scala.sys.process._ +import scala.collection.JavaConversions._ /** * The class is used to execute command in a shell @@ -32,10 +35,19 @@ object ShellExec { private val LOG = Logger.getLogger(getClass) private val PROCESS_TIMEOUT = 2.minutes + /** + * The builtin command line parser by ProcessBuilder (implicit sys.process) don't + * respect the quote chars (' and ") + */ + private def splitQuotedString(str: String): List[String] = { + val splitter = new QuotedStringTokenizer(str, " \t\n\r") + splitter.asInstanceOf[java.util.Enumeration[String]].toList + } + def exec(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT): Boolean = { - LOG.debug(s"$sender -> `$command`") + LOG.debug(s"$sender => `$command`") - val p = command.run() + val p = splitQuotedString(command).run() val f = Future(blocking(p.exitValue())) // wrap in Future val retval = try { Await.result(f, timeout) @@ -46,7 +58,7 @@ object ShellExec { p.exitValue() } - LOG.debug(s"$sender <- exit $retval") + LOG.debug(s"$sender <= exit $retval") retval == 0 } @@ -56,7 +68,7 @@ object ShellExec { val buf = new StringBuilder val processLogger = ProcessLogger((o: String) => buf.append(o).append("\n"), (e: String) => buf.append(e).append("\n")) - val p = command.run(processLogger) + val p = splitQuotedString(command).run(processLogger) val f = Future(blocking(p.exitValue())) // wrap in Future val retval = try { Await.result(f, timeout) @@ -66,9 +78,9 @@ object ShellExec { p.exitValue() } val output = buf.toString().trim - val PREVIEW_MAX_LENGTH = 1024 + val PREVIEW_MAX_LENGTH = 200 val preview = if (output.length > PREVIEW_MAX_LENGTH) - output.substring(0, PREVIEW_MAX_LENGTH) + "\n..." else output + output.substring(0, PREVIEW_MAX_LENGTH) + "..." else output LOG.debug(s"$sender <= `$preview` exit $retval") if (retval != 0) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala index ad4a7bd..d4dc1ee 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala @@ -20,6 +20,7 @@ package io.gearpump.integrationtest import org.apache.log4j.Logger import scala.concurrent.duration._ +import scala.util.{Failure, Success, Try} object Util { @@ -39,25 +40,31 @@ object Util { } } - def retryUntil(condition: => Boolean, attempts: Int = 15, - interval: Duration = 20.seconds): Unit = { + def retryUntil(condition: ()=> Boolean, conditionDescription: String, maxTries: Int = 15, + interval: Duration = 10.seconds): Unit = { var met = false - var attemptsLeft = attempts - - while (!met) { - try { - attemptsLeft -= 1 - met = condition - if (!met) { - throw new RuntimeException( - s"condition is not met after ${attempts - attemptsLeft} retries") - } - } catch { - case ex if attemptsLeft > 0 => - LOG.debug(s"condition is not met (maybe machine is slow). retry in ${interval.toSeconds}s ($attemptsLeft attempts left)") - Thread.sleep(interval.toMillis) + var tries = 0 + + while (!met && tries < maxTries) { + + met = Try(condition()) match { + case Success(true) => true + case Success(false) => false + case Failure(ex) => false + } + + tries += 1 + + if (!met) { + LOG.error(s"Failed due to (false == $conditionDescription), retrying for the ${tries} times...") + Thread.sleep(interval.toMillis) + } else { + LOG.info(s"Success ($conditionDescription) after ${tries} retries") } } - } + if (!met) { + throw new Exception(s"Failed after ${tries} retries, ($conditionDescription) == false") + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala index 1f14094..2217efe 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala @@ -1,6 +1,6 @@ package io.gearpump.integrationtest.hadoop -import io.gearpump.integrationtest.Docker +import io.gearpump.integrationtest.{Util, Docker} import org.apache.log4j.Logger object HadoopCluster { @@ -26,9 +26,15 @@ class HadoopCluster { def start(): Unit = { Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "") + + Util.retryUntil(()=>isAlive, "Hadoop cluster is alive") LOG.info("Hadoop cluster is started.") } + private def isAlive: Boolean = { + Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls /") + } + def getDefaultFS: String = { val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST) s"hdfs://$hostIPAddr:9000" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala index b053ffe..3244d24 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala @@ -67,8 +67,8 @@ class KafkaCluster(val advertisedHost: String, zkChroot: String = "") { "ZK_CHROOT" -> zkChroot), tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT) ) - Util.retryUntil(isAlive) - LOG.info("kafka cluster is started.") + Util.retryUntil(()=>isAlive, "kafka cluster is alive") + LOG.debug("kafka cluster is started.") } def isAlive: Boolean = { @@ -82,9 +82,7 @@ class KafkaCluster(val advertisedHost: String, zkChroot: String = "") { private lazy val hostIPAddr = Docker.getContainerIPAddr(KAFKA_HOST) def listTopics(): String = { - Docker.execAndCaptureOutput(KAFKA_HOST, - s"$KAFKA_HOME/bin/kafka-topics.sh" + - s" --zookeeper $getZookeeperConnectString -list") + kafkaListTopics(KAFKA_HOST, KAFKA_HOME, getZookeeperConnectString) } def getZookeeperConnectString: String = { @@ -96,30 +94,43 @@ class KafkaCluster(val advertisedHost: String, zkChroot: String = "") { } def createTopic(topic: String, partitions: Int = 1): Unit = { - Docker.exec(KAFKA_HOST, + LOG.debug(s"|=> Create kafka topic $topic with $partitions partitions") + + Docker.executeSilently(KAFKA_HOST, s"$KAFKA_HOME/bin/kafka-topics.sh" + s" --zookeeper $getZookeeperConnectString" + s" --create --topic $topic --partitions $partitions --replication-factor 1") } def produceDataToKafka(topic: String, messageNum: Int): Unit = { - Docker.exec(KAFKA_HOST, + Docker.executeSilently(KAFKA_HOST, s"$KAFKA_HOME/bin/kafka-topics.sh" + s" --zookeeper $getZookeeperConnectString" + s" --create --topic $topic --partitions 1 --replication-factor 1") - Docker.exec(KAFKA_HOST, + Docker.executeSilently(KAFKA_HOST, s"$KAFKA_HOME/bin/kafka-producer-perf-test.sh" + s" --broker-list $getBrokerListConnectString" + s" --topic $topic --messages $messageNum") } def getLatestOffset(topic: String): Int = { - val output = Docker.execAndCaptureOutput(KAFKA_HOST, - s"$KAFKA_HOME/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" + - s" --broker-list $getBrokerListConnectString " + + kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, getBrokerListConnectString) + } + + private def kafkaListTopics(container: String, kafkaHome: String, zookeeperConnectionString: String): String = { + LOG.debug(s"|=> Kafka list topics...") + Docker.execute(container, + s"$kafkaHome/bin/kafka-topics.sh" + + s" --zookeeper $zookeeperConnectionString -list") + } + + private def kafkaFetchLatestOffset(container: String, topic: String, kafkaHome: String, brokersList: String): Int = { + LOG.debug(s"|=> Get latest offset of topic $topic...") + val output = Docker.execute(container, + s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" + + s" --broker-list $brokersList " + s" --topic $topic --time -1") output.split(":")(2).toInt } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala index 13de780..d319d6b 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala @@ -44,6 +44,11 @@ class NumericalDataProducer(topic: String, bootstrapServers: String) { producer.close() } + /** How many message we have written in total*/ + def producedNumbers: Range = { + Range(1, lastWriteNum + 1) + } + private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = { val properties = new Properties() properties.setProperty("bootstrap.servers", bootstrapServers) @@ -62,7 +67,7 @@ class NumericalDataProducer(topic: String, bootstrapServers: String) { } } catch { case ex: InterruptedException => - LOG.info("message producing is stopped by an interrupt") + LOG.error("message producing is stopped by an interrupt") } } }) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala index 7330529..efeedae 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala @@ -25,9 +25,11 @@ trait ResultVerifier { class MessageLossDetector(totalNum: Int) extends ResultVerifier { private val bitSets = new mutable.BitSet(totalNum) + var result = List.empty[Int] override def onNext(num: Int): Unit = { bitSets.add(num) + result :+= num } def allReceived: Boolean = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala index 9d7e39f..8ecd9cd 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala @@ -17,6 +17,8 @@ */ package io.gearpump.integrationtest.minicluster +import java.io.File + import io.gearpump.integrationtest.Docker import scala.sys.process._ @@ -29,10 +31,10 @@ class BaseContainer(val host: String, command: String, tunnelPorts: Set[Int] = Set.empty) { private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher" - private val IMAGE_SUT_HOME = "/opt/gearpump" - private val IMAGE_LOG_HOME = "/var/log/gearpump" - private val LOCAL_SUT_HOME = "pwd".!!.trim + "/output/target/pack" - private val LOCAL_LOG_HOME = { + private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump" + private val DOCKER_IMAGE_LOG_HOME = "/var/log/gearpump" + private val HOST_GEARPUMP_HOME = "pwd".!!.trim + "/output/target/pack" + private val HOST_LOG_HOME = { val dir = "/tmp/gearpump" s"mkdir -p $dir".!! s"mktemp -p $dir -d".!!.trim @@ -48,8 +50,8 @@ class BaseContainer(val host: String, command: String, Docker.createAndStartContainer(host, IMAGE_NAME, command, environ = Map("JAVA_OPTS" -> CLUSTER_OPTS), volumes = Map( - LOCAL_SUT_HOME -> IMAGE_SUT_HOME, - LOCAL_LOG_HOME -> IMAGE_LOG_HOME), + HOST_GEARPUMP_HOME -> DOCKER_IMAGE_GEARPUMP_HOME, + HOST_LOG_HOME -> DOCKER_IMAGE_LOG_HOME), knownHosts = masterAddrs.map(_._1).filter(_ != host).toSet, tunnelPorts = tunnelPorts) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala index 506fc67..50988ca 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala @@ -29,7 +29,7 @@ class CommandLineClient(host: String) { private val LOG = Logger.getLogger(getClass) def listApps(): Array[String] = { - execAndCaptureOutput("gear info").split("\n").filter( + gearCommand(host, "gear info").split("\n").filter( _.startsWith("application: ") ) } @@ -50,15 +50,16 @@ class CommandLineClient(host: String) { } def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = { - execAndCaptureOutput(s"gear app -verbose true -jar $jar -executors $executorNum $args") + gearCommand(host, s"gear app -verbose true -jar $jar -executors $executorNum $args") } def submitApp(jar: String, args: String = ""): Int = { + LOG.debug(s"|=> Submit Application $jar...") submitAppUse("gear app", jar, args) } private def submitAppUse(launcher: String, jar: String, args: String = ""): Int = try { - execAndCaptureOutput(s"$launcher -jar $jar $args").split("\n") + gearCommand(host, s"$launcher -jar $jar $args").split("\n") .filter(_.contains("The application id is ")).head.split(" ").last.toInt } catch { case ex: Throwable => @@ -67,15 +68,16 @@ class CommandLineClient(host: String) { } def killApp(appId: Int): Boolean = { - exec(s"gear kill -appid $appId") + tryGearCommand(host, s"gear kill -appid $appId") } - private def exec(command: String): Boolean = { - Docker.exec(host, s"/opt/start $command") + private def gearCommand(container: String, command: String): String = { + LOG.debug(s"|=> Gear command $command in container $container...") + Docker.execute(container, s"/opt/start $command") } - private def execAndCaptureOutput(command: String): String = { - Docker.execAndCaptureOutput(host, s"/opt/start $command") + private def tryGearCommand(container: String, command: String): Boolean = { + LOG.debug(s"|=> Gear command $command in container $container...") + Docker.executeSilently(container, s"/opt/start $command") } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala index c17a20d..25c211f 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala @@ -17,6 +17,8 @@ */ package io.gearpump.integrationtest.minicluster +import java.io.IOException + import io.gearpump.cluster.master.MasterNode import io.gearpump.integrationtest.{Docker, Util} import org.apache.log4j.Logger @@ -49,18 +51,36 @@ class MiniCluster { private var workers: ListBuffer[String] = ListBuffer.empty def start(workerNum: Int = 2): Unit = { - // Masters' membership cannot be modified at runtime + + // Kill master + MASTER_ADDRS.foreach{case (host, _) => + if (Docker.containerExists(host)) { + Docker.killAndRemoveContainer(host) + } + } + + // Kill existing workers + workers ++= (0 until workerNum).map("worker" + _) + workers.foreach{ worker => + if (Docker.containerExists(worker)) { + Docker.killAndRemoveContainer(worker) + } + } + + // Start Masters MASTER_ADDRS.foreach({ case (host, port) => addMasterNode(host, port) }) + + // Start Workers + workers.foreach{worker => + val container = new BaseContainer(worker, "worker", MASTER_ADDRS) + container.createAndStart() + } + + // Check cluster status expectRestClientAuthenticated() expectClusterAvailable() - - // Workers' membership can be modified at runtime - (0 to workerNum - 1).foreach(index => { - val host = "worker" + index - addWorkerNode(host) - }) } private def addMasterNode(host: String, port: Int): Unit = { @@ -69,31 +89,35 @@ class MiniCluster { } def addWorkerNode(host: String): Unit = { - val container = new BaseContainer(host, "worker", MASTER_ADDRS) - container.createAndStart() - workers += host + if (workers.find(_ == host).isEmpty) { + val container = new BaseContainer(host, "worker", MASTER_ADDRS) + container.createAndStart() + workers += host + } else { + throw new IOException(s"Cannot add new worker $host, as worker with same hostname already exists") + } } /** * @throws RuntimeException if rest client is not authenticated after N attempts */ private def expectRestClientAuthenticated(): Unit = { - Util.retryUntil({ + Util.retryUntil(()=>{ restClient.login() LOG.info("rest client has been authenticated") true - }) + }, "login successfully") } /** * @throws RuntimeException if service is not available after N attempts */ private def expectClusterAvailable(): Unit = { - Util.retryUntil({ + Util.retryUntil(()=>{ val response = restClient.queryMaster() LOG.info(s"cluster is now available with response: $response.") response.aliveFor > 0 - }) + }, "cluster running") } def isAlive: Boolean = { @@ -101,7 +125,7 @@ class MiniCluster { } def getNetworkGateway: String = { - Docker.execAndCaptureOutput(MASTER_ADDRS.head._1, "ip route").split("\\s+")(2) + Docker.getNetworkGateway(MASTER_ADDRS.head._1) } def shutDown(): Unit = { @@ -124,9 +148,9 @@ class MiniCluster { def restart(): Unit = { shutDown() - Util.retryUntil( - !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists)) - LOG.info("all containers have been killed. restarting...") + Util.retryUntil(()=> + !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists), "all docker containers killed") + LOG.info("all docker containers have been killed. restarting...") start() } @@ -146,9 +170,8 @@ class MiniCluster { Docker.containerIsRunning(host) } - private def builtInJarsUnder(folder: String) = { - Docker.execAndCaptureOutput(getMasterHosts.head, s"find $SUT_HOME/$folder") - .split("\n").filter(_.endsWith(".jar")) + private def builtInJarsUnder(folder: String): Array[String] = { + Docker.findJars(getMasterHosts.head, s"$SUT_HOME/$folder") } private def queryBuiltInJars(folder: String, subtext: String): Seq[String] = { @@ -162,5 +185,4 @@ class MiniCluster { def queryBuiltInITJars(subtext: String): Seq[String] = { queryBuiltInJars("integrationtest", subtext) } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala index 7d83c93..b565f41 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala @@ -39,6 +39,8 @@ import upickle.Js import upickle.default._ import io.gearpump.services.util.UpickleUtil._ +import scala.reflect.ClassTag + /** * A REST client to operate a Gearpump cluster */ @@ -55,16 +57,16 @@ class RestClient(host: String, port: Int) { Graph(vertexList, edgeList) } - private def decodeAs[T: upickle.default.Reader](expr: String): T = try { + private def decodeAs[T](expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try { read[T](expr) } catch { case ex: Throwable => - LOG.error(ex) + LOG.error(s"Failed to decode Rest response to ${classTag.runtimeClass.getSimpleName}") throw ex } def queryVersion(): String = { - callFromRoot("version") + curl("version") } def listWorkers(): Array[WorkerSummary] = { @@ -240,16 +242,15 @@ class RestClient(host: String, port: Int) { private val CRUD_DELETE = "-X DELETE" private def callApi(endpoint: String, option: String = ""): String = { - callFromRoot(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile")) + curl(s"api/v1.0/$endpoint", Array(option, s"--cookie $cookieFile")) } - private def callFromRoot(endpoint: String, options: Array[String] = Array.empty[String]): String = { - Docker.execAndCaptureOutput(host, s"curl -s ${options.mkString(" ")} http://$host:$port/$endpoint") + private def curl(endpoint: String, options: Array[String] = Array.empty[String]): String = { + Docker.curl(host, s"http://$host:$port/$endpoint", options) } def login(): Unit = { - callFromRoot("login", Array(CRUD_POST, s"--cookie-jar $cookieFile", + curl("login", Array(CRUD_POST, s"--cookie-jar $cookieFile", "--data username=admin", "--data password=admin")) } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala index dbcc53d..7b460d7 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/storm/StormClient.scala @@ -20,11 +20,14 @@ package io.gearpump.integrationtest.storm import backtype.storm.utils.{Utils, DRPCClient} import io.gearpump.integrationtest.{Util, Docker} -import io.gearpump.integrationtest.minicluster.{RestClient, BaseContainer} +import io.gearpump.integrationtest.minicluster.{MiniCluster, RestClient, BaseContainer} -class StormClient(masterAddrs: List[(String, Int)], restClient: RestClient) { +import scala.util.Random - private val CONFIG_FILE = "/opt/gearpump/storm.yaml" +class StormClient(cluster: MiniCluster, restClient: RestClient) { + + private val masterAddrs: List[(String, Int)] = cluster.getMastersAddresses + private val CONFIG_FILE = s"/opt/gearpump/storm${new Random().nextInt()}.yaml" private val DRPC_HOST = "storm0" private val DRPC_PORT = 3772 private val DRPC_INVOCATIONS_PORT = 3773 @@ -35,19 +38,39 @@ class StormClient(masterAddrs: List[(String, Int)], restClient: RestClient) { private val drpcContainer = new BaseContainer(DRPC_HOST, STORM_DRPC, masterAddrs, tunnelPorts = Set(DRPC_PORT, DRPC_INVOCATIONS_PORT)) + private val nimbusContainer = new BaseContainer(NIMBUS_HOST, s"$STORM_NIMBUS -output $CONFIG_FILE", masterAddrs) def start(): Unit = { - drpcContainer.createAndStart() nimbusContainer.createAndStart() + ensureNimbusRunning() + + drpcContainer.createAndStart() + ensureDrpcServerRunning() + } + + private def ensureNimbusRunning(): Unit = { + Util.retryUntil(()=>{ + val response = Docker.execute(NIMBUS_HOST, "grep \"port\" " + CONFIG_FILE) + // Parse format nimbus.thrift.port: '39322' + val thriftPort = response.split(" ")(1).replace("'","").toInt + + Docker.executeSilently(NIMBUS_HOST, s"""sh -c "netstat -na | grep $thriftPort" """) + }, "Nimbus running") + } + + private def ensureDrpcServerRunning(): Unit = { + Util.retryUntil(()=>{ + Docker.executeSilently(DRPC_HOST, s"""sh -c "netstat -na | grep $DRPC_PORT " """) + }, "DRPC running") } def submitStormApp(jar: String, mainClass: String, args: String, appName: String): Int = { - Util.retryUntil({ - Docker.exec(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " + + Util.retryUntil(()=>{ + Docker.executeSilently(NIMBUS_HOST, s"$STORM_APP -config $CONFIG_FILE " + s"-jar $jar $mainClass $args") restClient.listRunningApps().exists(_.appName == appName) - }) + }, "app running") restClient.listRunningApps().filter(_.appName == appName).head.appId } @@ -57,6 +80,9 @@ class StormClient(masterAddrs: List[(String, Int)], restClient: RestClient) { } def shutDown(): Unit = { + + // clean the storm.yaml config file + Docker.executeSilently(NIMBUS_HOST, s"rm $CONFIG_FILE ") drpcContainer.killAndRemove() nimbusContainer.killAndRemove() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/jvm.sbt ---------------------------------------------------------------------- diff --git a/jvm.sbt b/jvm.sbt index e036446..341079f 100644 --- a/jvm.sbt +++ b/jvm.sbt @@ -1 +1,5 @@ -parallelExecution in (ThisBuild, Test) := false \ No newline at end of file +parallelExecution in (ThisBuild, Test) := false + +// http://www.scala-sbt.org/0.13/docs/Testing.html +logBuffered in IntegrationTest := false + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d092e80f/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index b521e53..3fc0ff9 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -176,13 +176,14 @@ object Build extends sbt.Build { "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "com.typesafe.akka" %% "akka-kernel" % akkaVersion, "com.github.intel-hadoop" %% "gearpump-shaded-akka-kryo" % kryoVersion, + "org.scala-lang" % "scala-reflect" % scalaVersionNumber, + "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4", "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test", "org.scalatest" %% "scalatest" % scalaTestVersion % "test", "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test", "org.mockito" % "mockito-core" % mockitoVersion % "test", "junit" % "junit" % junitVersion % "test" - ), - libraryDependencies <+= (scalaVersion)("org.scala-lang" % "scala-reflect" % _) + ) ) lazy val javadocSettings = Seq(
