Repository: incubator-gearpump Updated Branches: refs/heads/master 32f150717 -> 04c3975d6
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala deleted file mode 100644 index 6e4a1c4..0000000 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib - -// TODO: Fix the UT failure! - -// class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { -// val minTimeGen = Gen.choose[Long](1L, 500L) -// val maxTimeGen = Gen.choose[Long](500L, 999L) -// -// property("KafkaStorage lookup time should report StorageEmpty if storage is empty") { -// forAll { (time: Long, topic: String) => -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val getConsumer = () => mock[KafkaConsumer] -// val connectZk = () => mock[ZkClient] -// val storage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(), -// connectZk()) -// storage.lookUp(time) shouldBe Failure(StorageEmpty) -// } -// } -// -// property("KafkaStorage lookup time should return data or report failure if storage not empty") { -// forAll(minTimeGen, maxTimeGen, Gen.alphaStr) {(minTime: Long, maxTime: Long, topic: String) => -// val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case (time, index) => -// val offset = index.toLong -// time -> offset -// } -// val timeAndOffsetsMap = timeAndOffsets.toMap -// val data = timeAndOffsets.map { -// case (time, offset) => -// new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, Array[Byte]](time)), -// Injection[Long, Array[Byte]](offset)) -// }.toList -// -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val consumer = mock[KafkaConsumer] -// val getConsumer = () => consumer -// val connectZk = () => mock[ZkClient] -// -// val hasNexts = List.fill(data.tail.size)(true) :+ false -// when(consumer.hasNext).thenReturn(true, hasNexts:_*) -// when(consumer.next).thenReturn(data.head, data.tail:_*) -// -// val storage = new KafkaStorage(topic, topicExists = true, producer, -// getConsumer(), connectZk()) -// forAll(Gen.choose[Long](minTime, maxTime)) { -// time => -// storage.lookUp(time) match { -// case Success(array) => -// array should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(time))) -// case Failure(e) => fail("time in range should return Success with value") -// } -// } -// -// forAll(Gen.choose[Long](0L, minTime - 1)) { -// time => -// storage.lookUp(time) match { -// case Failure(e) => e shouldBe a [Underflow] -// e.asInstanceOf[Underflow].min should equal -// (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime))) -// case Success(_) => fail("time less than min should return Underflow failure") -// } -// } -// -// forAll(Gen.choose[Long](maxTime + 1, 1000L)) { -// time => -// storage.lookUp(time) match { -// case Failure(e) => e shouldBe a [Overflow] -// e.asInstanceOf[Overflow].max should equal -// (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime))) -// case Success(_) => fail("time larger than max should return Overflow failure") -// } -// } -// } -// } -// -// property("KafkaStorage append should send data to Kafka") { -// forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000), -// Gen.alphaStr, Gen.oneOf(true, false)) { -// (time: Long, offset: Long, topic: String, topicExists: Boolean) => -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val getConsumer = () => mock[KafkaConsumer] -// val connectZk = () => mock[ZkClient] -// val storage = new KafkaStorage(topic, topicExists, producer, getConsumer(), connectZk()) -// val offsetBytes = Injection[Long, Array[Byte]](offset) -// storage.append(time, offsetBytes) -// verify(producer).send(anyObject[ProducerRecord[Array[Byte], Array[Byte]]]()) -// } -// } -// -// val topicAndPartitionGen = for { -// topic <- Gen.alphaStr -// partition <- Gen.choose[Int](0, 100) -// } yield TopicAndPartition(topic, partition) -// property("KafkaStorage should load data from Kafka") { -// val kafkaMsgGen = for { -// timestamp <- Gen.choose[Long](1L, 1000L) -// offset <- Gen.choose[Long](0L, 1000L) -// } yield (timestamp, Injection[Long, Array[Byte]](offset)) -// val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen) -// -// val topicExistsGen = Gen.oneOf(true, false) -// -// forAll(topicAndPartitionGen, msgListGen) { -// (topicAndPartition: TopicAndPartition, msgList: List[(Long, Array[Byte])]) => -// val producer= mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val consumer = mock[KafkaConsumer] -// val getConsumer = () => consumer -// val connectZk = () => mock[ZkClient] -// val kafkaStorage = new KafkaStorage(topicAndPartition.topic, -// topicExists = true, producer, getConsumer(), connectZk()) -// msgList match { -// case Nil => -// when(consumer.hasNext).thenReturn(false) -// case list => -// val hasNexts = List.fill(list.tail.size)(true) :+ false -// val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, bytes), index) => -// KafkaMessage(topicAndPartition, index.toLong, -// Some(Injection[Long, Array[Byte]](timestamp)), bytes) -// } -// when(consumer.hasNext).thenReturn(true, hasNexts: _*) -// when(consumer.next).thenReturn(kafkaMsgList.head, kafkaMsgList.tail: _*) -// } -// kafkaStorage.load(consumer) shouldBe msgList -// } -// } -// -// property("KafkaStorage should not get consumer when topic doesn't exist") { -// forAll(Gen.alphaStr) { (topic: String) => -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val getConsumer = mock[() => KafkaConsumer] -// val connectZk = () => mock[ZkClient] -// val kafkaStorage = new KafkaStorage(topic, -// topicExists = false, producer, getConsumer(), connectZk()) -// verify(getConsumer, never()).apply() -// kafkaStorage.close() -// } -// } -// -// property("KafkaStorage should fail to load invalid KafkaMessage") { -// val invalidKafkaMsgGen = for { -// tp <- topicAndPartitionGen -// offset <- Gen.choose[Long](1L, 1000L) -// timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, Array[Byte]](ByteBuffer.allocate(0))), -// None) -// msg <- Gen.alphaStr.map(Injection[String, Array[Byte]]) -// } yield KafkaMessage(tp, offset, timestamp, msg) -// forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) => -// val consumer = mock[KafkaConsumer] -// val getConsumer = () => consumer -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val connectZk = () => mock[ZkClient] -// val kafkaStorage = new KafkaStorage(invalidKafkaMsg.topicAndPartition.topic, -// topicExists = true, producer, getConsumer(), connectZk()) -// when(consumer.hasNext).thenReturn(true, false) -// when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg) -// Try(kafkaStorage.load(consumer)).isFailure shouldBe true -// } -// } -// -// property("KafkaStorage close should close kafka producer and delete topic") { -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val getConsumer = () => mock[KafkaConsumer] -// val zkClient = mock[ZkClient] -// val connectZk = () => zkClient -// val kafkaStorage = new KafkaStorage("topic", false, producer, getConsumer(), connectZk()) -// kafkaStorage.close() -// verify(producer).close() -// verify(zkClient).createPersistent(anyString(), anyString()) -// } -// } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala deleted file mode 100644 index 099be3d..0000000 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib - -import kafka.common.TopicAndPartition -import kafka.server.{KafkaConfig => KafkaServerConfig} -import kafka.utils.{TestUtils, TestZKUtils} -import org.scalatest.prop.PropertyChecks -import org.scalatest.{BeforeAndAfterEach, Matchers, PropSpec} - -import org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper -import org.apache.gearpump.streaming.kafka.util.KafkaServerHarness - -class KafkaUtilSpec - extends PropSpec with PropertyChecks with BeforeAndAfterEach - with Matchers with KafkaServerHarness { - - val numServers = 1 - override val configs: List[KafkaServerConfig] = - for (props <- TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false)) - yield new KafkaServerConfig(props) { - override val zkConnect = TestZKUtils.zookeeperConnect - override val numPartitions = 4 - } - - override def beforeEach(): Unit = { - super.setUp() - } - - override def afterEach(): Unit = { - super.tearDown() - } - - import org.apache.gearpump.streaming.kafka.lib.KafkaUtil._ - - property("KafkaUtil should be able to create topic") { - val args = { - Table( - ("topic", "partitions", "replicas"), - (TestUtils.tempTopic, 1, numServers), - (TestUtils.tempTopic, 2, numServers + 1), - ("", 1, numServers), - (TestUtils.tempTopic, 0, numServers), - (TestUtils.tempTopic, 1, 0) - ) - } - forAll(args) { - (topic: String, partitions: Int, replicas: Int) => - if (topic.nonEmpty && partitions > 0 && replicas > 0 && replicas <= numServers) { - createTopic(connectZk(), topic, partitions, replicas) shouldBe false - createTopic(connectZk(), topic, partitions, replicas) shouldBe true - } else { - intercept[RuntimeException] { - createTopic(connectZk(), topic, partitions, replicas) - } - } - } - } - - property("KafkaUtil should be able to get broker info") { - val brokerList = getBrokerList - val partitions = 2 - val replicas = numServers - val topic = TestUtils.tempTopic() - intercept[RuntimeException] { - getBroker(connectZk(), topic, partitions) - } - val partitionsToBrokers = createTopicUntilLeaderIsElected(topic, partitions, replicas) - 0.until(partitions).foreach { part => - val hostPorts = brokerList(partitionsToBrokers(part).get).split(":") - val broker = getBroker(connectZk(), topic, part) - - broker.host shouldBe hostPorts(0) - broker.port.toString shouldBe hostPorts(1) - } - } - - property("KafkaUtil should be able to get TopicAndPartitions info and group with KafkaGrouper") { - val grouper: KafkaGrouper = new KafkaGrouper { - override def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) - : Array[TopicAndPartition] = { - topicAndPartitions - } - } - val topicNum = 3 - val topics = List.fill(topicNum)(TestUtils.tempTopic()) - topics.foreach(t => createTopicUntilLeaderIsElected(t, partitions = 1, replicas = 1)) - KafkaUtil.getTopicAndPartitions(connectZk(), topics).toSet shouldBe - topics.map(t => TopicAndPartition(t, 0)).toSet - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala deleted file mode 100644 index cc83b51..0000000 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib.consumer - -import org.scalatest.{Matchers, WordSpec} - -class ExponentialBackoffSleeperSpec extends WordSpec with Matchers { - - "ExponentialBackOffSleeper" should { - "sleep for increasing duration" in { - val sleeper = new ExponentialBackoffSleeper( - backOffMultiplier = 2.0, - initialDurationMs = 100, - maximumDurationMs = 10000 - ) - sleeper.getSleepDuration shouldBe 100 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 200 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 400 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 800 - } - - "sleep for no more than maximum duration" in { - val sleeper = new ExponentialBackoffSleeper( - backOffMultiplier = 2.0, - initialDurationMs = 6400, - maximumDurationMs = 10000 - ) - sleeper.getSleepDuration shouldBe 6400 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 10000 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 10000 - } - - "sleep for initial duration after reset" in { - val sleeper = new ExponentialBackoffSleeper( - backOffMultiplier = 2.0, - initialDurationMs = 100, - maximumDurationMs = 10000 - ) - sleeper.getSleepDuration shouldBe 100 - sleeper.setNextSleepDuration() - sleeper.getSleepDuration shouldBe 200 - sleeper.reset() - sleeper.getSleepDuration shouldBe 100 - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala deleted file mode 100644 index b6659a1..0000000 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib.consumer - -import java.util.concurrent.LinkedBlockingQueue - -import kafka.common.TopicAndPartition -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val nonNegativeGen = Gen.choose[Int](0, 1000) - val positiveGen = Gen.choose[Int](1, 1000) - val startOffsetGen = Gen.choose[Long](0L, 1000L) - property("FetchThread should set startOffset to iterators") { - forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) { - (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) => - val topicAndPartition = mock[TopicAndPartition] - val consumer = mock[KafkaConsumer] - val createConsumer = (tp: TopicAndPartition) => consumer - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, - incomingQueue, fetchThreshold, fetchSleepMS) - fetchThread.setStartOffset(topicAndPartition, startOffset) - verify(consumer).setStartOffset(startOffset) - } - } - - val topicAndPartitionGen = for { - topic <- Gen.alphaStr - partition <- Gen.choose[Int](0, Int.MaxValue) - } yield TopicAndPartition(topic, partition) - property("FetchThread should only fetchMessage when the number " + - "of messages in queue is below the threshold") { - forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) { - (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int, - startOffset: Long, topicAndPartition: TopicAndPartition) => - val message = mock[KafkaMessage] - val consumer = mock[KafkaConsumer] - val createConsumer = (tp: TopicAndPartition) => consumer - when(consumer.hasNext).thenReturn(true) - when(consumer.next).thenReturn(message) - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, - incomingQueue, fetchThreshold, fetchSleepMS) - - 0.until(messageNum) foreach { _ => - fetchThread.fetchMessage - } - - incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold) - } - } - - property("FetchThread poll should try to retrieve and remove the head of incoming queue") { - val topicAndPartition = mock[TopicAndPartition] - val consumer = mock[KafkaConsumer] - val createConsumer = (tp: TopicAndPartition) => consumer - val kafkaMsg = mock[KafkaMessage] - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - incomingQueue.put(kafkaMsg) - val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, incomingQueue, 0, 0) - fetchThread.poll shouldBe Some(kafkaMsg) - fetchThread.poll shouldBe None - } - - val tpAndHasNextGen = for { - tp <- topicAndPartitionGen - hasNext <- Gen.oneOf(true, false) - } yield (tp, hasNext) - - val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen) - .map(_.toMap) suchThat (_.nonEmpty) - - property("FetchThread fetchMessage should return false when there are no more messages " + - "from any TopicAndPartition") { - forAll(tpHasNextMapGen, nonNegativeGen) { - (tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) => - val createConsumer = (tp: TopicAndPartition) => { - val consumer = mock[KafkaConsumer] - val kafkaMsg = mock[KafkaMessage] - val hasNext = tpHasNextMap(tp) - when(consumer.hasNext).thenReturn(hasNext) - when(consumer.next).thenReturn(kafkaMsg) - consumer - } - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - val fetchThread = new FetchThread(tpHasNextMap.keys.toArray, - createConsumer, incomingQueue, tpHasNextMap.size + 1, fetchSleepMS) - fetchThread.fetchMessage shouldBe tpHasNextMap.values.reduce(_ || _) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala deleted file mode 100644 index 3c6a663..0000000 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib.consumer - -import com.twitter.bijection.Injection -import kafka.api.OffsetRequest -import kafka.common.TopicAndPartition -import kafka.consumer.SimpleConsumer -import kafka.message.{Message, MessageAndOffset} -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, Array[Byte]](msg))) - val messageNumGen = Gen.choose[Int](0, 1000) - val topicAndPartitionGen = for { - topic <- Gen.alphaStr - partition <- Gen.choose[Int](0, Int.MaxValue) - } yield (topic, partition) - - property("KafkaConsumer should iterate MessageAndOffset calling hasNext and next") { - forAll(messageGen, messageNumGen, topicAndPartitionGen) { - (message: Message, num: Int, topicAndPartition: (String, Int)) => - val (topic, partition) = topicAndPartition - val consumer = mock[SimpleConsumer] - when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), - OffsetRequest.EarliestTime, -1)).thenReturn(0) - val iterator = 0.until(num).map(index => MessageAndOffset(message, index.toLong)).iterator - val getIterator = (offset: Long) => iterator - val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) - 0.until(num).foreach { i => - kafkaConsumer.hasNext shouldBe true - val kafkaMessage = kafkaConsumer.next - kafkaMessage.offset shouldBe i.toLong - kafkaMessage.key shouldBe None - } - kafkaConsumer.hasNext shouldBe false - } - } - - val startOffsetGen = Gen.choose[Long](1L, 1000L) - property("KafkaConsumer setStartOffset should reset internal iterator") { - forAll(topicAndPartitionGen, startOffsetGen) { - (topicAndPartition: (String, Int), startOffset: Long) => - val (topic, partition) = topicAndPartition - val consumer = mock[SimpleConsumer] - val getIterator = mock[Long => Iterator[MessageAndOffset]] - when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), - OffsetRequest.EarliestTime, -1)).thenReturn(0) - val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) - kafkaConsumer.setStartOffset(startOffset) - verify(getIterator).apply(startOffset) - } - } - - property("KafkaConsumer close should close SimpleConsumer") { - forAll(topicAndPartitionGen) { - (topicAndPartition: (String, Int)) => - val (topic, partition) = topicAndPartition - val consumer = mock[SimpleConsumer] - when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), - OffsetRequest.EarliestTime, -1)).thenReturn(0) - val getIterator = mock[Long => Iterator[MessageAndOffset]] - val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) - kafkaConsumer.close() - verify(consumer).close() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala deleted file mode 100644 index 32b8685..0000000 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib.grouper - -import kafka.common.TopicAndPartition -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class KafkaDefaultGrouperSpec extends PropSpec with PropertyChecks with Matchers { - property("KafkaDefaultGrouper should group TopicAndPartitions in a round-robin way") { - forAll(Gen.posNum[Int], Gen.posNum[Int], Gen.posNum[Int]) { - (topicNum: Int, partitionNum: Int, taskNum: Int) => { - val topicAndPartitions = for { - t <- 0.until(topicNum) - p <- 0.until(partitionNum) - } yield TopicAndPartition("topic" + t, p) - 0.until(taskNum).foreach { taskIndex => - val grouper = new KafkaDefaultGrouper - grouper.group(taskNum, taskIndex, topicAndPartitions.toArray).forall( - tp => topicAndPartitions.indexOf(tp) % taskNum == taskIndex) - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala new file mode 100644 index 0000000..843aab7 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source + +import com.twitter.bijection.Injection +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers { + property("DefaultMessageDecoder should keep the original bytes data in Message") { + val decoder = new DefaultMessageDecoder() + forAll(Gen.chooseNum[Int](0, 100), Gen.alphaStr) { (k: Int, v: String) => + val kbytes = Injection[Int, Array[Byte]](k) + val vbytes = Injection[String, Array[Byte]](v) + val timestamp = System.currentTimeMillis() + val message = decoder.fromBytes(kbytes, vbytes) + message.msg shouldBe vbytes + message.timestamp should be >= timestamp + } + } +} + +class StringMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers { + property("StringMessageDecoder should decode original bytes data into string") { + val decoder = new StringMessageDecoder() + forAll(Gen.alphaStr, Gen.alphaStr) { (k: String, v: String) => + val kbytes = Injection[String, Array[Byte]](k) + val vbytes = Injection[String, Array[Byte]](v) + val timestamp = System.currentTimeMillis() + val message = decoder.fromBytes(kbytes, vbytes) + message.msg shouldBe v + message.timestamp should be >= timestamp + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeperSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeperSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeperSpec.scala new file mode 100644 index 0000000..83c4584 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeperSpec.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source.consumer + +import org.scalatest.{Matchers, WordSpec} + +class ExponentialBackoffSleeperSpec extends WordSpec with Matchers { + + "ExponentialBackOffSleeper" should { + "sleep for increasing duration" in { + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 100, + maximumDurationMs = 10000 + ) + sleeper.getSleepDuration shouldBe 100 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 200 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 400 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 800 + } + + "sleep for no more than maximum duration" in { + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 6400, + maximumDurationMs = 10000 + ) + sleeper.getSleepDuration shouldBe 6400 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 10000 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 10000 + } + + "sleep for initial duration after reset" in { + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 100, + maximumDurationMs = 10000 + ) + sleeper.getSleepDuration shouldBe 100 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 200 + sleeper.reset() + sleeper.getSleepDuration shouldBe 100 + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThreadSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThreadSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThreadSpec.scala new file mode 100644 index 0000000..0569060 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThreadSpec.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source.consumer + +import java.util.concurrent.LinkedBlockingQueue + +import kafka.common.TopicAndPartition +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val nonNegativeGen = Gen.choose[Int](0, 1000) + val positiveGen = Gen.choose[Int](1, 1000) + val startOffsetGen = Gen.choose[Long](0L, 1000L) + property("FetchThread should set startOffset to iterators") { + forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) { + (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) => + val topicAndPartition = mock[TopicAndPartition] + val consumer = mock[KafkaConsumer] + val createConsumer = (tp: TopicAndPartition) => consumer + val sleeper = mock[ExponentialBackoffSleeper] + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val fetchThread = new FetchThread(createConsumer, + incomingQueue, sleeper, fetchThreshold, fetchSleepMS) + fetchThread.setTopicAndPartitions(Array(topicAndPartition)) + fetchThread.setStartOffset(topicAndPartition, startOffset) + verify(consumer).setStartOffset(startOffset) + } + } + + val topicAndPartitionGen = for { + topic <- Gen.alphaStr + partition <- Gen.choose[Int](0, Int.MaxValue) + } yield TopicAndPartition(topic, partition) + property("FetchThread should only fetchMessage when the number " + + "of messages in queue is below the threshold") { + forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) { + (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int, + startOffset: Long, topicAndPartition: TopicAndPartition) => + val message = mock[KafkaMessage] + val consumer = mock[KafkaConsumer] + val createConsumer = (tp: TopicAndPartition) => consumer + + 0.until(messageNum) foreach { _ => + when(consumer.hasNext).thenReturn(true) + when(consumer.next()).thenReturn(message) + } + + val sleeper = mock[ExponentialBackoffSleeper] + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val fetchThread = new FetchThread(createConsumer, + incomingQueue, sleeper, fetchThreshold, fetchSleepMS) + + fetchThread.setTopicAndPartitions(Array(topicAndPartition)) + + 0.until(messageNum) foreach { _ => + fetchThread.runLoop() + } + + verify(sleeper, times(messageNum)).reset() + incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold) + } + } + + property("FetchThread poll should try to retrieve and remove the head of incoming queue") { + val topicAndPartition = mock[TopicAndPartition] + val consumer = mock[KafkaConsumer] + val createConsumer = (tp: TopicAndPartition) => consumer + val kafkaMsg = mock[KafkaMessage] + val sleeper = mock[ExponentialBackoffSleeper] + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + incomingQueue.put(kafkaMsg) + val fetchThread = new FetchThread(createConsumer, incomingQueue, sleeper, 0, 0) + fetchThread.setTopicAndPartitions(Array(topicAndPartition)) + fetchThread.poll shouldBe Some(kafkaMsg) + fetchThread.poll shouldBe None + } + + val tpAndHasNextGen = for { + tp <- topicAndPartitionGen + hasNext <- Gen.oneOf(true, false) + } yield (tp, hasNext) + + val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen) + .map(_.toMap) suchThat (_.nonEmpty) + + property("FetchThread fetchMessage should return false when there are no more messages " + + "from any TopicAndPartition") { + forAll(tpHasNextMapGen, nonNegativeGen) { + (tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) => + val createConsumer = (tp: TopicAndPartition) => { + val consumer = mock[KafkaConsumer] + val kafkaMsg = mock[KafkaMessage] + val hasNext = tpHasNextMap(tp) + when(consumer.hasNext).thenReturn(hasNext) + when(consumer.next()).thenReturn(kafkaMsg) + consumer + } + val sleeper = mock[ExponentialBackoffSleeper] + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val fetchThread = new FetchThread(createConsumer, incomingQueue, sleeper, + tpHasNextMap.size + 1, fetchSleepMS) + fetchThread.setTopicAndPartitions(tpHasNextMap.keys.toArray) + fetchThread.runLoop() + val hasMoreMessages = tpHasNextMap.values.reduce(_ || _) + if (!hasMoreMessages) { + verify(sleeper).sleep(fetchSleepMS) + } + } + } + + property("FetchThread should reset consumers on exception") { + forAll(startOffsetGen) { (offset: Long) => + val topicAndPartition = mock[TopicAndPartition] + val consumer = mock[KafkaConsumer] + val createConsumer = (tp: TopicAndPartition) => consumer + val sleeper = mock[ExponentialBackoffSleeper] + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val fetchThread = new FetchThread(createConsumer, incomingQueue, sleeper, 1, 0) + fetchThread.setTopicAndPartitions(Array(topicAndPartition)) + + when(consumer.hasNext).thenReturn(true) + when(consumer.next()).thenThrow(new RuntimeException) + fetchThread.runLoop() + // sleep on exception + verify(sleeper).sleep() + + // reset on previous exception + when(consumer.getNextOffset).thenReturn(offset) + when(consumer.hasNext).thenReturn(false) + fetchThread.runLoop() + verify(consumer).close() + // consumer is reset + verify(consumer).setStartOffset(offset) + // reset sleeper on successful fetch + verify(sleeper).reset() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumerSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumerSpec.scala new file mode 100644 index 0000000..4f8a239 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumerSpec.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source.consumer + +import com.twitter.bijection.Injection +import kafka.api.OffsetRequest +import kafka.common.TopicAndPartition +import kafka.consumer.SimpleConsumer +import kafka.message.{Message, MessageAndOffset} +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, Array[Byte]](msg))) + val messageNumGen = Gen.choose[Int](0, 1000) + val topicAndPartitionGen = for { + topic <- Gen.alphaStr + partition <- Gen.choose[Int](0, Int.MaxValue) + } yield (topic, partition) + + property("KafkaConsumer should iterate MessageAndOffset calling hasNext and next") { + forAll(messageGen, messageNumGen, topicAndPartitionGen) { + (message: Message, num: Int, topicAndPartition: (String, Int)) => + val (topic, partition) = topicAndPartition + val consumer = mock[SimpleConsumer] + when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), + OffsetRequest.EarliestTime, -1)).thenReturn(0) + val iterator = 0.until(num).map(index => MessageAndOffset(message, index.toLong)).iterator + val getIterator = (offset: Long) => iterator + val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) + 0.until(num).foreach { i => + kafkaConsumer.hasNext shouldBe true + val kafkaMessage = kafkaConsumer.next + kafkaMessage.offset shouldBe i.toLong + kafkaMessage.key shouldBe None + } + kafkaConsumer.hasNext shouldBe false + } + } + + val startOffsetGen = Gen.choose[Long](1L, 1000L) + property("KafkaConsumer setStartOffset should reset internal iterator") { + forAll(topicAndPartitionGen, startOffsetGen) { + (topicAndPartition: (String, Int), startOffset: Long) => + val (topic, partition) = topicAndPartition + val consumer = mock[SimpleConsumer] + val getIterator = mock[Long => Iterator[MessageAndOffset]] + when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), + OffsetRequest.EarliestTime, -1)).thenReturn(0) + val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) + kafkaConsumer.setStartOffset(startOffset) + verify(getIterator).apply(startOffset) + } + } + + property("KafkaConsumer close should close SimpleConsumer") { + forAll(topicAndPartitionGen) { + (topicAndPartition: (String, Int)) => + val (topic, partition) = topicAndPartition + val consumer = mock[SimpleConsumer] + when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), + OffsetRequest.EarliestTime, -1)).thenReturn(0) + val getIterator = mock[Long => Iterator[MessageAndOffset]] + val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) + kafkaConsumer.close() + verify(consumer).close() + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouperSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouperSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouperSpec.scala new file mode 100644 index 0000000..bd80826 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouperSpec.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source.grouper + +import kafka.common.TopicAndPartition +import org.scalacheck.Gen +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class DefaultPartitionGrouperSpec extends PropSpec with PropertyChecks with Matchers { + property("KafkaDefaultGrouper should group TopicAndPartitions in a round-robin way") { + forAll(Gen.posNum[Int], Gen.posNum[Int], Gen.posNum[Int]) { + (topicNum: Int, partitionNum: Int, taskNum: Int) => { + val topicAndPartitions = for { + t <- 0.until(topicNum) + p <- 0.until(partitionNum) + } yield TopicAndPartition("topic" + t, p) + 0.until(taskNum).foreach { taskIndex => + val grouper = new DefaultPartitionGrouper + grouper.group(taskNum, taskIndex, topicAndPartitions.toArray).forall( + tp => topicAndPartitions.indexOf(tp) % taskNum == taskIndex) + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala new file mode 100644 index 0000000..b2db243 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.kafka.lib.util + +import java.util.Properties + +import kafka.common.TopicAndPartition +import kafka.server.{KafkaConfig => KafkaServerConfig} +import kafka.utils.{TestZKUtils, TestUtils} +import org.apache.gearpump.streaming.kafka.lib.source.consumer.KafkaConsumer +import org.apache.gearpump.streaming.kafka.util.{KafkaConfig, KafkaServerHarness} +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, BeforeAndAfterEach, PropSpec} + +class KafkaClientSpec extends PropSpec with PropertyChecks + with BeforeAndAfterEach with Matchers with KafkaServerHarness { + + val numServers = 1 + override val configs: List[KafkaServerConfig] = + for (props <- TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false)) + yield new KafkaServerConfig(props) { + override val zkConnect = TestZKUtils.zookeeperConnect + override val numPartitions = 4 + } + + override def beforeEach(): Unit = { + super.setUp() + } + + override def afterEach(): Unit = { + super.tearDown() + } + + + property("KafkaClient should be able to create topic") { + val args = { + Table( + ("topic", "partitions", "replicas"), + (TestUtils.tempTopic(), 1, numServers), + (TestUtils.tempTopic(), 2, numServers + 1), + ("", 1, numServers), + (TestUtils.tempTopic(), 0, numServers), + (TestUtils.tempTopic(), 1, 0) + ) + } + forAll(args) { + (topic: String, partitions: Int, replicas: Int) => + val props = new Properties + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList) + val kafkaClient = new KafkaClient(new KafkaConfig(props), getZkClient) + if (topic.nonEmpty && partitions > 0 && replicas > 0 && replicas <= numServers) { + + kafkaClient.createTopic(topic, partitions, replicas) shouldBe false + kafkaClient.createTopic(topic, partitions, replicas) shouldBe true + } else { + intercept[RuntimeException] { + kafkaClient.createTopic(topic, partitions, replicas) + } + } + } + } + + property("KafkaClient should be able to get broker info") { + val brokerList = getBrokerList.split(",", -1) + val partitions = 2 + val replicas = numServers + val topic = TestUtils.tempTopic() + val props = new Properties + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList) + val kafkaClient = new KafkaClient(new KafkaConfig(props), getZkClient) + intercept[RuntimeException] { + kafkaClient.getBroker(topic, partitions) + } + val partitionsToBrokers = createTopicUntilLeaderIsElected(topic, partitions, replicas) + 0.until(partitions).foreach { part => + val hostPorts = brokerList(partitionsToBrokers(part).get).split(":") + val broker = kafkaClient.getBroker(topic, part) + + broker.host shouldBe hostPorts(0) + broker.port.toString shouldBe hostPorts(1) + } + } + + property("KafkaClient should be able to get TopicAndPartitions info") { + val props = new Properties + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList) + val kafkaClient = new KafkaClient(new KafkaConfig(props), getZkClient) + val topicNum = 3 + val topics = List.fill(topicNum)(TestUtils.tempTopic()) + topics.foreach(t => createTopicUntilLeaderIsElected(t, partitions = 1, replicas = 1)) + val actual = kafkaClient.getTopicAndPartitions(topics).toSet + val expected = topics.map(t => TopicAndPartition(t, 0)).toSet + actual shouldBe expected + } + + property("KafkaClient should be able to create consumer") { + val props = new Properties + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList) + val kafkaClient = new KafkaClient(new KafkaConfig(props), getZkClient) + val partitions = 2 + val topic = TestUtils.tempTopic() + createTopicUntilLeaderIsElected(topic, partitions, replicas = 1) + 0.until(partitions).foreach { par => + kafkaClient.createConsumer(topic, par, + startOffsetTime = -2) shouldBe a [KafkaConsumer] + } + + } + + property("KafkaClient should be able to create producer") { + val props = new Properties + props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect) + props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList) + val kafkaClient = new KafkaClient(new KafkaConfig(props), getZkClient) + kafkaClient.createProducer(new ByteArraySerializer, + new ByteArraySerializer) shouldBe a [KafkaProducer[_, _]] + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala index 99231f5..b6124e5 100644 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala @@ -31,10 +31,10 @@ trait KafkaServerHarness extends ZookeeperHarness { private var brokerList: String = null def getServers: List[KafkaServer] = servers - def getBrokerList: Array[String] = brokerList.split(",") + def getBrokerList: String = brokerList override def setUp() { - super.setUp + super.setUp() if (configs.size <= 0) { throw new KafkaException("Must supply at least one server config.") } @@ -43,15 +43,14 @@ trait KafkaServerHarness extends ZookeeperHarness { } override def tearDown() { - servers.map(server => server.shutdown()) - servers.map(server => server.config.logDirs.map(Utils.rm(_))) - super.tearDown + servers.foreach(_.shutdown()) + servers.foreach(_.config.logDirs.foreach(Utils.rm)) + super.tearDown() } - def createTopicUntilLeaderIsElected( - topic: String, partitions: Int, replicas: Int, timeout: Long = 10000) - : Map[Int, Option[Int]] = { - val zkClient = connectZk() + def createTopicUntilLeaderIsElected(topic: String, partitions: Int, + replicas: Int, timeout: Long = 10000): Map[Int, Option[Int]] = { + val zkClient = getZkClient try { // Creates topic AdminUtils.createTopic(zkClient, topic, partitions, replicas, new Properties) @@ -62,8 +61,6 @@ trait KafkaServerHarness extends ZookeeperHarness { }.toMap } catch { case e: Exception => throw e - } finally { - zkClient.close() } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala index 8fb23c2..9121ea4 100644 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala @@ -27,17 +27,19 @@ trait ZookeeperHarness { val zkConnectionTimeout = 60000 val zkSessionTimeout = 60000 private var zookeeper: EmbeddedZookeeper = null + private var zkClient: ZkClient = null def getZookeeper: EmbeddedZookeeper = zookeeper - def connectZk: () => ZkClient = () => { - new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) - } + + def getZkClient: ZkClient = zkClient def setUp() { zookeeper = new EmbeddedZookeeper(zkConnect) + zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer) } def tearDown() { + zkClient.close() Utils.swallow(zookeeper.shutdown()) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala index e40d8cd..f04dc9b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala @@ -47,7 +47,8 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory]( PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get - val checkpointStore = checkpointStoreFactory.getCheckpointStore(conf, taskContext) + val checkpointStore = checkpointStoreFactory.getCheckpointStore( + s"app$appId-task${taskId.processorId}_${taskId.index}") val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore) // System time interval to attempt checkpoint http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala index 753050e..2591856 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala @@ -44,7 +44,7 @@ class InMemoryCheckpointStore extends CheckpointStore { } class InMemoryCheckpointStoreFactory extends CheckpointStoreFactory { - override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore = { + override def getCheckpointStore(name: String): CheckpointStore = { new InMemoryCheckpointStore } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala index 2ef8610..4650ac2 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala @@ -36,7 +36,14 @@ trait CheckpointStore { def close(): Unit } +/** + * Creates CheckpointStore instance at runtime + */ trait CheckpointStoreFactory extends java.io.Serializable { - def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): CheckpointStore + /** + * @param name a unique name which maps to a unique path in checkpoint store + * @return a CheckpointStore instance + */ + def getCheckpointStore(name: String): CheckpointStore } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala index 0615078..3ea33a5 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala @@ -21,9 +21,14 @@ package org.apache.gearpump.streaming.transaction.api import org.apache.gearpump.Message /** - * MessageDecoder decodes raw bytes to Message It is usually written by end user and - * passed into TimeReplayableSource + * Decodes raw bytes to Message. + * It is usually written by end user and passed into TimeReplayableSource */ trait MessageDecoder extends java.io.Serializable { - def fromBytes(bytes: Array[Byte]): Message + /** + * @param key key of a kafka message, can be NULL + * @param value value of a kafka message + * @return a gearpump Message + */ + def fromBytes(key: Array[Byte], value: Array[Byte]): Message } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala deleted file mode 100644 index 616894a..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.transaction.api - -import org.apache.gearpump.{Message, TimeStamp} - -import scala.util.Try - -/** - * Filters offsets and store the mapping from timestamp to offset - */ -trait MessageFilter { - def filter(messageAndOffset: (Message, Long)): Option[Message] -} - -/** - * Resolves timestamp to offset by look up the underlying storage - */ -trait OffsetTimeStampResolver { - def resolveOffset(time: TimeStamp): Try[Long] -} - -/** - * Manages message's offset on TimeReplayableSource and timestamp - */ -trait OffsetManager extends MessageFilter with OffsetTimeStampResolver { - def close(): Unit -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala deleted file mode 100644 index 40fc088..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.transaction.api - -import org.apache.gearpump.TimeStamp - -import scala.util.Try - -object OffsetStorage { - - /** - * StorageEmpty means no data has been stored - */ - case object StorageEmpty extends Throwable - - /** - * Overflow means the looked up time is - * larger than the maximum stored TimeStamp - */ - case class Overflow(maxTimestamp: Array[Byte]) extends Throwable - - /** - * Underflow means the looked up time is - * smaller than the minimum stored TimeStamp - */ - case class Underflow(minTimestamp: Array[Byte]) extends Throwable -} - -/** - * OffsetStorage stores the mapping from TimeStamp to Offset - */ -trait OffsetStorage { - /** - * Tries to look up the time in the OffsetStorage return the corresponding Offset if the time is - * in the range of stored TimeStamps or one of the failure info (StorageEmpty, Overflow, - * Underflow) - * - * @param time the time to look for - * @return the corresponding offset if the time is in the range, otherwise failure - */ - def lookUp(time: TimeStamp): Try[Array[Byte]] - - def append(time: TimeStamp, offset: Array[Byte]): Unit - - def close(): Unit -} - -trait OffsetStorageFactory extends java.io.Serializable { - def getOffsetStorage(dir: String): OffsetStorage -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala index 16f98d5..c2d47e3 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala @@ -26,5 +26,11 @@ import org.apache.gearpump.streaming.source.DataSource * Subclass should be able to replay messages on recovery from the time * when an application crashed. */ -trait TimeReplayableSource extends DataSource +trait TimeReplayableSource extends DataSource { + /** + * Sets store factory which creates a checkpoint store + * at runtime to checkpoint (timestamp, source_offsets) + */ + def setCheckpointStore(factory: CheckpointStoreFactory) +}
