http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala index a898adf..46067ab 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -39,5 +39,4 @@ class KafkaDefaultGrouperSpec extends PropSpec with PropertyChecks with Matchers } } } - }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala index 2ee7260..ad315fe 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,7 +23,7 @@ import java.util.Properties import kafka.admin.AdminUtils import kafka.common.KafkaException import kafka.server.{KafkaConfig => KafkaServerConfig, KafkaServer} -import kafka.utils.{Utils, TestUtils} +import kafka.utils.{TestUtils, Utils} trait KafkaServerHarness extends ZookeeperHarness { val configs: List[KafkaServerConfig] @@ -35,8 +35,9 @@ trait KafkaServerHarness extends ZookeeperHarness { override def setUp() { super.setUp - if (configs.size <= 0) + if (configs.size <= 0) { throw new KafkaException("Must supply at least one server config.") + } brokerList = TestUtils.getBrokerListStrFromConfigs(configs) servers = configs.map(TestUtils.createServer(_)) } @@ -47,12 +48,14 @@ trait KafkaServerHarness extends ZookeeperHarness { super.tearDown } - def createTopicUntilLeaderIsElected(topic: String, partitions: Int, replicas: Int, timeout: Long = 10000) = { + def createTopicUntilLeaderIsElected( + topic: String, partitions: Int, replicas: Int, timeout: Long = 10000) + : Map[Int, Option[Int]] = { val zkClient = connectZk() try { - // create topic + // Creates topic AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties) - // wait until the update metadata request for new topic reaches all servers + // Waits until the update metadata request for new topic reaches all servers (0 until partitions).map { case i => TestUtils.waitUntilMetadataIsPropagated(servers, topic, i, timeout) i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i, timeout) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala index a2b25be..45fe760 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,7 +18,7 @@ package io.gearpump.streaming.kafka.util -import kafka.utils.{ZKStringSerializer, Utils, TestZKUtils} +import kafka.utils.{TestZKUtils, Utils, ZKStringSerializer} import kafka.zk.EmbeddedZookeeper import org.I0Itec.zkclient.ZkClient @@ -29,8 +29,9 @@ trait ZookeeperHarness { private var zookeeper: EmbeddedZookeeper = null def getZookeeper: EmbeddedZookeeper = zookeeper - def connectZk = () => new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) - + def connectZk: () => ZkClient = () => { + new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + } def setUp() { zookeeper = new EmbeddedZookeeper(zkConnect) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala ---------------------------------------------------------------------- diff --git a/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala b/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala index 39e2b24..3199128 100644 --- a/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala +++ b/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,7 +18,8 @@ package io.gearpump.streaming.monoid -import com.twitter.algebird.{Monoid => ABMonoid, Group => ABGroup} +import com.twitter.algebird.{Group => ABGroup, Monoid => ABMonoid} + import io.gearpump.streaming.state.api.{Group, Monoid} class AlgebirdMonoid[T](monoid: ABMonoid[T]) extends Monoid[T] { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala ---------------------------------------------------------------------- diff --git a/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala b/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala index f2f085a..9578e0a 100644 --- a/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala +++ b/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,10 +18,11 @@ package io.gearpump.streaming.serializer +import scala.util.Try + import com.twitter.chill.KryoInjection -import io.gearpump.streaming.state.api.Serializer -import scala.util.Try +import io.gearpump.streaming.state.api.Serializer class ChillSerializer[T] extends Serializer[T] { override def serialize(t: T): Array[Byte] = http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/README.md ---------------------------------------------------------------------- diff --git a/integrationtest/README.md b/integrationtest/README.md index 95fa6c5..4d85b00 100644 --- a/integrationtest/README.md +++ b/integrationtest/README.md @@ -60,7 +60,6 @@ For a not that clean solution, here is the steps: ``` 3. Run `sbt it:test` - ## Manual test by creating docker cluster manually To launch a Gearpump cluster manually, you can run the commands as follows. http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/resources/log4j.properties b/integrationtest/core/src/it/resources/log4j.properties index f7d0832..e45bd9a 100644 --- a/integrationtest/core/src/it/resources/log4j.properties +++ b/integrationtest/core/src/it/resources/log4j.properties @@ -7,7 +7,7 @@ # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala index 570e98d..32d8bb7 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/MiniClusterProvider.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,15 +20,13 @@ package io.gearpump.integrationtest import io.gearpump.integrationtest.minicluster.MiniCluster /** - * Provides an instance of SUT. - * - * By default it will instantiate a standalone Gearpump mini cluster. + * Provides a min cluster of Gearpump, which contains one or more masters, and workers. */ object MiniClusterProvider { private var instance = new MiniCluster - def get = instance + def get: MiniCluster = instance def set(instance: MiniCluster): MiniCluster = { this.instance = instance @@ -40,5 +38,4 @@ object MiniClusterProvider { * test spec will be responsible for cluster creation. */ var managed = false - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala index c07222f..6e4a471 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/TestSpecBase.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,10 +17,11 @@ */ package io.gearpump.integrationtest +import org.scalatest._ + import io.gearpump.cluster.MasterToAppMaster import io.gearpump.cluster.MasterToAppMaster.AppMasterData import io.gearpump.util.LogUtil -import org.scalatest._ /** * The abstract test spec @@ -55,7 +56,7 @@ trait TestSpecBase var restartClusterRequired: Boolean = false - override def beforeEach(td: TestData) = { + override def beforeEach(td: TestData): Unit = { LOGGER.debug(s">### =============================================================") LOGGER.debug(s">###1 Prepare test: ${td.name}\n") @@ -67,7 +68,7 @@ trait TestSpecBase LOGGER.debug(s">###2 Start test: ${td.name}\n") } - override def afterEach(td: TestData) = { + override def afterEach(td: TestData): Unit = { LOGGER.debug(s"<### =============================================================") LOGGER.debug(s"<###3 End test: ${td.name}\n") @@ -88,4 +89,4 @@ trait TestSpecBase app.appName shouldEqual expectedAppName app } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala index 1c9fdfd..9c0c779 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/CommandLineSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -124,7 +124,6 @@ class CommandLineSpec extends TestSpecBase { appId } - private def expectAppIsRunningByParsingOutput(appId: Int, expectedName: String): Unit = { val actual = commandLineClient.queryApp(appId) actual should include(s"application: $appId, ") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala index 7c9176a..321b395 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,10 +17,11 @@ */ package io.gearpump.integrationtest.checklist -import io.gearpump.integrationtest.{Util, TestSpecBase} -import io.gearpump.integrationtest.kafka._ import org.scalatest.TestData +import io.gearpump.integrationtest.kafka._ +import io.gearpump.integrationtest.{TestSpecBase, Util} + /** * The test spec checks the Kafka datasource connector */ @@ -40,7 +41,7 @@ class ConnectorKafkaSpec extends TestSpecBase { super.afterAll() } - override def afterEach(test: TestData) = { + override def afterEach(test: TestData): Unit = { super.afterEach(test) if (producer != null) { producer.stop() @@ -68,7 +69,7 @@ class ConnectorKafkaSpec extends TestSpecBase { // verify expectAppIsRunning(appId, "KafkaReadWrite") - Util.retryUntil(()=>kafkaCluster.getLatestOffset(sinkTopic) == messageNum, + Util.retryUntil(() => kafkaCluster.getLatestOffset(sinkTopic) == messageNum, "kafka all message written") } } @@ -97,23 +98,23 @@ class ConnectorKafkaSpec extends TestSpecBase { // verify #1 expectAppIsRunning(appId, "KafkaReadWrite") - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") // verify #2 val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max restClient.killExecutor(appId, executorToKill) shouldBe true - Util.retryUntil(()=>restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill, + Util.retryUntil(() => restClient.queryExecutorBrief(appId) + .map(_.executorId).max > executorToKill, s"executor $executorToKill killed") // verify #3 val detector = new MessageLossDetector(producer.lastWriteNum) val kafkaReader = new SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort) - Util.retryUntil(()=>{ + Util.retryUntil(() => { kafkaReader.read() detector.allReceived }, "kafka all message read") } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala index c8c57f5..588e7c7 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,7 +17,7 @@ */ package io.gearpump.integrationtest.checklist -import io.gearpump.integrationtest.{Util, TestSpecBase} +import io.gearpump.integrationtest.{TestSpecBase, Util} import io.gearpump.metrics.Metrics.Meter import io.gearpump.streaming._ import io.gearpump.streaming.appmaster.ProcessorSummary @@ -50,7 +50,7 @@ class DynamicDagSpec extends TestSpecBase { val formerProcessors = restClient.queryStreamingAppDetail(appId).processors replaceProcessor(appId, 1, sumTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil(()=>{ + Util.retryUntil(() => { laterProcessors = restClient.queryStreamingAppDetail(appId).processors laterProcessors.size == formerProcessors.size + 1 }, "new processor successfully added") @@ -65,7 +65,7 @@ class DynamicDagSpec extends TestSpecBase { val formerProcessors = restClient.queryStreamingAppDetail(appId).processors replaceProcessor(appId, 0, splitTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil(()=>{ + Util.retryUntil(() => { laterProcessors = restClient.queryStreamingAppDetail(appId).processors laterProcessors.size == formerProcessors.size + 1 }, "new processor added") @@ -80,7 +80,7 @@ class DynamicDagSpec extends TestSpecBase { val formerProcessors = restClient.queryStreamingAppDetail(appId).processors replaceProcessor(appId, 1, sumTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil(()=>{ + Util.retryUntil(() => { laterProcessors = restClient.queryStreamingAppDetail(appId).processors laterProcessors.size == formerProcessors.size + 1 }, "new processor added") @@ -88,12 +88,12 @@ class DynamicDagSpec extends TestSpecBase { val fakeTaskClass = "io.gearpump.streaming.examples.wordcount.Fake" replaceProcessor(appId, laterProcessors.keySet.max, fakeTaskClass) - Util.retryUntil(()=>{ + Util.retryUntil(() => { val processorsAfterFailure = restClient.queryStreamingAppDetail(appId).processors processorsAfterFailure.size == laterProcessors.size }, "new processor added") val currentClock = restClient.queryStreamingAppDetail(appId).clock - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > currentClock, + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > currentClock, "app clock is advancing") } @@ -106,19 +106,18 @@ class DynamicDagSpec extends TestSpecBase { val formerProcessors = restClient.queryStreamingAppDetail(appId).processors replaceProcessor(appId, 1, sumTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil(()=>{ + Util.retryUntil(() => { laterProcessors = restClient.queryStreamingAppDetail(appId).processors laterProcessors.size == formerProcessors.size + 1 }, "new processor added") processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") restClient.killAppMaster(appId) shouldBe true - Util.retryUntil(()=>restClient.queryApp(appId).appMasterPath != formerAppMaster, + Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster, "new AppMaster created") val processors = restClient.queryStreamingAppDetail(appId).processors processors.size shouldEqual laterProcessors.size } - } private def expectSolJarSubmittedWithAppId(): Int = { @@ -126,7 +125,7 @@ class DynamicDagSpec extends TestSpecBase { val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length) success shouldBe true expectAppIsRunning(appId, solName) - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") appId } @@ -147,7 +146,7 @@ class DynamicDagSpec extends TestSpecBase { } private def processorHasThroughput(appId: Int, processorId: Int, metrics: String): Unit = { - Util.retryUntil(()=>{ + Util.retryUntil(() => { val actual = restClient.queryStreamingAppMetrics(appId, current = false, path = "processor" + processorId) val throughput = actual.metrics.filter(_.value.name.endsWith(metrics)) @@ -155,5 +154,4 @@ class DynamicDagSpec extends TestSpecBase { throughput.forall(_.value.asInstanceOf[Meter].count > 0L) }, "new processor has message received") } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala index efee3c6..20d1b12 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,11 +17,11 @@ */ package io.gearpump.integrationtest.checklist -import io.gearpump.integrationtest.Docker._ +import org.apache.log4j.Logger + import io.gearpump.integrationtest.{Docker, TestSpecBase, Util} import io.gearpump.streaming._ import io.gearpump.streaming.appmaster.ProcessorSummary -import org.apache.log4j.Logger /** * The test spec will perform destructive operations to check the stability @@ -49,12 +49,13 @@ class ExampleSpec extends TestSpecBase { def verify(): Boolean = { val workerNum = cluster.getWorkerHosts.length - val result = commandLineClient.submitAppAndCaptureOutput(distShellJar, workerNum, args.mkString(" ")).split("\n"). + val result = commandLineClient.submitAppAndCaptureOutput(distShellJar, + workerNum, args.mkString(" ")).split("\n"). filterNot(line => line.startsWith("[INFO]") || line.isEmpty) expectedHostNames.forall(result.contains) } - Util.retryUntil(()=>verify(), + Util.retryUntil(() => verify(), s"executors started on all expected hosts ${expectedHostNames.mkString(", ")}") } } @@ -66,10 +67,12 @@ class ExampleSpec extends TestSpecBase { "can submit immediately after killing a former one" in { // setup val formerAppId = restClient.getNextAvailableAppId() - val formerSubmissionSuccess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + val formerSubmissionSuccess = + restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) formerSubmissionSuccess shouldBe true expectAppIsRunning(formerAppId, wordCountName) - Util.retryUntil(()=>restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app running") + Util.retryUntil(() => + restClient.queryStreamingAppDetail(formerAppId).clock > 0, "app running") restClient.killApp(formerAppId) // exercise @@ -109,9 +112,9 @@ class ExampleSpec extends TestSpecBase { expectAppIsRunning(appId, appName) // exercise - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app submitted") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app submitted") val formerClock = restClient.queryStreamingAppDetail(appId).clock - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > formerClock, + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > formerClock, "app clock is advancing") } @@ -126,14 +129,14 @@ class ExampleSpec extends TestSpecBase { val expectedProcessorId = formerProcessors.size val expectedParallelism = processor0.parallelism + 1 val expectedDescription = processor0.description + "new" - val replaceMe = new ProcessorDescription(processor0.id, processor0.taskClass, expectedParallelism, - description = expectedDescription) + val replaceMe = new ProcessorDescription(processor0.id, processor0.taskClass, + expectedParallelism, description = expectedDescription) // exercise val success = restClient.replaceStreamingAppProcessor(appId, replaceMe) success shouldBe true var laterProcessors: Map[ProcessorId, ProcessorSummary] = null - Util.retryUntil(()=>{ + Util.retryUntil(() => { laterProcessors = restClient.queryStreamingAppDetail(appId).processors laterProcessors.size == formerProcessors.size + 1 }, "new process added") @@ -142,5 +145,4 @@ class ExampleSpec extends TestSpecBase { laterProcessor0.description shouldEqual expectedDescription } } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala index 35c31cd..3f70339 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala @@ -1,12 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.gearpump.integrationtest.checklist -import io.gearpump.integrationtest.Docker._ +import org.apache.log4j.Logger + import io.gearpump.integrationtest.hadoop.HadoopCluster._ -import io.gearpump.integrationtest.{Util, TestSpecBase} -import io.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader, MessageLossDetector, NumericalDataProducer} import io.gearpump.integrationtest.kafka.KafkaCluster._ -import org.apache.log4j.Logger +import io.gearpump.integrationtest.kafka.{ResultVerifier, SimpleKafkaReader} +import io.gearpump.integrationtest.{TestSpecBase, Util} +/** + * Checks message delivery consistency, like at-least-once, and exactly-once. + */ class MessageDeliverySpec extends TestSpecBase { private val LOG = Logger.getLogger(getClass) @@ -49,7 +70,8 @@ class MessageDeliverySpec extends TestSpecBase { // verify #1 expectAppIsRunning(appId, "MessageCount") - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app is running") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, + "app is running") // wait for checkpoint to take place Thread.sleep(1000) @@ -57,17 +79,19 @@ class MessageDeliverySpec extends TestSpecBase { LOG.info("Trigger message replay by kill and restart the executors") val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max restClient.killExecutor(appId, executorToKill) shouldBe true - Util.retryUntil(()=> restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill, - s"executor $executorToKill killed") + Util.retryUntil(() => restClient.queryExecutorBrief(appId) + .map(_.executorId).max > executorToKill, s"executor $executorToKill killed") producer.stop() val producedNumbers = producer.producedNumbers - LOG.info(s"In total, numbers in range[${producedNumbers.start}, ${producedNumbers.end - 1}] have been written to Kafka") + LOG.info(s"In total, numbers in range[${producedNumbers.start}" + + s", ${producedNumbers.end - 1}] have been written to Kafka") // verify #3 val kafkaSourceOffset = kafkaCluster.getLatestOffset(sourceTopic) - assert(producedNumbers.size == kafkaSourceOffset, "produced message should match Kafka queue size") + assert(producedNumbers.size == kafkaSourceOffset, + "produced message should match Kafka queue size") LOG.info(s"The Kafka source topic $sourceTopic offset is " + kafkaSourceOffset) @@ -83,11 +107,13 @@ class MessageDeliverySpec extends TestSpecBase { val kafkaReader = new SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost, port = kafkaCluster.advertisedPort) - Util.retryUntil(()=>{ + Util.retryUntil(() => { kafkaReader.read() - LOG.info(s"Received message count: ${detector.latestMessageCount}, expect: ${producedNumbers.size}") + LOG.info(s"Received message count: ${detector.latestMessageCount}, " + + s"expect: ${producedNumbers.size}") detector.latestMessageCount == producedNumbers.size - }, "MessageCountApp calculated message count matches expected in case of message replay") + }, "MessageCountApp calculated message count matches " + + "expected in case of message replay") } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala index 722cb33..f4abde7 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,13 +17,13 @@ */ package io.gearpump.integrationtest.checklist +import scala.concurrent.duration._ + import io.gearpump.cluster.MasterToAppMaster import io.gearpump.cluster.master.MasterStatus import io.gearpump.cluster.worker.WorkerSummary import io.gearpump.integrationtest.{TestSpecBase, Util} -import scala.concurrent.duration._ - /** * The test spec checks REST service usage */ @@ -62,7 +62,8 @@ class RestServiceSpec extends TestSpecBase { "reject a repeated submission request while the application is running" in { // setup val appId = restClient.getNextAvailableAppId() - val formerSubmissionSuccess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) + val formerSubmissionSuccess = restClient.submitApp(wordCountJar, + cluster.getWorkerHosts.length) formerSubmissionSuccess shouldBe true expectAppIsRunning(appId, wordCountName) @@ -77,14 +78,16 @@ class RestServiceSpec extends TestSpecBase { success shouldBe false } - "submit a wordcount application with 4 split and 3 sum processors and expect parallelism of processors match the given number" in { + "submit a wordcount application with 4 split and 3 sum processors and expect " + + "parallelism of processors match the given number" in { // setup val splitNum = 4 val sumNum = 3 val appId = restClient.getNextAvailableAppId() // exercise - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, s"-split $splitNum -sum $sumNum") + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, + s"-split $splitNum -sum $sumNum") success shouldBe true expectAppIsRunning(appId, wordCountName) val processors = restClient.queryStreamingAppDetail(appId).processors @@ -104,7 +107,8 @@ class RestServiceSpec extends TestSpecBase { // exercise expectMetricsAvailable( - restClient.queryStreamingAppMetrics(appId, current = true).metrics.nonEmpty, "metrics available") + restClient.queryStreamingAppMetrics(appId, current = true).metrics.nonEmpty, + "metrics available") val actual = restClient.queryStreamingAppMetrics(appId, current = true) actual.path shouldEqual s"app$appId.processor*" actual.metrics.foreach(metric => { @@ -119,7 +123,8 @@ class RestServiceSpec extends TestSpecBase { }, "metrics available") } - "can obtain application corresponding executors' metrics and the metrics will keep changing" in { + "can obtain application corresponding executors' metrics and " + + "the metrics will keep changing" in { // setup val appId = restClient.getNextAvailableAppId() val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) @@ -128,7 +133,8 @@ class RestServiceSpec extends TestSpecBase { // exercise expectMetricsAvailable( - restClient.queryExecutorMetrics(appId, current = true).metrics.nonEmpty, "metrics available") + restClient.queryExecutorMetrics(appId, current = true).metrics.nonEmpty, + "metrics available") val actual = restClient.queryExecutorMetrics(appId, current = true) actual.path shouldEqual s"app$appId.executor*" actual.metrics.foreach(metric => { @@ -194,7 +200,7 @@ class RestServiceSpec extends TestSpecBase { // exercise var runningWorkers: Array[WorkerSummary] = Array.empty - Util.retryUntil(()=>{ + Util.retryUntil(() => { runningWorkers = restClient.listRunningWorkers() runningWorkers.length == expectedWorkersCount }, "all workers running") @@ -207,13 +213,14 @@ class RestServiceSpec extends TestSpecBase { // setup restartClusterRequired = true val formerWorkersCount = cluster.getWorkerHosts.length - Util.retryUntil(()=>restClient.listRunningWorkers().length == formerWorkersCount, + Util.retryUntil(() => restClient.listRunningWorkers().length == formerWorkersCount, "all workers running") val workerName = "newWorker" // exercise cluster.addWorkerNode(workerName) - Util.retryUntil(()=>restClient.listRunningWorkers().length > formerWorkersCount, "new worker added") + Util.retryUntil(() => restClient.listRunningWorkers().length > formerWorkersCount, + "new worker added") cluster.getWorkerHosts.length shouldEqual formerWorkersCount + 1 restClient.listRunningWorkers().length shouldEqual formerWorkersCount + 1 } @@ -252,7 +259,8 @@ class RestServiceSpec extends TestSpecBase { restClient.listRunningWorkers().foreach { worker => val workerId = worker.workerId expectMetricsAvailable( - restClient.queryWorkerMetrics(workerId, current = true).metrics.nonEmpty, "metrics available") + restClient.queryWorkerMetrics(workerId, current = true).metrics.nonEmpty, + "metrics available") val actual = restClient.queryWorkerMetrics(workerId, current = true) actual.path shouldEqual s"worker$workerId" actual.metrics.foreach(metric => { @@ -323,13 +331,14 @@ class RestServiceSpec extends TestSpecBase { val originSplitNum = 4 val originSumNum = 3 val originAppId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, s"-split $originSplitNum -sum $originSumNum") + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, + s"-split $originSplitNum -sum $originSumNum") success shouldBe true expectAppIsRunning(originAppId, wordCountName) val originAppDetail = restClient.queryStreamingAppDetail(originAppId) // exercise - Util.retryUntil(()=>restClient.restartApp(originAppId), "app restarted") + Util.retryUntil(() => restClient.restartApp(originAppId), "app restarted") val killedApp = restClient.queryApp(originAppId) killedApp.appId shouldEqual originAppId killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive @@ -355,7 +364,6 @@ class RestServiceSpec extends TestSpecBase { private def expectMetricsAvailable(condition: => Boolean, conditionDescription: String): Unit = { val config = restClient.queryMasterConfig() val reportInterval = Duration(config.getString("gearpump.metrics.report-interval-ms") + "ms") - Util.retryUntil(()=>condition, conditionDescription, interval = reportInterval) + Util.retryUntil(() => condition, conditionDescription, interval = reportInterval) } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala index ffad3ec..b002f70 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StabilitySpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,15 +17,16 @@ */ package io.gearpump.integrationtest.checklist -import io.gearpump.WorkerId +import scala.concurrent.duration.Duration + import io.gearpump.cluster.MasterToAppMaster +import io.gearpump.cluster.worker.WorkerId import io.gearpump.integrationtest.{TestSpecBase, Util} import io.gearpump.util.{Constants, LogUtil} -import scala.concurrent.duration.Duration - /** - * The test spec will perform destructive operations to check the stability + * The test spec will perform destructive operations to check the stability. Operations + * contains shutting-down appmaster, executor, or worker, and etc.. */ class StabilitySpec extends TestSpecBase { @@ -36,13 +37,13 @@ class StabilitySpec extends TestSpecBase { // setup val appId = commandLineClient.submitApp(wordCountJar) val formerAppMaster = restClient.queryApp(appId).appMasterPath - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") ensureClockStoredInMaster() // exercise restClient.killAppMaster(appId) shouldBe true // todo: how long master will begin to recover and how much time for the recovering? - Util.retryUntil(()=>restClient.queryApp(appId).appMasterPath != formerAppMaster, + Util.retryUntil(() => restClient.queryApp(appId).appMasterPath != formerAppMaster, "appmaster killed and restarted") // verify @@ -56,14 +57,15 @@ class StabilitySpec extends TestSpecBase { "will create a new executor and application will replay from the latest application clock" in { // setup val appId = commandLineClient.submitApp(wordCountJar) - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max ensureClockStoredInMaster() // exercise restClient.killExecutor(appId, executorToKill) shouldBe true // todo: how long appmaster will begin to recover and how much time for the recovering? - Util.retryUntil(()=>restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill, + Util.retryUntil(() => restClient.queryExecutorBrief(appId) + .map(_.executorId).max > executorToKill, s"executor $executorToKill killed and restarted") // verify @@ -85,7 +87,7 @@ class StabilitySpec extends TestSpecBase { // setup restartClusterRequired = true val appId = commandLineClient.submitApp(wordCountJar) - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") val allexecutors = restClient.queryExecutorBrief(appId) val maxExecutor = allexecutors.sortBy(_.executorId).last @@ -93,8 +95,10 @@ class StabilitySpec extends TestSpecBase { val appMaster = allexecutors.find(_.executorId == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) - LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, worker: ${maxExecutor.workerId}") - val executorsSharingSameWorker = allexecutors.filter(_.workerId == maxExecutor.workerId).map(_.executorId) + LOG.info(s"Max executor Id is executor: ${maxExecutor.executorId}, " + + s"worker: ${maxExecutor.workerId}") + val executorsSharingSameWorker = allexecutors + .filter(_.workerId == maxExecutor.workerId).map(_.executorId) LOG.info(s"These executors sharing the same worker Id ${maxExecutor.workerId}," + s" ${executorsSharingSameWorker.mkString(",")}") @@ -102,7 +106,8 @@ class StabilitySpec extends TestSpecBase { val workerIdToKill = maxExecutor.workerId cluster.removeWorkerNode(hostName(workerIdToKill)) - val appMasterKilled = executorsSharingSameWorker.exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) + val appMasterKilled = executorsSharingSameWorker + .exists(_ == Constants.APPMASTER_DEFAULT_EXECUTOR_ID) def executorsMigrated(): Boolean = { val executors = restClient.queryExecutorBrief(appId) @@ -118,7 +123,7 @@ class StabilitySpec extends TestSpecBase { } } - Util.retryUntil(()=> { + Util.retryUntil(() => { executorsMigrated() }, s"new executor created with id > ${maxExecutor.executorId} when worker is killed") @@ -144,13 +149,14 @@ class StabilitySpec extends TestSpecBase { // verify val aliveWorkers = cluster.getWorkerHosts - Util.retryUntil(()=>aliveWorkers.forall(worker => !cluster.nodeIsOnline(worker)), + Util.retryUntil(() => aliveWorkers.forall(worker => !cluster.nodeIsOnline(worker)), "all workers down") } } private def ensureClockStoredInMaster(): Unit = { - // todo: 5000ms is a fixed sync period in clock service. we wait for 5000ms to assume the clock is stored + // TODO: 5000ms is a fixed sync period in clock service. + // we wait for 5000ms to assume the clock is stored Thread.sleep(5000) } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala index 710cfa7..7d0a672 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -22,8 +22,8 @@ import io.gearpump.integrationtest.storm.StormClient import io.gearpump.integrationtest.{TestSpecBase, Util} /** - * The test spec checks the compatibility of running Storm applications - */ + * The test spec checks the compatibility of running Storm applications + */ class StormCompatibilitySpec extends TestSpecBase { private lazy val stormClient = { @@ -58,7 +58,7 @@ class StormCompatibilitySpec extends TestSpecBase { "Storm over Gearpump" should withStorm { stormVersion => - s"support basic topologies ($stormVersion)" in { + s"support basic topologies ($stormVersion)" in { val stormJar = getStormJar(stormVersion) val topologyName = getTopologyName("exclamation", stormVersion) @@ -70,7 +70,7 @@ class StormCompatibilitySpec extends TestSpecBase { appName = topologyName) // verify - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") } s"support to run a python version of wordcount ($stormVersion)" in { @@ -85,7 +85,7 @@ class StormCompatibilitySpec extends TestSpecBase { appName = topologyName) // verify - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") } s"support DRPC ($stormVersion)" in { @@ -102,7 +102,7 @@ class StormCompatibilitySpec extends TestSpecBase { val drpcClient = stormClient.getDRPCClient(cluster.getNetworkGateway) // verify - Util.retryUntil(()=>{ + Util.retryUntil(() => { drpcClient.execute("reach", "notaurl.com") == "0" }, "drpc reach == 0") drpcClient.execute("reach", "foo.com/blog/1") shouldEqual "16" @@ -121,16 +121,17 @@ class StormCompatibilitySpec extends TestSpecBase { appName = topologyName) // verify - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") } s"support at-least-once semantics with Storm's Kafka connector ($stormVersion)" in { val stormJar = getStormJar(stormVersion) val topologyName = getTopologyName("storm_kafka", stormVersion) - val stormKafkaTopology = s"io.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology" + val stormKafkaTopology = + s"io.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology" - import KafkaCluster._ + import io.gearpump.integrationtest.kafka.KafkaCluster._ withKafkaCluster(cluster) { kafkaCluster => val sourcePartitionNum = 2 @@ -142,7 +143,7 @@ class StormCompatibilitySpec extends TestSpecBase { val args = Array("-topologyName", topologyName, "-sourceTopic", sourceTopic, "-sinkTopic", sinkTopic, "-zookeeperConnect", zookeeper, "-brokerList", brokerList, - "-spoutNum", s"$sourcePartitionNum", "-boltNum", s"$sinkPartitionNum" + "-spoutNum", s"$sourcePartitionNum", "-boltNum", s"$sinkPartitionNum" ) kafkaCluster.createTopic(sourceTopic, sourcePartitionNum) @@ -156,22 +157,24 @@ class StormCompatibilitySpec extends TestSpecBase { args = args.mkString(" "), appName = topologyName) - Util.retryUntil(()=>restClient.queryStreamingAppDetail(appId).clock > 0, "app running") + Util.retryUntil(() => + restClient.queryStreamingAppDetail(appId).clock > 0, "app running") // kill executor and verify at-least-once is guaranteed on application restart val executorToKill = restClient.queryExecutorBrief(appId).map(_.executorId).max restClient.killExecutor(appId, executorToKill) shouldBe true - Util.retryUntil(()=>restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill, + Util.retryUntil(() => + restClient.queryExecutorBrief(appId).map(_.executorId).max > executorToKill, s"executor $executorToKill killed") // verify no message loss val detector = new - MessageLossDetector(producer.lastWriteNum) + MessageLossDetector(producer.lastWriteNum) val kafkaReader = new - SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost, - port = kafkaCluster.advertisedPort) + SimpleKafkaReader(detector, sinkTopic, host = kafkaCluster.advertisedHost, + port = kafkaCluster.advertisedPort) - Util.retryUntil (()=>{ + Util.retryUntil(() => { kafkaReader.read() detector.allReceived }, "all kafka message read") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala index 65d0d4c..4178e66 100644 --- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala +++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,13 +17,15 @@ */ package io.gearpump.integrationtest.suites +import org.scalatest._ + import io.gearpump.integrationtest.MiniClusterProvider import io.gearpump.integrationtest.checklist._ import io.gearpump.integrationtest.minicluster.MiniCluster -import org.scalatest._ /** - * Launch a Gearpump cluster in standalone mode and run all test specs + * Launch a Gearpump cluster in standalone mode and run all test specs. To test a specific + * test spec, you need to comment out other lines. */ class StandaloneModeSuite extends Suites( new CommandLineSpec, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala index 64ef9a7..aee2681 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Docker.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,7 +17,6 @@ */ package io.gearpump.integrationtest -import io.gearpump.integrationtest.ShellExec._ import org.apache.log4j.Logger /** @@ -38,7 +37,6 @@ object Docker { ShellExec.exec(s"docker exec $container $command", s"EXEC $container") } - /** * @throws RuntimeException in case retval != 0 */ @@ -48,7 +46,6 @@ object Docker { } } - final def executeSilently(container: String, command: String): Boolean = { trace(container, s"Execute silently $command") { doExecuteSilently(container, command) @@ -119,12 +116,14 @@ object Docker { /** * @throws RuntimeException in case particular container is created already */ - private def createAndStartContainer(name: String, options: String, command: String, image: String): String = { - ShellExec.execAndCaptureOutput(s"docker run $options --name $name $image $command", s"MAKE $name") + private def createAndStartContainer( + name: String, options: String, command: String, image: String): String = { + ShellExec.execAndCaptureOutput(s"docker run $options " + + s"--name $name $image $command", s"MAKE $name") } final def killAndRemoveContainer(name: String): Boolean = { - trace(name, s"kill and remove container"){ + trace(name, s"kill and remove container") { ShellExec.exec(s"docker rm -f $name", s"STOP $name") } } @@ -141,7 +140,8 @@ object Docker { ShellExec.execAndCaptureOutput(s"docker inspect $option $container", s"EXEC $container") } - final def curl(container: String, url: String, options: Array[String] = Array.empty[String]): String = { + final def curl(container: String, url: String, options: Array[String] = Array.empty[String]) + : String = { trace(container, s"curl $url") { doExecute(container, s"curl -s ${options.mkString(" ")} $url") } @@ -172,7 +172,9 @@ object Docker { } private def trace[T](container: String, msg: String)(fun: => T): T = { - Console.println() + // scalastyle:off println + Console.println() // A empty line to let the output looks better. + // scalastyle:on println LOG.debug(s"Container $container====>> $msg") LOG.debug("INPUT==>>") val response = fun @@ -198,8 +200,12 @@ object Docker { x.toString } - val preview = if (output.length > PREVIEW_MAX_LENGTH) - output.substring(0, PREVIEW_MAX_LENGTH) + "..." else output + val preview = if (output.length > PREVIEW_MAX_LENGTH) { + output.substring(0, PREVIEW_MAX_LENGTH) + "..." + } + else { + output + } preview } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala index 1c01b51..9045efe 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/ShellExec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,15 +17,14 @@ */ package io.gearpump.integrationtest -import org.apache.commons.lang.text.{StrMatcher, StrTokenizer} -import org.apache.log4j.Logger -import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer - +import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent._ import scala.concurrent.duration._ import scala.sys.process._ -import scala.collection.JavaConversions._ + +import org.apache.log4j.Logger +import org.apache.storm.shade.org.eclipse.jetty.util.QuotedStringTokenizer /** * The class is used to execute command in a shell @@ -41,7 +40,7 @@ object ShellExec { */ private def splitQuotedString(str: String): List[String] = { val splitter = new QuotedStringTokenizer(str, " \t\n\r") - splitter.asInstanceOf[java.util.Enumeration[String]].toList + splitter.asInstanceOf[java.util.Enumeration[String]].asScala.toList } def exec(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT): Boolean = { @@ -49,7 +48,8 @@ object ShellExec { val p = splitQuotedString(command).run() val f = Future(blocking(p.exitValue())) // wrap in Future - val retval = try { + val retval = { + try { Await.result(f, timeout) } catch { case _: TimeoutException => @@ -57,12 +57,13 @@ object ShellExec { p.destroy() p.exitValue() } - + } LOG.debug(s"$sender <= exit $retval") retval == 0 } - def execAndCaptureOutput(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT): String = { + def execAndCaptureOutput(command: String, sender: String, timeout: Duration = PROCESS_TIMEOUT) + : String = { LOG.debug(s"$sender => `$command`") val buf = new StringBuilder @@ -70,17 +71,22 @@ object ShellExec { (e: String) => buf.append(e).append("\n")) val p = splitQuotedString(command).run(processLogger) val f = Future(blocking(p.exitValue())) // wrap in Future - val retval = try { + val retval = { + try { Await.result(f, timeout) } catch { case _: TimeoutException => p.destroy() p.exitValue() } + } val output = buf.toString().trim val PREVIEW_MAX_LENGTH = 200 - val preview = if (output.length > PREVIEW_MAX_LENGTH) - output.substring(0, PREVIEW_MAX_LENGTH) + "..." else output + val preview = if (output.length > PREVIEW_MAX_LENGTH) { + output.substring(0, PREVIEW_MAX_LENGTH) + "..." + } else { + output + } LOG.debug(s"$sender <= `$preview` exit $retval") if (retval != 0) { @@ -89,5 +95,4 @@ object ShellExec { } output } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala index d4dc1ee..71429b2 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/Util.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,11 +17,11 @@ */ package io.gearpump.integrationtest -import org.apache.log4j.Logger - import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} +import org.apache.log4j.Logger + object Util { private val LOG = Logger.getLogger(getClass) @@ -40,8 +40,9 @@ object Util { } } - def retryUntil(condition: ()=> Boolean, conditionDescription: String, maxTries: Int = 15, - interval: Duration = 10.seconds): Unit = { + def retryUntil( + condition: () => Boolean, conditionDescription: String, maxTries: Int = 15, + interval: Duration = 10.seconds): Unit = { var met = false var tries = 0 @@ -56,7 +57,8 @@ object Util { tries += 1 if (!met) { - LOG.error(s"Failed due to (false == $conditionDescription), retrying for the ${tries} times...") + LOG.error(s"Failed due to (false == $conditionDescription), " + + s"retrying for the ${tries} times...") Thread.sleep(interval.toMillis) } else { LOG.info(s"Success ($conditionDescription) after ${tries} retries") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala index 2217efe..6d277f0 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala @@ -1,10 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.gearpump.integrationtest.hadoop -import io.gearpump.integrationtest.{Util, Docker} import org.apache.log4j.Logger +import io.gearpump.integrationtest.{Docker, Util} + object HadoopCluster { + /** Starts a Hadoop cluster */ def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = { val hadoopCluster = new HadoopCluster try { @@ -27,10 +47,11 @@ class HadoopCluster { def start(): Unit = { Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "") - Util.retryUntil(()=>isAlive, "Hadoop cluster is alive") + Util.retryUntil(() => isAlive, "Hadoop cluster is alive") LOG.info("Hadoop cluster is started.") } + // Checks whether the cluster is alive by listing "/" private def isAlive: Boolean = { Docker.executeSilently(HADOOP_HOST, "/usr/local/hadoop/bin/hadoop fs -ls /") } @@ -43,5 +64,4 @@ class HadoopCluster { def shutDown(): Unit = { Docker.killAndRemoveContainer(HADOOP_HOST) } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala index 3244d24..862fc58 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,12 +17,14 @@ */ package io.gearpump.integrationtest.kafka +import org.apache.log4j.Logger + import io.gearpump.integrationtest.minicluster.MiniCluster import io.gearpump.integrationtest.{Docker, Util} -import org.apache.log4j.Logger object KafkaCluster { + /** Starts a Kafka cluster */ def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): Unit = { val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka") try { @@ -34,7 +36,7 @@ object KafkaCluster { } def withDataProducer(topic: String, brokerList: String) - (testCode: NumericalDataProducer => Unit): Unit = { + (testCode: NumericalDataProducer => Unit): Unit = { val producer = new NumericalDataProducer(topic, brokerList) try { producer.start() @@ -43,7 +45,6 @@ object KafkaCluster { producer.stop() } } - } /** @@ -67,7 +68,7 @@ class KafkaCluster(val advertisedHost: String, zkChroot: String = "") { "ZK_CHROOT" -> zkChroot), tunnelPorts = Set(ZOOKEEPER_PORT, BROKER_PORT) ) - Util.retryUntil(()=>isAlive, "kafka cluster is alive") + Util.retryUntil(() => isAlive, "kafka cluster is alive") LOG.debug("kafka cluster is started.") } @@ -118,14 +119,17 @@ class KafkaCluster(val advertisedHost: String, zkChroot: String = "") { kafkaFetchLatestOffset(KAFKA_HOST, topic, KAFKA_HOME, getBrokerListConnectString) } - private def kafkaListTopics(container: String, kafkaHome: String, zookeeperConnectionString: String): String = { + private def kafkaListTopics( + container: String, kafkaHome: String, zookeeperConnectionString: String): String = { + LOG.debug(s"|=> Kafka list topics...") Docker.execute(container, s"$kafkaHome/bin/kafka-topics.sh" + s" --zookeeper $zookeeperConnectionString -list") } - private def kafkaFetchLatestOffset(container: String, topic: String, kafkaHome: String, brokersList: String): Int = { + private def kafkaFetchLatestOffset( + container: String, topic: String, kafkaHome: String, brokersList: String): Int = { LOG.debug(s"|=> Get latest offset of topic $topic...") val output = Docker.execute(container, s"$kafkaHome/bin/kafka-run-class.sh kafka.tools.GetOffsetShell" + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala index d319d6b..03bb9fb 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,11 +19,12 @@ package io.gearpump.integrationtest.kafka import java.util.Properties -import io.gearpump.streaming.serializer.ChillSerializer import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.ByteArraySerializer import org.apache.log4j.Logger +import io.gearpump.streaming.serializer.ChillSerializer + class NumericalDataProducer(topic: String, bootstrapServers: String) { private val LOG = Logger.getLogger(getClass) @@ -44,7 +45,7 @@ class NumericalDataProducer(topic: String, bootstrapServers: String) { producer.close() } - /** How many message we have written in total*/ + /** How many message we have written in total */ def producedNumbers: Range = { Range(1, lastWriteNum + 1) } @@ -52,7 +53,8 @@ class NumericalDataProducer(topic: String, bootstrapServers: String) { private def createProducer: KafkaProducer[Array[Byte], Array[Byte]] = { val properties = new Properties() properties.setProperty("bootstrap.servers", bootstrapServers) - new KafkaProducer[Array[Byte], Array[Byte]](properties, new ByteArraySerializer, new ByteArraySerializer) + new KafkaProducer[Array[Byte], Array[Byte]](properties, + new ByteArraySerializer, new ByteArraySerializer) } private val produceThread = new Thread(new Runnable { @@ -71,5 +73,4 @@ class NumericalDataProducer(topic: String, bootstrapServers: String) { } } }) - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala index efeedae..98aceba 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -39,5 +39,4 @@ class MessageLossDetector(totalNum: Int) extends ResultVerifier { def received(num: Int): Boolean = { bitSets(num) } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala index e2e1dbc..29289d9 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,14 +17,16 @@ */ package io.gearpump.integrationtest.kafka -import io.gearpump.streaming.serializer.ChillSerializer +import scala.util.{Failure, Success} + import kafka.api.FetchRequestBuilder import kafka.consumer.SimpleConsumer import kafka.utils.Utils -import scala.util.{Failure, Success} + +import io.gearpump.streaming.serializer.ChillSerializer class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int = 0, - host: String, port: Int) { + host: String, port: Int) { private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "") private val serializer = new ChillSerializer[Int] @@ -44,5 +46,4 @@ class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: Int } } } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala index 8ecd9cd..e5c7cb7 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/BaseContainer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,18 +17,16 @@ */ package io.gearpump.integrationtest.minicluster -import java.io.File +import scala.sys.process._ import io.gearpump.integrationtest.Docker -import scala.sys.process._ - /** * A helper to instantiate the base image for different usage. */ class BaseContainer(val host: String, command: String, - masterAddrs: List[(String, Int)], - tunnelPorts: Set[Int] = Set.empty) { + masterAddrs: List[(String, Int)], + tunnelPorts: Set[Int] = Set.empty) { private val IMAGE_NAME = "stanleyxu2005/gearpump-launcher" private val DOCKER_IMAGE_GEARPUMP_HOME = "/opt/gearpump" @@ -59,5 +57,4 @@ class BaseContainer(val host: String, command: String, def killAndRemove(): Unit = { Docker.killAndRemoveContainer(host) } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala index 50988ca..5eebd30 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,9 +17,10 @@ */ package io.gearpump.integrationtest.minicluster +import org.apache.log4j.Logger + import io.gearpump.cluster.MasterToAppMaster import io.gearpump.integrationtest.Docker -import org.apache.log4j.Logger /** * A command-line client to operate a Gearpump cluster http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala index 25c211f..bfdecee 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/MiniCluster.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,12 +18,11 @@ package io.gearpump.integrationtest.minicluster import java.io.IOException +import scala.collection.mutable.ListBuffer -import io.gearpump.cluster.master.MasterNode -import io.gearpump.integrationtest.{Docker, Util} import org.apache.log4j.Logger -import scala.collection.mutable.ListBuffer +import io.gearpump.integrationtest.{Docker, Util} /** * This class is a test driver for end-to-end integration test. @@ -53,7 +52,7 @@ class MiniCluster { def start(workerNum: Int = 2): Unit = { // Kill master - MASTER_ADDRS.foreach{case (host, _) => + MASTER_ADDRS.foreach { case (host, _) => if (Docker.containerExists(host)) { Docker.killAndRemoveContainer(host) } @@ -61,7 +60,7 @@ class MiniCluster { // Kill existing workers workers ++= (0 until workerNum).map("worker" + _) - workers.foreach{ worker => + workers.foreach { worker => if (Docker.containerExists(worker)) { Docker.killAndRemoveContainer(worker) } @@ -73,7 +72,7 @@ class MiniCluster { }) // Start Workers - workers.foreach{worker => + workers.foreach { worker => val container = new BaseContainer(worker, "worker", MASTER_ADDRS) container.createAndStart() } @@ -94,7 +93,8 @@ class MiniCluster { container.createAndStart() workers += host } else { - throw new IOException(s"Cannot add new worker $host, as worker with same hostname already exists") + throw new IOException(s"Cannot add new worker $host, " + + s"as worker with same hostname already exists") } } @@ -102,7 +102,7 @@ class MiniCluster { * @throws RuntimeException if rest client is not authenticated after N attempts */ private def expectRestClientAuthenticated(): Unit = { - Util.retryUntil(()=>{ + Util.retryUntil(() => { restClient.login() LOG.info("rest client has been authenticated") true @@ -113,7 +113,7 @@ class MiniCluster { * @throws RuntimeException if service is not available after N attempts */ private def expectClusterAvailable(): Unit = { - Util.retryUntil(()=>{ + Util.retryUntil(() => { val response = restClient.queryMaster() LOG.info(s"cluster is now available with response: $response.") response.aliveFor > 0 @@ -148,8 +148,9 @@ class MiniCluster { def restart(): Unit = { shutDown() - Util.retryUntil(()=> - !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists), "all docker containers killed") + Util.retryUntil(() => { + !(getMasterHosts ++ getWorkerHosts).exists(Docker.containerExists) + }, "all docker containers killed") LOG.info("all docker containers have been killed. restarting...") start() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala index b565f41..087f188 100644 --- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala +++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,29 +17,30 @@ */ package io.gearpump.integrationtest.minicluster +import scala.reflect.ClassTag + import com.typesafe.config.{Config, ConfigFactory} -import io.gearpump.WorkerId -import io.gearpump.cluster.{AppJar, MasterToAppMaster} +import org.apache.log4j.Logger +import upickle.Js +import upickle.default._ + +import io.gearpump.cluster.AppMasterToMaster.MasterData import io.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersData} import io.gearpump.cluster.MasterToClient.HistoryMetrics +import io.gearpump.cluster.master.MasterSummary +import io.gearpump.cluster.worker.{WorkerId, WorkerSummary} +import io.gearpump.cluster.{AppJar, MasterToAppMaster} import io.gearpump.integrationtest.{Docker, Util} import io.gearpump.services.AppMasterService.Status import io.gearpump.services.MasterService.{AppSubmissionResult, BuiltinPartitioners} +// NOTE: This cannot be removed!!! +import io.gearpump.services.util.UpickleUtil._ import io.gearpump.streaming.ProcessorDescription -import io.gearpump.cluster.AppMasterToMaster.MasterData -import io.gearpump.cluster.master.MasterSummary -import io.gearpump.cluster.worker.WorkerSummary import io.gearpump.streaming.appmaster.AppMaster.ExecutorBrief import io.gearpump.streaming.appmaster.DagManager.{DAGOperationResult, ReplaceProcessor} import io.gearpump.streaming.appmaster.StreamAppMasterSummary import io.gearpump.streaming.executor.Executor.ExecutorSummary import io.gearpump.util.{Constants, Graph} -import org.apache.log4j.Logger -import upickle.Js -import upickle.default._ -import io.gearpump.services.util.UpickleUtil._ - -import scala.reflect.ClassTag /** * A REST client to operate a Gearpump cluster @@ -50,14 +51,16 @@ class RestClient(host: String, port: Int) { private val cookieFile: String = "cookie.txt" - implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = upickle.default.Reader[Graph[Int, String]] { + implicit val graphReader: upickle.default.Reader[Graph[Int, String]] = + upickle.default.Reader[Graph[Int, String]] { case Js.Obj(verties, edges) => val vertexList = upickle.default.readJs[List[Int]](verties._2) val edgeList = upickle.default.readJs[List[(Int, String, Int)]](edges._2) Graph(vertexList, edgeList) } - private def decodeAs[T](expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try { + private def decodeAs[T]( + expr: String)(implicit reader: upickle.default.Reader[T], classTag: ClassTag[T]): T = try { read[T](expr) } catch { case ex: Throwable => @@ -91,7 +94,8 @@ class RestClient(host: String, port: Int) { listApps().length + 1 } - def submitApp(jar: String, executorNum: Int, args: String = "", config: String = ""): Boolean = try { + def submitApp(jar: String, executorNum: Int, args: String = "", config: String = "") + : Boolean = try { var endpoint = "master/submitapp" var options = Seq(s"jar=@$jar") @@ -130,7 +134,8 @@ class RestClient(host: String, port: Int) { decodeAs[StreamAppMasterSummary](resp) } - def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*"): HistoryMetrics = { + def queryStreamingAppMetrics(appId: Int, current: Boolean, path: String = "processor*") + : HistoryMetrics = { val args = if (current) "?readLatest=true" else "" val resp = callApi(s"appmaster/$appId/metrics/app$appId.$path$args") decodeAs[HistoryMetrics](resp) @@ -196,7 +201,8 @@ class RestClient(host: String, port: Int) { def replaceStreamingAppProcessor(appId: Int, replaceMe: ProcessorDescription): Boolean = try { val replaceOperation = new ReplaceProcessor(replaceMe.id, replaceMe) val args = upickle.default.write(replaceOperation) - val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args), CRUD_POST) + val resp = callApi(s"appmaster/$appId/dynamicdag?args=" + Util.encodeUriComponent(args), + CRUD_POST) decodeAs[DAGOperationResult](resp) true } catch {
