http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala deleted file mode 100644 index 7447308..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib - -import kafka.common.TopicAndPartition -import kafka.server.{KafkaConfig => KafkaServerConfig} -import kafka.utils.{TestUtils, TestZKUtils} -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfterEach, Matchers, PropSpec} - -import io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper -import io.gearpump.streaming.kafka.util.KafkaServerHarness - -class KafkaUtilSpec - extends PropSpec with PropertyChecks with BeforeAndAfterEach - with Matchers with KafkaServerHarness { - - val numServers = 1 - override val configs: List[KafkaServerConfig] = - for (props <- TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false)) - yield new KafkaServerConfig(props) { - override val zkConnect = TestZKUtils.zookeeperConnect - override val numPartitions = 4 - } - - override def beforeEach(): Unit = { - super.setUp() - } - - override def afterEach(): Unit = { - super.tearDown() - } - - import io.gearpump.streaming.kafka.lib.KafkaUtil._ - - property("KafkaUtil should be able to create topic") { - val args = { - Table( - ("topic", "partitions", "replicas"), - (TestUtils.tempTopic, 1, numServers), - (TestUtils.tempTopic, 2, numServers + 1), - ("", 1, numServers), - (TestUtils.tempTopic, 0, numServers), - (TestUtils.tempTopic, 1, 0) - ) - } - forAll(args) { - (topic: String, partitions: Int, replicas: Int) => - if (topic.nonEmpty && partitions > 0 && replicas > 0 && replicas <= numServers) { - createTopic(connectZk(), topic, partitions, replicas) shouldBe false - createTopic(connectZk(), topic, partitions, replicas) shouldBe true - } else { - intercept[RuntimeException] { - createTopic(connectZk(), topic, partitions, replicas) - } - } - } - } - - property("KafkaUtil should be able to get broker info") { - val brokerList = getBrokerList - val partitions = 2 - val replicas = numServers - val topic = TestUtils.tempTopic() - intercept[RuntimeException] { - getBroker(connectZk(), topic, partitions) - } - val partitionsToBrokers = createTopicUntilLeaderIsElected(topic, partitions, replicas) - 0.until(partitions).foreach { part => - val hostPorts = brokerList(partitionsToBrokers(part).get).split(":") - val broker = getBroker(connectZk(), topic, part) - - broker.host shouldBe hostPorts(0) - broker.port.toString shouldBe hostPorts(1) - } - } - - property("KafkaUtil should be able to get TopicAndPartitions info and group with KafkaGrouper") { - val grouper: KafkaGrouper = new KafkaGrouper { - override def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) - : Array[TopicAndPartition] = { - topicAndPartitions - } - } - val topicNum = 3 - val topics = List.fill(topicNum)(TestUtils.tempTopic()) - topics.foreach(t => createTopicUntilLeaderIsElected(t, partitions = 1, replicas = 1)) - KafkaUtil.getTopicAndPartitions(connectZk(), topics).toSet shouldBe - topics.map(t => TopicAndPartition(t, 0)).toSet - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala deleted file mode 100644 index 3983874..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib.consumer - -import org.scalatest.{Matchers, WordSpec} - -class ExponentialBackoffSleeperSpec extends WordSpec with Matchers { - - "ExponentialBackOffSleeper" should { - "sleep for increasing duration" in { - val sleeper = new ExponentialBackoffSleeper( - backOffMultiplier = 2.0, - initialDurationMs = 100, - maximumDurationMs = 10000 - ) - sleeper.getSleepDuration shouldBe 100 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 200 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 400 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 800 - } - - "sleep for no more than maximum duration" in { - val sleeper = new ExponentialBackoffSleeper( - backOffMultiplier = 2.0, - initialDurationMs = 6400, - maximumDurationMs = 10000 - ) - sleeper.getSleepDuration shouldBe 6400 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 10000 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 10000 - } - - "sleep for initial duration after reset" in { - val sleeper = new ExponentialBackoffSleeper( - backOffMultiplier = 2.0, - initialDurationMs = 100, - maximumDurationMs = 10000 - ) - sleeper.getSleepDuration shouldBe 100 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 200 - sleeper.reset() - sleeper.getSleepDuration shouldBe 100 - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala deleted file mode 100644 index d955dfa..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib.consumer - -import java.util.concurrent.LinkedBlockingQueue - -import kafka.common.TopicAndPartition -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val nonNegativeGen = Gen.choose[Int](0, 1000) - val positiveGen = Gen.choose[Int](1, 1000) - val startOffsetGen = Gen.choose[Long](0L, 1000L) - property("FetchThread should set startOffset to iterators") { - forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) { - (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) => - val topicAndPartition = mock[TopicAndPartition] - val consumer = mock[KafkaConsumer] - val createConsumer = (tp: TopicAndPartition) => consumer - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, - incomingQueue, fetchThreshold, fetchSleepMS) - fetchThread.setStartOffset(topicAndPartition, startOffset) - verify(consumer).setStartOffset(startOffset) - } - } - - val topicAndPartitionGen = for { - topic <- Gen.alphaStr - partition <- Gen.choose[Int](0, Int.MaxValue) - } yield TopicAndPartition(topic, partition) - property("FetchThread should only fetchMessage when the number " + - "of messages in queue is below the threshold") { - forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) { - (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int, - startOffset: Long, topicAndPartition: TopicAndPartition) => - val message = mock[KafkaMessage] - val consumer = mock[KafkaConsumer] - val createConsumer = (tp: TopicAndPartition) => consumer - when(consumer.hasNext).thenReturn(true) - when(consumer.next).thenReturn(message) - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, - incomingQueue, fetchThreshold, fetchSleepMS) - - 0.until(messageNum) foreach { _ => - fetchThread.fetchMessage - } - - incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold) - } - } - - property("FetchThread poll should try to retrieve and remove the head of incoming queue") { - val topicAndPartition = mock[TopicAndPartition] - val consumer = mock[KafkaConsumer] - val createConsumer = (tp: TopicAndPartition) => consumer - val kafkaMsg = mock[KafkaMessage] - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - incomingQueue.put(kafkaMsg) - val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, incomingQueue, 0, 0) - fetchThread.poll shouldBe Some(kafkaMsg) - fetchThread.poll shouldBe None - } - - val tpAndHasNextGen = for { - tp <- topicAndPartitionGen - hasNext <- Gen.oneOf(true, false) - } yield (tp, hasNext) - - val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen) - .map(_.toMap) suchThat (_.nonEmpty) - - property("FetchThread fetchMessage should return false when there are no more messages " + - "from any TopicAndPartition") { - forAll(tpHasNextMapGen, nonNegativeGen) { - (tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) => - val createConsumer = (tp: TopicAndPartition) => { - val consumer = mock[KafkaConsumer] - val kafkaMsg = mock[KafkaMessage] - val hasNext = tpHasNextMap(tp) - when(consumer.hasNext).thenReturn(hasNext) - when(consumer.next).thenReturn(kafkaMsg) - consumer - } - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - val fetchThread = new FetchThread(tpHasNextMap.keys.toArray, - createConsumer, incomingQueue, tpHasNextMap.size + 1, fetchSleepMS) - fetchThread.fetchMessage shouldBe tpHasNextMap.values.reduce(_ || _) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala deleted file mode 100644 index 524fc9b..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib.consumer - -import com.twitter.bijection.Injection -import kafka.api.OffsetRequest -import kafka.common.TopicAndPartition -import kafka.consumer.SimpleConsumer -import kafka.message.{Message, MessageAndOffset} -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, Array[Byte]](msg))) - val messageNumGen = Gen.choose[Int](0, 1000) - val topicAndPartitionGen = for { - topic <- Gen.alphaStr - partition <- Gen.choose[Int](0, Int.MaxValue) - } yield (topic, partition) - - property("KafkaConsumer should iterate MessageAndOffset calling hasNext and next") { - forAll(messageGen, messageNumGen, topicAndPartitionGen) { - (message: Message, num: Int, topicAndPartition: (String, Int)) => - val (topic, partition) = topicAndPartition - val consumer = mock[SimpleConsumer] - when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), - OffsetRequest.EarliestTime, -1)).thenReturn(0) - val iterator = 0.until(num).map(index => MessageAndOffset(message, index.toLong)).iterator - val getIterator = (offset: Long) => iterator - val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) - 0.until(num).foreach { i => - kafkaConsumer.hasNext shouldBe true - val kafkaMessage = kafkaConsumer.next - kafkaMessage.offset shouldBe i.toLong - kafkaMessage.key shouldBe None - } - kafkaConsumer.hasNext shouldBe false - } - } - - val startOffsetGen = Gen.choose[Long](1L, 1000L) - property("KafkaConsumer setStartOffset should reset internal iterator") { - forAll(topicAndPartitionGen, startOffsetGen) { - (topicAndPartition: (String, Int), startOffset: Long) => - val (topic, partition) = topicAndPartition - val consumer = mock[SimpleConsumer] - val getIterator = mock[Long => Iterator[MessageAndOffset]] - when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), - OffsetRequest.EarliestTime, -1)).thenReturn(0) - val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) - kafkaConsumer.setStartOffset(startOffset) - verify(getIterator).apply(startOffset) - } - } - - property("KafkaConsumer close should close SimpleConsumer") { - forAll(topicAndPartitionGen) { - (topicAndPartition: (String, Int)) => - val (topic, partition) = topicAndPartition - val consumer = mock[SimpleConsumer] - when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), - OffsetRequest.EarliestTime, -1)).thenReturn(0) - val getIterator = mock[Long => Iterator[MessageAndOffset]] - val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) - kafkaConsumer.close() - verify(consumer).close() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala deleted file mode 100644 index 46067ab..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib.grouper - -import kafka.common.TopicAndPartition -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class KafkaDefaultGrouperSpec extends PropSpec with PropertyChecks with Matchers { - property("KafkaDefaultGrouper should group TopicAndPartitions in a round-robin way") { - forAll(Gen.posNum[Int], Gen.posNum[Int], Gen.posNum[Int]) { - (topicNum: Int, partitionNum: Int, taskNum: Int) => { - val topicAndPartitions = for { - t <- 0.until(topicNum) - p <- 0.until(partitionNum) - } yield TopicAndPartition("topic" + t, p) - 0.until(taskNum).foreach { taskIndex => - val grouper = new KafkaDefaultGrouper - grouper.group(taskNum, taskIndex, topicAndPartitions.toArray).forall( - tp => topicAndPartitions.indexOf(tp) % taskNum == taskIndex) - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala deleted file mode 100644 index ad315fe..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/KafkaServerHarness.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.util - -import java.util.Properties - -import kafka.admin.AdminUtils -import kafka.common.KafkaException -import kafka.server.{KafkaConfig => KafkaServerConfig, KafkaServer} -import kafka.utils.{TestUtils, Utils} - -trait KafkaServerHarness extends ZookeeperHarness { - val configs: List[KafkaServerConfig] - private var servers: List[KafkaServer] = null - private var brokerList: String = null - - def getServers: List[KafkaServer] = servers - def getBrokerList: Array[String] = brokerList.split(",") - - override def setUp() { - super.setUp - if (configs.size <= 0) { - throw new KafkaException("Must supply at least one server config.") - } - brokerList = TestUtils.getBrokerListStrFromConfigs(configs) - servers = configs.map(TestUtils.createServer(_)) - } - - override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => server.config.logDirs.map(Utils.rm(_))) - super.tearDown - } - - def createTopicUntilLeaderIsElected( - topic: String, partitions: Int, replicas: Int, timeout: Long = 10000) - : Map[Int, Option[Int]] = { - val zkClient = connectZk() - try { - // Creates topic - AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties) - // Waits until the update metadata request for new topic reaches all servers - (0 until partitions).map { case i => - TestUtils.waitUntilMetadataIsPropagated(servers, topic, i, timeout) - i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i, timeout) - }.toMap - } catch { - case e: Exception => throw e - } finally { - zkClient.close() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala deleted file mode 100644 index 45fe760..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/util/ZookeeperHarness.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.util - -import kafka.utils.{TestZKUtils, Utils, ZKStringSerializer} -import kafka.zk.EmbeddedZookeeper -import org.I0Itec.zkclient.ZkClient - -trait ZookeeperHarness { - val zkConnect: String = TestZKUtils.zookeeperConnect - val zkConnectionTimeout = 60000 - val zkSessionTimeout = 60000 - private var zookeeper: EmbeddedZookeeper = null - - def getZookeeper: EmbeddedZookeeper = zookeeper - def connectZk: () => ZkClient = () => { - new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) - } - - def setUp() { - zookeeper = new EmbeddedZookeeper(zkConnect) - } - - def tearDown() { - Utils.swallow(zookeeper.shutdown()) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala new file mode 100644 index 0000000..2f8a533 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka + +import com.twitter.bijection.Injection +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.MockUtil + +class KafkaSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val dataGen = for { + topic <- Gen.alphaStr + key <- Gen.alphaStr + msg <- Gen.alphaStr + } yield (topic, Injection[String, Array[Byte]](key), Injection[String, Array[Byte]](msg)) + + property("KafkaSink write should send producer record") { + forAll(dataGen) { + (data: (String, Array[Byte], Array[Byte])) => + val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] + val (topic, key, msg) = data + val kafkaSink = new KafkaSink(() => producer, topic) + kafkaSink.write(Message((key, msg))) + verify(producer).send(MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]]( + r => r.topic == topic && (r.key sameElements key) && (r.value sameElements msg))) + kafkaSink.write(Message(msg)) + verify(producer).send(MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]]( + r => r.topic() == topic && (r.key == null) && (r.value() sameElements msg) + )) + kafkaSink.close() + } + } + + property("KafkaSink close should close kafka producer") { + val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] + val kafkaSink = new KafkaSink(() => producer, "topic") + kafkaSink.close() + verify(producer).close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala new file mode 100644 index 0000000..8055d00 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka + +import scala.util.{Failure, Success} + +import com.twitter.bijection.Injection +import kafka.common.TopicAndPartition +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage} +import org.apache.gearpump.streaming.kafka.lib.{KafkaOffsetManager, KafkaSourceConfig} +import org.apache.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty +import org.apache.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter} + +class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val startTimeGen = Gen.choose[Long](0L, 1000L) + val offsetGen = Gen.choose[Long](0L, 1000L) + + property("KafkaSource open sets consumer to earliest offset") { + val topicAndPartition = mock[TopicAndPartition] + val fetchThread = mock[FetchThread] + val offsetManager = mock[KafkaOffsetManager] + val messageDecoder = mock[MessageDecoder] + val timestampFilter = mock[TimeStampFilter] + val offsetStorageFactory = mock[OffsetStorageFactory] + val kafkaConfig = mock[KafkaSourceConfig] + val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, + timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) + + kafkaSource.setStartTime(None) + + verify(fetchThread).start() + verify(fetchThread, never()).setStartOffset(anyObject[TopicAndPartition](), anyLong()) + } + + property("KafkaSource open should not set consumer start offset if offset storage is empty") { + forAll(startTimeGen) { (startTime: Long) => + val offsetManager = mock[KafkaOffsetManager] + val topicAndPartition = mock[TopicAndPartition] + val fetchThread = mock[FetchThread] + val messageDecoder = mock[MessageDecoder] + val timestampFilter = mock[TimeStampFilter] + val offsetStorageFactory = mock[OffsetStorageFactory] + val kafkaConfig = mock[KafkaSourceConfig] + val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, + timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) + + when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(StorageEmpty)) + + source.setStartTime(Some(startTime)) + verify(fetchThread, never()).setStartOffset(anyObject[TopicAndPartition](), anyLong()) + verify(fetchThread).start() + + when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new RuntimeException)) + intercept[RuntimeException] { + source.setStartTime(Some(startTime)) + } + source.close() + } + } + + property("KafkaSource open should set consumer start offset if offset storage is not empty") { + forAll(startTimeGen, offsetGen) { + (startTime: Long, offset: Long) => + val offsetManager = mock[KafkaOffsetManager] + val topicAndPartition = mock[TopicAndPartition] + val fetchThread = mock[FetchThread] + val messageDecoder = mock[MessageDecoder] + val timestampFilter = mock[TimeStampFilter] + val offsetStorageFactory = mock[OffsetStorageFactory] + val kafkaConfig = mock[KafkaSourceConfig] + val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, + timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) + + when(offsetManager.resolveOffset(startTime)).thenReturn(Success(offset)) + + source.setStartTime(Some(startTime)) + verify(fetchThread).setStartOffset(topicAndPartition, offset) + verify(fetchThread).start() + + when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new RuntimeException)) + intercept[RuntimeException] { + source.setStartTime(Some(startTime)) + } + source.close() + } + } + + property("KafkaSource read should return number of messages in best effort") { + val kafkaMsgGen = for { + topic <- Gen.alphaStr + partition <- Gen.choose[Int](0, 1000) + offset <- Gen.choose[Long](0L, 1000L) + key = None + msg <- Gen.alphaStr.map(Injection[String, Array[Byte]]) + } yield KafkaMessage(TopicAndPartition(topic, partition), offset, key, msg) + val msgQueueGen = Gen.containerOf[Array, KafkaMessage](kafkaMsgGen) + forAll(msgQueueGen) { + (msgQueue: Array[KafkaMessage]) => + val offsetManager = mock[KafkaOffsetManager] + val fetchThread = mock[FetchThread] + val messageDecoder = mock[MessageDecoder] + + val timestampFilter = mock[TimeStampFilter] + val offsetStorageFactory = mock[OffsetStorageFactory] + val kafkaConfig = mock[KafkaSourceConfig] + val offsetManagers = msgQueue.map(_.topicAndPartition -> offsetManager).toMap + + val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, + timestampFilter, Some(fetchThread), offsetManagers) + + if (msgQueue.isEmpty) { + when(fetchThread.poll).thenReturn(None) + source.read() shouldBe null + } else { + msgQueue.indices.foreach { i => + val message = Message(msgQueue(i).msg) + when(fetchThread.poll).thenReturn(Option(msgQueue(i))) + when(messageDecoder.fromBytes(anyObject[Array[Byte]])).thenReturn(message) + when(offsetManager.filter(anyObject[(Message, Long)])).thenReturn(Some(message)) + when(timestampFilter.filter(anyObject[Message], anyLong())).thenReturn(Some(message)) + + source.read shouldBe message + } + } + source.close() + } + } + + property("KafkaSource close should close all offset managers") { + val offsetManager = mock[KafkaOffsetManager] + val topicAndPartition = mock[TopicAndPartition] + val fetchThread = mock[FetchThread] + val timestampFilter = mock[TimeStampFilter] + val messageDecoder = mock[MessageDecoder] + val offsetStorageFactory = mock[OffsetStorageFactory] + val kafkaConfig = mock[KafkaSourceConfig] + val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, + timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) + source.close() + verify(offsetManager).close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala new file mode 100644 index 0000000..e243eab --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib + +import com.twitter.bijection.Injection +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers { + property("DefaultMessageDecoder should keep the original bytes data in Message") { + val decoder = new DefaultMessageDecoder() + forAll(Gen.alphaStr) { (s: String) => + val bytes = Injection[String, Array[Byte]](s) + decoder.fromBytes(bytes).msg shouldBe bytes + } + } +} + +class StringMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers { + property("StringMessageDecoder should decode original bytes data into string") { + val decoder = new StringMessageDecoder() + forAll(Gen.alphaStr) { (s: String) => + val bytes = Injection[String, Array[Byte]](s) + decoder.fromBytes(bytes).msg shouldBe s + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala new file mode 100644 index 0000000..987d975 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib + +import scala.util.{Failure, Success} + +import com.twitter.bijection.Injection +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.transaction.api.OffsetStorage +import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} + +class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val timeStampGen = Gen.choose[Long](0L, 1000L) + val messageGen = for { + msg <- Gen.alphaStr + time <- timeStampGen + } yield Message(msg, time) + + val messageAndOffsetsGen = Gen.listOf[Message](messageGen).map(_.zipWithIndex) + + property("KafkaOffsetManager should append offset to storage in monotonically" + + " increasing time order") { + forAll(messageAndOffsetsGen) { (messageAndOffsets: List[(Message, Int)]) => + val offsetStorage = mock[OffsetStorage] + val offsetManager = new KafkaOffsetManager(offsetStorage) + messageAndOffsets.foldLeft(0L) { (max, messageAndOffset) => + val (message, offset) = messageAndOffset + offsetManager.filter((message, offset.toLong)) shouldBe Option(message) + if (message.timestamp > max) { + val newMax = message.timestamp + verify(offsetStorage).append(newMax, Injection[Long, Array[Byte]](offset.toLong)) + newMax + } else { + verifyZeroInteractions(offsetStorage) + max + } + } + offsetManager.close() + } + } + + val minTimeStampGen = Gen.choose[Long](0L, 500L) + val maxTimeStampGen = Gen.choose[Long](500L, 1000L) + property("KafkaOffsetManager resolveOffset should " + + "report StorageEmpty failure when storage is empty") { + forAll(timeStampGen) { (time: Long) => + val offsetStorage = mock[OffsetStorage] + val offsetManager = new KafkaOffsetManager(offsetStorage) + when(offsetStorage.lookUp(time)).thenReturn(Failure(StorageEmpty)) + offsetManager.resolveOffset(time) shouldBe Failure(StorageEmpty) + + doThrow(new RuntimeException).when(offsetStorage).lookUp(time) + intercept[RuntimeException] { + offsetManager.resolveOffset(time) + } + offsetManager.close() + } + } + + val offsetGen = Gen.choose[Long](0L, 1000L) + property("KafkaOffsetManager resolveOffset should return a valid" + + " offset when storage is not empty") { + forAll(timeStampGen, minTimeStampGen, maxTimeStampGen, offsetGen) { + (time: Long, min: Long, max: Long, offset: Long) => + val offsetStorage = mock[OffsetStorage] + val offsetManager = new KafkaOffsetManager(offsetStorage) + if (time < min) { + when(offsetStorage.lookUp(time)).thenReturn(Failure( + Underflow(Injection[Long, Array[Byte]](min)))) + offsetManager.resolveOffset(time) shouldBe Success(min) + } else if (time > max) { + when(offsetStorage.lookUp(time)).thenReturn(Failure( + Overflow(Injection[Long, Array[Byte]](max)))) + offsetManager.resolveOffset(time) shouldBe Success(max) + } else { + when(offsetStorage.lookUp(time)).thenReturn(Success(Injection[Long, Array[Byte]](offset))) + offsetManager.resolveOffset(time) shouldBe Success(offset) + } + + doThrow(new RuntimeException).when(offsetStorage).lookUp(time) + intercept[RuntimeException] { + offsetManager.resolveOffset(time) + } + offsetManager.close() + } + } + + property("KafkaOffsetManager close should close offset storage") { + val offsetStorage = mock[OffsetStorage] + val offsetManager = new KafkaOffsetManager(offsetStorage) + offsetManager.close() + verify(offsetStorage).close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala new file mode 100644 index 0000000..6e4a1c4 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib + +// TODO: Fix the UT failure! + +// class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { +// val minTimeGen = Gen.choose[Long](1L, 500L) +// val maxTimeGen = Gen.choose[Long](500L, 999L) +// +// property("KafkaStorage lookup time should report StorageEmpty if storage is empty") { +// forAll { (time: Long, topic: String) => +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val getConsumer = () => mock[KafkaConsumer] +// val connectZk = () => mock[ZkClient] +// val storage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(), +// connectZk()) +// storage.lookUp(time) shouldBe Failure(StorageEmpty) +// } +// } +// +// property("KafkaStorage lookup time should return data or report failure if storage not empty") { +// forAll(minTimeGen, maxTimeGen, Gen.alphaStr) {(minTime: Long, maxTime: Long, topic: String) => +// val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case (time, index) => +// val offset = index.toLong +// time -> offset +// } +// val timeAndOffsetsMap = timeAndOffsets.toMap +// val data = timeAndOffsets.map { +// case (time, offset) => +// new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, Array[Byte]](time)), +// Injection[Long, Array[Byte]](offset)) +// }.toList +// +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val consumer = mock[KafkaConsumer] +// val getConsumer = () => consumer +// val connectZk = () => mock[ZkClient] +// +// val hasNexts = List.fill(data.tail.size)(true) :+ false +// when(consumer.hasNext).thenReturn(true, hasNexts:_*) +// when(consumer.next).thenReturn(data.head, data.tail:_*) +// +// val storage = new KafkaStorage(topic, topicExists = true, producer, +// getConsumer(), connectZk()) +// forAll(Gen.choose[Long](minTime, maxTime)) { +// time => +// storage.lookUp(time) match { +// case Success(array) => +// array should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(time))) +// case Failure(e) => fail("time in range should return Success with value") +// } +// } +// +// forAll(Gen.choose[Long](0L, minTime - 1)) { +// time => +// storage.lookUp(time) match { +// case Failure(e) => e shouldBe a [Underflow] +// e.asInstanceOf[Underflow].min should equal +// (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime))) +// case Success(_) => fail("time less than min should return Underflow failure") +// } +// } +// +// forAll(Gen.choose[Long](maxTime + 1, 1000L)) { +// time => +// storage.lookUp(time) match { +// case Failure(e) => e shouldBe a [Overflow] +// e.asInstanceOf[Overflow].max should equal +// (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime))) +// case Success(_) => fail("time larger than max should return Overflow failure") +// } +// } +// } +// } +// +// property("KafkaStorage append should send data to Kafka") { +// forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000), +// Gen.alphaStr, Gen.oneOf(true, false)) { +// (time: Long, offset: Long, topic: String, topicExists: Boolean) => +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val getConsumer = () => mock[KafkaConsumer] +// val connectZk = () => mock[ZkClient] +// val storage = new KafkaStorage(topic, topicExists, producer, getConsumer(), connectZk()) +// val offsetBytes = Injection[Long, Array[Byte]](offset) +// storage.append(time, offsetBytes) +// verify(producer).send(anyObject[ProducerRecord[Array[Byte], Array[Byte]]]()) +// } +// } +// +// val topicAndPartitionGen = for { +// topic <- Gen.alphaStr +// partition <- Gen.choose[Int](0, 100) +// } yield TopicAndPartition(topic, partition) +// property("KafkaStorage should load data from Kafka") { +// val kafkaMsgGen = for { +// timestamp <- Gen.choose[Long](1L, 1000L) +// offset <- Gen.choose[Long](0L, 1000L) +// } yield (timestamp, Injection[Long, Array[Byte]](offset)) +// val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen) +// +// val topicExistsGen = Gen.oneOf(true, false) +// +// forAll(topicAndPartitionGen, msgListGen) { +// (topicAndPartition: TopicAndPartition, msgList: List[(Long, Array[Byte])]) => +// val producer= mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val consumer = mock[KafkaConsumer] +// val getConsumer = () => consumer +// val connectZk = () => mock[ZkClient] +// val kafkaStorage = new KafkaStorage(topicAndPartition.topic, +// topicExists = true, producer, getConsumer(), connectZk()) +// msgList match { +// case Nil => +// when(consumer.hasNext).thenReturn(false) +// case list => +// val hasNexts = List.fill(list.tail.size)(true) :+ false +// val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, bytes), index) => +// KafkaMessage(topicAndPartition, index.toLong, +// Some(Injection[Long, Array[Byte]](timestamp)), bytes) +// } +// when(consumer.hasNext).thenReturn(true, hasNexts: _*) +// when(consumer.next).thenReturn(kafkaMsgList.head, kafkaMsgList.tail: _*) +// } +// kafkaStorage.load(consumer) shouldBe msgList +// } +// } +// +// property("KafkaStorage should not get consumer when topic doesn't exist") { +// forAll(Gen.alphaStr) { (topic: String) => +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val getConsumer = mock[() => KafkaConsumer] +// val connectZk = () => mock[ZkClient] +// val kafkaStorage = new KafkaStorage(topic, +// topicExists = false, producer, getConsumer(), connectZk()) +// verify(getConsumer, never()).apply() +// kafkaStorage.close() +// } +// } +// +// property("KafkaStorage should fail to load invalid KafkaMessage") { +// val invalidKafkaMsgGen = for { +// tp <- topicAndPartitionGen +// offset <- Gen.choose[Long](1L, 1000L) +// timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, Array[Byte]](ByteBuffer.allocate(0))), +// None) +// msg <- Gen.alphaStr.map(Injection[String, Array[Byte]]) +// } yield KafkaMessage(tp, offset, timestamp, msg) +// forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) => +// val consumer = mock[KafkaConsumer] +// val getConsumer = () => consumer +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val connectZk = () => mock[ZkClient] +// val kafkaStorage = new KafkaStorage(invalidKafkaMsg.topicAndPartition.topic, +// topicExists = true, producer, getConsumer(), connectZk()) +// when(consumer.hasNext).thenReturn(true, false) +// when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg) +// Try(kafkaStorage.load(consumer)).isFailure shouldBe true +// } +// } +// +// property("KafkaStorage close should close kafka producer and delete topic") { +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val getConsumer = () => mock[KafkaConsumer] +// val zkClient = mock[ZkClient] +// val connectZk = () => zkClient +// val kafkaStorage = new KafkaStorage("topic", false, producer, getConsumer(), connectZk()) +// kafkaStorage.close() +// verify(producer).close() +// verify(zkClient).createPersistent(anyString(), anyString()) +// } +// } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala new file mode 100644 index 0000000..099be3d --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib + +import kafka.common.TopicAndPartition +import kafka.server.{KafkaConfig => KafkaServerConfig} +import kafka.utils.{TestUtils, TestZKUtils} +import org.scalatest.prop.PropertyChecks +import org.scalatest.{BeforeAndAfterEach, Matchers, PropSpec} + +import org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper +import org.apache.gearpump.streaming.kafka.util.KafkaServerHarness + +class KafkaUtilSpec + extends PropSpec with PropertyChecks with BeforeAndAfterEach + with Matchers with KafkaServerHarness { + + val numServers = 1 + override val configs: List[KafkaServerConfig] = + for (props <- TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false)) + yield new KafkaServerConfig(props) { + override val zkConnect = TestZKUtils.zookeeperConnect + override val numPartitions = 4 + } + + override def beforeEach(): Unit = { + super.setUp() + } + + override def afterEach(): Unit = { + super.tearDown() + } + + import org.apache.gearpump.streaming.kafka.lib.KafkaUtil._ + + property("KafkaUtil should be able to create topic") { + val args = { + Table( + ("topic", "partitions", "replicas"), + (TestUtils.tempTopic, 1, numServers), + (TestUtils.tempTopic, 2, numServers + 1), + ("", 1, numServers), + (TestUtils.tempTopic, 0, numServers), + (TestUtils.tempTopic, 1, 0) + ) + } + forAll(args) { + (topic: String, partitions: Int, replicas: Int) => + if (topic.nonEmpty && partitions > 0 && replicas > 0 && replicas <= numServers) { + createTopic(connectZk(), topic, partitions, replicas) shouldBe false + createTopic(connectZk(), topic, partitions, replicas) shouldBe true + } else { + intercept[RuntimeException] { + createTopic(connectZk(), topic, partitions, replicas) + } + } + } + } + + property("KafkaUtil should be able to get broker info") { + val brokerList = getBrokerList + val partitions = 2 + val replicas = numServers + val topic = TestUtils.tempTopic() + intercept[RuntimeException] { + getBroker(connectZk(), topic, partitions) + } + val partitionsToBrokers = createTopicUntilLeaderIsElected(topic, partitions, replicas) + 0.until(partitions).foreach { part => + val hostPorts = brokerList(partitionsToBrokers(part).get).split(":") + val broker = getBroker(connectZk(), topic, part) + + broker.host shouldBe hostPorts(0) + broker.port.toString shouldBe hostPorts(1) + } + } + + property("KafkaUtil should be able to get TopicAndPartitions info and group with KafkaGrouper") { + val grouper: KafkaGrouper = new KafkaGrouper { + override def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) + : Array[TopicAndPartition] = { + topicAndPartitions + } + } + val topicNum = 3 + val topics = List.fill(topicNum)(TestUtils.tempTopic()) + topics.foreach(t => createTopicUntilLeaderIsElected(t, partitions = 1, replicas = 1)) + KafkaUtil.getTopicAndPartitions(connectZk(), topics).toSet shouldBe + topics.map(t => TopicAndPartition(t, 0)).toSet + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala new file mode 100644 index 0000000..cc83b51 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.consumer + +import org.scalatest.{Matchers, WordSpec} + +class ExponentialBackoffSleeperSpec extends WordSpec with Matchers { + + "ExponentialBackOffSleeper" should { + "sleep for increasing duration" in { + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 100, + maximumDurationMs = 10000 + ) + sleeper.getSleepDuration shouldBe 100 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 200 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 400 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 800 + } + + "sleep for no more than maximum duration" in { + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 6400, + maximumDurationMs = 10000 + ) + sleeper.getSleepDuration shouldBe 6400 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 10000 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 10000 + } + + "sleep for initial duration after reset" in { + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 100, + maximumDurationMs = 10000 + ) + sleeper.getSleepDuration shouldBe 100 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 200 + sleeper.reset() + sleeper.getSleepDuration shouldBe 100 + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala new file mode 100644 index 0000000..b6659a1 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.consumer + +import java.util.concurrent.LinkedBlockingQueue + +import kafka.common.TopicAndPartition +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val nonNegativeGen = Gen.choose[Int](0, 1000) + val positiveGen = Gen.choose[Int](1, 1000) + val startOffsetGen = Gen.choose[Long](0L, 1000L) + property("FetchThread should set startOffset to iterators") { + forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) { + (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) => + val topicAndPartition = mock[TopicAndPartition] + val consumer = mock[KafkaConsumer] + val createConsumer = (tp: TopicAndPartition) => consumer + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, + incomingQueue, fetchThreshold, fetchSleepMS) + fetchThread.setStartOffset(topicAndPartition, startOffset) + verify(consumer).setStartOffset(startOffset) + } + } + + val topicAndPartitionGen = for { + topic <- Gen.alphaStr + partition <- Gen.choose[Int](0, Int.MaxValue) + } yield TopicAndPartition(topic, partition) + property("FetchThread should only fetchMessage when the number " + + "of messages in queue is below the threshold") { + forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) { + (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int, + startOffset: Long, topicAndPartition: TopicAndPartition) => + val message = mock[KafkaMessage] + val consumer = mock[KafkaConsumer] + val createConsumer = (tp: TopicAndPartition) => consumer + when(consumer.hasNext).thenReturn(true) + when(consumer.next).thenReturn(message) + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, + incomingQueue, fetchThreshold, fetchSleepMS) + + 0.until(messageNum) foreach { _ => + fetchThread.fetchMessage + } + + incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold) + } + } + + property("FetchThread poll should try to retrieve and remove the head of incoming queue") { + val topicAndPartition = mock[TopicAndPartition] + val consumer = mock[KafkaConsumer] + val createConsumer = (tp: TopicAndPartition) => consumer + val kafkaMsg = mock[KafkaMessage] + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + incomingQueue.put(kafkaMsg) + val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, incomingQueue, 0, 0) + fetchThread.poll shouldBe Some(kafkaMsg) + fetchThread.poll shouldBe None + } + + val tpAndHasNextGen = for { + tp <- topicAndPartitionGen + hasNext <- Gen.oneOf(true, false) + } yield (tp, hasNext) + + val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen) + .map(_.toMap) suchThat (_.nonEmpty) + + property("FetchThread fetchMessage should return false when there are no more messages " + + "from any TopicAndPartition") { + forAll(tpHasNextMapGen, nonNegativeGen) { + (tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) => + val createConsumer = (tp: TopicAndPartition) => { + val consumer = mock[KafkaConsumer] + val kafkaMsg = mock[KafkaMessage] + val hasNext = tpHasNextMap(tp) + when(consumer.hasNext).thenReturn(hasNext) + when(consumer.next).thenReturn(kafkaMsg) + consumer + } + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val fetchThread = new FetchThread(tpHasNextMap.keys.toArray, + createConsumer, incomingQueue, tpHasNextMap.size + 1, fetchSleepMS) + fetchThread.fetchMessage shouldBe tpHasNextMap.values.reduce(_ || _) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala new file mode 100644 index 0000000..3c6a663 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.consumer + +import com.twitter.bijection.Injection +import kafka.api.OffsetRequest +import kafka.common.TopicAndPartition +import kafka.consumer.SimpleConsumer +import kafka.message.{Message, MessageAndOffset} +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, Array[Byte]](msg))) + val messageNumGen = Gen.choose[Int](0, 1000) + val topicAndPartitionGen = for { + topic <- Gen.alphaStr + partition <- Gen.choose[Int](0, Int.MaxValue) + } yield (topic, partition) + + property("KafkaConsumer should iterate MessageAndOffset calling hasNext and next") { + forAll(messageGen, messageNumGen, topicAndPartitionGen) { + (message: Message, num: Int, topicAndPartition: (String, Int)) => + val (topic, partition) = topicAndPartition + val consumer = mock[SimpleConsumer] + when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), + OffsetRequest.EarliestTime, -1)).thenReturn(0) + val iterator = 0.until(num).map(index => MessageAndOffset(message, index.toLong)).iterator + val getIterator = (offset: Long) => iterator + val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) + 0.until(num).foreach { i => + kafkaConsumer.hasNext shouldBe true + val kafkaMessage = kafkaConsumer.next + kafkaMessage.offset shouldBe i.toLong + kafkaMessage.key shouldBe None + } + kafkaConsumer.hasNext shouldBe false + } + } + + val startOffsetGen = Gen.choose[Long](1L, 1000L) + property("KafkaConsumer setStartOffset should reset internal iterator") { + forAll(topicAndPartitionGen, startOffsetGen) { + (topicAndPartition: (String, Int), startOffset: Long) => + val (topic, partition) = topicAndPartition + val consumer = mock[SimpleConsumer] + val getIterator = mock[Long => Iterator[MessageAndOffset]] + when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), + OffsetRequest.EarliestTime, -1)).thenReturn(0) + val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) + kafkaConsumer.setStartOffset(startOffset) + verify(getIterator).apply(startOffset) + } + } + + property("KafkaConsumer close should close SimpleConsumer") { + forAll(topicAndPartitionGen) { + (topicAndPartition: (String, Int)) => + val (topic, partition) = topicAndPartition + val consumer = mock[SimpleConsumer] + when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), + OffsetRequest.EarliestTime, -1)).thenReturn(0) + val getIterator = mock[Long => Iterator[MessageAndOffset]] + val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) + kafkaConsumer.close() + verify(consumer).close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala new file mode 100644 index 0000000..32b8685 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.grouper + +import kafka.common.TopicAndPartition +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class KafkaDefaultGrouperSpec extends PropSpec with PropertyChecks with Matchers { + property("KafkaDefaultGrouper should group TopicAndPartitions in a round-robin way") { + forAll(Gen.posNum[Int], Gen.posNum[Int], Gen.posNum[Int]) { + (topicNum: Int, partitionNum: Int, taskNum: Int) => { + val topicAndPartitions = for { + t <- 0.until(topicNum) + p <- 0.until(partitionNum) + } yield TopicAndPartition("topic" + t, p) + 0.until(taskNum).foreach { taskIndex => + val grouper = new KafkaDefaultGrouper + grouper.group(taskNum, taskIndex, topicAndPartitions.toArray).forall( + tp => topicAndPartitions.indexOf(tp) % taskNum == taskIndex) + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala new file mode 100644 index 0000000..99231f5 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.util + +import java.util.Properties + +import kafka.admin.AdminUtils +import kafka.common.KafkaException +import kafka.server.{KafkaConfig => KafkaServerConfig, KafkaServer} +import kafka.utils.{TestUtils, Utils} + +trait KafkaServerHarness extends ZookeeperHarness { + val configs: List[KafkaServerConfig] + private var servers: List[KafkaServer] = null + private var brokerList: String = null + + def getServers: List[KafkaServer] = servers + def getBrokerList: Array[String] = brokerList.split(",") + + override def setUp() { + super.setUp + if (configs.size <= 0) { + throw new KafkaException("Must supply at least one server config.") + } + brokerList = TestUtils.getBrokerListStrFromConfigs(configs) + servers = configs.map(TestUtils.createServer(_)) + } + + override def tearDown() { + servers.map(server => server.shutdown()) + servers.map(server => server.config.logDirs.map(Utils.rm(_))) + super.tearDown + } + + def createTopicUntilLeaderIsElected( + topic: String, partitions: Int, replicas: Int, timeout: Long = 10000) + : Map[Int, Option[Int]] = { + val zkClient = connectZk() + try { + // Creates topic + AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties) + // Waits until the update metadata request for new topic reaches all servers + (0 until partitions).map { case i => + TestUtils.waitUntilMetadataIsPropagated(servers, topic, i, timeout) + i -> TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, i, timeout) + }.toMap + } catch { + case e: Exception => throw e + } finally { + zkClient.close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala new file mode 100644 index 0000000..8fb23c2 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.util + +import kafka.utils.{TestZKUtils, Utils, ZKStringSerializer} +import kafka.zk.EmbeddedZookeeper +import org.I0Itec.zkclient.ZkClient + +trait ZookeeperHarness { + val zkConnect: String = TestZKUtils.zookeeperConnect + val zkConnectionTimeout = 60000 + val zkSessionTimeout = 60000 + private var zookeeper: EmbeddedZookeeper = null + + def getZookeeper: EmbeddedZookeeper = zookeeper + def connectZk: () => ZkClient = () => { + new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) + } + + def setUp() { + zookeeper = new EmbeddedZookeeper(zkConnect) + } + + def tearDown() { + Utils.swallow(zookeeper.shutdown()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala ---------------------------------------------------------------------- diff --git a/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala b/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala deleted file mode 100644 index 3199128..0000000 --- a/external/monoid/src/main/scala/io/gearpump/streaming/monoid/AlgebirdMonoid.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.monoid - -import com.twitter.algebird.{Group => ABGroup, Monoid => ABMonoid} - -import io.gearpump.streaming.state.api.{Group, Monoid} - -class AlgebirdMonoid[T](monoid: ABMonoid[T]) extends Monoid[T] { - override def plus(l: T, r: T): T = monoid.plus(l, r) - - override def zero: T = monoid.zero -} - -class AlgebirdGroup[T](group: ABGroup[T]) extends Group[T] { - override def zero: T = group.zero - - override def plus(l: T, r: T): T = group.plus(l, r) - - override def minus(l: T, r: T): T = group.minus(l, r) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/monoid/src/main/scala/org/apache/gearpump/streaming/monoid/AlgebirdMonoid.scala ---------------------------------------------------------------------- diff --git a/external/monoid/src/main/scala/org/apache/gearpump/streaming/monoid/AlgebirdMonoid.scala b/external/monoid/src/main/scala/org/apache/gearpump/streaming/monoid/AlgebirdMonoid.scala new file mode 100644 index 0000000..2d831e1 --- /dev/null +++ b/external/monoid/src/main/scala/org/apache/gearpump/streaming/monoid/AlgebirdMonoid.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.monoid + +import com.twitter.algebird.{Group => ABGroup, Monoid => ABMonoid} +import org.apache.gearpump.streaming.state.api.{Group, Monoid} + +class AlgebirdMonoid[T](monoid: ABMonoid[T]) extends Monoid[T] { + override def plus(l: T, r: T): T = monoid.plus(l, r) + + override def zero: T = monoid.zero +} + +class AlgebirdGroup[T](group: ABGroup[T]) extends Group[T] { + override def zero: T = group.zero + + override def plus(l: T, r: T): T = group.plus(l, r) + + override def minus(l: T, r: T): T = group.minus(l, r) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala ---------------------------------------------------------------------- diff --git a/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala b/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala deleted file mode 100644 index 9578e0a..0000000 --- a/external/serializer/src/main/scala/io/gearpump/streaming/serializer/ChillSerializer.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.serializer - -import scala.util.Try - -import com.twitter.chill.KryoInjection - -import io.gearpump.streaming.state.api.Serializer - -class ChillSerializer[T] extends Serializer[T] { - override def serialize(t: T): Array[Byte] = - KryoInjection(t) - - override def deserialize(bytes: Array[Byte]): Try[T] = - KryoInjection.invert(bytes).map(_.asInstanceOf[T]) -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/serializer/src/main/scala/org/apache/gearpump/streaming/serializer/ChillSerializer.scala ---------------------------------------------------------------------- diff --git a/external/serializer/src/main/scala/org/apache/gearpump/streaming/serializer/ChillSerializer.scala b/external/serializer/src/main/scala/org/apache/gearpump/streaming/serializer/ChillSerializer.scala new file mode 100644 index 0000000..29d26c0 --- /dev/null +++ b/external/serializer/src/main/scala/org/apache/gearpump/streaming/serializer/ChillSerializer.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.serializer + +import com.twitter.chill.KryoInjection +import org.apache.gearpump.streaming.state.api.Serializer + +import scala.util.Try + +class ChillSerializer[T] extends Serializer[T] { + override def serialize(t: T): Array[Byte] = + KryoInjection(t) + + override def deserialize(bytes: Array[Byte]): Try[T] = + KryoInjection.invert(bytes).map(_.asInstanceOf[T]) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/integrationtest/README.md ---------------------------------------------------------------------- diff --git a/integrationtest/README.md b/integrationtest/README.md index 53befc0..034632b 100644 --- a/integrationtest/README.md +++ b/integrationtest/README.md @@ -34,7 +34,7 @@ The integration test framework use docker to simulate a real cluster. The test s The test will launch a Gearpump cluster with 1 master and 2 worker nodes as 3 Docker containers. It might take 10-20 minutes to go through all the test cases. It depends on, how powerful your machine is. The Docker container itself does not have a Gearpump distribution. It will link your local build to the Docker container. When tests are finished, you can see the test result on the screen, or you can save them to a file with this command `sbt it:test > test_report.out`. To investigate Gearpump log, please check the directory `output/target/pack/logs`. -### How To test single integration test suite like `io.gearpump.integrationtest.checklist.CommandlineSpec`? +### How To test single integration test suite like `org.apache.gearpump.integrationtest.checklist.CommandlineSpec`? Unfortunately, I searched around, and didn't find a clean way to do this in sbt. Gearpump is using nested suite for integration test, which I think sbt don't support well with `sbt test-only <className>`. Please also see discussion at: @@ -42,8 +42,8 @@ integration test, which I think sbt don't support well with `sbt test-only <clas For a not that clean solution, here is the steps: -1. Locate class `io.gearpump.integrationtest.suites.StandaloneModeSuite` source file at - `gearpump/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites` +1. Locate class `org.apache.gearpump.integrationtest.suites.StandaloneModeSuite` source file at + `gearpump/integrationtest/core/src/it/scala/org.apache.gearpump/integrationtest/suites` 2. Document out suite you don't want to test like this: ```
