http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala index 968834e..3dede8e 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaSource.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,21 +19,20 @@ package io.gearpump.streaming.kafka import java.util.Properties +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success} + +import kafka.common.TopicAndPartition +import org.slf4j.Logger -import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaSourceConfig, KafkaUtil, KafkaOffsetManager} import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage} +import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaOffsetManager, KafkaSourceConfig, KafkaUtil} import io.gearpump.streaming.source.DefaultTimeStampFilter import io.gearpump.streaming.task.TaskContext +import io.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty import io.gearpump.streaming.transaction.api._ -import kafka.common.TopicAndPartition -import OffsetStorage.StorageEmpty import io.gearpump.util.LogUtil import io.gearpump.{Message, TimeStamp} -import org.slf4j.Logger - -import scala.collection.mutable.ArrayBuffer -import scala.util.{Failure, Success} - object KafkaSource { private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource]) @@ -44,15 +43,19 @@ object KafkaSource { * from multiple Kafka TopicAndPartition in a round-robin way. * * This is a TimeReplayableSource which is able to replay messages given a start time. - * Each kafka message is tagged with a timestamp by [[io.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, timestamp) mapping - * is stored to a [[OffsetStorage]]. On recovery, we could retrieve the previously stored offset - * from the [[OffsetStorage]] by timestamp and start to read from there. + * Each kafka message is tagged with a timestamp by + * [[io.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, timestamp) mapping + * is stored to a [[io.gearpump.streaming.transaction.api.OffsetStorage]]. On recovery, + * we could retrieve the previously stored offset from the + * [[io.gearpump.streaming.transaction.api.OffsetStorage]] by timestamp and start to read from + * there. * - * kafka message is wrapped into gearpump [[Message]] and further filtered by a [[TimeStampFilter]] + * kafka message is wrapped into gearpump [[io.gearpump.Message]] and further filtered by a + * [[io.gearpump.streaming.transaction.api.TimeStampFilter]] * such that obsolete messages are dropped. * * @param config kafka source config - * @param messageDecoder decodes [[Message]] from raw bytes + * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes * @param timestampFilter filters out message based on timestamp * @param fetchThread fetches messages and puts on a in-memory queue * @param offsetManagers manages offset-to-timestamp storage for each kafka.common.TopicAndPartition @@ -63,59 +66,66 @@ class KafkaSource( messageDecoder: MessageDecoder = new DefaultMessageDecoder, timestampFilter: TimeStampFilter = new DefaultTimeStampFilter, private var fetchThread: Option[FetchThread] = None, - private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = Map.empty[TopicAndPartition, KafkaOffsetManager]) - extends TimeReplayableSource { - import KafkaSource._ + private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = { + Map.empty[TopicAndPartition, KafkaOffsetManager] + }) extends TimeReplayableSource { + import io.gearpump.streaming.kafka.KafkaSource._ private var startTime: Option[TimeStamp] = None - /** + * Constructs a Kafka Source by... + * * @param topics comma-separated string of topics * @param properties kafka consumer config * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]] - * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] + * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] * */ def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory) = { this(KafkaSourceConfig(properties).withConsumerTopics(topics), offsetStorageFactory) } /** + * Constructs a Kafka Source by... + * * @param topics comma-separated string of topics * @param properties kafka consumer config * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]] - * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] - * @param messageDecoder decodes [[Message]] from raw bytes + * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] + * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes * @param timestampFilter filters out message based on timestamp */ def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder, - timestampFilter: TimeStampFilter) = { + messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter) = { this(KafkaSourceConfig(properties) .withConsumerTopics(topics), offsetStorageFactory, messageDecoder, timestampFilter) } /** + * Constructs a Kafka Source by... + * * @param topics comma-separated string of topics * @param zkConnect kafka consumer config `zookeeper.connect` * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]] - * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] + * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] */ def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory) = this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory) /** + * Constructs a Kafka Source by... + * * @param topics comma-separated string of topics * @param zkConnect kafka consumer config `zookeeper.connect` * @param offsetStorageFactory [[io.gearpump.streaming.transaction.api.OffsetStorageFactory]] - * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] - * @param messageDecoder decodes [[Message]] from raw bytes + * that creates [[io.gearpump.streaming.transaction.api.OffsetStorage]] + * @param messageDecoder decodes [[io.gearpump.Message]] from raw bytes * @param timestampFilter filters out message based on timestamp */ def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory, - messageDecoder: MessageDecoder, - timestampFilter: TimeStampFilter) = { + messageDecoder: MessageDecoder, + timestampFilter: TimeStampFilter) = { this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory, messageDecoder, timestampFilter) } @@ -191,5 +201,4 @@ class KafkaSource( override def close(): Unit = { offsetManagers.foreach(_._2.close()) } - }
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala index eacd267..e50bf84 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,36 +19,37 @@ package io.gearpump.streaming.kafka import java.util.Properties +import scala.collection.mutable +import scala.util.{Failure, Success, Try} import com.twitter.bijection.Injection -import io.gearpump.streaming.kafka.lib.KafkaUtil -import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer -import io.gearpump.streaming.transaction.api.{OffsetStorageFactory, OffsetStorage} import kafka.api.OffsetRequest import kafka.consumer.ConsumerConfig import org.I0Itec.zkclient.ZkClient -import io.gearpump.TimeStamp -import OffsetStorage.{Overflow, StorageEmpty, Underflow} -import io.gearpump.util.LogUtil import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.serialization.ByteArraySerializer import org.slf4j.Logger -import scala.collection.mutable -import scala.util.{Failure, Success, Try} +import io.gearpump.TimeStamp +import io.gearpump.streaming.kafka.lib.KafkaUtil +import io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer +import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} +import io.gearpump.streaming.transaction.api.{OffsetStorage, OffsetStorageFactory} +import io.gearpump.util.LogUtil /** - * factory that builds [[KafkaStorage]] + * Factory that builds [[KafkaStorage]] * * @param consumerProps kafka consumer config * @param producerProps kafka producer config */ -class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties) extends OffsetStorageFactory { +class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties) + extends OffsetStorageFactory { /** - * - * this creates consumer config properties with `zookeeper.connect` set to zkConnect + * Creates consumer config properties with `zookeeper.connect` set to zkConnect * and producer config properties with `bootstrap.servers` set to bootstrapServers + * * @param zkConnect kafka consumer config `zookeeper.connect` * @param bootstrapServers kafka producer config `bootstrap.servers` */ @@ -70,7 +71,8 @@ object KafkaStorage { } /** - * this stores offset-timestamp mapping to kafka + * Stores offset-timestamp mapping to kafka + * * @param topic kafka store topic * @param producer kafka producer * @param getConsumer function to get kafka consumer @@ -83,11 +85,10 @@ class KafkaStorage private[kafka]( connectZk: => ZkClient) extends OffsetStorage { - private lazy val consumer = getConsumer private val dataByTime: List[(TimeStamp, Array[Byte])] = { - if (KafkaUtil.topicExists(connectZk, topic)){ + if (KafkaUtil.topicExists(connectZk, topic)) { load(consumer) } else { List.empty[(TimeStamp, Array[Byte])] @@ -95,8 +96,9 @@ class KafkaStorage private[kafka]( } /** - * offsets with timestamp < `time` have already been processed by the system - * so we look up the storage for the first offset with timestamp >= `time` on replay + * Offsets with timestamp less than `time` have already been processed by the system + * so we look up the storage for the first offset with timestamp large equal than `time` + * on replay. * * @param time the timestamp to look up for the earliest unprocessed offset * @return the earliest unprocessed offset if `time` is in the range, otherwise failure @@ -143,5 +145,4 @@ class KafkaStorage private[kafka]( consumer.close() messagesBuilder.result().toList } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala index 5c700be..2a852e2 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -24,6 +24,8 @@ import io.gearpump.streaming.dsl import io.gearpump.streaming.kafka.KafkaSink class KafkaDSLSink[T](stream: dsl.Stream[T]) { + + /** Create a Kafka DSL Sink */ def writeToKafka( topic: String, bootstrapServers: String, @@ -42,6 +44,9 @@ class KafkaDSLSink[T](stream: dsl.Stream[T]) { } object KafkaDSLSink { + + import scala.language.implicitConversions + implicit def streamToKafkaDSLSink[T](stream: dsl.Stream[T]): KafkaDSLSink[T] = { new KafkaDSLSink[T](stream) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala index 7f752e5..325b40f 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,10 +20,10 @@ package io.gearpump.streaming.kafka.dsl import java.util.Properties import io.gearpump.streaming.dsl -import io.gearpump.streaming.dsl.{StreamApp} +import io.gearpump.streaming.dsl.StreamApp import io.gearpump.streaming.kafka.KafkaSource import io.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaSourceConfig} -import io.gearpump.streaming.transaction.api.{OffsetStorageFactory, TimeStampFilter, MessageDecoder} +import io.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter} object KafkaDSLUtil { def createStream[T]( @@ -33,7 +33,8 @@ object KafkaDSLUtil { kafkaConfig: KafkaSourceConfig, offsetStorageFactory: OffsetStorageFactory, messageDecoder: MessageDecoder = new DefaultMessageDecoder): dsl.Stream[T] = { - app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder), parallelism, description) + app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder), + parallelism, description) } def createStream[T]( @@ -43,8 +44,8 @@ object KafkaDSLUtil { topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory) - , parallelism, description) + app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory), + parallelism, description) } def createStream[T]( @@ -56,8 +57,8 @@ object KafkaDSLUtil { offsetStorageFactory: OffsetStorageFactory, messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory, messageDecoder, timestampFilter) - , parallelism, description) + app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory, + messageDecoder, timestampFilter), parallelism, description) } def createStream[T]( @@ -67,7 +68,8 @@ object KafkaDSLUtil { topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, properties, offsetStorageFactory), parallelism, description) + app.source[T](new KafkaSource(topics, properties, offsetStorageFactory), + parallelism, description) } def createStream[T]( @@ -79,9 +81,8 @@ object KafkaDSLUtil { offsetStorageFactory: OffsetStorageFactory, messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter): dsl.Stream[T] = { - app.source[T](new KafkaSource(topics, properties, offsetStorageFactory, messageDecoder, timestampFilter), parallelism, description) + app.source[T](new KafkaSource(topics, properties, offsetStorageFactory, + messageDecoder, timestampFilter), parallelism, description) } } - - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala index 873e614..f846efe 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,11 +18,12 @@ package io.gearpump.streaming.kafka.lib +import scala.util.{Failure, Success} + import com.twitter.bijection.Injection -import io.gearpump.streaming.transaction.api.MessageDecoder -import io.gearpump.Message -import scala.util.{Failure, Success} +import io.gearpump.Message +import io.gearpump.streaming.transaction.api.MessageDecoder class DefaultMessageDecoder extends MessageDecoder { override def fromBytes(bytes: Array[Byte]): Message = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala index ecaa294..e9c95e3 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,23 +18,24 @@ package io.gearpump.streaming.kafka.lib +import scala.util.{Failure, Success, Try} + import com.twitter.bijection.Injection -import io.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage} -import io.gearpump._ -import OffsetStorage.{Overflow, StorageEmpty, Underflow} -import io.gearpump.util.LogUtil import org.slf4j.Logger -import scala.util.{Failure, Success, Try} +import io.gearpump._ +import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} +import io.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage} +import io.gearpump.util.LogUtil object KafkaOffsetManager { private val LOG: Logger = LogUtil.getLogger(classOf[KafkaOffsetManager]) } private[kafka] class KafkaOffsetManager(storage: OffsetStorage) extends OffsetManager { - import KafkaOffsetManager._ + import io.gearpump.streaming.kafka.lib.KafkaOffsetManager._ - var maxTime: TimeStamp = 0L + var maxTime: TimeStamp = 0L override def filter(messageAndOffset: (Message, Long)): Option[Message] = { val (message, offset) = messageAndOffset http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala index 50905fc..123f3ac 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,13 +20,13 @@ package io.gearpump.streaming.kafka.lib import java.util.Properties -import io.gearpump.streaming.kafka.KafkaSource -import io.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, KafkaGrouper} -import io.gearpump.util.LogUtil import kafka.api.OffsetRequest import kafka.consumer.ConsumerConfig import org.slf4j.Logger +import io.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, KafkaGrouper} +import io.gearpump.util.LogUtil + object KafkaSourceConfig { val NAME = "kafka_config" @@ -45,12 +45,14 @@ object KafkaSourceConfig { } /** - * this class extends kafka kafka.consumer.ConsumerConfig with specific configs for [[KafkaSource]] + * Extends kafka.consumer.ConsumerConfig with specific config needed by + * [[io.gearpump.streaming.kafka.KafkaSource]] * * @param consumerProps kafka consumer config */ -class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends java.io.Serializable { - import KafkaSourceConfig._ +class KafkaSourceConfig(val consumerProps: Properties = new Properties) + extends java.io.Serializable { + import io.gearpump.streaming.kafka.lib.KafkaSourceConfig._ if (!consumerProps.containsKey(ZOOKEEPER_CONNECT)) { consumerProps.setProperty(ZOOKEEPER_CONNECT, "localhost:2181") @@ -63,9 +65,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends def consumerConfig: ConsumerConfig = new ConsumerConfig(consumerProps) /** - * set kafka consumer topics + * Set kafka consumer topics, seperated by comma. + * * @param topics comma-separated string - * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.CONSUMER_TOPICS]] set to given value + * @return new KafkaConfig based on this but with + * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#CONSUMER_TOPICS]] + * set to given value */ def withConsumerTopics(topics: String): KafkaSourceConfig = { consumerProps.setProperty(CONSUMER_TOPICS, topics) @@ -73,18 +78,22 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends } /** - * @return a list of kafka consumer topics + * Returns a list of kafka consumer topics */ def getConsumerTopics: List[String] = { Option(consumerProps.getProperty(CONSUMER_TOPICS)).getOrElse("topic1").split(",").toList } /** - * [[consumer.FetchThread]] will sleep for a while if no more messages or - * the incoming queue size is above the [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_THRESHOLD]] - * this is to set sleep interval + * Sets the sleep interval if there are no more message or message buffer is full. + * + * Consumer.FetchThread will sleep for a while if no more messages or + * the incoming queue size is above the + * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]] + * * @param sleepMS sleep interval in milliseconds - * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_SLEEP_MS]] set to given value + * @return new KafkaConfig based on this but with + * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_SLEEP_MS]] set to given value */ def withFetchSleepMS(sleepMS: Int): KafkaSourceConfig = { consumerProps.setProperty(FETCH_SLEEP_MS, s"$sleepMS") @@ -92,9 +101,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends } /** - * [[consumer.FetchThread]] will sleep for a while if no more messages or - * the incoming queue size is above the [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_THRESHOLD]] - * this is to get sleep interval + * Gets the sleep interval + * + * Consumer.FetchThread sleeps for a while if no more messages or + * the incoming queue is full (size is bigger than the + * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]) + * * @return sleep interval in milliseconds */ def getFetchSleepMS: Int = { @@ -102,10 +114,14 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends } /** - * [[consumer.FetchThread]] stops fetching new messages if its incoming queue + * Sets the batch size we use for one fetch. + * + * Consumer.FetchThread stops fetching new messages if its incoming queue * size is above the threshold and starts again when the queue size is below it + * * @param threshold queue size - * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.FETCH_THRESHOLD]] set to give value + * @return new KafkaConfig based on this but with + * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]] set to give value */ def withFetchThreshold(threshold: Int): KafkaSourceConfig = { consumerProps.setProperty(FETCH_THRESHOLD, s"$threshold") @@ -113,9 +129,11 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends } /** + * Returns fetch batch size. + * + * Consumer.FetchThread stops fetching new messages if + * its incoming queue size is above the threshold and starts again when the queue size is below it * - * [[io.gearpump.streaming.kafka.lib.consumer.FetchThread]] stops fetching new messages if its incoming queue - * size is above the threshold and starts again when the queue size is below it * @return fetch threshold */ def getFetchThreshold: Int = { @@ -123,11 +141,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends } /** - * set [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which - * defines how kafka.common.TopicAndPartitions are mapped to source tasks + * Sets [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which + * defines how kafka.common.TopicAndPartitions are mapped to source tasks. * * @param className name of the factory class - * @return new KafkaConfig based on this but with [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig.GROUPER_CLASS]] set to given value + * @return new KafkaConfig based on this but with + * [[io.gearpump.streaming.kafka.lib.KafkaSourceConfig#GROUPER_CLASS]] set to given value */ def withGrouper(className: String): KafkaSourceConfig = { consumerProps.setProperty(GROUPER_CLASS, className) @@ -135,13 +154,12 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends } /** - * get [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] instance, which + * Returns [[io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] instance, which * defines how kafka.common.TopicAndPartitions are mapped to source tasks - * @return */ def getGrouper: KafkaGrouper = { - Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS)).getOrElse(classOf[KafkaDefaultGrouper].getName)) - .newInstance().asInstanceOf[KafkaGrouper] + Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS)) + .getOrElse(classOf[KafkaDefaultGrouper].getName)).newInstance().asInstanceOf[KafkaGrouper] } def withConsumerStartOffset(earliestOrLatest: Long): KafkaSourceConfig = { @@ -150,7 +168,8 @@ class KafkaSourceConfig(val consumerProps: Properties = new Properties) extends } def getConsumerStartOffset: Long = { - Option(consumerProps.getProperty(CONSUMER_START_OFFSET)).getOrElse(s"${OffsetRequest.EarliestTime}").toLong + Option(consumerProps.getProperty(CONSUMER_START_OFFSET)) + .getOrElse(s"${OffsetRequest.EarliestTime}").toLong } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala index 54a66ae..2f7fcf7 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/KafkaUtil.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -27,11 +27,12 @@ import kafka.common.TopicAndPartition import kafka.consumer.ConsumerConfig import kafka.utils.{ZKStringSerializer, ZkUtils} import org.I0Itec.zkclient.ZkClient -import io.gearpump.util.LogUtil import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} import org.apache.kafka.common.serialization.Serializer import org.slf4j.Logger +import io.gearpump.util.LogUtil + object KafkaUtil { private val LOG: Logger = LogUtil.getLogger(getClass) @@ -39,7 +40,8 @@ object KafkaUtil { val zkClient = connectZk try { val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) - .getOrElse(throw new RuntimeException(s"leader not available for TopicAndPartition($topic, $partition)")) + .getOrElse(throw new RuntimeException( + s"leader not available for TopicAndPartition($topic, $partition)")) ZkUtils.getBrokerInfo(zkClient, leader) .getOrElse(throw new RuntimeException(s"broker info not found for leader $leader")) } catch { @@ -51,12 +53,13 @@ object KafkaUtil { } } - def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: List[String]): Array[TopicAndPartition] = { + def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: List[String]) + : Array[TopicAndPartition] = { val zkClient = connectZk try { - ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap { - case (topic, partitions) => partitions.map(TopicAndPartition(topic, _)) - }.toArray + ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap { + case (topic, partitions) => partitions.map(TopicAndPartition(topic, _)) + }.toArray } catch { case e: Exception => LOG.error(e.getMessage) @@ -80,10 +83,11 @@ object KafkaUtil { } /** - * create a new kafka topic - * return true if topic already exists, and false otherwise + * create a new kafka topic + * return true if topic already exists, and false otherwise */ - def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, replicas: Int): Boolean = { + def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, replicas: Int) + : Boolean = { val zkClient = connectZk try { if (AdminUtils.topicExists(zkClient, topic)) { @@ -132,15 +136,16 @@ object KafkaUtil { case e: Exception => LOG.error(s"$filename not found") } finally { - if(propStream != null) + if (propStream != null) { propStream.close() + } } props } def createKafkaProducer[K, V](properties: Properties, - keySerializer: Serializer[K], - valueSerializer: Serializer[V]): KafkaProducer[K, V] = { + keySerializer: Serializer[K], + valueSerializer: Serializer[V]): KafkaProducer[K, V] = { if (properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) { properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") } @@ -153,11 +158,10 @@ object KafkaUtil { properties } - def buildConsumerConfig(zkConnect: String): Properties = { - val properties = new Properties() - properties.setProperty("zookeeper.connect", zkConnect) - properties.setProperty("group.id", "gearpump") - properties + def buildConsumerConfig(zkConnect: String): Properties = { + val properties = new Properties() + properties.setProperty("zookeeper.connect", zkConnect) + properties.setProperty("group.id", "gearpump") + properties } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala index b821f15..141ae98 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,15 +7,13 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.gearpump.streaming.kafka.lib.consumer http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala index ee53151..b2b3f4f 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThread.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -23,17 +23,18 @@ import java.util.concurrent.LinkedBlockingQueue import kafka.common.TopicAndPartition import kafka.consumer.ConsumerConfig -import io.gearpump.util.LogUtil import org.slf4j.Logger +import io.gearpump.util.LogUtil + object FetchThread { private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread]) def apply(topicAndPartitions: Array[TopicAndPartition], - fetchThreshold: Int, - fetchSleepMS: Long, - startOffsetTime: Long, - consumerConfig: ConsumerConfig): FetchThread = { + fetchThreshold: Int, + fetchSleepMS: Long, + startOffsetTime: Long, + consumerConfig: ConsumerConfig): FetchThread = { val createConsumer = (tp: TopicAndPartition) => KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig) @@ -43,20 +44,22 @@ object FetchThread { } /** - * A thread to fetch messages from multiple kafka [[TopicAndPartition]]s and puts them + * A thread to fetch messages from multiple kafka org.apache.kafka.TopicAndPartition and puts them * onto a queue, which is asynchronously polled by a consumer * - * @param createConsumer given a [[TopicAndPartition]], create a [[KafkaConsumer]] to connect to it + * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a + * [[io.gearpump.streaming.kafka.lib.consumer.KafkaConsumer]] to + * connect to it * @param incomingQueue a queue to buffer incoming messages * @param fetchThreshold above which thread should stop fetching messages * @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold */ private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition], - createConsumer: TopicAndPartition => KafkaConsumer, - incomingQueue: LinkedBlockingQueue[KafkaMessage], - fetchThreshold: Int, - fetchSleepMS: Long) extends Thread { - import FetchThread._ + createConsumer: TopicAndPartition => KafkaConsumer, + incomingQueue: LinkedBlockingQueue[KafkaMessage], + fetchThreshold: Int, + fetchSleepMS: Long) extends Thread { + import io.gearpump.streaming.kafka.lib.consumer.FetchThread._ private var consumers: Map[TopicAndPartition, KafkaConsumer] = createAllConsumers http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala index 208a99d..77321b9 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,7 +18,6 @@ package io.gearpump.streaming.kafka.lib.consumer -import io.gearpump.streaming.kafka.lib.KafkaUtil import kafka.api.{FetchRequestBuilder, OffsetRequest} import kafka.common.ErrorMapping._ import kafka.common.TopicAndPartition @@ -26,8 +25,11 @@ import kafka.consumer.{ConsumerConfig, SimpleConsumer} import kafka.message.MessageAndOffset import kafka.utils.Utils +import io.gearpump.streaming.kafka.lib.KafkaUtil + object KafkaConsumer { - def apply(topic: String, partition: Int, startOffsetTime: Long, config: ConsumerConfig): KafkaConsumer = { + def apply(topic: String, partition: Int, startOffsetTime: Long, config: ConsumerConfig) + : KafkaConsumer = { val connectZk = KafkaUtil.connectZookeeper(config) val broker = KafkaUtil.getBroker(connectZk(), topic, partition) val soTimeout = config.socketTimeoutMs @@ -40,7 +42,7 @@ object KafkaConsumer { .addFetch(topic, partition, offset, fetchSize) .build() - val response = consumer.fetch(request) + val response = consumer.fetch(request) response.errorCode(topic, partition) match { case NoError => response.messageSet(topic, partition).iterator case error => throw exceptionFor(error) @@ -51,13 +53,14 @@ object KafkaConsumer { } /** - * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over messages from a kafka kafka.common.TopicAndPartition. + * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over + * messages from a kafka kafka.common.TopicAndPartition. */ class KafkaConsumer(consumer: SimpleConsumer, - topic: String, - partition: Int, - getIterator: (Long) => Iterator[MessageAndOffset], - startOffsetTime: Long = OffsetRequest.EarliestTime) { + topic: String, + partition: Int, + getIterator: (Long) => Iterator[MessageAndOffset], + startOffsetTime: Long = OffsetRequest.EarliestTime) { private val earliestOffset = consumer .earliestOrLatestOffset(TopicAndPartition(topic, partition), startOffsetTime, -1) private var nextOffset: Long = earliestOffset @@ -97,6 +100,4 @@ class KafkaConsumer(consumer: SimpleConsumer, def close(): Unit = { consumer.close() } - - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala index 18d1bf0..16330ed 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -28,9 +28,11 @@ import kafka.common.TopicAndPartition * @param msg message payload */ case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long, - key: Option[Array[Byte]], msg: Array[Byte]) { + key: Option[Array[Byte]], msg: Array[Byte]) { + def this(topic: String, partition: Int, offset: Long, - key: Option[Array[Byte]], msg: Array[Byte]) = + key: Option[Array[Byte]], msg: Array[Byte]) = { this(TopicAndPartition(topic, partition), offset, key, msg) + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala index 4a384fc..0f968e2 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -30,7 +30,9 @@ import kafka.common.TopicAndPartition * streamProducer1 gets (topicA, partition2), (topicB, partition2) */ class KafkaDefaultGrouper extends KafkaGrouper { - def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]): Array[TopicAndPartition] = { - topicAndPartitions.indices.filter(_ % taskNum == taskIndex).map(i => topicAndPartitions(i)).toArray + def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) + : Array[TopicAndPartition] = { + topicAndPartitions.indices.filter(_ % taskNum == taskIndex) + .map(i => topicAndPartitions(i)).toArray } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala index 035917b..6660a04 100644 --- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala +++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -24,7 +24,7 @@ import kafka.common.TopicAndPartition * this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks */ trait KafkaGrouper { - def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]): Array[TopicAndPartition] + def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) + : Array[TopicAndPartition] } - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala index 86bc099..2b00414 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,8 +19,6 @@ package io.gearpump.streaming.kafka import com.twitter.bijection.Injection -import io.gearpump.Message -import io.gearpump.streaming.MockUtil import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.mockito.Mockito._ import org.scalacheck.Gen @@ -28,6 +26,9 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} +import io.gearpump.Message +import io.gearpump.streaming.MockUtil + class KafkaSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { val dataGen = for { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala index e67b572..6cd78fc 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,16 +18,10 @@ package io.gearpump.streaming.kafka +import scala.util.{Failure, Success} + import com.twitter.bijection.Injection -import io.gearpump.streaming.kafka.lib.{KafkaSourceConfig, KafkaOffsetManager} -import io.gearpump.streaming.kafka.lib.consumer.{KafkaMessage, FetchThread} -import io.gearpump.streaming.transaction.api.{OffsetStorageFactory, TimeStampFilter, MessageDecoder, OffsetStorage} import kafka.common.TopicAndPartition -import io.gearpump.Message -import io.gearpump.streaming.kafka.lib.consumer.FetchThread -import io.gearpump.streaming.kafka.lib.KafkaOffsetManager -import OffsetStorage.StorageEmpty -import io.gearpump.streaming.transaction.api.OffsetStorageFactory import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalacheck.Gen @@ -35,7 +29,11 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import scala.util.{Failure, Success} +import io.gearpump.Message +import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage} +import io.gearpump.streaming.kafka.lib.{KafkaOffsetManager, KafkaSourceConfig} +import io.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty +import io.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter} class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -50,8 +48,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo val timestampFilter = mock[TimeStampFilter] val offsetStorageFactory = mock[OffsetStorageFactory] val kafkaConfig = mock[KafkaSourceConfig] - val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter, - Some(fetchThread), Map(topicAndPartition -> offsetManager)) + val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, + timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) kafkaSource.setStartTime(None) @@ -68,8 +66,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo val timestampFilter = mock[TimeStampFilter] val offsetStorageFactory = mock[OffsetStorageFactory] val kafkaConfig = mock[KafkaSourceConfig] - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter, - Some(fetchThread), Map(topicAndPartition -> offsetManager)) + val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, + timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(StorageEmpty)) @@ -95,8 +93,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo val timestampFilter = mock[TimeStampFilter] val offsetStorageFactory = mock[OffsetStorageFactory] val kafkaConfig = mock[KafkaSourceConfig] - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter, - Some(fetchThread), Map(topicAndPartition -> offsetManager)) + val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, + timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) when(offsetManager.resolveOffset(startTime)).thenReturn(Success(offset)) @@ -135,7 +133,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo val kafkaConfig = mock[KafkaSourceConfig] val offsetManagers = kafkaMsgList.map(_.topicAndPartition -> offsetManager).toMap - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter, Some(fetchThread), offsetManagers) + val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, + timestampFilter, Some(fetchThread), offsetManagers) if (number == 0) { verify(fetchThread, never()).poll @@ -171,10 +170,9 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo val messageDecoder = mock[MessageDecoder] val offsetStorageFactory = mock[OffsetStorageFactory] val kafkaConfig = mock[KafkaSourceConfig] - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, timestampFilter, Some(fetchThread), - Map(topicAndPartition -> offsetManager)) + val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, + timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) source.close() verify(offsetManager).close() } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala index 2453adf..f3b5425 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,8 +20,8 @@ package io.gearpump.streaming.kafka.lib import com.twitter.bijection.Injection import org.scalacheck.Gen -import org.scalatest.{PropSpec, Matchers} import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers { property("DefaultMessageDecoder should keep the original bytes data in Message") { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala index e185a29..c762b06 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,17 +18,18 @@ package io.gearpump.streaming.kafka.lib +import scala.util.{Failure, Success} + import com.twitter.bijection.Injection -import io.gearpump.streaming.transaction.api.OffsetStorage -import io.gearpump.Message -import OffsetStorage.{Overflow, StorageEmpty, Underflow} import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import scala.util.{Failure, Success} +import io.gearpump.Message +import io.gearpump.streaming.transaction.api.OffsetStorage +import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -40,11 +41,12 @@ class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers val messageAndOffsetsGen = Gen.listOf[Message](messageGen).map(_.zipWithIndex) - property("KafkaOffsetManager should append offset to storage in monotonically increasing time order") { + property("KafkaOffsetManager should append offset to storage in monotonically" + + " increasing time order") { forAll(messageAndOffsetsGen) { (messageAndOffsets: List[(Message, Int)]) => val offsetStorage = mock[OffsetStorage] val offsetManager = new KafkaOffsetManager(offsetStorage) - messageAndOffsets.foldLeft(0L){ (max, messageAndOffset) => + messageAndOffsets.foldLeft(0L) { (max, messageAndOffset) => val (message, offset) = messageAndOffset offsetManager.filter((message, offset.toLong)) shouldBe Option(message) if (message.timestamp > max) { @@ -62,7 +64,8 @@ class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers val minTimeStampGen = Gen.choose[Long](0L, 500L) val maxTimeStampGen = Gen.choose[Long](500L, 1000L) - property("KafkaOffsetManager resolveOffset should report StorageEmpty failure when storage is empty") { + property("KafkaOffsetManager resolveOffset should " + + "report StorageEmpty failure when storage is empty") { forAll(timeStampGen) { (time: Long) => val offsetStorage = mock[OffsetStorage] val offsetManager = new KafkaOffsetManager(offsetStorage) @@ -78,16 +81,19 @@ class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers } val offsetGen = Gen.choose[Long](0L, 1000L) - property("KafkaOffsetManager resolveOffset should return a valid offset when storage is not empty") { + property("KafkaOffsetManager resolveOffset should return a valid" + + " offset when storage is not empty") { forAll(timeStampGen, minTimeStampGen, maxTimeStampGen, offsetGen) { (time: Long, min: Long, max: Long, offset: Long) => val offsetStorage = mock[OffsetStorage] val offsetManager = new KafkaOffsetManager(offsetStorage) if (time < min) { - when(offsetStorage.lookUp(time)).thenReturn(Failure(Underflow(Injection[Long, Array[Byte]](min)))) + when(offsetStorage.lookUp(time)).thenReturn(Failure( + Underflow(Injection[Long, Array[Byte]](min)))) offsetManager.resolveOffset(time) shouldBe Success(min) } else if (time > max) { - when(offsetStorage.lookUp(time)).thenReturn(Failure(Overflow(Injection[Long, Array[Byte]](max)))) + when(offsetStorage.lookUp(time)).thenReturn(Failure( + Overflow(Injection[Long, Array[Byte]](max)))) offsetManager.resolveOffset(time) shouldBe Success(max) } else { when(offsetStorage.lookUp(time)).thenReturn(Success(Injection[Long, Array[Byte]](offset))) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala index bcc757b..af23c12 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,163 +18,170 @@ package io.gearpump.streaming.kafka.lib -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - val minTimeGen = Gen.choose[Long](1L, 500L) - val maxTimeGen = Gen.choose[Long](500L, 999L) - - /* property("KafkaStorage lookup time should report StorageEmpty if storage is empty") { - forAll { (time: Long, topic: String) => - val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] - val getConsumer = () => mock[KafkaConsumer] - val connectZk = () => mock[ZkClient] - val storage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(), connectZk()) - storage.lookUp(time) shouldBe Failure(StorageEmpty) - } - } - - property("KafkaStorage lookup time should return data or report failure if storage is not empty") { - forAll(minTimeGen, maxTimeGen, Gen.alphaStr) { (minTime: Long, maxTime: Long, topic: String) => - val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case (time, index) => - val offset = index.toLong - time -> offset - } - val timeAndOffsetsMap = timeAndOffsets.toMap - val data = timeAndOffsets.map { - case (time, offset) => - new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, Array[Byte]](time)), - Injection[Long, Array[Byte]](offset)) - }.toList - - val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] - val consumer = mock[KafkaConsumer] - val getConsumer = () => consumer - val connectZk = () => mock[ZkClient] - - val hasNexts = List.fill(data.tail.size)(true) :+ false - when(consumer.hasNext).thenReturn(true, hasNexts:_*) - when(consumer.next).thenReturn(data.head, data.tail:_*) - - val storage = new KafkaStorage(topic, topicExists = true, producer, getConsumer(), connectZk()) - forAll(Gen.choose[Long](minTime, maxTime)) { - time => - storage.lookUp(time) match { - case Success(array) => array should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(time))) - case Failure(e) => fail("time in range should return Success with value") - } - } - - forAll(Gen.choose[Long](0L, minTime - 1)) { - time => - storage.lookUp(time) match { - case Failure(e) => e shouldBe a [Underflow] - e.asInstanceOf[Underflow].min should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime))) - case Success(_) => fail("time less than min should return Underflow failure") - } - } - - forAll(Gen.choose[Long](maxTime + 1, 1000L)) { - time => - storage.lookUp(time) match { - case Failure(e) => e shouldBe a [Overflow] - e.asInstanceOf[Overflow].max should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime))) - case Success(_) => fail("time larger than max should return Overflow failure") - } - } - } - } - - property("KafkaStorage append should send data to Kafka") { - forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000), Gen.alphaStr, Gen.oneOf(true, false)) { - (time: Long, offset: Long, topic: String, topicExists: Boolean) => - val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] - val getConsumer = () => mock[KafkaConsumer] - val connectZk = () => mock[ZkClient] - val storage = new KafkaStorage(topic, topicExists, producer, getConsumer(), connectZk()) - val offsetBytes = Injection[Long, Array[Byte]](offset) - storage.append(time, offsetBytes) - verify(producer).send(anyObject[ProducerRecord[Array[Byte], Array[Byte]]]()) - } - } - - val topicAndPartitionGen = for { - topic <- Gen.alphaStr - partition <- Gen.choose[Int](0, 100) - } yield TopicAndPartition(topic, partition) - property("KafkaStorage should load data from Kafka") { - val kafkaMsgGen = for { - timestamp <- Gen.choose[Long](1L, 1000L) - offset <- Gen.choose[Long](0L, 1000L) - } yield (timestamp, Injection[Long, Array[Byte]](offset)) - val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen) - - val topicExistsGen = Gen.oneOf(true, false) - - forAll(topicAndPartitionGen, msgListGen) { - (topicAndPartition: TopicAndPartition, msgList: List[(Long, Array[Byte])]) => - val producer= mock[KafkaProducer[Array[Byte], Array[Byte]]] - val consumer = mock[KafkaConsumer] - val getConsumer = () => consumer - val connectZk = () => mock[ZkClient] - val kafkaStorage = new KafkaStorage(topicAndPartition.topic, topicExists = true, producer, getConsumer(), connectZk()) - msgList match { - case Nil => - when(consumer.hasNext).thenReturn(false) - case list => - val hasNexts = List.fill(list.tail.size)(true) :+ false - val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, bytes), index) => - KafkaMessage(topicAndPartition, index.toLong, Some(Injection[Long, Array[Byte]](timestamp)), bytes) - } - when(consumer.hasNext).thenReturn(true, hasNexts: _*) - when(consumer.next).thenReturn(kafkaMsgList.head, kafkaMsgList.tail: _*) - } - kafkaStorage.load(consumer) shouldBe msgList - } - } - - property("KafkaStorage should not get consumer when topic doesn't exist") { - forAll(Gen.alphaStr) { (topic: String) => - val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] - val getConsumer = mock[() => KafkaConsumer] - val connectZk = () => mock[ZkClient] - val kafkaStorage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(), connectZk()) - verify(getConsumer, never()).apply() - kafkaStorage.close() - } - } - - property("KafkaStorage should fail to load invalid KafkaMessage") { - val invalidKafkaMsgGen = for { - tp <- topicAndPartitionGen - offset <- Gen.choose[Long](1L, 1000L) - timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, Array[Byte]](ByteBuffer.allocate(0))), None) - msg <- Gen.alphaStr.map(Injection[String, Array[Byte]]) - } yield KafkaMessage(tp, offset, timestamp, msg) - forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) => - val consumer = mock[KafkaConsumer] - val getConsumer = () => consumer - val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] - val connectZk = () => mock[ZkClient] - val kafkaStorage = new KafkaStorage(invalidKafkaMsg.topicAndPartition.topic, topicExists = true, - producer, getConsumer(), connectZk()) - when(consumer.hasNext).thenReturn(true, false) - when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg) - Try(kafkaStorage.load(consumer)).isFailure shouldBe true - } - } - - property("KafkaStorage close should close kafka producer and delete topic") { - val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] - val getConsumer = () => mock[KafkaConsumer] - val zkClient = mock[ZkClient] - val connectZk = () => zkClient - val kafkaStorage = new KafkaStorage("topic", false, producer, getConsumer(), connectZk()) - kafkaStorage.close() - verify(producer).close() - verify(zkClient).createPersistent(anyString(), anyString()) - }*/ -} +// TODO: Fix the UT failure! + +// class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { +// val minTimeGen = Gen.choose[Long](1L, 500L) +// val maxTimeGen = Gen.choose[Long](500L, 999L) +// +// property("KafkaStorage lookup time should report StorageEmpty if storage is empty") { +// forAll { (time: Long, topic: String) => +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val getConsumer = () => mock[KafkaConsumer] +// val connectZk = () => mock[ZkClient] +// val storage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(), +// connectZk()) +// storage.lookUp(time) shouldBe Failure(StorageEmpty) +// } +// } +// +// property("KafkaStorage lookup time should return data or report failure if storage not empty") { +// forAll(minTimeGen, maxTimeGen, Gen.alphaStr) {(minTime: Long, maxTime: Long, topic: String) => +// val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case (time, index) => +// val offset = index.toLong +// time -> offset +// } +// val timeAndOffsetsMap = timeAndOffsets.toMap +// val data = timeAndOffsets.map { +// case (time, offset) => +// new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, Array[Byte]](time)), +// Injection[Long, Array[Byte]](offset)) +// }.toList +// +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val consumer = mock[KafkaConsumer] +// val getConsumer = () => consumer +// val connectZk = () => mock[ZkClient] +// +// val hasNexts = List.fill(data.tail.size)(true) :+ false +// when(consumer.hasNext).thenReturn(true, hasNexts:_*) +// when(consumer.next).thenReturn(data.head, data.tail:_*) +// +// val storage = new KafkaStorage(topic, topicExists = true, producer, +// getConsumer(), connectZk()) +// forAll(Gen.choose[Long](minTime, maxTime)) { +// time => +// storage.lookUp(time) match { +// case Success(array) => +// array should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(time))) +// case Failure(e) => fail("time in range should return Success with value") +// } +// } +// +// forAll(Gen.choose[Long](0L, minTime - 1)) { +// time => +// storage.lookUp(time) match { +// case Failure(e) => e shouldBe a [Underflow] +// e.asInstanceOf[Underflow].min should equal +// (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime))) +// case Success(_) => fail("time less than min should return Underflow failure") +// } +// } +// +// forAll(Gen.choose[Long](maxTime + 1, 1000L)) { +// time => +// storage.lookUp(time) match { +// case Failure(e) => e shouldBe a [Overflow] +// e.asInstanceOf[Overflow].max should equal +// (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime))) +// case Success(_) => fail("time larger than max should return Overflow failure") +// } +// } +// } +// } +// +// property("KafkaStorage append should send data to Kafka") { +// forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000), +// Gen.alphaStr, Gen.oneOf(true, false)) { +// (time: Long, offset: Long, topic: String, topicExists: Boolean) => +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val getConsumer = () => mock[KafkaConsumer] +// val connectZk = () => mock[ZkClient] +// val storage = new KafkaStorage(topic, topicExists, producer, getConsumer(), connectZk()) +// val offsetBytes = Injection[Long, Array[Byte]](offset) +// storage.append(time, offsetBytes) +// verify(producer).send(anyObject[ProducerRecord[Array[Byte], Array[Byte]]]()) +// } +// } +// +// val topicAndPartitionGen = for { +// topic <- Gen.alphaStr +// partition <- Gen.choose[Int](0, 100) +// } yield TopicAndPartition(topic, partition) +// property("KafkaStorage should load data from Kafka") { +// val kafkaMsgGen = for { +// timestamp <- Gen.choose[Long](1L, 1000L) +// offset <- Gen.choose[Long](0L, 1000L) +// } yield (timestamp, Injection[Long, Array[Byte]](offset)) +// val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen) +// +// val topicExistsGen = Gen.oneOf(true, false) +// +// forAll(topicAndPartitionGen, msgListGen) { +// (topicAndPartition: TopicAndPartition, msgList: List[(Long, Array[Byte])]) => +// val producer= mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val consumer = mock[KafkaConsumer] +// val getConsumer = () => consumer +// val connectZk = () => mock[ZkClient] +// val kafkaStorage = new KafkaStorage(topicAndPartition.topic, +// topicExists = true, producer, getConsumer(), connectZk()) +// msgList match { +// case Nil => +// when(consumer.hasNext).thenReturn(false) +// case list => +// val hasNexts = List.fill(list.tail.size)(true) :+ false +// val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, bytes), index) => +// KafkaMessage(topicAndPartition, index.toLong, +// Some(Injection[Long, Array[Byte]](timestamp)), bytes) +// } +// when(consumer.hasNext).thenReturn(true, hasNexts: _*) +// when(consumer.next).thenReturn(kafkaMsgList.head, kafkaMsgList.tail: _*) +// } +// kafkaStorage.load(consumer) shouldBe msgList +// } +// } +// +// property("KafkaStorage should not get consumer when topic doesn't exist") { +// forAll(Gen.alphaStr) { (topic: String) => +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val getConsumer = mock[() => KafkaConsumer] +// val connectZk = () => mock[ZkClient] +// val kafkaStorage = new KafkaStorage(topic, +// topicExists = false, producer, getConsumer(), connectZk()) +// verify(getConsumer, never()).apply() +// kafkaStorage.close() +// } +// } +// +// property("KafkaStorage should fail to load invalid KafkaMessage") { +// val invalidKafkaMsgGen = for { +// tp <- topicAndPartitionGen +// offset <- Gen.choose[Long](1L, 1000L) +// timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, Array[Byte]](ByteBuffer.allocate(0))), +// None) +// msg <- Gen.alphaStr.map(Injection[String, Array[Byte]]) +// } yield KafkaMessage(tp, offset, timestamp, msg) +// forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) => +// val consumer = mock[KafkaConsumer] +// val getConsumer = () => consumer +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val connectZk = () => mock[ZkClient] +// val kafkaStorage = new KafkaStorage(invalidKafkaMsg.topicAndPartition.topic, +// topicExists = true, producer, getConsumer(), connectZk()) +// when(consumer.hasNext).thenReturn(true, false) +// when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg) +// Try(kafkaStorage.load(consumer)).isFailure shouldBe true +// } +// } +// +// property("KafkaStorage close should close kafka producer and delete topic") { +// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] +// val getConsumer = () => mock[KafkaConsumer] +// val zkClient = mock[ZkClient] +// val connectZk = () => zkClient +// val kafkaStorage = new KafkaStorage("topic", false, producer, getConsumer(), connectZk()) +// kafkaStorage.close() +// verify(producer).close() +// verify(zkClient).createPersistent(anyString(), anyString()) +// } +// } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala index 24c1c5d..7447308 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -18,34 +18,36 @@ package io.gearpump.streaming.kafka.lib -import io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper -import io.gearpump.streaming.kafka.util.KafkaServerHarness import kafka.common.TopicAndPartition -import kafka.utils.{TestZKUtils, TestUtils} +import kafka.server.{KafkaConfig => KafkaServerConfig} +import kafka.utils.{TestUtils, TestZKUtils} import org.scalatest.prop.PropertyChecks -import org.scalatest.{PropSpec, Matchers, BeforeAndAfterEach} +import org.scalatest.{BeforeAndAfterEach, Matchers, PropSpec} -import kafka.server.{KafkaConfig => KafkaServerConfig} +import io.gearpump.streaming.kafka.lib.grouper.KafkaGrouper +import io.gearpump.streaming.kafka.util.KafkaServerHarness +class KafkaUtilSpec + extends PropSpec with PropertyChecks with BeforeAndAfterEach + with Matchers with KafkaServerHarness { -class KafkaUtilSpec extends PropSpec with PropertyChecks with BeforeAndAfterEach with Matchers with KafkaServerHarness { val numServers = 1 override val configs: List[KafkaServerConfig] = - for(props <- TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false)) - yield new KafkaServerConfig(props) { - override val zkConnect = TestZKUtils.zookeeperConnect - override val numPartitions = 4 - } + for (props <- TestUtils.createBrokerConfigs(numServers, enableControlledShutdown = false)) + yield new KafkaServerConfig(props) { + override val zkConnect = TestZKUtils.zookeeperConnect + override val numPartitions = 4 + } - override def beforeEach: Unit = { + override def beforeEach(): Unit = { super.setUp() } - override def afterEach: Unit = { + override def afterEach(): Unit = { super.tearDown() } - import KafkaUtil._ + import io.gearpump.streaming.kafka.lib.KafkaUtil._ property("KafkaUtil should be able to create topic") { val args = { @@ -91,13 +93,15 @@ class KafkaUtilSpec extends PropSpec with PropertyChecks with BeforeAndAfterEach property("KafkaUtil should be able to get TopicAndPartitions info and group with KafkaGrouper") { val grouper: KafkaGrouper = new KafkaGrouper { - override def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]): Array[TopicAndPartition] = + override def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) + : Array[TopicAndPartition] = { topicAndPartitions + } } val topicNum = 3 val topics = List.fill(topicNum)(TestUtils.tempTopic()) topics.foreach(t => createTopicUntilLeaderIsElected(t, partitions = 1, replicas = 1)) - KafkaUtil.getTopicAndPartitions(connectZk(), topics).toSet shouldBe topics.map(t => TopicAndPartition(t, 0)).toSet + KafkaUtil.getTopicAndPartitions(connectZk(), topics).toSet shouldBe + topics.map(t => TopicAndPartition(t, 0)).toSet } - } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala index a20e575..3983874 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -8,20 +7,18 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package io.gearpump.streaming.kafka.lib.consumer -import org.scalatest.{WordSpec, Matchers} +import org.scalatest.{Matchers, WordSpec} class ExponentialBackoffSleeperSpec extends WordSpec with Matchers { @@ -69,4 +66,3 @@ class ExponentialBackoffSleeperSpec extends WordSpec with Matchers { } } - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala index 92d3c31..d955dfa 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -35,14 +35,14 @@ class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with Mo property("FetchThread should set startOffset to iterators") { forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) { (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) => - val topicAndPartition = mock[TopicAndPartition] - val consumer = mock[KafkaConsumer] - val createConsumer = (tp: TopicAndPartition) => consumer - val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() - val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, - incomingQueue, fetchThreshold, fetchSleepMS) - fetchThread.setStartOffset(topicAndPartition, startOffset) - verify(consumer).setStartOffset(startOffset) + val topicAndPartition = mock[TopicAndPartition] + val consumer = mock[KafkaConsumer] + val createConsumer = (tp: TopicAndPartition) => consumer + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val fetchThread = new FetchThread(Array(topicAndPartition), createConsumer, + incomingQueue, fetchThreshold, fetchSleepMS) + fetchThread.setStartOffset(topicAndPartition, startOffset) + verify(consumer).setStartOffset(startOffset) } } @@ -50,10 +50,11 @@ class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with Mo topic <- Gen.alphaStr partition <- Gen.choose[Int](0, Int.MaxValue) } yield TopicAndPartition(topic, partition) - property("FetchThread should only fetchMessage when the number of messages in queue is below the threshold") { + property("FetchThread should only fetchMessage when the number " + + "of messages in queue is below the threshold") { forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, topicAndPartitionGen) { (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int, - startOffset: Long, topicAndPartition: TopicAndPartition) => + startOffset: Long, topicAndPartition: TopicAndPartition) => val message = mock[KafkaMessage] val consumer = mock[KafkaConsumer] val createConsumer = (tp: TopicAndPartition) => consumer @@ -87,8 +88,12 @@ class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with Mo tp <- topicAndPartitionGen hasNext <- Gen.oneOf(true, false) } yield (tp, hasNext) - val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen).map(_.toMap) suchThat (_.nonEmpty) - property("FetchThread fetchMessage should return false when there are no more messages from any TopicAndPartition") { + + val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, Boolean)](tpAndHasNextGen) + .map(_.toMap) suchThat (_.nonEmpty) + + property("FetchThread fetchMessage should return false when there are no more messages " + + "from any TopicAndPartition") { forAll(tpHasNextMapGen, nonNegativeGen) { (tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) => val createConsumer = (tp: TopicAndPartition) => { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/c176e448/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala index 2669f60..524fc9b 100644 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala +++ b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS,
