fix GEARPUMP-150 correct the integration test file structure Author: huafengw <[email protected]>
Closes #26 from huafengw/name. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e9ea6e2f Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e9ea6e2f Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e9ea6e2f Branch: refs/heads/master Commit: e9ea6e2f3366ede2aa350204e8b3854a6d0081ca Parents: c80c069 Author: huafengw <[email protected]> Authored: Fri May 27 19:38:40 2016 +0800 Committer: manuzhang <[email protected]> Committed: Fri May 27 19:38:40 2016 +0800 ---------------------------------------------------------------------- .../integrationtest/MiniClusterProvider.scala | 41 --- .../gearpump/integrationtest/TestSpecBase.scala | 92 ----- .../checklist/CommandLineSpec.scala | 133 ------- .../checklist/ConnectorKafkaSpec.scala | 120 ------ .../checklist/DynamicDagSpec.scala | 157 -------- .../integrationtest/checklist/ExampleSpec.scala | 148 -------- .../checklist/MessageDeliverySpec.scala | 122 ------ .../checklist/RestServiceSpec.scala | 369 ------------------- .../checklist/StabilitySpec.scala | 162 -------- .../checklist/StormCompatibilitySpec.scala | 185 ---------- .../suites/StandaloneModeSuite.scala | 51 --- .../integrationtest/MiniClusterProvider.scala | 41 +++ .../gearpump/integrationtest/TestSpecBase.scala | 92 +++++ .../checklist/CommandLineSpec.scala | 133 +++++++ .../checklist/ConnectorKafkaSpec.scala | 120 ++++++ .../checklist/DynamicDagSpec.scala | 157 ++++++++ .../integrationtest/checklist/ExampleSpec.scala | 148 ++++++++ .../checklist/MessageDeliverySpec.scala | 122 ++++++ .../checklist/RestServiceSpec.scala | 369 +++++++++++++++++++ .../checklist/StabilitySpec.scala | 162 ++++++++ .../checklist/StormCompatibilitySpec.scala | 185 ++++++++++ .../suites/StandaloneModeSuite.scala | 51 +++ .../io/gearpump/integrationtest/Docker.scala | 211 ----------- .../io/gearpump/integrationtest/ShellExec.scala | 98 ----- .../io/gearpump/integrationtest/Util.scala | 72 ---- .../integrationtest/hadoop/HadoopCluster.scala | 67 ---- .../integrationtest/kafka/KafkaCluster.scala | 140 ------- .../kafka/NumericalDataProducer.scala | 76 ---- .../integrationtest/kafka/ResultVerifier.scala | 42 --- .../kafka/SimpleKafkaReader.scala | 49 --- .../minicluster/BaseContainer.scala | 60 --- .../minicluster/CommandLineClient.scala | 84 ----- .../minicluster/MiniCluster.scala | 189 ---------- .../minicluster/RestClient.scala | 268 -------------- .../integrationtest/storm/StormClient.scala | 91 ----- .../gearpump/integrationtest/Docker.scala | 211 +++++++++++ .../gearpump/integrationtest/ShellExec.scala | 98 +++++ .../apache/gearpump/integrationtest/Util.scala | 72 ++++ .../integrationtest/hadoop/HadoopCluster.scala | 67 ++++ .../integrationtest/kafka/KafkaCluster.scala | 140 +++++++ .../kafka/NumericalDataProducer.scala | 76 ++++ .../integrationtest/kafka/ResultVerifier.scala | 42 +++ .../kafka/SimpleKafkaReader.scala | 49 +++ .../minicluster/BaseContainer.scala | 60 +++ .../minicluster/CommandLineClient.scala | 84 +++++ .../minicluster/MiniCluster.scala | 189 ++++++++++ .../minicluster/RestClient.scala | 268 ++++++++++++++ .../integrationtest/storm/StormClient.scala | 91 +++++ .../integrationtest/storm/Adaptor.scala | 38 -- .../storm/Storm010KafkaTopology.scala | 95 ----- .../integrationtest/storm/Adaptor.scala | 38 ++ .../storm/Storm010KafkaTopology.scala | 95 +++++ .../integrationtest/storm/Adaptor.scala | 38 -- .../storm/Storm09KafkaTopology.scala | 95 ----- .../integrationtest/storm/Adaptor.scala | 38 ++ .../storm/Storm09KafkaTopology.scala | 95 +++++ 56 files changed, 3293 insertions(+), 3293 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala deleted file mode 100644 index 4a161c7..0000000 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest - -import org.apache.gearpump.integrationtest.minicluster.MiniCluster - -/** - * Provides a min cluster of Gearpump, which contains one or more masters, and workers. - */ -object MiniClusterProvider { - - private var instance = new MiniCluster - - def get: MiniCluster = instance - - def set(instance: MiniCluster): MiniCluster = { - this.instance = instance - instance - } - - /** - * Indicates whether test suite should create particular cluster. In case of false, every - * test spec will be responsible for cluster creation. - */ - var managed = false -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala deleted file mode 100644 index dabcc71..0000000 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest - -import org.scalatest._ - -import org.apache.gearpump.cluster.MasterToAppMaster -import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData -import org.apache.gearpump.util.LogUtil - -/** - * The abstract test spec - */ -trait TestSpecBase - extends WordSpec with Matchers with BeforeAndAfterEachTestData with BeforeAndAfterAll { - - private def LOGGER = LogUtil.getLogger(getClass) - - override def beforeAll(): Unit = { - super.beforeAll() - if (!MiniClusterProvider.managed) { - LOGGER.info("Will test with a default standalone mini cluster") - MiniClusterProvider.get.start() - } - } - - override def afterAll(): Unit = { - if (!MiniClusterProvider.managed) { - LOGGER.info("Will shutdown the default mini cluster") - MiniClusterProvider.get.shutDown() - } - super.afterAll() - } - - lazy val cluster = MiniClusterProvider.get - lazy val commandLineClient = cluster.commandLineClient - lazy val restClient = cluster.restClient - - lazy val wordCountJar = cluster.queryBuiltInExampleJars("wordcount-").head - lazy val wordCountName = "wordCount" - - var restartClusterRequired: Boolean = false - - override def beforeEach(td: TestData): Unit = { - - LOGGER.debug(s">### =============================================================") - LOGGER.debug(s">###1 Prepare test: ${td.name}\n") - - assert(cluster != null, "Configure MiniCluster properly in suite spec") - cluster.isAlive shouldBe true - restClient.listRunningApps().isEmpty shouldBe true - LOGGER.debug(s">### =============================================================") - LOGGER.debug(s">###2 Start test: ${td.name}\n") - } - - override def afterEach(td: TestData): Unit = { - LOGGER.debug(s"<### =============================================================") - LOGGER.debug(s"<###3 End test: ${td.name}\n") - - if (restartClusterRequired || !cluster.isAlive) { - restartClusterRequired = false - LOGGER.info("Will restart the cluster for next test case") - cluster.restart() - } else { - restClient.listRunningApps().foreach(app => { - commandLineClient.killApp(app.appId) shouldBe true - }) - } - } - - def expectAppIsRunning(appId: Int, expectedAppName: String): AppMasterData = { - val app = restClient.queryApp(appId) - app.status shouldEqual MasterToAppMaster.AppMasterActive - app.appName shouldEqual expectedAppName - app - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala deleted file mode 100644 index eabc684..0000000 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.checklist - -import org.apache.gearpump.cluster.MasterToAppMaster -import org.apache.gearpump.integrationtest.TestSpecBase - -/** - * The test spec checks the command-line usage - */ -class CommandLineSpec extends TestSpecBase { - - "use `gear info` to list applications" should { - "retrieve 0 application after cluster just started" in { - // exercise - getRunningAppCount shouldEqual 0 - } - - "retrieve 1 application after the first application submission" in { - // setup - val appId = expectSubmitAppSuccess(wordCountJar) - expectAppIsRunningByParsingOutput(appId, wordCountName) - - // exercise - getRunningAppCount shouldEqual 1 - } - } - - "use `gear app` to submit application" should { - "find a running application after submission" in { - // exercise - val appId = expectSubmitAppSuccess(wordCountJar) - expectAppIsRunningByParsingOutput(appId, wordCountName) - } - - "reject a repeated submission request while the application is running" in { - // setup - val appId = expectSubmitAppSuccess(wordCountJar) - expectAppIsRunningByParsingOutput(appId, wordCountName) - - // exercise - val actualAppId = commandLineClient.submitApp(wordCountJar) - actualAppId shouldEqual -1 - } - - "reject an invalid submission (the jar file path is incorrect)" in { - // exercise - val actualAppId = commandLineClient.submitApp(wordCountJar + ".missing") - actualAppId shouldEqual -1 - } - } - - "use `gear kill` to kill application" should { - "a running application should be killed" in { - // setup - val appId = expectSubmitAppSuccess(wordCountJar) - - // exercise - val success = commandLineClient.killApp(appId) - success shouldBe true - } - - "should fail when attempting to kill a stopped application" in { - // setup - val appId = expectSubmitAppSuccess(wordCountJar) - var success = commandLineClient.killApp(appId) - success shouldBe true - - // exercise - success = commandLineClient.killApp(appId) - success shouldBe false - } - - "the EmbededCluster can be used as embedded cluster in process" in { - // setup - val args = "-debug true -sleep 10" - val appId = expectSubmitAppSuccess(wordCountJar, args) - var success = commandLineClient.killApp(appId) - success shouldBe true - } - - "should fail when attempting to kill a non-exist application" in { - // setup - val freeAppId = getNextAvailableAppId - - // exercise - val success = commandLineClient.killApp(freeAppId) - success shouldBe false - } - } - - "use `gear replay` to replay the application from current min clock" should { - "todo: description" in { - // todo: test code - } - } - - private def getRunningAppCount: Int = { - commandLineClient.listRunningApps().length - } - - private def getNextAvailableAppId: Int = { - commandLineClient.listApps().length + 1 - } - - private def expectSubmitAppSuccess(jar: String, args: String = ""): Int = { - val appId = commandLineClient.submitApp(jar) - appId should not equal -1 - appId - } - - private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: String): Unit = { - val actual = commandLineClient.queryApp(appId) - actual should include(s"application: $appId, ") - actual should include(s"name: $expectedName, ") - actual should include(s"status: ${MasterToAppMaster.AppMasterActive}") - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala deleted file mode 100644 index d8bdc1e..0000000 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.checklist - -import org.scalatest.TestData - -import org.apache.gearpump.integrationtest.kafka._ -import org.apache.gearpump.integrationtest.{TestSpecBase, Util} - -/** - * The test spec checks the Kafka datasource connector - */ -class ConnectorKafkaSpec extends TestSpecBase { - - private lazy val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway) - private lazy val kafkaJar = cluster.queryBuiltInExampleJars("kafka-").head - private var producer: NumericalDataProducer = null - - override def beforeAll(): Unit = { - super.beforeAll() - kafkaCluster.start() - } - - override def afterAll(): Unit = { - kafkaCluster.shutDown() - super.afterAll() - } - - override def afterEach(test: TestData): Unit = { - super.afterEach(test) - if (producer != null) { - producer.stop() - producer = null - } - } - - "KafkaSource and KafkaSink" should { - "read from and write to kafka" in { - // setup - val sourceTopic = "topic1" - val sinkTopic = "topic2" - val messageNum = 10000 - kafkaCluster.produceDataToKafka(sourceTopic, messageNum) - - // exercise - val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite", - "-zookeeperConnect", kafkaCluster.getZookeeperConnectString, - "-brokerList", kafkaCluster.getBrokerListConnectString, - "-sourceTopic", sourceTopic, - "-sinkTopic", sinkTopic).mkString(" ") - val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args) - success shouldBe true - - // verify - expectAppIsRunning(appId, "KafkaReadWrite") - Util.retryUntil(() => kafkaCluster.getLatestOffset(sinkTopic) == messageNum, - "kafka all message written") - } - } - - "Gearpump with Kafka" should { - "support at-least-once message delivery" in { - // setup - val sourcePartitionNum = 2 - val sourceTopic = "topic3" - val sinkTopic = "topic4" - // Generate number sequence (1, 2, 3, ...) to the topic - kafkaCluster.createTopic(sourceTopic, sourcePartitionNum) - producer = new NumericalDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) - producer.start() - - // exercise - val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite", - "-zookeeperConnect", kafkaCluster.getZookeeperConnectString, - "-brokerList", kafkaCluster.getBrokerListConnectString, - "-sourceTopic", sourceTopic, - "-sinkTopic", sinkTopic, - "-source", sourcePartitionNum).mkString(" ") - val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args) - success shouldBe true - - // verify #1 - expectAppIsRunning(appId, "KafkaReadWrite") - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") - - // verify #2 - val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max - restClient.killExecutor(appId, executorToKill) shouldBe true - Util.retryUntil(() => restClient.queryExecutorBrief(appId) - .map(_.executorId).max > executorToKill, - s"executor $executorToKill killed") - - // verify #3 - val detector = new MessageLossDetector(producer.lastWriteNum) - val kafkaReader = new SimpleKafkaReader(detector, sinkTopic, - host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort) - Util.retryUntil(() => { - kafkaReader.read() - detector.allReceived - }, "kafka all message read") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala deleted file mode 100644 index 56b33c1..0000000 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.checklist - -import org.apache.gearpump.integrationtest.{TestSpecBase, Util} -import org.apache.gearpump.metrics.Metrics.Meter -import org.apache.gearpump.streaming._ -import org.apache.gearpump.streaming.appmaster.ProcessorSummary - -class DynamicDagSpec extends TestSpecBase { - - lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head - val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split" - val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum" - val solName = "sol" - - "dynamic dag" should { - "can retrieve a list of built-in partitioner classes" in { - val partitioners = restClient.queryBuiltInPartitioners() - partitioners.length should be > 0 - partitioners.foreach(clazz => - clazz should startWith("org.apache.gearpump.partitioner.") - ) - } - - "can compose a wordcount application from scratch" in { - // todo: blocked by #1450 - } - - "can replace down stream with wordcount's sum processor (new processor will have metrics)" in { - // setup - val appId = expectSolJarSubmittedWithAppId() - - // exercise - val formerProcessors = restClient.queryStreamingAppDetail(appId).processors - replaceProcessor(appId, 1, sumTaskClass) - var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil(() => { - laterProcessors = restClient.queryStreamingAppDetail(appId).processors - laterProcessors.size == formerProcessors.size + 1 - }, "new processor successfully added") - processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") - } - - "can replace up stream with wordcount's split processor (new processor will have metrics)" in { - // setup - val appId = expectSolJarSubmittedWithAppId() - - // exercise - val formerProcessors = restClient.queryStreamingAppDetail(appId).processors - replaceProcessor(appId, 0, splitTaskClass) - var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil(() => { - laterProcessors = restClient.queryStreamingAppDetail(appId).processors - laterProcessors.size == formerProcessors.size + 1 - }, "new processor added") - processorHasThroughput(appId, laterProcessors.keySet.max, "sendThroughput") - } - - "fall back to last dag version when replacing a processor failid" in { - // setup - val appId = expectSolJarSubmittedWithAppId() - - // exercise - val formerProcessors = restClient.queryStreamingAppDetail(appId).processors - replaceProcessor(appId, 1, sumTaskClass) - var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil(() => { - laterProcessors = restClient.queryStreamingAppDetail(appId).processors - laterProcessors.size == formerProcessors.size + 1 - }, "new processor added") - processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") - - val fakeTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Fake" - replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass) - Util.retryUntil(() => { - val processorsAfterFailure = restClient.queryStreamingAppDetail(appId).processors - processorsAfterFailure.size == laterProcessors.size - }, "new processor added") - val currentClock = restClient.queryStreamingAppDetail(appId).clock - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > currentClock, - "app clock is advancing") - } - - "fall back to last dag version when AppMaster HA triggered" in { - // setup - val appId = expectSolJarSubmittedWithAppId() - - // exercise - val formerAppMaster = restClient.queryApp(appId).appMasterPath - val formerProcessors = restClient.queryStreamingAppDetail(appId).processors - replaceProcessor(appId, 1, sumTaskClass) - var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil(() => { - laterProcessors = restClient.queryStreamingAppDetail(appId).processors - laterProcessors.size == formerProcessors.size + 1 - }, "new processor added") - processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") - - restClient.killAppMaster(appId) shouldBe true - Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster, - "new AppMaster created") - val processors = restClient.queryStreamingAppDetail(appId).processors - processors.size shouldEqual laterProcessors.size - } - } - - private def expectSolJarSubmittedWithAppId(): Int = { - val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length) - success shouldBe true - expectAppIsRunning(appId, solName) - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") - appId - } - - private def replaceProcessor( - appId: Int, - formerProcessorId: Int, - newTaskClass: String, - newProcessorDescription: String = "", - newParallelism: Int = 1): Unit = { - val uploadedJar = restClient.uploadJar(wordCountJar) - val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass, - newParallelism, newProcessorDescription, - jar = uploadedJar) - - // exercise - val success = restClient.replaceStreamingAppProcessor(appId, replaceMe) - success shouldBe true - } - - private def processorHasThroughput(appId: Int, processorId: Int, metrics: String): Unit = { - Util.retryUntil(() => { - val actual = restClient.queryStreamingAppMetrics(appId, current = false, - path = "processor" + processorId) - val throughput = actual.metrics.filter(_.value.name.endsWith(metrics)) - throughput.size should be > 0 - throughput.forall(_.value.asInstanceOf[Meter].count > 0L) - }, "new processor has message received") - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala deleted file mode 100644 index 27e4665..0000000 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.checklist - -import org.apache.log4j.Logger - -import org.apache.gearpump.integrationtest.{Docker, TestSpecBase, Util} -import org.apache.gearpump.streaming._ -import org.apache.gearpump.streaming.appmaster.ProcessorSummary - -/** - * The test spec will perform destructive operations to check the stability - */ -class ExampleSpec extends TestSpecBase { - - private val LOG = Logger.getLogger(getClass) - - "distributed shell" should { - "execute commands on machines where its executors are running" in { - val distShellJar = cluster.queryBuiltInExampleJars("distributedshell-").head - val mainClass = "org.apache.gearpump.examples.distributedshell.DistributedShell" - val clientClass = "org.apache.gearpump.examples.distributedshell.DistributedShellClient" - val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(distShellJar, cluster.getWorkerHosts.length, mainClass) - success shouldBe true - expectAppIsRunning(appId, "DistributedShell") - val args = Array( - clientClass, - "-appid", appId.toString, - "-command", "hostname" - ) - - val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName(_)) - - def verify(): Boolean = { - val workerNum = cluster.getWorkerHosts.length - val result = commandLineClient.submitAppAndCaptureOutput(distShellJar, - workerNum, args.mkString(" ")).split("\n"). - filterNot(line => line.startsWith("[INFO]") || line.isEmpty) - expectedHostNames.forall(result.contains) - } - - Util.retryUntil(() => verify(), - s"executors started on all expected hosts ${expectedHostNames.mkString(", ")}") - } - } - - "wordcount" should { - val wordCountJarNamePrefix = "wordcount-" - behave like streamingApplication(wordCountJarNamePrefix, wordCountName) - - "can submit immediately after killing a former one" in { - // setup - val formerAppId = restClient.getNextAvailableAppId() - val formerSubmissionSuccess = - restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) - formerSubmissionSuccess shouldBe true - expectAppIsRunning(formerAppId, wordCountName) - Util.retryUntil(() => - restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app running") - restClient.killApp(formerAppId) - - // exercise - val appId = formerAppId + 1 - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) - success shouldBe true - expectAppIsRunning(appId, wordCountName) - } - } - - "wordcount(java)" should { - val wordCountJavaJarNamePrefix = "wordcountjava-" - val wordCountJavaName = "wordcountJava" - behave like streamingApplication(wordCountJavaJarNamePrefix, wordCountJavaName) - } - - "sol" should { - val solJarNamePrefix = "sol-" - val solName = "sol" - behave like streamingApplication(solJarNamePrefix, solName) - } - - "complexdag" should { - val dynamicDagJarNamePrefix = "complexdag-" - val dynamicDagName = "dag" - behave like streamingApplication(dynamicDagJarNamePrefix, dynamicDagName) - } - - def streamingApplication(jarNamePrefix: String, appName: String): Unit = { - lazy val jar = cluster.queryBuiltInExampleJars(jarNamePrefix).head - - "can obtain application clock and the clock will keep changing" in { - // setup - val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(jar, cluster.getWorkerHosts.length) - success shouldBe true - expectAppIsRunning(appId, appName) - - // exercise - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app submitted") - val formerClock = restClient.queryStreamingAppDetail(appId).clock - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > formerClock, - "app clock is advancing") - } - - "can change the parallelism and description of a processor" in { - // setup - val appId = restClient.getNextAvailableAppId() - val formerSubmissionSuccess = restClient.submitApp(jar, cluster.getWorkerHosts.length) - formerSubmissionSuccess shouldBe true - expectAppIsRunning(appId, appName) - val formerProcessors = restClient.queryStreamingAppDetail(appId).processors - val processor0 = formerProcessors.get(0).get - val expectedProcessorId = formerProcessors.size - val expectedParallelism = processor0.parallelism + 1 - val expectedDescription = processor0.description + "new" - val replaceMe = new ProcessorDescription(processor0.id, processor0.taskClass, - expectedParallelism, description = expectedDescription) - - // exercise - val success = restClient.replaceStreamingAppProcessor(appId, replaceMe) - success shouldBe true - var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil(() => { - laterProcessors = restClient.queryStreamingAppDetail(appId).processors - laterProcessors.size == formerProcessors.size + 1 - }, "new process added") - val laterProcessor0 = laterProcessors.get(expectedProcessorId).get - laterProcessor0.parallelism shouldEqual expectedParallelism - laterProcessor0.description shouldEqual expectedDescription - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala deleted file mode 100644 index bb9982a..0000000 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.integrationtest.checklist - -import org.apache.log4j.Logger - -import org.apache.gearpump.integrationtest.hadoop.HadoopCluster._ -import org.apache.gearpump.integrationtest.kafka.KafkaCluster._ -import org.apache.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader} -import org.apache.gearpump.integrationtest.{TestSpecBase, Util} - -/** - * Checks message delivery consistency, like at-least-once, and exactly-once. - */ -class MessageDeliverySpec extends TestSpecBase { - - private val LOG = Logger.getLogger(getClass) - - override def beforeAll(): Unit = { - super.beforeAll() - } - - override def afterAll(): Unit = { - super.afterAll() - } - - "Gearpump" should { - "support exactly-once message delivery" in { - withKafkaCluster(cluster) { kafkaCluster => - // setup - val sourcePartitionNum = 1 - val sourceTopic = "topic1" - val sinkTopic = "topic2" - - // Generate number sequence (1, 2, 3, ...) to the topic - kafkaCluster.createTopic(sourceTopic, sourcePartitionNum) - - withDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) { producer => - - withHadoopCluster { hadoopCluster => - // exercise - val args = Array("org.apache.gearpump.streaming.examples.state.MessageCountApp", - "-defaultFS", hadoopCluster.getDefaultFS, - "-zookeeperConnect", kafkaCluster.getZookeeperConnectString, - "-brokerList", kafkaCluster.getBrokerListConnectString, - "-sourceTopic", sourceTopic, - "-sinkTopic", sinkTopic, - "-sourceTask", sourcePartitionNum).mkString(" ") - val appId = restClient.getNextAvailableAppId() - - val stateJar = cluster.queryBuiltInExampleJars("state-").head - val success = restClient.submitApp(stateJar, executorNum = 1, args = args) - success shouldBe true - - // verify #1 - expectAppIsRunning(appId, "MessageCount") - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, - "app is running") - - // wait for checkpoint to take place - Thread.sleep(1000) - - LOG.info("Trigger message replay by kill and restart the executors") - val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max - restClient.killExecutor(appId, executorToKill) shouldBe true - Util.retryUntil(() => restClient.queryExecutorBrief(appId) - .map(_.executorId).max > executorToKill, s"executor $executorToKill killed") - - producer.stop() - val producedNumbers = producer.producedNumbers - LOG.info(s"In total, numbers in range[${producedNumbers.start}" + - s", ${producedNumbers.end - 1}] have been written to Kafka") - - // verify #3 - val kafkaSourceOffset = kafkaCluster.getLatestOffset(sourceTopic) - - assert(producedNumbers.size == kafkaSourceOffset, - "produced message should match Kafka queue size") - - LOG.info(s"The Kafka source topic $sourceTopic offset is " + kafkaSourceOffset) - - // The sink processor of this job (MessageCountApp) writes total message - // count to Kafka Sink periodically (once every checkpoint interval). - // The detector keep record of latest message count. - val detector = new ResultVerifier { - var latestMessageCount: Int = 0 - override def onNext(messageCount: Int): Unit = { - this.latestMessageCount = messageCount - } - } - - val kafkaReader = new SimpleKafkaReader(detector, sinkTopic, - host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort) - Util.retryUntil(() => { - kafkaReader.read() - LOG.info(s"Received message count: ${detector.latestMessageCount}, " + - s"expect: ${producedNumbers.size}") - detector.latestMessageCount == producedNumbers.size - }, "MessageCountApp calculated message count matches " + - "expected in case of message replay") - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala deleted file mode 100644 index 2f5bb64..0000000 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala +++ /dev/null @@ -1,369 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.checklist - -import scala.concurrent.duration._ - -import org.apache.gearpump.cluster.MasterToAppMaster -import org.apache.gearpump.cluster.master.MasterStatus -import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} -import org.apache.gearpump.integrationtest.{TestSpecBase, Util} - -/** - * The test spec checks REST service usage - */ -class RestServiceSpec extends TestSpecBase { - - "query system version" should { - "retrieve the current version number" in { - restClient.queryVersion() should not be empty - } - } - - "list applications" should { - "retrieve 0 application after cluster just started" in { - restClient.listRunningApps().length shouldEqual 0 - } - - "retrieve 1 application after the first application submission" in { - // exercise - val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) - success shouldBe true - expectAppIsRunning(appId, wordCountName) - restClient.listRunningApps().length shouldEqual 1 - } - } - - "submit application (wordcount)" should { - "find a running application after submission" in { - // exercise - val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) - success shouldBe true - expectAppIsRunning(appId, wordCountName) - } - - "reject a repeated submission request while the application is running" in { - // setup - val appId = restClient.getNextAvailableAppId() - val formerSubmissionSuccess = restClient.submitApp(wordCountJar, - cluster.getWorkerHosts.length) - formerSubmissionSuccess shouldBe true - expectAppIsRunning(appId, wordCountName) - - // exercise - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) - success shouldBe false - } - - "reject an invalid submission (the jar file path is incorrect)" in { - // exercise - val success = restClient.submitApp(wordCountJar + ".missing", cluster.getWorkerHosts.length) - success shouldBe false - } - - "submit a wordcount application with 4 split and 3 sum processors and expect " + - "parallelism of processors match the given number" in { - // setup - val splitNum = 4 - val sumNum = 3 - val appId = restClient.getNextAvailableAppId() - - // exercise - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, - s"-split $splitNum -sum $sumNum") - success shouldBe true - expectAppIsRunning(appId, wordCountName) - val processors = restClient.queryStreamingAppDetail(appId).processors - processors.size shouldEqual 2 - val splitProcessor = processors.get(0).get - splitProcessor.parallelism shouldEqual splitNum - val sumProcessor = processors.get(1).get - sumProcessor.parallelism shouldEqual sumNum - } - - "can obtain application metrics and the metrics will keep changing" in { - // setup - val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) - success shouldBe true - expectAppIsRunning(appId, wordCountName) - - // exercise - expectMetricsAvailable( - restClient.queryStreamingAppMetrics(appId, current = true).metrics.nonEmpty, - "metrics available") - val actual = restClient.queryStreamingAppMetrics(appId, current = true) - actual.path shouldEqual s"app$appId.processor*" - actual.metrics.foreach(metric => { - metric.time should be > 0L - metric.value should not be null - }) - val formerMetricsDump = actual.metrics.toString() - - expectMetricsAvailable({ - val laterMetrics = restClient.queryStreamingAppMetrics(appId, current = true).metrics - laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump - }, "metrics available") - } - - "can obtain application corresponding executors' metrics and " + - "the metrics will keep changing" in { - // setup - val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) - success shouldBe true - expectAppIsRunning(appId, wordCountName) - - // exercise - expectMetricsAvailable( - restClient.queryExecutorMetrics(appId, current = true).metrics.nonEmpty, - "metrics available") - val actual = restClient.queryExecutorMetrics(appId, current = true) - actual.path shouldEqual s"app$appId.executor*" - actual.metrics.foreach(metric => { - metric.time should be > 0L - metric.value should not be null - }) - val formerMetricsDump = actual.metrics.toString() - - expectMetricsAvailable({ - val laterMetrics = restClient.queryExecutorMetrics(appId, current = true).metrics - laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump - }, "metrics available") - } - } - - "kill application" should { - "a running application should be killed" in { - // setup - val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) - success shouldBe true - expectAppIsRunning(appId, wordCountName) - - // exercise - killAppAndVerify(appId) - } - - "should fail when attempting to kill a stopped application" in { - // setup - val appId = restClient.getNextAvailableAppId() - val submissionSucess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) - submissionSucess shouldBe true - expectAppIsRunning(appId, wordCountName) - killAppAndVerify(appId) - - // exercise - val success = restClient.killApp(appId) - success shouldBe false - } - - "should fail when attempting to kill a non-exist application" in { - // setup - val freeAppId = restClient.listApps().length + 1 - - // exercise - val success = restClient.killApp(freeAppId) - success shouldBe false - } - } - - "cluster information" should { - "retrieve 1 master for a non-HA cluster" in { - // exercise - val masterSummary = restClient.queryMaster() - masterSummary.cluster.map(_.toTuple) shouldEqual cluster.getMastersAddresses - masterSummary.aliveFor should be > 0L - masterSummary.masterStatus shouldEqual MasterStatus.Synced - } - - "retrieve the same number of workers as cluster has" in { - // setup - val expectedWorkersCount = cluster.getWorkerHosts.size - - // exercise - var runningWorkers: Array[WorkerSummary] = Array.empty - Util.retryUntil(() => { - runningWorkers = restClient.listRunningWorkers() - runningWorkers.length == expectedWorkersCount - }, "all workers running") - runningWorkers.foreach { worker => - worker.state shouldEqual MasterToAppMaster.AppMasterActive - } - } - - "find a newly added worker instance" in { - // setup - restartClusterRequired = true - val formerWorkersCount = cluster.getWorkerHosts.length - Util.retryUntil(() => restClient.listRunningWorkers().length == formerWorkersCount, - "all workers running") - val workerName = "newWorker" - - // exercise - cluster.addWorkerNode(workerName) - Util.retryUntil(() => restClient.listRunningWorkers().length > formerWorkersCount, - "new worker added") - cluster.getWorkerHosts.length shouldEqual formerWorkersCount + 1 - restClient.listRunningWorkers().length shouldEqual formerWorkersCount + 1 - } - - "retrieve 0 worker, if cluster is started without any workers" in { - // setup - restartClusterRequired = true - cluster.shutDown() - - // exercise - cluster.start(workerNum = 0) - cluster.getWorkerHosts.length shouldEqual 0 - restClient.listRunningWorkers().length shouldEqual 0 - } - - "can obtain master's metrics and the metrics will keep changing" in { - // exercise - expectMetricsAvailable( - restClient.queryMasterMetrics(current = true).metrics.nonEmpty, "metrics available") - val actual = restClient.queryMasterMetrics(current = true) - actual.path shouldEqual s"master" - actual.metrics.foreach(metric => { - metric.time should be > 0L - metric.value should not be null - }) - val formerMetricsDump = actual.metrics.toString() - - expectMetricsAvailable({ - val laterMetrics = restClient.queryMasterMetrics(current = true).metrics - laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump - }, "metrics available") - } - - "can obtain workers' metrics and the metrics will keep changing" in { - // exercise - restClient.listRunningWorkers().foreach { worker => - val workerId = worker.workerId - expectMetricsAvailable( - restClient.queryWorkerMetrics(workerId, current = true).metrics.nonEmpty, - "metrics available") - val actual = restClient.queryWorkerMetrics(workerId, current = true) - actual.path shouldEqual s"worker${WorkerId.render(workerId)}" - actual.metrics.foreach(metric => { - metric.time should be > 0L - metric.value should not be null - }) - val formerMetricsDump = actual.metrics.toString() - - expectMetricsAvailable({ - val laterMetrics = restClient.queryWorkerMetrics(workerId, current = true).metrics - laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump - }, "metrics available") - } - } - } - - "configuration" should { - "retrieve the configuration of master and match particular values" in { - // exercise - val actual = restClient.queryMasterConfig() - actual.hasPath("gearpump") shouldBe true - actual.hasPath("gearpump.cluster") shouldBe true - actual.getString("gearpump.hostname") shouldEqual cluster.getMasterHosts.mkString(",") - } - - "retrieve the configuration of worker X and match particular values" in { - // exercise - restClient.listRunningWorkers().foreach { worker => - val actual = restClient.queryWorkerConfig(worker.workerId) - actual.hasPath("gearpump") shouldBe true - actual.hasPath("gearpump.worker") shouldBe true - } - } - - "retrieve the configuration of executor X and match particular values" in { - // setup - val appId = restClient.getNextAvailableAppId() - - // exercise - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) - success shouldBe true - restClient.queryExecutorBrief(appId).foreach { executor => - val executorId = executor.executorId - val actual = restClient.queryExecutorConfig(appId, executorId) - actual.hasPath("gearpump") shouldBe true - actual.hasPath("gearpump.executor") shouldBe true - actual.getInt("gearpump.applicationId") shouldEqual appId - actual.getInt("gearpump.executorId") shouldEqual executorId - } - } - - "retrieve the configuration of application X and match particular values" in { - // setup - val appId = restClient.getNextAvailableAppId() - - // exercise - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) - success shouldBe true - val actual = restClient.queryAppMasterConfig(appId) - actual.hasPath("gearpump") shouldBe true - actual.hasPath("gearpump.appmaster") shouldBe true - } - } - - "application life-cycle" should { - "newly started application should be configured same as the previous one, after restart" in { - // setup - val originSplitNum = 4 - val originSumNum = 3 - val originAppId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, - s"-split $originSplitNum -sum $originSumNum") - success shouldBe true - expectAppIsRunning(originAppId, wordCountName) - val originAppDetail = restClient.queryStreamingAppDetail(originAppId) - - // exercise - Util.retryUntil(() => restClient.restartApp(originAppId), "app restarted") - val killedApp = restClient.queryApp(originAppId) - killedApp.appId shouldEqual originAppId - killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive - val runningApps = restClient.listRunningApps() - runningApps.length shouldEqual 1 - val newAppDetail = restClient.queryStreamingAppDetail(runningApps.head.appId) - newAppDetail.appName shouldEqual originAppDetail.appName - newAppDetail.processors.size shouldEqual originAppDetail.processors.size - newAppDetail.processors.get(0).get.parallelism shouldEqual originSplitNum - newAppDetail.processors.get(1).get.parallelism shouldEqual originSumNum - } - } - - private def killAppAndVerify(appId: Int): Unit = { - val success = restClient.killApp(appId) - success shouldBe true - - val actualApp = restClient.queryApp(appId) - actualApp.appId shouldEqual appId - actualApp.status shouldEqual MasterToAppMaster.AppMasterInActive - } - - private def expectMetricsAvailable(condition: => Boolean, conditionDescription: String): Unit = { - val config = restClient.queryMasterConfig() - val reportInterval = Duration(config.getString("gearpump.metrics.report-interval-ms") + "ms") - Util.retryUntil(() => condition, conditionDescription, interval = reportInterval) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala deleted file mode 100644 index 4b15055..0000000 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.checklist - -import scala.concurrent.duration.Duration - -import org.apache.gearpump.cluster.MasterToAppMaster -import org.apache.gearpump.cluster.worker.WorkerId -import org.apache.gearpump.integrationtest.{TestSpecBase, Util} -import org.apache.gearpump.util.{Constants, LogUtil} - -/** - * The test spec will perform destructive operations to check the stability. Operations - * contains shutting-down appmaster, executor, or worker, and etc.. - */ -class StabilitySpec extends TestSpecBase { - - private val LOG = LogUtil.getLogger(getClass) - - "kill appmaster" should { - "restart the whole application" in { - // setup - val appId = commandLineClient.submitApp(wordCountJar) - val formerAppMaster = restClient.queryApp(appId).appMasterPath - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") - ensureClockStoredInMaster() - - // exercise - restClient.killAppMaster(appId) shouldBe true - // todo: how long master will begin to recover and how much time for the recovering? - Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster, - "appmaster killed and restarted") - - // verify - val laterAppMaster = restClient.queryStreamingAppDetail(appId) - laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive - laterAppMaster.clock should be > 0L - } - } - - "kill executor" should { - "will create a new executor and application will replay from the latest application clock" in { - // setup - val appId = commandLineClient.submitApp(wordCountJar) - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") - val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max - ensureClockStoredInMaster() - - // exercise - restClient.killExecutor(appId, executorToKill) shouldBe true - // todo: how long appmaster will begin to recover and how much time for the recovering? - Util.retryUntil(() => restClient.queryExecutorBrief(appId) - .map(_.executorId).max > executorToKill, - s"executor $executorToKill killed and restarted") - - // verify - val laterAppMaster = restClient.queryStreamingAppDetail(appId) - laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive - laterAppMaster.clock should be > 0L - } - } - - private def hostName(workerId: WorkerId): String = { - val worker = restClient.listRunningWorkers().find(_.workerId == workerId) - // Parse hostname from JVM info (in format: PID@hostname) - val hostname = worker.get.jvmName.split("@")(1) - hostname - } - - "kill worker" should { - "worker will not recover but all its executors will be migrated to other workers" in { - // setup - restartClusterRequired = true - val appId = commandLineClient.submitApp(wordCountJar) - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") - - val allexecutors = restClient.queryExecutorBrief(appId) - val maxExecutor = allexecutors.sortBy(_.executorId).last - ensureClockStoredInMaster() - - val appMaster = allexecutors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) - - LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, " + - s"worker: ${maxExecutor.workerId}") - val executorsSharingSameWorker = allexecutors - .filter(_.workerId == maxExecutor.workerId).map(_.executorId) - LOG.info(s"These executors sharing the same worker Id ${maxExecutor.workerId}," + - s" ${executorsSharingSameWorker.mkString(",")}") - - // kill the worker and expect restarting all killed executors on other workers. - val workerIdToKill = maxExecutor.workerId - cluster.removeWorkerNode(hostName(workerIdToKill)) - - val appMasterKilled = executorsSharingSameWorker - .exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) - - def executorsMigrated(): Boolean = { - val executors = restClient.queryExecutorBrief(appId) - val newAppMaster = executors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) - - if (appMasterKilled) { - newAppMaster.get.workerId != appMaster.get.workerId - } else { - // New executors will be started to replace killed executors. - // The new executors will be assigned larger executor Id. We use this trick to detect - // Whether new executors have been started successfully. - executors.map(_.executorId).max > maxExecutor.executorId - } - } - - Util.retryUntil(() => { - executorsMigrated() - }, s"new executor created with id > ${maxExecutor.executorId} when worker is killed") - - // verify - val laterAppMaster = restClient.queryStreamingAppDetail(appId) - laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive - laterAppMaster.clock should be > 0L - } - } - - "kill master" should { - "master will be down and all workers will attempt to reconnect and suicide after X seconds" in { - // setup - restartClusterRequired = true - val masters = cluster.getMasterHosts - val config = restClient.queryMasterConfig() - val shutDownTimeout = Duration(config.getString("akka.cluster.auto-down-unreachable-after")) - - // exercise - masters.foreach(cluster.removeMasterNode) - info(s"will sleep ${shutDownTimeout.toSeconds}s and then check workers are down") - Thread.sleep(shutDownTimeout.toMillis) - - // verify - val aliveWorkers = cluster.getWorkerHosts - Util.retryUntil(() => aliveWorkers.forall(worker => !cluster.nodeIsOnline(worker)), - "all workers down") - } - } - - private def ensureClockStoredInMaster(): Unit = { - // TODO: 5000ms is a fixed sync period in clock service. - // we wait for 5000ms to assume the clock is stored - Thread.sleep(5000) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala deleted file mode 100644 index b327bf4..0000000 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.checklist - -import org.apache.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, SimpleKafkaReader} -import org.apache.gearpump.integrationtest.storm.StormClient -import org.apache.gearpump.integrationtest.{TestSpecBase, Util} - -/** - * The test spec checks the compatibility of running Storm applications - */ -class StormCompatibilitySpec extends TestSpecBase { - - private lazy val stormClient = { - new StormClient(cluster, restClient) - } - - val `version0.9` = "09" - val `version0.10` = "010" - - override def beforeAll(): Unit = { - super.beforeAll() - stormClient.start() - } - - override def afterAll(): Unit = { - stormClient.shutDown() - super.afterAll() - } - - def withStorm(testCode: String => Unit): Unit = { - testCode(`version0.9`) - testCode(`version0.10`) - } - - def getTopologyName(name: String, stormVersion: String): String = { - s"${name}_$stormVersion" - } - - def getStormJar(stormVersion: String): String = { - cluster.queryBuiltInITJars(s"storm$stormVersion-").head - } - - "Storm over Gearpump" should withStorm { - stormVersion => - s"support basic topologies ($stormVersion)" in { - val stormJar = getStormJar(stormVersion) - val topologyName = getTopologyName("exclamation", stormVersion) - - // exercise - val appId = stormClient.submitStormApp( - jar = stormJar, - mainClass = "storm.starter.ExclamationTopology", - args = topologyName, - appName = topologyName) - - // verify - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") - } - - s"support to run a python version of wordcount ($stormVersion)" in { - val stormJar = getStormJar(stormVersion) - val topologyName = getTopologyName("wordcount", stormVersion) - - // exercise - val appId = stormClient.submitStormApp( - jar = stormJar, - mainClass = "storm.starter.WordCountTopology", - args = topologyName, - appName = topologyName) - - // verify - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") - } - - s"support DRPC ($stormVersion)" in { - // ReachTopology computes the Twitter url reached by users and their followers - // using Storm Distributed RPC feature - // input (user and follower) data are already prepared in memory - val stormJar = getStormJar(stormVersion) - val topologyName = getTopologyName("reach", stormVersion) - stormClient.submitStormApp( - jar = stormJar, - mainClass = "storm.starter.ReachTopology", - args = topologyName, - appName = topologyName) - val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway) - - // verify - Util.retryUntil(() => { - drpcClient.execute("reach", "notaurl.com") == "0" - }, "drpc reach == 0") - drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16" - drpcClient.execute("reach", "engineering.twitter.com/blog/5") shouldEqual "14" - } - - s"support tick tuple ($stormVersion)" in { - val stormJar = getStormJar(stormVersion) - val topologyName = getTopologyName("slidingWindowCounts", stormVersion) - - // exercise - val appId = stormClient.submitStormApp( - jar = stormJar, - mainClass = "storm.starter.RollingTopWords", - args = s"$topologyName remote", - appName = topologyName) - - // verify - Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") - } - - s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in { - - val stormJar = getStormJar(stormVersion) - val topologyName = getTopologyName("storm_kafka", stormVersion) - val stormKafkaTopology = - s"org.apache.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology" - - import org.apache.gearpump.integrationtest.kafka.KafkaCluster._ - withKafkaCluster(cluster) { - kafkaCluster => - val sourcePartitionNum = 2 - val sinkPartitionNum = 1 - val zookeeper = kafkaCluster.getZookeeperConnectString - val brokerList = kafkaCluster.getBrokerListConnectString - val sourceTopic = "topic1" - val sinkTopic = "topic2" - - val args = Array("-topologyName", topologyName, "-sourceTopic", sourceTopic, - "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, "-brokerList", brokerList, - "-spoutNum", s"$sourcePartitionNum", "-boltNum", s"$sinkPartitionNum" - ) - - kafkaCluster.createTopic(sourceTopic, sourcePartitionNum) - - // generate number sequence (1, 2, 3, ...) to the topic - withDataProducer(sourceTopic, brokerList) { producer => - - val appId = stormClient.submitStormApp( - jar = stormJar, - mainClass = stormKafkaTopology, - args = args.mkString(" "), - appName = topologyName) - - Util.retryUntil(() => - restClient.queryStreamingAppDetail(appId).clock > 0, "app running") - - // kill executor and verify at-least-once is guaranteed on application restart - val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max - restClient.killExecutor(appId, executorToKill) shouldBe true - Util.retryUntil(() => - restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill, - s"executor $executorToKill killed") - - // verify no message loss - val detector = new - MessageLossDetector(producer.lastWriteNum) - val kafkaReader = new - SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost, - port = kafkaCluster.advertisedPort) - - Util.retryUntil(() => { - kafkaReader.read() - detector.allReceived - }, "all kafka message read") - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala deleted file mode 100644 index 7942308..0000000 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.suites - -import org.scalatest._ - -import org.apache.gearpump.integrationtest.MiniClusterProvider -import org.apache.gearpump.integrationtest.checklist._ -import org.apache.gearpump.integrationtest.minicluster.MiniCluster - -/** - * Launch a Gearpump cluster in standalone mode and run all test specs. To test a specific - * test spec, you need to comment out other lines. - */ -class StandaloneModeSuite extends Suites( - new CommandLineSpec, - new RestServiceSpec, - new ExampleSpec, - new DynamicDagSpec, - new StormCompatibilitySpec, - new StabilitySpec, - new ConnectorKafkaSpec, - new MessageDeliverySpec -) with BeforeAndAfterAll { - - override def beforeAll(): Unit = { - super.beforeAll() - MiniClusterProvider.managed = true - MiniClusterProvider.set(new MiniCluster).start() - } - - override def afterAll(): Unit = { - MiniClusterProvider.get.shutDown() - super.afterAll() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala new file mode 100644 index 0000000..4a161c7 --- /dev/null +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/MiniClusterProvider.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest + +import org.apache.gearpump.integrationtest.minicluster.MiniCluster + +/** + * Provides a min cluster of Gearpump, which contains one or more masters, and workers. + */ +object MiniClusterProvider { + + private var instance = new MiniCluster + + def get: MiniCluster = instance + + def set(instance: MiniCluster): MiniCluster = { + this.instance = instance + instance + } + + /** + * Indicates whether test suite should create particular cluster. In case of false, every + * test spec will be responsible for cluster creation. + */ + var managed = false +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala new file mode 100644 index 0000000..dabcc71 --- /dev/null +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest + +import org.scalatest._ + +import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData +import org.apache.gearpump.util.LogUtil + +/** + * The abstract test spec + */ +trait TestSpecBase + extends WordSpec with Matchers with BeforeAndAfterEachTestData with BeforeAndAfterAll { + + private def LOGGER = LogUtil.getLogger(getClass) + + override def beforeAll(): Unit = { + super.beforeAll() + if (!MiniClusterProvider.managed) { + LOGGER.info("Will test with a default standalone mini cluster") + MiniClusterProvider.get.start() + } + } + + override def afterAll(): Unit = { + if (!MiniClusterProvider.managed) { + LOGGER.info("Will shutdown the default mini cluster") + MiniClusterProvider.get.shutDown() + } + super.afterAll() + } + + lazy val cluster = MiniClusterProvider.get + lazy val commandLineClient = cluster.commandLineClient + lazy val restClient = cluster.restClient + + lazy val wordCountJar = cluster.queryBuiltInExampleJars("wordcount-").head + lazy val wordCountName = "wordCount" + + var restartClusterRequired: Boolean = false + + override def beforeEach(td: TestData): Unit = { + + LOGGER.debug(s">### =============================================================") + LOGGER.debug(s">###1 Prepare test: ${td.name}\n") + + assert(cluster != null, "Configure MiniCluster properly in suite spec") + cluster.isAlive shouldBe true + restClient.listRunningApps().isEmpty shouldBe true + LOGGER.debug(s">### =============================================================") + LOGGER.debug(s">###2 Start test: ${td.name}\n") + } + + override def afterEach(td: TestData): Unit = { + LOGGER.debug(s"<### =============================================================") + LOGGER.debug(s"<###3 End test: ${td.name}\n") + + if (restartClusterRequired || !cluster.isAlive) { + restartClusterRequired = false + LOGGER.info("Will restart the cluster for next test case") + cluster.restart() + } else { + restClient.listRunningApps().foreach(app => { + commandLineClient.killApp(app.appId) shouldBe true + }) + } + } + + def expectAppIsRunning(appId: Int, expectedAppName: String): AppMasterData = { + val app = restClient.queryApp(appId) + app.status shouldEqual MasterToAppMaster.AppMasterActive + app.appName shouldEqual expectedAppName + app + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala new file mode 100644 index 0000000..eabc684 --- /dev/null +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.checklist + +import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.integrationtest.TestSpecBase + +/** + * The test spec checks the command-line usage + */ +class CommandLineSpec extends TestSpecBase { + + "use `gear info` to list applications" should { + "retrieve 0 application after cluster just started" in { + // exercise + getRunningAppCount shouldEqual 0 + } + + "retrieve 1 application after the first application submission" in { + // setup + val appId = expectSubmitAppSuccess(wordCountJar) + expectAppIsRunningByParsingOutput(appId, wordCountName) + + // exercise + getRunningAppCount shouldEqual 1 + } + } + + "use `gear app` to submit application" should { + "find a running application after submission" in { + // exercise + val appId = expectSubmitAppSuccess(wordCountJar) + expectAppIsRunningByParsingOutput(appId, wordCountName) + } + + "reject a repeated submission request while the application is running" in { + // setup + val appId = expectSubmitAppSuccess(wordCountJar) + expectAppIsRunningByParsingOutput(appId, wordCountName) + + // exercise + val actualAppId = commandLineClient.submitApp(wordCountJar) + actualAppId shouldEqual -1 + } + + "reject an invalid submission (the jar file path is incorrect)" in { + // exercise + val actualAppId = commandLineClient.submitApp(wordCountJar + ".missing") + actualAppId shouldEqual -1 + } + } + + "use `gear kill` to kill application" should { + "a running application should be killed" in { + // setup + val appId = expectSubmitAppSuccess(wordCountJar) + + // exercise + val success = commandLineClient.killApp(appId) + success shouldBe true + } + + "should fail when attempting to kill a stopped application" in { + // setup + val appId = expectSubmitAppSuccess(wordCountJar) + var success = commandLineClient.killApp(appId) + success shouldBe true + + // exercise + success = commandLineClient.killApp(appId) + success shouldBe false + } + + "the EmbededCluster can be used as embedded cluster in process" in { + // setup + val args = "-debug true -sleep 10" + val appId = expectSubmitAppSuccess(wordCountJar, args) + var success = commandLineClient.killApp(appId) + success shouldBe true + } + + "should fail when attempting to kill a non-exist application" in { + // setup + val freeAppId = getNextAvailableAppId + + // exercise + val success = commandLineClient.killApp(freeAppId) + success shouldBe false + } + } + + "use `gear replay` to replay the application from current min clock" should { + "todo: description" in { + // todo: test code + } + } + + private def getRunningAppCount: Int = { + commandLineClient.listRunningApps().length + } + + private def getNextAvailableAppId: Int = { + commandLineClient.listApps().length + 1 + } + + private def expectSubmitAppSuccess(jar: String, args: String = ""): Int = { + val appId = commandLineClient.submitApp(jar) + appId should not equal -1 + appId + } + + private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: String): Unit = { + val actual = commandLineClient.queryApp(appId) + actual should include(s"application: $appId, ") + actual should include(s"name: $expectedName, ") + actual should include(s"status: ${MasterToAppMaster.AppMasterActive}") + } +}
