fix #1972, backoff retry kafka consuming on exception
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/9b1085cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/9b1085cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/9b1085cf Branch: refs/heads/master Commit: 9b1085cfe04c7d161372b60c499a00e89aaede51 Parents: 1db8606 Author: manuzhang <[email protected]> Authored: Fri Feb 19 12:14:36 2016 +0800 Committer: manuzhang <[email protected]> Committed: Fri Feb 19 21:20:42 2016 +0800 ---------------------------------------------------------------------- .../consumer/ExponentialBackoffSleeper.scala | 58 ++++++++++ .../kafka/lib/consumer/FetchThread.scala | 56 ++++++++-- .../kafka/lib/consumer/KafkaConsumer.scala | 2 + .../streaming/kafka/lib/FetchThreadSpec.scala | 108 ------------------- .../streaming/kafka/lib/KafkaConsumerSpec.scala | 89 --------------- .../ExponentialBackoffSleeperSpec.scala | 72 +++++++++++++ .../kafka/lib/consumer/FetchThreadSpec.scala | 108 +++++++++++++++++++ .../kafka/lib/consumer/KafkaConsumerSpec.scala | 88 +++++++++++++++ 8 files changed, 373 insertions(+), 208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala new file mode 100644 index 0000000..b821f15 --- /dev/null +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala @@ -0,0 +1,58 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package io.gearpump.streaming.kafka.lib.consumer + +/** + * someone sleeps for exponentially increasing duration each time + * until the cap + * + * @param backOffMultiplier The factor by which the duration increases. + * @param initialDurationMs Time in milliseconds for initial sleep. + * @param maximumDurationMs Cap up to which we will increase the duration. + */ +private[consumer] class ExponentialBackoffSleeper( + backOffMultiplier: Double = 2.0, + initialDurationMs: Long = 100, + maximumDurationMs: Long = 10000) { + + require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1") + require(initialDurationMs > 0, "initialDurationMs must be positive") + require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be >= initialDurationMs") + + private var sleepDuration = initialDurationMs + + def reset(): Unit = { + sleepDuration = initialDurationMs + } + + def sleep(): Unit = { + Thread.sleep(sleepDuration) + setNextSleepDuration() + } + + def getSleepDuration: Long = sleepDuration + + def setNextSleepDuration(): Unit = { + val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long] + sleepDuration = math.min(math.max(initialDurationMs, next), maximumDurationMs) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala index 932939c..ee53151 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala @@ -34,29 +34,32 @@ object FetchThread { fetchSleepMS: Long, startOffsetTime: Long, consumerConfig: ConsumerConfig): FetchThread = { - val consumers: Map[TopicAndPartition, KafkaConsumer] = topicAndPartitions.map { - tp => - tp -> KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig) - }.toMap + val createConsumer = (tp: TopicAndPartition) => + KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig) + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - new FetchThread(consumers, incomingQueue, fetchThreshold, fetchSleepMS) + new FetchThread(topicAndPartitions, createConsumer, incomingQueue, fetchThreshold, fetchSleepMS) } } /** * A thread to fetch messages from multiple kafka [[TopicAndPartition]]s and puts them * onto a queue, which is asynchronously polled by a consumer - * @param consumers [[KafkaConsumer]]s by kafka [[TopicAndPartition]]s + * + * @param createConsumer given a [[TopicAndPartition]], create a [[KafkaConsumer]] to connect to it * @param incomingQueue a queue to buffer incoming messages * @param fetchThreshold above which thread should stop fetching messages * @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold */ -private[kafka] class FetchThread(consumers: Map[TopicAndPartition, KafkaConsumer], +private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition], + createConsumer: TopicAndPartition => KafkaConsumer, incomingQueue: LinkedBlockingQueue[KafkaMessage], fetchThreshold: Int, fetchSleepMS: Long) extends Thread { import FetchThread._ + private var consumers: Map[TopicAndPartition, KafkaConsumer] = createAllConsumers + def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = { consumers(tp).setStartOffset(startOffset) } @@ -67,10 +70,29 @@ private[kafka] class FetchThread(consumers: Map[TopicAndPartition, KafkaConsumer override def run(): Unit = { try { - while (!Thread.currentThread.isInterrupted) { - val hasMoreMessages = fetchMessage - if (!hasMoreMessages || incomingQueue.size >= fetchThreshold) { - Thread.sleep(fetchSleepMS) + var nextOffsets = Map.empty[TopicAndPartition, Long] + var reset = false + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 100L, + maximumDurationMs = 10000L) + while (!Thread.currentThread().isInterrupted) { + try { + if (reset) { + nextOffsets = consumers.mapValues(_.getNextOffset) + resetConsumers(nextOffsets) + reset = false + } + val hasMoreMessages = fetchMessage + sleeper.reset() + if (!hasMoreMessages || incomingQueue.size >= fetchThreshold) { + Thread.sleep(fetchSleepMS) + } + } catch { + case exception: Exception => + LOG.warn(s"resetting consumers due to $exception") + reset = true + sleeper.sleep() } } } catch { @@ -99,4 +121,16 @@ private[kafka] class FetchThread(consumers: Map[TopicAndPartition, KafkaConsumer } } } + + private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = { + topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap + } + + private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit = { + consumers.values.foreach(_.close()) + consumers = createAllConsumers + consumers.foreach { case (tp, consumer) => + consumer.setStartOffset(nextOffsets(tp)) + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala index 04b04ef..208a99d 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala @@ -92,6 +92,8 @@ class KafkaConsumer(consumer: SimpleConsumer, hasNextHelper(iterator, newIterator = false) } + def getNextOffset: Long = nextOffset + def close(): Unit = { consumer.close() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala deleted file mode 100644 index af59296..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/FetchThreadSpec.scala +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib - -import java.util.concurrent.LinkedBlockingQueue - -import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage, KafkaConsumer} -import kafka.common.TopicAndPartition -import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val nonNegativeGen = Gen.choose[Int](0, 1000) - val positiveGen = Gen.choose[Int](1, 1000) - val startOffsetGen = Gen.choose[Long](0L, 1000L) - property("FetchThread should set startOffset to iterators") { - forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) { - (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) => - val topicAndPartition = mock[TopicAndPartition] - val consumer = mock[KafkaConsumer] - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - val fetchThread = new FetchThread(Map(topicAndPartition -> consumer), - incomingQueue, fetchThreshold, fetchSleepMS) - fetchThread.setStartOffset(topicAndPartition, startOffset) - verify(consumer).setStartOffset(startOffset) - } - } - - val topicAndPartitionGen = for { - topic <- Gen.alphaStr - partition <- Gen.choose[Int](0, Int.MaxValue) - } yield TopicAndPartition(topic, partition) - property("FetchThread should only fetchMessage when the number of messages in queue is below the threshold") { - forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) { - (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int, - startOffset: Long, topicAndPartition: TopicAndPartition) => - val message = mock[KafkaMessage] - val consumer = mock[KafkaConsumer] - when(consumer.hasNext).thenReturn(true) - when(consumer.next).thenReturn(message) - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - val fetchThread = new FetchThread( - Map(topicAndPartition -> consumer), - incomingQueue, fetchThreshold, fetchSleepMS) - - 0.until(messageNum) foreach { _ => - fetchThread.fetchMessage - } - - incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold) - } - } - - property("FetchThread poll should try to retrieve and remove the head of incoming queue") { - val topicAndPartition = mock[TopicAndPartition] - val consumer = mock[KafkaConsumer] - val kafkaMsg = mock[KafkaMessage] - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - incomingQueue.put(kafkaMsg) - val fetchThread = new FetchThread(Map(topicAndPartition -> consumer), incomingQueue, 0, 0) - fetchThread.poll shouldBe Some(kafkaMsg) - fetchThread.poll shouldBe None - } - - val tpAndHasNextGen = for { - tp <- topicAndPartitionGen - hasNext <- Gen.oneOf(true, false) - } yield (tp, hasNext) - val tpAndHasNextListGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen) suchThat (_.size > 0) - property("FetchThread fetchMessage should return false when there are no more messages from any TopicAndPartition") { - forAll(tpAndHasNextListGen, nonNegativeGen) { - (tps: List[(TopicAndPartition, Boolean)], fetchSleepMS: Int) => - val tpAndIterators = tps.map { case (tp, hasNext) => - val consumer = mock[KafkaConsumer] - val kafkaMsg = mock[KafkaMessage] - when(consumer.hasNext).thenReturn(hasNext) - when(consumer.next).thenReturn(kafkaMsg) - tp -> consumer - }.toMap - - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - val fetchThread = new FetchThread( - tpAndIterators, incomingQueue, tpAndIterators.size + 1, fetchSleepMS) - fetchThread.fetchMessage shouldBe tps.map(_._2).reduce(_ || _) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala deleted file mode 100644 index aae545c..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaConsumerSpec.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib - -import com.twitter.bijection.Injection -import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer -import kafka.api.OffsetRequest -import kafka.common.TopicAndPartition -import kafka.consumer.SimpleConsumer -import kafka.message.{Message, MessageAndOffset} -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, Array[Byte]](msg))) - val messageNumGen = Gen.choose[Int](0, 1000) - val topicAndPartitionGen = for { - topic <- Gen.alphaStr - partition <- Gen.choose[Int](0, Int.MaxValue) - } yield (topic, partition) - - property("KafkaConsumer should iterate MessageAndOffset calling hasNext and next") { - forAll(messageGen, messageNumGen, topicAndPartitionGen) { - (message: Message, num: Int, topicAndPartition: (String, Int)) => - val (topic, partition) = topicAndPartition - val consumer = mock[SimpleConsumer] - when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), - OffsetRequest.EarliestTime, -1)).thenReturn(0) - val iterator = 0.until(num).map(index => MessageAndOffset(message, index.toLong)).iterator - val getIterator = (offset: Long) => iterator - val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) - 0.until(num).foreach { i => - kafkaConsumer.hasNext shouldBe true - val kafkaMessage = kafkaConsumer.next - kafkaMessage.offset shouldBe i.toLong - kafkaMessage.key shouldBe None - } - kafkaConsumer.hasNext shouldBe false - } - } - - val startOffsetGen = Gen.choose[Long](1L, 1000L) - property("KafkaConsumer setStartOffset should reset internal iterator") { - forAll(topicAndPartitionGen, startOffsetGen) { - (topicAndPartition: (String, Int), startOffset: Long) => - val (topic, partition) = topicAndPartition - val consumer = mock[SimpleConsumer] - val getIterator = mock[Long => Iterator[MessageAndOffset]] - when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), - OffsetRequest.EarliestTime, -1)).thenReturn(0) - val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) - kafkaConsumer.setStartOffset(startOffset) - verify(getIterator).apply(startOffset) - } - } - - property("KafkaConsumer close should close SimpleConsumer") { - forAll(topicAndPartitionGen) { - (topicAndPartition: (String, Int)) => - val (topic, partition) = topicAndPartition - val consumer = mock[SimpleConsumer] - when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), - OffsetRequest.EarliestTime, -1)).thenReturn(0) - val getIterator = mock[Long => Iterator[MessageAndOffset]] - val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) - kafkaConsumer.close() - verify(consumer).close() - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala new file mode 100644 index 0000000..a20e575 --- /dev/null +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala @@ -0,0 +1,72 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package io.gearpump.streaming.kafka.lib.consumer + +import org.scalatest.{WordSpec, Matchers} + +class ExponentialBackoffSleeperSpec extends WordSpec with Matchers { + + "ExponentialBackOffSleeper" should { + "sleep for increasing duration" in { + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 100, + maximumDurationMs = 10000 + ) + sleeper.getSleepDuration shouldBe 100 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 200 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 400 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 800 + } + + "sleep for no more than maximum duration" in { + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 6400, + maximumDurationMs = 10000 + ) + sleeper.getSleepDuration shouldBe 6400 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 10000 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 10000 + } + + "sleep for initial duration after reset" in { + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 100, + maximumDurationMs = 10000 + ) + sleeper.getSleepDuration shouldBe 100 + sleeper.setNextSleepDuration() + sleeper.getSleepDuration shouldBe 200 + sleeper.reset() + sleeper.getSleepDuration shouldBe 100 + } + } +} + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala new file mode 100644 index 0000000..92d3c31 --- /dev/null +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.streaming.kafka.lib.consumer + +import java.util.concurrent.LinkedBlockingQueue + +import kafka.common.TopicAndPartition +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val nonNegativeGen = Gen.choose[Int](0, 1000) + val positiveGen = Gen.choose[Int](1, 1000) + val startOffsetGen = Gen.choose[Long](0L, 1000L) + property("FetchThread should set startOffset to iterators") { + forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) { + (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) => + val topicAndPartition = mock[TopicAndPartition] + val consumer = mock[KafkaConsumer] + val createConsumer = (tp: TopicAndPartition) => consumer + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, + incomingQueue, fetchThreshold, fetchSleepMS) + fetchThread.setStartOffset(topicAndPartition, startOffset) + verify(consumer).setStartOffset(startOffset) + } + } + + val topicAndPartitionGen = for { + topic <- Gen.alphaStr + partition <- Gen.choose[Int](0, Int.MaxValue) + } yield TopicAndPartition(topic, partition) + property("FetchThread should only fetchMessage when the number of messages in queue is below the threshold") { + forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) { + (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int, + startOffset: Long, topicAndPartition: TopicAndPartition) => + val message = mock[KafkaMessage] + val consumer = mock[KafkaConsumer] + val createConsumer = (tp: TopicAndPartition) => consumer + when(consumer.hasNext).thenReturn(true) + when(consumer.next).thenReturn(message) + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, + incomingQueue, fetchThreshold, fetchSleepMS) + + 0.until(messageNum) foreach { _ => + fetchThread.fetchMessage + } + + incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold) + } + } + + property("FetchThread poll should try to retrieve and remove the head of incoming queue") { + val topicAndPartition = mock[TopicAndPartition] + val consumer = mock[KafkaConsumer] + val createConsumer = (tp: TopicAndPartition) => consumer + val kafkaMsg = mock[KafkaMessage] + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + incomingQueue.put(kafkaMsg) + val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, incomingQueue, 0, 0) + fetchThread.poll shouldBe Some(kafkaMsg) + fetchThread.poll shouldBe None + } + + val tpAndHasNextGen = for { + tp <- topicAndPartitionGen + hasNext <- Gen.oneOf(true, false) + } yield (tp, hasNext) + val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen).map(_.toMap) suchThat (_.nonEmpty) + property("FetchThread fetchMessage should return false when there are no more messages from any TopicAndPartition") { + forAll(tpHasNextMapGen, nonNegativeGen) { + (tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) => + val createConsumer = (tp: TopicAndPartition) => { + val consumer = mock[KafkaConsumer] + val kafkaMsg = mock[KafkaMessage] + val hasNext = tpHasNextMap(tp) + when(consumer.hasNext).thenReturn(hasNext) + when(consumer.next).thenReturn(kafkaMsg) + consumer + } + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val fetchThread = new FetchThread(tpHasNextMap.keys.toArray, + createConsumer, incomingQueue, tpHasNextMap.size + 1, fetchSleepMS) + fetchThread.fetchMessage shouldBe tpHasNextMap.values.reduce(_ || _) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/9b1085cf/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala new file mode 100644 index 0000000..2669f60 --- /dev/null +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.gearpump.streaming.kafka.lib.consumer + +import com.twitter.bijection.Injection +import kafka.api.OffsetRequest +import kafka.common.TopicAndPartition +import kafka.consumer.SimpleConsumer +import kafka.message.{Message, MessageAndOffset} +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, Array[Byte]](msg))) + val messageNumGen = Gen.choose[Int](0, 1000) + val topicAndPartitionGen = for { + topic <- Gen.alphaStr + partition <- Gen.choose[Int](0, Int.MaxValue) + } yield (topic, partition) + + property("KafkaConsumer should iterate MessageAndOffset calling hasNext and next") { + forAll(messageGen, messageNumGen, topicAndPartitionGen) { + (message: Message, num: Int, topicAndPartition: (String, Int)) => + val (topic, partition) = topicAndPartition + val consumer = mock[SimpleConsumer] + when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), + OffsetRequest.EarliestTime, -1)).thenReturn(0) + val iterator = 0.until(num).map(index => MessageAndOffset(message, index.toLong)).iterator + val getIterator = (offset: Long) => iterator + val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) + 0.until(num).foreach { i => + kafkaConsumer.hasNext shouldBe true + val kafkaMessage = kafkaConsumer.next + kafkaMessage.offset shouldBe i.toLong + kafkaMessage.key shouldBe None + } + kafkaConsumer.hasNext shouldBe false + } + } + + val startOffsetGen = Gen.choose[Long](1L, 1000L) + property("KafkaConsumer setStartOffset should reset internal iterator") { + forAll(topicAndPartitionGen, startOffsetGen) { + (topicAndPartition: (String, Int), startOffset: Long) => + val (topic, partition) = topicAndPartition + val consumer = mock[SimpleConsumer] + val getIterator = mock[Long => Iterator[MessageAndOffset]] + when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), + OffsetRequest.EarliestTime, -1)).thenReturn(0) + val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) + kafkaConsumer.setStartOffset(startOffset) + verify(getIterator).apply(startOffset) + } + } + + property("KafkaConsumer close should close SimpleConsumer") { + forAll(topicAndPartitionGen) { + (topicAndPartition: (String, Int)) => + val (topic, partition) = topicAndPartition + val consumer = mock[SimpleConsumer] + when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, partition), + OffsetRequest.EarliestTime, -1)).thenReturn(0) + val getIterator = mock[Long => Iterator[MessageAndOffset]] + val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, getIterator) + kafkaConsumer.close() + verify(consumer).close() + } + } +}
