http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala new file mode 100644 index 0000000..d8bdc1e --- /dev/null +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.checklist + +import org.scalatest.TestData + +import org.apache.gearpump.integrationtest.kafka._ +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} + +/** + * The test spec checks the Kafka datasource connector + */ +class ConnectorKafkaSpec extends TestSpecBase { + + private lazy val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway) + private lazy val kafkaJar = cluster.queryBuiltInExampleJars("kafka-").head + private var producer: NumericalDataProducer = null + + override def beforeAll(): Unit = { + super.beforeAll() + kafkaCluster.start() + } + + override def afterAll(): Unit = { + kafkaCluster.shutDown() + super.afterAll() + } + + override def afterEach(test: TestData): Unit = { + super.afterEach(test) + if (producer != null) { + producer.stop() + producer = null + } + } + + "KafkaSource and KafkaSink" should { + "read from and write to kafka" in { + // setup + val sourceTopic = "topic1" + val sinkTopic = "topic2" + val messageNum = 10000 + kafkaCluster.produceDataToKafka(sourceTopic, messageNum) + + // exercise + val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite", + "-zookeeperConnect", kafkaCluster.getZookeeperConnectString, + "-brokerList", kafkaCluster.getBrokerListConnectString, + "-sourceTopic", sourceTopic, + "-sinkTopic", sinkTopic).mkString(" ") + val appId = restClient.getNextAvailableAppId() + val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args) + success shouldBe true + + // verify + expectAppIsRunning(appId, "KafkaReadWrite") + Util.retryUntil(() => kafkaCluster.getLatestOffset(sinkTopic) == messageNum, + "kafka all message written") + } + } + + "Gearpump with Kafka" should { + "support at-least-once message delivery" in { + // setup + val sourcePartitionNum = 2 + val sourceTopic = "topic3" + val sinkTopic = "topic4" + // Generate number sequence (1, 2, 3, ...) to the topic + kafkaCluster.createTopic(sourceTopic, sourcePartitionNum) + producer = new NumericalDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) + producer.start() + + // exercise + val args = Array("org.apache.gearpump.streaming.examples.kafka.KafkaReadWrite", + "-zookeeperConnect", kafkaCluster.getZookeeperConnectString, + "-brokerList", kafkaCluster.getBrokerListConnectString, + "-sourceTopic", sourceTopic, + "-sinkTopic", sinkTopic, + "-source", sourcePartitionNum).mkString(" ") + val appId = restClient.getNextAvailableAppId() + val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args) + success shouldBe true + + // verify #1 + expectAppIsRunning(appId, "KafkaReadWrite") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + + // verify #2 + val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max + restClient.killExecutor(appId, executorToKill) shouldBe true + Util.retryUntil(() => restClient.queryExecutorBrief(appId) + .map(_.executorId).max > executorToKill, + s"executor $executorToKill killed") + + // verify #3 + val detector = new MessageLossDetector(producer.lastWriteNum) + val kafkaReader = new SimpleKafkaReader(detector, sinkTopic, + host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort) + Util.retryUntil(() => { + kafkaReader.read() + detector.allReceived + }, "kafka all message read") + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala new file mode 100644 index 0000000..56b33c1 --- /dev/null +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.checklist + +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} +import org.apache.gearpump.metrics.Metrics.Meter +import org.apache.gearpump.streaming._ +import org.apache.gearpump.streaming.appmaster.ProcessorSummary + +class DynamicDagSpec extends TestSpecBase { + + lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head + val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split" + val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum" + val solName = "sol" + + "dynamic dag" should { + "can retrieve a list of built-in partitioner classes" in { + val partitioners = restClient.queryBuiltInPartitioners() + partitioners.length should be > 0 + partitioners.foreach(clazz => + clazz should startWith("org.apache.gearpump.partitioner.") + ) + } + + "can compose a wordcount application from scratch" in { + // todo: blocked by #1450 + } + + "can replace down stream with wordcount's sum processor (new processor will have metrics)" in { + // setup + val appId = expectSolJarSubmittedWithAppId() + + // exercise + val formerProcessors = restClient.queryStreamingAppDetail(appId).processors + replaceProcessor(appId, 1, sumTaskClass) + var laterProcessors: Map[ProcessorId, ProcessorSummary] = null + Util.retryUntil(() => { + laterProcessors = restClient.queryStreamingAppDetail(appId).processors + laterProcessors.size == formerProcessors.size + 1 + }, "new processor successfully added") + processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") + } + + "can replace up stream with wordcount's split processor (new processor will have metrics)" in { + // setup + val appId = expectSolJarSubmittedWithAppId() + + // exercise + val formerProcessors = restClient.queryStreamingAppDetail(appId).processors + replaceProcessor(appId, 0, splitTaskClass) + var laterProcessors: Map[ProcessorId, ProcessorSummary] = null + Util.retryUntil(() => { + laterProcessors = restClient.queryStreamingAppDetail(appId).processors + laterProcessors.size == formerProcessors.size + 1 + }, "new processor added") + processorHasThroughput(appId, laterProcessors.keySet.max, "sendThroughput") + } + + "fall back to last dag version when replacing a processor failid" in { + // setup + val appId = expectSolJarSubmittedWithAppId() + + // exercise + val formerProcessors = restClient.queryStreamingAppDetail(appId).processors + replaceProcessor(appId, 1, sumTaskClass) + var laterProcessors: Map[ProcessorId, ProcessorSummary] = null + Util.retryUntil(() => { + laterProcessors = restClient.queryStreamingAppDetail(appId).processors + laterProcessors.size == formerProcessors.size + 1 + }, "new processor added") + processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") + + val fakeTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Fake" + replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass) + Util.retryUntil(() => { + val processorsAfterFailure = restClient.queryStreamingAppDetail(appId).processors + processorsAfterFailure.size == laterProcessors.size + }, "new processor added") + val currentClock = restClient.queryStreamingAppDetail(appId).clock + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > currentClock, + "app clock is advancing") + } + + "fall back to last dag version when AppMaster HA triggered" in { + // setup + val appId = expectSolJarSubmittedWithAppId() + + // exercise + val formerAppMaster = restClient.queryApp(appId).appMasterPath + val formerProcessors = restClient.queryStreamingAppDetail(appId).processors + replaceProcessor(appId, 1, sumTaskClass) + var laterProcessors: Map[ProcessorId, ProcessorSummary] = null + Util.retryUntil(() => { + laterProcessors = restClient.queryStreamingAppDetail(appId).processors + laterProcessors.size == formerProcessors.size + 1 + }, "new processor added") + processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") + + restClient.killAppMaster(appId) shouldBe true + Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster, + "new AppMaster created") + val processors = restClient.queryStreamingAppDetail(appId).processors + processors.size shouldEqual laterProcessors.size + } + } + + private def expectSolJarSubmittedWithAppId(): Int = { + val appId = restClient.getNextAvailableAppId() + val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length) + success shouldBe true + expectAppIsRunning(appId, solName) + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + appId + } + + private def replaceProcessor( + appId: Int, + formerProcessorId: Int, + newTaskClass: String, + newProcessorDescription: String = "", + newParallelism: Int = 1): Unit = { + val uploadedJar = restClient.uploadJar(wordCountJar) + val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass, + newParallelism, newProcessorDescription, + jar = uploadedJar) + + // exercise + val success = restClient.replaceStreamingAppProcessor(appId, replaceMe) + success shouldBe true + } + + private def processorHasThroughput(appId: Int, processorId: Int, metrics: String): Unit = { + Util.retryUntil(() => { + val actual = restClient.queryStreamingAppMetrics(appId, current = false, + path = "processor" + processorId) + val throughput = actual.metrics.filter(_.value.name.endsWith(metrics)) + throughput.size should be > 0 + throughput.forall(_.value.asInstanceOf[Meter].count > 0L) + }, "new processor has message received") + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala new file mode 100644 index 0000000..27e4665 --- /dev/null +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/ExampleSpec.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.checklist + +import org.apache.log4j.Logger + +import org.apache.gearpump.integrationtest.{Docker, TestSpecBase, Util} +import org.apache.gearpump.streaming._ +import org.apache.gearpump.streaming.appmaster.ProcessorSummary + +/** + * The test spec will perform destructive operations to check the stability + */ +class ExampleSpec extends TestSpecBase { + + private val LOG = Logger.getLogger(getClass) + + "distributed shell" should { + "execute commands on machines where its executors are running" in { + val distShellJar = cluster.queryBuiltInExampleJars("distributedshell-").head + val mainClass = "org.apache.gearpump.examples.distributedshell.DistributedShell" + val clientClass = "org.apache.gearpump.examples.distributedshell.DistributedShellClient" + val appId = restClient.getNextAvailableAppId() + val success = restClient.submitApp(distShellJar, cluster.getWorkerHosts.length, mainClass) + success shouldBe true + expectAppIsRunning(appId, "DistributedShell") + val args = Array( + clientClass, + "-appid", appId.toString, + "-command", "hostname" + ) + + val expectedHostNames = cluster.getWorkerHosts.map(Docker.getHostName(_)) + + def verify(): Boolean = { + val workerNum = cluster.getWorkerHosts.length + val result = commandLineClient.submitAppAndCaptureOutput(distShellJar, + workerNum, args.mkString(" ")).split("\n"). + filterNot(line => line.startsWith("[INFO]") || line.isEmpty) + expectedHostNames.forall(result.contains) + } + + Util.retryUntil(() => verify(), + s"executors started on all expected hosts ${expectedHostNames.mkString(", ")}") + } + } + + "wordcount" should { + val wordCountJarNamePrefix = "wordcount-" + behave like streamingApplication(wordCountJarNamePrefix, wordCountName) + + "can submit immediately after killing a former one" in { + // setup + val formerAppId = restClient.getNextAvailableAppId() + val formerSubmissionSuccess = + restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + formerSubmissionSuccess shouldBe true + expectAppIsRunning(formerAppId, wordCountName) + Util.retryUntil(() => + restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app running") + restClient.killApp(formerAppId) + + // exercise + val appId = formerAppId + 1 + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + success shouldBe true + expectAppIsRunning(appId, wordCountName) + } + } + + "wordcount(java)" should { + val wordCountJavaJarNamePrefix = "wordcountjava-" + val wordCountJavaName = "wordcountJava" + behave like streamingApplication(wordCountJavaJarNamePrefix, wordCountJavaName) + } + + "sol" should { + val solJarNamePrefix = "sol-" + val solName = "sol" + behave like streamingApplication(solJarNamePrefix, solName) + } + + "complexdag" should { + val dynamicDagJarNamePrefix = "complexdag-" + val dynamicDagName = "dag" + behave like streamingApplication(dynamicDagJarNamePrefix, dynamicDagName) + } + + def streamingApplication(jarNamePrefix: String, appName: String): Unit = { + lazy val jar = cluster.queryBuiltInExampleJars(jarNamePrefix).head + + "can obtain application clock and the clock will keep changing" in { + // setup + val appId = restClient.getNextAvailableAppId() + val success = restClient.submitApp(jar, cluster.getWorkerHosts.length) + success shouldBe true + expectAppIsRunning(appId, appName) + + // exercise + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app submitted") + val formerClock = restClient.queryStreamingAppDetail(appId).clock + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > formerClock, + "app clock is advancing") + } + + "can change the parallelism and description of a processor" in { + // setup + val appId = restClient.getNextAvailableAppId() + val formerSubmissionSuccess = restClient.submitApp(jar, cluster.getWorkerHosts.length) + formerSubmissionSuccess shouldBe true + expectAppIsRunning(appId, appName) + val formerProcessors = restClient.queryStreamingAppDetail(appId).processors + val processor0 = formerProcessors.get(0).get + val expectedProcessorId = formerProcessors.size + val expectedParallelism = processor0.parallelism + 1 + val expectedDescription = processor0.description + "new" + val replaceMe = new ProcessorDescription(processor0.id, processor0.taskClass, + expectedParallelism, description = expectedDescription) + + // exercise + val success = restClient.replaceStreamingAppProcessor(appId, replaceMe) + success shouldBe true + var laterProcessors: Map[ProcessorId, ProcessorSummary] = null + Util.retryUntil(() => { + laterProcessors = restClient.queryStreamingAppDetail(appId).processors + laterProcessors.size == formerProcessors.size + 1 + }, "new process added") + val laterProcessor0 = laterProcessors.get(expectedProcessorId).get + laterProcessor0.parallelism shouldEqual expectedParallelism + laterProcessor0.description shouldEqual expectedDescription + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala new file mode 100644 index 0000000..bb9982a --- /dev/null +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/MessageDeliverySpec.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.integrationtest.checklist + +import org.apache.log4j.Logger + +import org.apache.gearpump.integrationtest.hadoop.HadoopCluster._ +import org.apache.gearpump.integrationtest.kafka.KafkaCluster._ +import org.apache.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader} +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} + +/** + * Checks message delivery consistency, like at-least-once, and exactly-once. + */ +class MessageDeliverySpec extends TestSpecBase { + + private val LOG = Logger.getLogger(getClass) + + override def beforeAll(): Unit = { + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + } + + "Gearpump" should { + "support exactly-once message delivery" in { + withKafkaCluster(cluster) { kafkaCluster => + // setup + val sourcePartitionNum = 1 + val sourceTopic = "topic1" + val sinkTopic = "topic2" + + // Generate number sequence (1, 2, 3, ...) to the topic + kafkaCluster.createTopic(sourceTopic, sourcePartitionNum) + + withDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) { producer => + + withHadoopCluster { hadoopCluster => + // exercise + val args = Array("org.apache.gearpump.streaming.examples.state.MessageCountApp", + "-defaultFS", hadoopCluster.getDefaultFS, + "-zookeeperConnect", kafkaCluster.getZookeeperConnectString, + "-brokerList", kafkaCluster.getBrokerListConnectString, + "-sourceTopic", sourceTopic, + "-sinkTopic", sinkTopic, + "-sourceTask", sourcePartitionNum).mkString(" ") + val appId = restClient.getNextAvailableAppId() + + val stateJar = cluster.queryBuiltInExampleJars("state-").head + val success = restClient.submitApp(stateJar, executorNum = 1, args = args) + success shouldBe true + + // verify #1 + expectAppIsRunning(appId, "MessageCount") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, + "app is running") + + // wait for checkpoint to take place + Thread.sleep(1000) + + LOG.info("Trigger message replay by kill and restart the executors") + val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max + restClient.killExecutor(appId, executorToKill) shouldBe true + Util.retryUntil(() => restClient.queryExecutorBrief(appId) + .map(_.executorId).max > executorToKill, s"executor $executorToKill killed") + + producer.stop() + val producedNumbers = producer.producedNumbers + LOG.info(s"In total, numbers in range[${producedNumbers.start}" + + s", ${producedNumbers.end - 1}] have been written to Kafka") + + // verify #3 + val kafkaSourceOffset = kafkaCluster.getLatestOffset(sourceTopic) + + assert(producedNumbers.size == kafkaSourceOffset, + "produced message should match Kafka queue size") + + LOG.info(s"The Kafka source topic $sourceTopic offset is " + kafkaSourceOffset) + + // The sink processor of this job (MessageCountApp) writes total message + // count to Kafka Sink periodically (once every checkpoint interval). + // The detector keep record of latest message count. + val detector = new ResultVerifier { + var latestMessageCount: Int = 0 + override def onNext(messageCount: Int): Unit = { + this.latestMessageCount = messageCount + } + } + + val kafkaReader = new SimpleKafkaReader(detector, sinkTopic, + host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort) + Util.retryUntil(() => { + kafkaReader.read() + LOG.info(s"Received message count: ${detector.latestMessageCount}, " + + s"expect: ${producedNumbers.size}") + detector.latestMessageCount == producedNumbers.size + }, "MessageCountApp calculated message count matches " + + "expected in case of message replay") + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala new file mode 100644 index 0000000..2f5bb64 --- /dev/null +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala @@ -0,0 +1,369 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.checklist + +import scala.concurrent.duration._ + +import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.cluster.master.MasterStatus +import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary} +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} + +/** + * The test spec checks REST service usage + */ +class RestServiceSpec extends TestSpecBase { + + "query system version" should { + "retrieve the current version number" in { + restClient.queryVersion() should not be empty + } + } + + "list applications" should { + "retrieve 0 application after cluster just started" in { + restClient.listRunningApps().length shouldEqual 0 + } + + "retrieve 1 application after the first application submission" in { + // exercise + val appId = restClient.getNextAvailableAppId() + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + success shouldBe true + expectAppIsRunning(appId, wordCountName) + restClient.listRunningApps().length shouldEqual 1 + } + } + + "submit application (wordcount)" should { + "find a running application after submission" in { + // exercise + val appId = restClient.getNextAvailableAppId() + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + success shouldBe true + expectAppIsRunning(appId, wordCountName) + } + + "reject a repeated submission request while the application is running" in { + // setup + val appId = restClient.getNextAvailableAppId() + val formerSubmissionSuccess = restClient.submitApp(wordCountJar, + cluster.getWorkerHosts.length) + formerSubmissionSuccess shouldBe true + expectAppIsRunning(appId, wordCountName) + + // exercise + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + success shouldBe false + } + + "reject an invalid submission (the jar file path is incorrect)" in { + // exercise + val success = restClient.submitApp(wordCountJar + ".missing", cluster.getWorkerHosts.length) + success shouldBe false + } + + "submit a wordcount application with 4 split and 3 sum processors and expect " + + "parallelism of processors match the given number" in { + // setup + val splitNum = 4 + val sumNum = 3 + val appId = restClient.getNextAvailableAppId() + + // exercise + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, + s"-split $splitNum -sum $sumNum") + success shouldBe true + expectAppIsRunning(appId, wordCountName) + val processors = restClient.queryStreamingAppDetail(appId).processors + processors.size shouldEqual 2 + val splitProcessor = processors.get(0).get + splitProcessor.parallelism shouldEqual splitNum + val sumProcessor = processors.get(1).get + sumProcessor.parallelism shouldEqual sumNum + } + + "can obtain application metrics and the metrics will keep changing" in { + // setup + val appId = restClient.getNextAvailableAppId() + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + success shouldBe true + expectAppIsRunning(appId, wordCountName) + + // exercise + expectMetricsAvailable( + restClient.queryStreamingAppMetrics(appId, current = true).metrics.nonEmpty, + "metrics available") + val actual = restClient.queryStreamingAppMetrics(appId, current = true) + actual.path shouldEqual s"app$appId.processor*" + actual.metrics.foreach(metric => { + metric.time should be > 0L + metric.value should not be null + }) + val formerMetricsDump = actual.metrics.toString() + + expectMetricsAvailable({ + val laterMetrics = restClient.queryStreamingAppMetrics(appId, current = true).metrics + laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump + }, "metrics available") + } + + "can obtain application corresponding executors' metrics and " + + "the metrics will keep changing" in { + // setup + val appId = restClient.getNextAvailableAppId() + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + success shouldBe true + expectAppIsRunning(appId, wordCountName) + + // exercise + expectMetricsAvailable( + restClient.queryExecutorMetrics(appId, current = true).metrics.nonEmpty, + "metrics available") + val actual = restClient.queryExecutorMetrics(appId, current = true) + actual.path shouldEqual s"app$appId.executor*" + actual.metrics.foreach(metric => { + metric.time should be > 0L + metric.value should not be null + }) + val formerMetricsDump = actual.metrics.toString() + + expectMetricsAvailable({ + val laterMetrics = restClient.queryExecutorMetrics(appId, current = true).metrics + laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump + }, "metrics available") + } + } + + "kill application" should { + "a running application should be killed" in { + // setup + val appId = restClient.getNextAvailableAppId() + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + success shouldBe true + expectAppIsRunning(appId, wordCountName) + + // exercise + killAppAndVerify(appId) + } + + "should fail when attempting to kill a stopped application" in { + // setup + val appId = restClient.getNextAvailableAppId() + val submissionSucess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + submissionSucess shouldBe true + expectAppIsRunning(appId, wordCountName) + killAppAndVerify(appId) + + // exercise + val success = restClient.killApp(appId) + success shouldBe false + } + + "should fail when attempting to kill a non-exist application" in { + // setup + val freeAppId = restClient.listApps().length + 1 + + // exercise + val success = restClient.killApp(freeAppId) + success shouldBe false + } + } + + "cluster information" should { + "retrieve 1 master for a non-HA cluster" in { + // exercise + val masterSummary = restClient.queryMaster() + masterSummary.cluster.map(_.toTuple) shouldEqual cluster.getMastersAddresses + masterSummary.aliveFor should be > 0L + masterSummary.masterStatus shouldEqual MasterStatus.Synced + } + + "retrieve the same number of workers as cluster has" in { + // setup + val expectedWorkersCount = cluster.getWorkerHosts.size + + // exercise + var runningWorkers: Array[WorkerSummary] = Array.empty + Util.retryUntil(() => { + runningWorkers = restClient.listRunningWorkers() + runningWorkers.length == expectedWorkersCount + }, "all workers running") + runningWorkers.foreach { worker => + worker.state shouldEqual MasterToAppMaster.AppMasterActive + } + } + + "find a newly added worker instance" in { + // setup + restartClusterRequired = true + val formerWorkersCount = cluster.getWorkerHosts.length + Util.retryUntil(() => restClient.listRunningWorkers().length == formerWorkersCount, + "all workers running") + val workerName = "newWorker" + + // exercise + cluster.addWorkerNode(workerName) + Util.retryUntil(() => restClient.listRunningWorkers().length > formerWorkersCount, + "new worker added") + cluster.getWorkerHosts.length shouldEqual formerWorkersCount + 1 + restClient.listRunningWorkers().length shouldEqual formerWorkersCount + 1 + } + + "retrieve 0 worker, if cluster is started without any workers" in { + // setup + restartClusterRequired = true + cluster.shutDown() + + // exercise + cluster.start(workerNum = 0) + cluster.getWorkerHosts.length shouldEqual 0 + restClient.listRunningWorkers().length shouldEqual 0 + } + + "can obtain master's metrics and the metrics will keep changing" in { + // exercise + expectMetricsAvailable( + restClient.queryMasterMetrics(current = true).metrics.nonEmpty, "metrics available") + val actual = restClient.queryMasterMetrics(current = true) + actual.path shouldEqual s"master" + actual.metrics.foreach(metric => { + metric.time should be > 0L + metric.value should not be null + }) + val formerMetricsDump = actual.metrics.toString() + + expectMetricsAvailable({ + val laterMetrics = restClient.queryMasterMetrics(current = true).metrics + laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump + }, "metrics available") + } + + "can obtain workers' metrics and the metrics will keep changing" in { + // exercise + restClient.listRunningWorkers().foreach { worker => + val workerId = worker.workerId + expectMetricsAvailable( + restClient.queryWorkerMetrics(workerId, current = true).metrics.nonEmpty, + "metrics available") + val actual = restClient.queryWorkerMetrics(workerId, current = true) + actual.path shouldEqual s"worker${WorkerId.render(workerId)}" + actual.metrics.foreach(metric => { + metric.time should be > 0L + metric.value should not be null + }) + val formerMetricsDump = actual.metrics.toString() + + expectMetricsAvailable({ + val laterMetrics = restClient.queryWorkerMetrics(workerId, current = true).metrics + laterMetrics.nonEmpty && laterMetrics.toString() != formerMetricsDump + }, "metrics available") + } + } + } + + "configuration" should { + "retrieve the configuration of master and match particular values" in { + // exercise + val actual = restClient.queryMasterConfig() + actual.hasPath("gearpump") shouldBe true + actual.hasPath("gearpump.cluster") shouldBe true + actual.getString("gearpump.hostname") shouldEqual cluster.getMasterHosts.mkString(",") + } + + "retrieve the configuration of worker X and match particular values" in { + // exercise + restClient.listRunningWorkers().foreach { worker => + val actual = restClient.queryWorkerConfig(worker.workerId) + actual.hasPath("gearpump") shouldBe true + actual.hasPath("gearpump.worker") shouldBe true + } + } + + "retrieve the configuration of executor X and match particular values" in { + // setup + val appId = restClient.getNextAvailableAppId() + + // exercise + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + success shouldBe true + restClient.queryExecutorBrief(appId).foreach { executor => + val executorId = executor.executorId + val actual = restClient.queryExecutorConfig(appId, executorId) + actual.hasPath("gearpump") shouldBe true + actual.hasPath("gearpump.executor") shouldBe true + actual.getInt("gearpump.applicationId") shouldEqual appId + actual.getInt("gearpump.executorId") shouldEqual executorId + } + } + + "retrieve the configuration of application X and match particular values" in { + // setup + val appId = restClient.getNextAvailableAppId() + + // exercise + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + success shouldBe true + val actual = restClient.queryAppMasterConfig(appId) + actual.hasPath("gearpump") shouldBe true + actual.hasPath("gearpump.appmaster") shouldBe true + } + } + + "application life-cycle" should { + "newly started application should be configured same as the previous one, after restart" in { + // setup + val originSplitNum = 4 + val originSumNum = 3 + val originAppId = restClient.getNextAvailableAppId() + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, + s"-split $originSplitNum -sum $originSumNum") + success shouldBe true + expectAppIsRunning(originAppId, wordCountName) + val originAppDetail = restClient.queryStreamingAppDetail(originAppId) + + // exercise + Util.retryUntil(() => restClient.restartApp(originAppId), "app restarted") + val killedApp = restClient.queryApp(originAppId) + killedApp.appId shouldEqual originAppId + killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive + val runningApps = restClient.listRunningApps() + runningApps.length shouldEqual 1 + val newAppDetail = restClient.queryStreamingAppDetail(runningApps.head.appId) + newAppDetail.appName shouldEqual originAppDetail.appName + newAppDetail.processors.size shouldEqual originAppDetail.processors.size + newAppDetail.processors.get(0).get.parallelism shouldEqual originSplitNum + newAppDetail.processors.get(1).get.parallelism shouldEqual originSumNum + } + } + + private def killAppAndVerify(appId: Int): Unit = { + val success = restClient.killApp(appId) + success shouldBe true + + val actualApp = restClient.queryApp(appId) + actualApp.appId shouldEqual appId + actualApp.status shouldEqual MasterToAppMaster.AppMasterInActive + } + + private def expectMetricsAvailable(condition: => Boolean, conditionDescription: String): Unit = { + val config = restClient.queryMasterConfig() + val reportInterval = Duration(config.getString("gearpump.metrics.report-interval-ms") + "ms") + Util.retryUntil(() => condition, conditionDescription, interval = reportInterval) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala new file mode 100644 index 0000000..4b15055 --- /dev/null +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.checklist + +import scala.concurrent.duration.Duration + +import org.apache.gearpump.cluster.MasterToAppMaster +import org.apache.gearpump.cluster.worker.WorkerId +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} +import org.apache.gearpump.util.{Constants, LogUtil} + +/** + * The test spec will perform destructive operations to check the stability. Operations + * contains shutting-down appmaster, executor, or worker, and etc.. + */ +class StabilitySpec extends TestSpecBase { + + private val LOG = LogUtil.getLogger(getClass) + + "kill appmaster" should { + "restart the whole application" in { + // setup + val appId = commandLineClient.submitApp(wordCountJar) + val formerAppMaster = restClient.queryApp(appId).appMasterPath + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + ensureClockStoredInMaster() + + // exercise + restClient.killAppMaster(appId) shouldBe true + // todo: how long master will begin to recover and how much time for the recovering? + Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster, + "appmaster killed and restarted") + + // verify + val laterAppMaster = restClient.queryStreamingAppDetail(appId) + laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive + laterAppMaster.clock should be > 0L + } + } + + "kill executor" should { + "will create a new executor and application will replay from the latest application clock" in { + // setup + val appId = commandLineClient.submitApp(wordCountJar) + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max + ensureClockStoredInMaster() + + // exercise + restClient.killExecutor(appId, executorToKill) shouldBe true + // todo: how long appmaster will begin to recover and how much time for the recovering? + Util.retryUntil(() => restClient.queryExecutorBrief(appId) + .map(_.executorId).max > executorToKill, + s"executor $executorToKill killed and restarted") + + // verify + val laterAppMaster = restClient.queryStreamingAppDetail(appId) + laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive + laterAppMaster.clock should be > 0L + } + } + + private def hostName(workerId: WorkerId): String = { + val worker = restClient.listRunningWorkers().find(_.workerId == workerId) + // Parse hostname from JVM info (in format: PID@hostname) + val hostname = worker.get.jvmName.split("@")(1) + hostname + } + + "kill worker" should { + "worker will not recover but all its executors will be migrated to other workers" in { + // setup + restartClusterRequired = true + val appId = commandLineClient.submitApp(wordCountJar) + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + + val allexecutors = restClient.queryExecutorBrief(appId) + val maxExecutor = allexecutors.sortBy(_.executorId).last + ensureClockStoredInMaster() + + val appMaster = allexecutors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) + + LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, " + + s"worker: ${maxExecutor.workerId}") + val executorsSharingSameWorker = allexecutors + .filter(_.workerId == maxExecutor.workerId).map(_.executorId) + LOG.info(s"These executors sharing the same worker Id ${maxExecutor.workerId}," + + s" ${executorsSharingSameWorker.mkString(",")}") + + // kill the worker and expect restarting all killed executors on other workers. + val workerIdToKill = maxExecutor.workerId + cluster.removeWorkerNode(hostName(workerIdToKill)) + + val appMasterKilled = executorsSharingSameWorker + .exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) + + def executorsMigrated(): Boolean = { + val executors = restClient.queryExecutorBrief(appId) + val newAppMaster = executors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) + + if (appMasterKilled) { + newAppMaster.get.workerId != appMaster.get.workerId + } else { + // New executors will be started to replace killed executors. + // The new executors will be assigned larger executor Id. We use this trick to detect + // Whether new executors have been started successfully. + executors.map(_.executorId).max > maxExecutor.executorId + } + } + + Util.retryUntil(() => { + executorsMigrated() + }, s"new executor created with id > ${maxExecutor.executorId} when worker is killed") + + // verify + val laterAppMaster = restClient.queryStreamingAppDetail(appId) + laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive + laterAppMaster.clock should be > 0L + } + } + + "kill master" should { + "master will be down and all workers will attempt to reconnect and suicide after X seconds" in { + // setup + restartClusterRequired = true + val masters = cluster.getMasterHosts + val config = restClient.queryMasterConfig() + val shutDownTimeout = Duration(config.getString("akka.cluster.auto-down-unreachable-after")) + + // exercise + masters.foreach(cluster.removeMasterNode) + info(s"will sleep ${shutDownTimeout.toSeconds}s and then check workers are down") + Thread.sleep(shutDownTimeout.toMillis) + + // verify + val aliveWorkers = cluster.getWorkerHosts + Util.retryUntil(() => aliveWorkers.forall(worker => !cluster.nodeIsOnline(worker)), + "all workers down") + } + } + + private def ensureClockStoredInMaster(): Unit = { + // TODO: 5000ms is a fixed sync period in clock service. + // we wait for 5000ms to assume the clock is stored + Thread.sleep(5000) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala new file mode 100644 index 0000000..b327bf4 --- /dev/null +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.checklist + +import org.apache.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, SimpleKafkaReader} +import org.apache.gearpump.integrationtest.storm.StormClient +import org.apache.gearpump.integrationtest.{TestSpecBase, Util} + +/** + * The test spec checks the compatibility of running Storm applications + */ +class StormCompatibilitySpec extends TestSpecBase { + + private lazy val stormClient = { + new StormClient(cluster, restClient) + } + + val `version0.9` = "09" + val `version0.10` = "010" + + override def beforeAll(): Unit = { + super.beforeAll() + stormClient.start() + } + + override def afterAll(): Unit = { + stormClient.shutDown() + super.afterAll() + } + + def withStorm(testCode: String => Unit): Unit = { + testCode(`version0.9`) + testCode(`version0.10`) + } + + def getTopologyName(name: String, stormVersion: String): String = { + s"${name}_$stormVersion" + } + + def getStormJar(stormVersion: String): String = { + cluster.queryBuiltInITJars(s"storm$stormVersion-").head + } + + "Storm over Gearpump" should withStorm { + stormVersion => + s"support basic topologies ($stormVersion)" in { + val stormJar = getStormJar(stormVersion) + val topologyName = getTopologyName("exclamation", stormVersion) + + // exercise + val appId = stormClient.submitStormApp( + jar = stormJar, + mainClass = "storm.starter.ExclamationTopology", + args = topologyName, + appName = topologyName) + + // verify + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + } + + s"support to run a python version of wordcount ($stormVersion)" in { + val stormJar = getStormJar(stormVersion) + val topologyName = getTopologyName("wordcount", stormVersion) + + // exercise + val appId = stormClient.submitStormApp( + jar = stormJar, + mainClass = "storm.starter.WordCountTopology", + args = topologyName, + appName = topologyName) + + // verify + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + } + + s"support DRPC ($stormVersion)" in { + // ReachTopology computes the Twitter url reached by users and their followers + // using Storm Distributed RPC feature + // input (user and follower) data are already prepared in memory + val stormJar = getStormJar(stormVersion) + val topologyName = getTopologyName("reach", stormVersion) + stormClient.submitStormApp( + jar = stormJar, + mainClass = "storm.starter.ReachTopology", + args = topologyName, + appName = topologyName) + val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway) + + // verify + Util.retryUntil(() => { + drpcClient.execute("reach", "notaurl.com") == "0" + }, "drpc reach == 0") + drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16" + drpcClient.execute("reach", "engineering.twitter.com/blog/5") shouldEqual "14" + } + + s"support tick tuple ($stormVersion)" in { + val stormJar = getStormJar(stormVersion) + val topologyName = getTopologyName("slidingWindowCounts", stormVersion) + + // exercise + val appId = stormClient.submitStormApp( + jar = stormJar, + mainClass = "storm.starter.RollingTopWords", + args = s"$topologyName remote", + appName = topologyName) + + // verify + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + } + + s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in { + + val stormJar = getStormJar(stormVersion) + val topologyName = getTopologyName("storm_kafka", stormVersion) + val stormKafkaTopology = + s"org.apache.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology" + + import org.apache.gearpump.integrationtest.kafka.KafkaCluster._ + withKafkaCluster(cluster) { + kafkaCluster => + val sourcePartitionNum = 2 + val sinkPartitionNum = 1 + val zookeeper = kafkaCluster.getZookeeperConnectString + val brokerList = kafkaCluster.getBrokerListConnectString + val sourceTopic = "topic1" + val sinkTopic = "topic2" + + val args = Array("-topologyName", topologyName, "-sourceTopic", sourceTopic, + "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, "-brokerList", brokerList, + "-spoutNum", s"$sourcePartitionNum", "-boltNum", s"$sinkPartitionNum" + ) + + kafkaCluster.createTopic(sourceTopic, sourcePartitionNum) + + // generate number sequence (1, 2, 3, ...) to the topic + withDataProducer(sourceTopic, brokerList) { producer => + + val appId = stormClient.submitStormApp( + jar = stormJar, + mainClass = stormKafkaTopology, + args = args.mkString(" "), + appName = topologyName) + + Util.retryUntil(() => + restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + + // kill executor and verify at-least-once is guaranteed on application restart + val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max + restClient.killExecutor(appId, executorToKill) shouldBe true + Util.retryUntil(() => + restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill, + s"executor $executorToKill killed") + + // verify no message loss + val detector = new + MessageLossDetector(producer.lastWriteNum) + val kafkaReader = new + SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost, + port = kafkaCluster.advertisedPort) + + Util.retryUntil(() => { + kafkaReader.read() + detector.allReceived + }, "all kafka message read") + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala new file mode 100644 index 0000000..7942308 --- /dev/null +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/suites/StandaloneModeSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.integrationtest.suites + +import org.scalatest._ + +import org.apache.gearpump.integrationtest.MiniClusterProvider +import org.apache.gearpump.integrationtest.checklist._ +import org.apache.gearpump.integrationtest.minicluster.MiniCluster + +/** + * Launch a Gearpump cluster in standalone mode and run all test specs. To test a specific + * test spec, you need to comment out other lines. + */ +class StandaloneModeSuite extends Suites( + new CommandLineSpec, + new RestServiceSpec, + new ExampleSpec, + new DynamicDagSpec, + new StormCompatibilitySpec, + new StabilitySpec, + new ConnectorKafkaSpec, + new MessageDeliverySpec +) with BeforeAndAfterAll { + + override def beforeAll(): Unit = { + super.beforeAll() + MiniClusterProvider.managed = true + MiniClusterProvider.set(new MiniCluster).start() + } + + override def afterAll(): Unit = { + MiniClusterProvider.get.shutDown() + super.afterAll() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala deleted file mode 100644 index f315ad3..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala +++ /dev/null @@ -1,211 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest - -import org.apache.log4j.Logger - -/** - * The class is used to execute Docker commands. - */ -object Docker { - - private val LOG = Logger.getLogger(getClass) - - /** - * @throws RuntimeException in case retval != 0 - */ - private def doExecute(container: String, command: String): String = { - ShellExec.execAndCaptureOutput(s"docker exec $container $command", s"EXEC $container") - } - - private def doExecuteSilently(container: String, command: String): Boolean = { - ShellExec.exec(s"docker exec $container $command", s"EXEC $container") - } - - /** - * @throws RuntimeException in case retval != 0 - */ - final def execute(container: String, command: String): String = { - trace(container, s"Execute $command") { - doExecute(container, command) - } - } - - final def executeSilently(container: String, command: String): Boolean = { - trace(container, s"Execute silently $command") { - doExecuteSilently(container, command) - } - } - - final def listContainers(): Seq[String] = { - trace("", s"Listing how many containers...") { - ShellExec.execAndCaptureOutput("docker ps -q -a", "LIST") - .split("\n").filter(_.nonEmpty) - } - } - - final def containerIsRunning(name: String): Boolean = { - trace(name, s"Check container running or not...") { - ShellExec.execAndCaptureOutput(s"docker ps -q --filter name=$name", s"FIND $name").nonEmpty - } - } - - final def getContainerIPAddr(name: String): String = { - trace(name, s"Get Ip Address") { - Docker.inspect(name, "--format={{.NetworkSettings.IPAddress}}") - } - } - - final def containerExists(name: String): Boolean = { - trace(name, s"Check container existing or not...") { - ShellExec.execAndCaptureOutput(s"docker ps -q -a --filter name=$name", s"FIND $name").nonEmpty - } - } - - /** - * @throws RuntimeException in case particular container is created already - */ - final def createAndStartContainer(name: String, image: String, command: String, - environ: Map[String, String] = Map.empty, // key, value - volumes: Map[String, String] = Map.empty, // from, to - knownHosts: Set[String] = Set.empty, - tunnelPorts: Set[Int] = Set.empty): String = { - - if (containerExists(name)) { - killAndRemoveContainer(name) - } - - trace(name, s"Create and start $name ($image)...") { - - val optsBuilder = new StringBuilder - optsBuilder.append("-d") // run in background - optsBuilder.append(" -h " + name) // use container name as hostname - optsBuilder.append(" -v /etc/localtime:/etc/localtime:ro") // synchronize timezone settings - - environ.foreach { case (key, value) => - optsBuilder.append(s" -e $key=$value") - } - volumes.foreach { case (from, to) => - optsBuilder.append(s" -v $from:$to") - } - knownHosts.foreach(host => - optsBuilder.append(" --link " + host) - ) - tunnelPorts.foreach(port => - optsBuilder.append(s" -p $port:$port") - ) - createAndStartContainer(name, optsBuilder.toString(), command, image) - } - } - - /** - * @throws RuntimeException in case particular container is created already - */ - private def createAndStartContainer( - name: String, options: String, command: String, image: String): String = { - ShellExec.execAndCaptureOutput(s"docker run $options " + - s"--name $name $image $command", s"MAKE $name") - } - - final def killAndRemoveContainer(name: String): Boolean = { - trace(name, s"kill and remove container") { - ShellExec.exec(s"docker rm -f $name", s"STOP $name") - } - } - - final def killAndRemoveContainer(names: Array[String]): Boolean = { - assert(names.length > 0) - val args = names.mkString(" ") - trace(names.mkString(","), s"kill and remove containers") { - ShellExec.exec(s"docker rm -f $args", s"STOP $args.") - } - } - - private def inspect(container: String, option: String): String = { - ShellExec.execAndCaptureOutput(s"docker inspect $option $container", s"EXEC $container") - } - - final def curl(container: String, url: String, options: Array[String] = Array.empty[String]) - : String = { - trace(container, s"curl $url") { - doExecute(container, s"curl -s ${options.mkString(" ")} $url") - } - } - - final def getHostName(container: String): String = { - trace(container, s"Get hostname of container...") { - doExecute(container, "hostname") - } - } - - final def getNetworkGateway(container: String): String = { - trace(container, s"Get gateway of container...") { - doExecute(container, "ip route").split("\\s+")(2) - } - } - final def killProcess(container: String, pid: Int, signal: String = "SIGKILL"): Boolean = { - trace(container, s"Kill process pid: $pid") { - doExecuteSilently(container, s"kill -$signal $pid") - } - } - - final def findJars(container: String, folder: String): Array[String] = { - trace(container, s"Find jars under $folder") { - doExecute(container, s"find $folder") - .split("\n").filter(_.endsWith(".jar")) - } - } - - private def trace[T](container: String, msg: String)(fun: => T): T = { - // scalastyle:off println - Console.println() // A empty line to let the output looks better. - // scalastyle:on println - LOG.debug(s"Container $container====>> $msg") - LOG.debug("INPUT==>>") - val response = fun - LOG.debug("<<==OUTPUT") - - LOG.debug(brief(response)) - - LOG.debug(s"<<====Command END. Container $container, $msg \n") - response - } - - private val PREVIEW_MAX_LENGTH = 1024 - - private def brief[T](input: T): String = { - val output = input match { - case true => - "Success|True" - case false => - "Failure|False" - case x: Array[Any] => - "Success: [" + x.mkString(",") + "]" - case x => - x.toString - } - - val preview = if (output.length > PREVIEW_MAX_LENGTH) { - output.substring(0, PREVIEW_MAX_LENGTH) + "..." - } - else { - output - } - preview - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala deleted file mode 100644 index 25d7ee3..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest - -import scala.collection.JavaConverters._ -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent._ -import scala.concurrent.duration._ -import scala.sys.process._ - -import org.apache.log4j.Logger -import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer - -/** - * The class is used to execute command in a shell - */ -object ShellExec { - - private val LOG = Logger.getLogger(getClass) - private val PROCESS_TIMEOUT = 2.minutes - - /** - * The builtin command line parser by ProcessBuilder (implicit sys.process) don't - * respect the quote chars (' and ") - */ - private def splitQuotedString(str: String): List[String] = { - val splitter = new QuotedStringTokenizer(str, " \t\n\r") - splitter.asInstanceOf[java.util.Enumeration[String]].asScala.toList - } - - def exec(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT): Boolean = { - LOG.debug(s"$sender => `$command`") - - val p = splitQuotedString(command).run() - val f = Future(blocking(p.exitValue())) // wrap in Future - val retval = { - try { - Await.result(f, timeout) - } catch { - case _: TimeoutException => - LOG.error(s"timeout to execute command `$command`") - p.destroy() - p.exitValue() - } - } - LOG.debug(s"$sender <= exit $retval") - retval == 0 - } - - def execAndCaptureOutput(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT) - : String = { - LOG.debug(s"$sender => `$command`") - - val buf = new StringBuilder - val processLogger = ProcessLogger((o: String) => buf.append(o).append("\n"), - (e: String) => buf.append(e).append("\n")) - val p = splitQuotedString(command).run(processLogger) - val f = Future(blocking(p.exitValue())) // wrap in Future - val retval = { - try { - Await.result(f, timeout) - } catch { - case _: TimeoutException => - p.destroy() - p.exitValue() - } - } - val output = buf.toString().trim - val PREVIEW_MAX_LENGTH = 200 - val preview = if (output.length > PREVIEW_MAX_LENGTH) { - output.substring(0, PREVIEW_MAX_LENGTH) + "..." - } else { - output - } - - LOG.debug(s"$sender <= `$preview` exit $retval") - if (retval != 0) { - throw new RuntimeException( - s"exited ($retval) by executing `$command`") - } - output - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala deleted file mode 100644 index 7e7085d..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest - -import scala.concurrent.duration._ -import scala.util.{Failure, Success, Try} - -import org.apache.log4j.Logger - -object Util { - - private val LOG = Logger.getLogger(getClass) - - def encodeUriComponent(s: String): String = { - try { - java.net.URLEncoder.encode(s, "UTF-8") - .replaceAll("\\+", "%20") - .replaceAll("\\%21", "!") - .replaceAll("\\%27", "'") - .replaceAll("\\%28", "(") - .replaceAll("\\%29", ")") - .replaceAll("\\%7E", "~") - } catch { - case ex: Throwable => s - } - } - - def retryUntil( - condition: () => Boolean, conditionDescription: String, maxTries: Int = 15, - interval: Duration = 10.seconds): Unit = { - var met = false - var tries = 0 - - while (!met && tries < maxTries) { - - met = Try(condition()) match { - case Success(true) => true - case Success(false) => false - case Failure(ex) => false - } - - tries += 1 - - if (!met) { - LOG.error(s"Failed due to (false == $conditionDescription), " + - s"retrying for the ${tries} times...") - Thread.sleep(interval.toMillis) - } else { - LOG.info(s"Success ($conditionDescription) after ${tries} retries") - } - } - - if (!met) { - throw new Exception(s"Failed after ${tries} retries, ($conditionDescription) == false") - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala deleted file mode 100644 index f836abd..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.integrationtest.hadoop - -import org.apache.log4j.Logger - -import org.apache.gearpump.integrationtest.{Docker, Util} - -object HadoopCluster { - - /** Starts a Hadoop cluster */ - def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = { - val hadoopCluster = new HadoopCluster - try { - hadoopCluster.start() - testCode(hadoopCluster) - } finally { - hadoopCluster.shutDown() - } - } -} -/** - * This class maintains a single node Hadoop cluster - */ -class HadoopCluster { - - private val LOG = Logger.getLogger(getClass) - private val HADOOP_DOCKER_IMAGE = "sequenceiq/hadoop-docker:2.6.0" - private val HADOOP_HOST = "hadoop0" - - def start(): Unit = { - Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "") - - Util.retryUntil(() => isAlive, "Hadoop cluster is alive") - LOG.info("Hadoop cluster is started.") - } - - // Checks whether the cluster is alive by listing "/" - private def isAlive: Boolean = { - Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls /") - } - - def getDefaultFS: String = { - val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST) - s"hdfs://$hostIPAddr:9000" - } - - def shutDown(): Unit = { - Docker.killAndRemoveContainer(HADOOP_HOST) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala deleted file mode 100644 index 15ba084..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.kafka - -import org.apache.log4j.Logger - -import org.apache.gearpump.integrationtest.minicluster.MiniCluster -import org.apache.gearpump.integrationtest.{Docker, Util} - -object KafkaCluster { - - /** Starts a Kafka cluster */ - def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): Unit = { - val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka") - try { - kafkaCluster.start() - testCode(kafkaCluster) - } finally { - kafkaCluster.shutDown() - } - } - - def withDataProducer(topic: String, brokerList: String) - (testCode: NumericalDataProducer => Unit): Unit = { - val producer = new NumericalDataProducer(topic, brokerList) - try { - producer.start() - testCode(producer) - } finally { - producer.stop() - } - } -} - -/** - * This class maintains a single node Kafka cluster with integrated Zookeeper. - */ -class KafkaCluster(val advertisedHost: String, zkChroot: String = "") { - - private val LOG = Logger.getLogger(getClass) - private val KAFKA_DOCKER_IMAGE = "spotify/kafka" - private val KAFKA_HOST = "kafka0" - private val KAFKA_HOME = "/opt/kafka_2.11-0.8.2.1/" - private val ZOOKEEPER_PORT = 2181 - private val BROKER_PORT = 9092 - val advertisedPort = BROKER_PORT - - def start(): Unit = { - Docker.createAndStartContainer(KAFKA_HOST, KAFKA_DOCKER_IMAGE, "", - environ = Map( - "ADVERTISED_HOST" -> advertisedHost, - "ADVERTISED_PORT" -> BROKER_PORT.toString, - "ZK_CHROOT" -> zkChroot), - tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT) - ) - Util.retryUntil(() => isAlive, "kafka cluster is alive") - LOG.debug("kafka cluster is started.") - } - - def isAlive: Boolean = { - !listTopics().contains("Connection refused") - } - - def shutDown(): Unit = { - Docker.killAndRemoveContainer(KAFKA_HOST) - } - - private lazy val hostIPAddr = Docker.getContainerIPAddr(KAFKA_HOST) - - def listTopics(): String = { - kafkaListTopics(KAFKA_HOST, KAFKA_HOME, getZookeeperConnectString) - } - - def getZookeeperConnectString: String = { - s"$hostIPAddr:$ZOOKEEPER_PORT/$zkChroot" - } - - def getBrokerListConnectString: String = { - s"$hostIPAddr:$BROKER_PORT" - } - - def createTopic(topic: String, partitions: Int = 1): Unit = { - LOG.debug(s"|=> Create kafka topic $topic with $partitions partitions") - - Docker.executeSilently(KAFKA_HOST, - s"$KAFKA_HOME/bin/kafka-topics.sh" + - s" --zookeeper $getZookeeperConnectString" + - s" --create --topic $topic --partitions $partitions --replication-factor 1") - } - - def produceDataToKafka(topic: String, messageNum: Int): Unit = { - Docker.executeSilently(KAFKA_HOST, - s"$KAFKA_HOME/bin/kafka-topics.sh" + - s" --zookeeper $getZookeeperConnectString" + - s" --create --topic $topic --partitions 1 --replication-factor 1") - - Docker.executeSilently(KAFKA_HOST, - s"$KAFKA_HOME/bin/kafka-producer-perf-test.sh" + - s" --broker-list $getBrokerListConnectString" + - s" --topic $topic --messages $messageNum") - } - - def getLatestOffset(topic: String): Int = { - kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, getBrokerListConnectString) - } - - private def kafkaListTopics( - container: String, kafkaHome: String, zookeeperConnectionString: String): String = { - - LOG.debug(s"|=> Kafka list topics...") - Docker.execute(container, - s"$kafkaHome/bin/kafka-topics.sh" + - s" --zookeeper $zookeeperConnectionString -list") - } - - private def kafkaFetchLatestOffset( - container: String, topic: String, kafkaHome: String, brokersList: String): Int = { - LOG.debug(s"|=> Get latest offset of topic $topic...") - val output = Docker.execute(container, - s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" + - s" --broker-list $brokersList " + - s" --topic $topic --time -1") - output.split(":")(2).toInt - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala deleted file mode 100644 index 1cf3125..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.kafka - -import java.util.Properties - -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.apache.log4j.Logger - -import org.apache.gearpump.streaming.serializer.ChillSerializer - -class NumericalDataProducer(topic: String, bootstrapServers: String) { - - private val LOG = Logger.getLogger(getClass) - private val producer = createProducer - private val WRITE_SLEEP_NANOS = 10 - private val serializer = new ChillSerializer[Int] - var lastWriteNum = 0 - - def start(): Unit = { - produceThread.start() - } - - def stop(): Unit = { - if (produceThread.isAlive) { - produceThread.interrupt() - produceThread.join() - } - producer.close() - } - - /** How many message we have written in total */ - def producedNumbers: Range = { - Range(1, lastWriteNum + 1) - } - - private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = { - val properties = new Properties() - properties.setProperty("bootstrap.servers", bootstrapServers) - new KafkaProducer[Array[Byte], Array[Byte]](properties, - new ByteArraySerializer, new ByteArraySerializer) - } - - private val produceThread = new Thread(new Runnable { - override def run(): Unit = { - try { - while (!Thread.currentThread.isInterrupted) { - lastWriteNum += 1 - val msg = serializer.serialize(lastWriteNum) - val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, msg) - producer.send(record) - Thread.sleep(0, WRITE_SLEEP_NANOS) - } - } catch { - case ex: InterruptedException => - LOG.error("message producing is stopped by an interrupt") - } - } - }) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala deleted file mode 100644 index 1f773d3..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.kafka - -import scala.collection.mutable - -trait ResultVerifier { - def onNext(num: Int): Unit -} - -class MessageLossDetector(totalNum: Int) extends ResultVerifier { - private val bitSets = new mutable.BitSet(totalNum) - var result = List.empty[Int] - - override def onNext(num: Int): Unit = { - bitSets.add(num) - result :+= num - } - - def allReceived: Boolean = { - 1.to(totalNum).forall(bitSets) - } - - def received(num: Int): Boolean = { - bitSets(num) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e9ea6e2f/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala deleted file mode 100644 index 392ca86..0000000 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.integrationtest.kafka - -import scala.util.{Failure, Success} - -import kafka.api.FetchRequestBuilder -import kafka.consumer.SimpleConsumer -import kafka.utils.Utils - -import org.apache.gearpump.streaming.serializer.ChillSerializer - -class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int = 0, - host: String, port: Int) { - - private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "") - private val serializer = new ChillSerializer[Int] - private var offset = 0L - - def read(): Unit = { - val messageSet = consumer.fetch( - new FetchRequestBuilder().addFetch(topic, partition, offset, Int.MaxValue).build() - ).messageSet(topic, partition) - - for (messageAndOffset <- messageSet) { - serializer.deserialize(Utils.readBytes(messageAndOffset.message.payload)) match { - case Success(msg) => - offset = messageAndOffset.nextOffset - verifier.onNext(msg) - case Failure(e) => throw e - } - } - } -} \ No newline at end of file
