http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala new file mode 100644 index 0000000..339711b --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaSource.scala @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka + +import java.util.Properties +import scala.collection.mutable.ArrayBuffer +import scala.util.{Failure, Success} + +import kafka.common.TopicAndPartition +import org.slf4j.Logger + +import org.apache.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage} +import org.apache.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaOffsetManager, KafkaSourceConfig, KafkaUtil} +import org.apache.gearpump.streaming.source.DefaultTimeStampFilter +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty +import org.apache.gearpump.streaming.transaction.api._ +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} + +object KafkaSource { + private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource]) +} + +/** + * Kafka source connectors that pulls a batch of messages (`kafka.consumer.emit.batch.size`) + * from multiple Kafka TopicAndPartition in a round-robin way. + * + * This is a TimeReplayableSource which is able to replay messages given a start time. + * Each kafka message is tagged with a timestamp by + * [[org.apache.gearpump.streaming.transaction.api.MessageDecoder]] and the (offset, timestamp) + * mapping is stored to a [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]]. + * On recovery, we could retrieve the previously stored offset from the + * [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] by timestamp and start to read + * from there. + * + * kafka message is wrapped into gearpump [[org.apache.gearpump.Message]] and further filtered by a + * [[org.apache.gearpump.streaming.transaction.api.TimeStampFilter]] + * such that obsolete messages are dropped. + * + * @param config kafka source config + * @param offsetStorageFactory factory to build [[OffsetStorage]] + * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes + * @param timestampFilter filters out message based on timestamp + * @param fetchThread fetches messages and puts on a in-memory queue + * @param offsetManagers manages offset-to-timestamp storage for each kafka.common.TopicAndPartition + */ +class KafkaSource( + config: KafkaSourceConfig, + offsetStorageFactory: OffsetStorageFactory, + messageDecoder: MessageDecoder = new DefaultMessageDecoder, + timestampFilter: TimeStampFilter = new DefaultTimeStampFilter, + private var fetchThread: Option[FetchThread] = None, + private var offsetManagers: Map[TopicAndPartition, KafkaOffsetManager] = { + Map.empty[TopicAndPartition, KafkaOffsetManager] + }) extends TimeReplayableSource { + import org.apache.gearpump.streaming.kafka.KafkaSource._ + + private var startTime: Option[TimeStamp] = None + + /** + * Constructs a Kafka Source by... + * + * @param topics comma-separated string of topics + * @param properties kafka consumer config + * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory + * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] + * + */ + def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory) = { + this(KafkaSourceConfig(properties).withConsumerTopics(topics), offsetStorageFactory) + } + /** + * Constructs a Kafka Source by... + * + * @param topics comma-separated string of topics + * @param properties kafka consumer config + * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory + * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] + * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes + * @param timestampFilter filters out message based on timestamp + */ + def this(topics: String, properties: Properties, offsetStorageFactory: OffsetStorageFactory, + messageDecoder: MessageDecoder, timestampFilter: TimeStampFilter) = { + this(KafkaSourceConfig(properties) + .withConsumerTopics(topics), offsetStorageFactory, + messageDecoder, timestampFilter) + } + + /** + * Constructs a Kafka Source by... + * + * @param topics comma-separated string of topics + * @param zkConnect kafka consumer config `zookeeper.connect` + * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory + * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] + */ + def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory) = + this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory) + + /** + * Constructs a Kafka Source by... + * + * @param topics comma-separated string of topics + * @param zkConnect kafka consumer config `zookeeper.connect` + * @param offsetStorageFactory org.apache.gearpump.streaming.transaction.api.OffsetStorageFactory + * that creates [[org.apache.gearpump.streaming.transaction.api.OffsetStorage]] + * @param messageDecoder decodes [[org.apache.gearpump.Message]] from raw bytes + * @param timestampFilter filters out message based on timestamp + */ + def this(topics: String, zkConnect: String, offsetStorageFactory: OffsetStorageFactory, + messageDecoder: MessageDecoder, + timestampFilter: TimeStampFilter) = { + this(topics, KafkaUtil.buildConsumerConfig(zkConnect), offsetStorageFactory, + messageDecoder, timestampFilter) + } + + LOG.debug(s"assigned ${offsetManagers.keySet}") + + private[kafka] def setStartTime(startTime: Option[TimeStamp]): Unit = { + this.startTime = startTime + fetchThread.foreach { fetch => + this.startTime.foreach { time => + offsetManagers.foreach { case (tp, offsetManager) => + offsetManager.resolveOffset(time) match { + case Success(offset) => + LOG.debug(s"set start offset to $offset for $tp") + fetch.setStartOffset(tp, offset) + case Failure(StorageEmpty) => + LOG.debug(s"no previous TimeStamp stored") + case Failure(e) => throw e + } + } + } + fetch.setDaemon(true) + fetch.start() + } + } + + override def open(context: TaskContext, startTime: TimeStamp): Unit = { + import context.{appId, appName, parallelism, taskId} + + val topics = config.getConsumerTopics + val grouper = config.getGrouper + val consumerConfig = config.consumerConfig + val topicAndPartitions = grouper.group(parallelism, taskId.index, + KafkaUtil.getTopicAndPartitions(KafkaUtil.connectZookeeper(consumerConfig)(), topics)) + this.fetchThread = Some(FetchThread(topicAndPartitions, config.getFetchThreshold, + config.getFetchSleepMS, config.getConsumerStartOffset, consumerConfig)) + this.offsetManagers = topicAndPartitions.map { tp => + val storageTopic = s"app${appId}_${appName}_${tp.topic}_${tp.partition}" + val storage = offsetStorageFactory.getOffsetStorage(storageTopic) + tp -> new KafkaOffsetManager(storage) + }.toMap + + setStartTime(Option(startTime)) + } + + override def read(): Message = { + fetchThread.flatMap(_.poll.flatMap(filterMessage)).orNull + } + + private def filterMessage(kafkaMsg: KafkaMessage): Option[Message] = { + val msgOpt = offsetManagers(kafkaMsg.topicAndPartition) + .filter(messageDecoder.fromBytes(kafkaMsg.msg) -> kafkaMsg.offset) + msgOpt.flatMap { msg => + startTime match { + case None => + Some(msg) + case Some(time) => + timestampFilter.filter(msg, time) + } + } + } + + override def close(): Unit = { + offsetManagers.foreach(_._2.close()) + } +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala new file mode 100644 index 0000000..8748999 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/KafkaStorage.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka + +import java.util.Properties +import scala.collection.mutable +import scala.util.{Failure, Success, Try} + +import com.twitter.bijection.Injection +import kafka.api.OffsetRequest +import kafka.consumer.ConsumerConfig +import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.slf4j.Logger + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.kafka.lib.KafkaUtil +import org.apache.gearpump.streaming.kafka.lib.consumer.KafkaConsumer +import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} +import org.apache.gearpump.streaming.transaction.api.{OffsetStorage, OffsetStorageFactory} +import org.apache.gearpump.util.LogUtil + +/** + * Factory that builds [[KafkaStorage]] + * + * @param consumerProps kafka consumer config + * @param producerProps kafka producer config + */ +class KafkaStorageFactory(consumerProps: Properties, producerProps: Properties) + extends OffsetStorageFactory { + + /** + * Creates consumer config properties with `zookeeper.connect` set to zkConnect + * and producer config properties with `bootstrap.servers` set to bootstrapServers + * + * @param zkConnect kafka consumer config `zookeeper.connect` + * @param bootstrapServers kafka producer config `bootstrap.servers` + */ + def this(zkConnect: String, bootstrapServers: String) = + this(KafkaUtil.buildConsumerConfig(zkConnect), KafkaUtil.buildProducerConfig(bootstrapServers)) + + override def getOffsetStorage(dir: String): OffsetStorage = { + val topic = dir + val consumerConfig = new ConsumerConfig(consumerProps) + val getConsumer = () => KafkaConsumer(topic, 0, OffsetRequest.EarliestTime, consumerConfig) + new KafkaStorage(topic, KafkaUtil.createKafkaProducer[Array[Byte], Array[Byte]]( + producerProps, new ByteArraySerializer, new ByteArraySerializer), + getConsumer(), KafkaUtil.connectZookeeper(consumerConfig)()) + } +} + +object KafkaStorage { + private val LOG: Logger = LogUtil.getLogger(classOf[KafkaStorage]) +} + +/** + * Stores offset-timestamp mapping to kafka + * + * @param topic kafka store topic + * @param producer kafka producer + * @param getConsumer function to get kafka consumer + * @param connectZk function to connect zookeeper + */ +class KafkaStorage private[kafka]( + topic: String, + producer: KafkaProducer[Array[Byte], Array[Byte]], + getConsumer: => KafkaConsumer, + connectZk: => ZkClient) + extends OffsetStorage { + + private lazy val consumer = getConsumer + + private val dataByTime: List[(TimeStamp, Array[Byte])] = { + if (KafkaUtil.topicExists(connectZk, topic)) { + load(consumer) + } else { + List.empty[(TimeStamp, Array[Byte])] + } + } + + /** + * Offsets with timestamp less than `time` have already been processed by the system + * so we look up the storage for the first offset with timestamp large equal than `time` + * on replay. + * + * @param time the timestamp to look up for the earliest unprocessed offset + * @return the earliest unprocessed offset if `time` is in the range, otherwise failure + */ + override def lookUp(time: TimeStamp): Try[Array[Byte]] = { + if (dataByTime.isEmpty) { + Failure(StorageEmpty) + } else { + val min = dataByTime.head + val max = dataByTime.last + if (time < min._1) { + Failure(Underflow(min._2)) + } else if (time > max._1) { + Failure(Overflow(max._2)) + } else { + Success(dataByTime.find(_._1 >= time).get._2) + } + } + } + + override def append(time: TimeStamp, offset: Array[Byte]): Unit = { + val message = new ProducerRecord[Array[Byte], Array[Byte]]( + topic, 0, Injection[Long, Array[Byte]](time), offset) + producer.send(message) + } + + override def close(): Unit = { + producer.close() + KafkaUtil.deleteTopic(connectZk, topic) + } + + private[kafka] def load(consumer: KafkaConsumer): List[(TimeStamp, Array[Byte])] = { + var messagesBuilder = new mutable.ArrayBuilder.ofRef[(TimeStamp, Array[Byte])] + while (consumer.hasNext) { + val kafkaMsg = consumer.next + kafkaMsg.key.map { k => + Injection.invert[TimeStamp, Array[Byte]](k) match { + case Success(time) => + messagesBuilder += (time -> kafkaMsg.msg) + case Failure(e) => throw e + } + } orElse (throw new RuntimeException("offset key should not be null")) + } + consumer.close() + messagesBuilder.result().toList + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala new file mode 100644 index 0000000..5f48b43 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLSink.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.kafka.dsl + +import java.util.Properties + +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl +import org.apache.gearpump.streaming.kafka.KafkaSink + +class KafkaDSLSink[T](stream: dsl.Stream[T]) { + + /** Create a Kafka DSL Sink */ + def writeToKafka( + topic: String, + bootstrapServers: String, + parallism: Int, + description: String): dsl.Stream[T] = { + stream.sink(new KafkaSink(topic, bootstrapServers), parallism, UserConfig.empty, description) + } + + def writeToKafka( + topic: String, + properties: Properties, + parallism: Int, + description: String): dsl.Stream[T] = { + stream.sink(new KafkaSink(topic, properties), parallism, UserConfig.empty, description) + } +} + +object KafkaDSLSink { + + import scala.language.implicitConversions + + implicit def streamToKafkaDSLSink[T](stream: dsl.Stream[T]): KafkaDSLSink[T] = { + new KafkaDSLSink[T](stream) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala new file mode 100644 index 0000000..0275966 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSLUtil.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.kafka.dsl + +import java.util.Properties + +import org.apache.gearpump.streaming.dsl +import org.apache.gearpump.streaming.dsl.StreamApp +import org.apache.gearpump.streaming.kafka.KafkaSource +import org.apache.gearpump.streaming.kafka.lib.{DefaultMessageDecoder, KafkaSourceConfig} +import org.apache.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter} + +object KafkaDSLUtil { + def createStream[T]( + app: StreamApp, + parallelism: Int, + description: String, + kafkaConfig: KafkaSourceConfig, + offsetStorageFactory: OffsetStorageFactory, + messageDecoder: MessageDecoder = new DefaultMessageDecoder): dsl.Stream[T] = { + app.source[T](new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder), + parallelism, description) + } + + def createStream[T]( + app: StreamApp, + parallelism: Int, + description: String, + topics: String, + zkConnect: String, + offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = { + app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory), + parallelism, description) + } + + def createStream[T]( + app: StreamApp, + parallelism: Int, + description: String, + topics: String, + zkConnect: String, + offsetStorageFactory: OffsetStorageFactory, + messageDecoder: MessageDecoder, + timestampFilter: TimeStampFilter): dsl.Stream[T] = { + app.source[T](new KafkaSource(topics, zkConnect, offsetStorageFactory, + messageDecoder, timestampFilter), parallelism, description) + } + + def createStream[T]( + app: StreamApp, + parallelism: Int, + description: String, + topics: String, + properties: Properties, + offsetStorageFactory: OffsetStorageFactory): dsl.Stream[T] = { + app.source[T](new KafkaSource(topics, properties, offsetStorageFactory), + parallelism, description) + } + + def createStream[T]( + app: StreamApp, + topics: String, + parallelism: Int, + description: String, + properties: Properties, + offsetStorageFactory: OffsetStorageFactory, + messageDecoder: MessageDecoder, + timestampFilter: TimeStampFilter): dsl.Stream[T] = { + app.source[T](new KafkaSource(topics, properties, offsetStorageFactory, + messageDecoder, timestampFilter), parallelism, description) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala new file mode 100644 index 0000000..ea7e8d1 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoder.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib + +import scala.util.{Failure, Success} + +import com.twitter.bijection.Injection + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.transaction.api.MessageDecoder + +class DefaultMessageDecoder extends MessageDecoder { + override def fromBytes(bytes: Array[Byte]): Message = { + Message(bytes, System.currentTimeMillis()) + } +} + +class StringMessageDecoder extends MessageDecoder { + override def fromBytes(bytes: Array[Byte]): Message = { + Injection.invert[String, Array[Byte]](bytes) match { + case Success(s) => Message(s, System.currentTimeMillis()) + case Failure(e) => throw e + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala new file mode 100644 index 0000000..88f509b --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManager.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib + +import scala.util.{Failure, Success, Try} + +import com.twitter.bijection.Injection +import org.slf4j.Logger + +import org.apache.gearpump._ +import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} +import org.apache.gearpump.streaming.transaction.api.{OffsetManager, OffsetStorage} +import org.apache.gearpump.util.LogUtil + +object KafkaOffsetManager { + private val LOG: Logger = LogUtil.getLogger(classOf[KafkaOffsetManager]) +} + +private[kafka] class KafkaOffsetManager(storage: OffsetStorage) extends OffsetManager { + import org.apache.gearpump.streaming.kafka.lib.KafkaOffsetManager._ + + var maxTime: TimeStamp = 0L + + override def filter(messageAndOffset: (Message, Long)): Option[Message] = { + val (message, offset) = messageAndOffset + if (message.timestamp > maxTime) { + maxTime = message.timestamp + storage.append(maxTime, Injection[Long, Array[Byte]](offset)) + } + Some(message) + } + + override def resolveOffset(time: TimeStamp): Try[Long] = { + storage.lookUp(time) match { + case Success(offset) => Injection.invert[Long, Array[Byte]](offset) + case Failure(Overflow(max)) => + LOG.warn(s"start time larger than the max stored TimeStamp; set to max offset") + Injection.invert[Long, Array[Byte]](max) + case Failure(Underflow(min)) => + LOG.warn(s"start time less than the min stored TimeStamp; set to min offset") + Injection.invert[Long, Array[Byte]](min) + case Failure(StorageEmpty) => Failure(StorageEmpty) + case Failure(e) => throw e + } + } + + override def close(): Unit = { + storage.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala new file mode 100644 index 0000000..ade414e --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaSourceConfig.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib + +import java.util.Properties + +import kafka.api.OffsetRequest +import kafka.consumer.ConsumerConfig +import org.slf4j.Logger + +import org.apache.gearpump.streaming.kafka.lib.grouper.{KafkaDefaultGrouper, KafkaGrouper} +import org.apache.gearpump.util.LogUtil + +object KafkaSourceConfig { + + val NAME = "kafka_config" + + val ZOOKEEPER_CONNECT = "zookeeper.connect" + val GROUP_ID = "group.id" + val CONSUMER_START_OFFSET = "kafka.consumer.start.offset" + val CONSUMER_TOPICS = "kafka.consumer.topics" + val FETCH_THRESHOLD = "kafka.consumer.fetch.threshold" + val FETCH_SLEEP_MS = "kafka.consumer.fetch.sleep.ms" + val GROUPER_CLASS = "kafka.grouper.class" + + private val LOG: Logger = LogUtil.getLogger(getClass) + + def apply(consumerProps: Properties): KafkaSourceConfig = new KafkaSourceConfig(consumerProps) +} + +/** + * Extends kafka.consumer.ConsumerConfig with specific config needed by + * [[org.apache.gearpump.streaming.kafka.KafkaSource]] + * + * @param consumerProps kafka consumer config + */ +class KafkaSourceConfig(val consumerProps: Properties = new Properties) + extends java.io.Serializable { + import org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig._ + + if (!consumerProps.containsKey(ZOOKEEPER_CONNECT)) { + consumerProps.setProperty(ZOOKEEPER_CONNECT, "localhost:2181") + } + + if (!consumerProps.containsKey(GROUP_ID)) { + consumerProps.setProperty(GROUP_ID, "gearpump") + } + + def consumerConfig: ConsumerConfig = new ConsumerConfig(consumerProps) + + /** + * Set kafka consumer topics, seperated by comma. + * + * @param topics comma-separated string + * @return new KafkaConfig based on this but with + * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#CONSUMER_TOPICS]] + * set to given value + */ + def withConsumerTopics(topics: String): KafkaSourceConfig = { + consumerProps.setProperty(CONSUMER_TOPICS, topics) + KafkaSourceConfig(consumerProps) + } + + /** + * Returns a list of kafka consumer topics + */ + def getConsumerTopics: List[String] = { + Option(consumerProps.getProperty(CONSUMER_TOPICS)).getOrElse("topic1").split(",").toList + } + + /** + * Sets the sleep interval if there are no more message or message buffer is full. + * + * Consumer.FetchThread will sleep for a while if no more messages or + * the incoming queue size is above the + * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]] + * + * @param sleepMS sleep interval in milliseconds + * @return new KafkaConfig based on this but with + * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_SLEEP_MS]] + * set to given value + */ + def withFetchSleepMS(sleepMS: Int): KafkaSourceConfig = { + consumerProps.setProperty(FETCH_SLEEP_MS, s"$sleepMS") + KafkaSourceConfig(consumerProps) + } + + /** + * Gets the sleep interval + * + * Consumer.FetchThread sleeps for a while if no more messages or + * the incoming queue is full (size is bigger than the + * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]]) + * + * @return sleep interval in milliseconds + */ + def getFetchSleepMS: Int = { + Option(consumerProps.getProperty(FETCH_SLEEP_MS)).getOrElse("100").toInt + } + + /** + * Sets the batch size we use for one fetch. + * + * Consumer.FetchThread stops fetching new messages if its incoming queue + * size is above the threshold and starts again when the queue size is below it + * + * @param threshold queue size + * @return new KafkaConfig based on this but with + * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#FETCH_THRESHOLD]] + * set to give value + */ + def withFetchThreshold(threshold: Int): KafkaSourceConfig = { + consumerProps.setProperty(FETCH_THRESHOLD, s"$threshold") + KafkaSourceConfig(consumerProps) + } + + /** + * Returns fetch batch size. + * + * Consumer.FetchThread stops fetching new messages if + * its incoming queue size is above the threshold and starts again when the queue size is below it + * + * @return fetch threshold + */ + def getFetchThreshold: Int = { + Option(consumerProps.getProperty(FETCH_THRESHOLD)).getOrElse("10000").toInt + } + + /** + * Sets [[org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]], which + * defines how kafka.common.TopicAndPartitions are mapped to source tasks. + * + * @param className name of the factory class + * @return new KafkaConfig based on this but with + * [[org.apache.gearpump.streaming.kafka.lib.KafkaSourceConfig#GROUPER_CLASS]] + * set to given value + */ + def withGrouper(className: String): KafkaSourceConfig = { + consumerProps.setProperty(GROUPER_CLASS, className) + KafkaSourceConfig(consumerProps) + } + + /** + * Returns [[org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper]] instance, which + * defines how kafka.common.TopicAndPartitions are mapped to source tasks + */ + def getGrouper: KafkaGrouper = { + Class.forName(Option(consumerProps.getProperty(GROUPER_CLASS)) + .getOrElse(classOf[KafkaDefaultGrouper].getName)).newInstance().asInstanceOf[KafkaGrouper] + } + + def withConsumerStartOffset(earliestOrLatest: Long): KafkaSourceConfig = { + consumerProps.setProperty(CONSUMER_START_OFFSET, s"$earliestOrLatest") + KafkaSourceConfig(consumerProps) + } + + def getConsumerStartOffset: Long = { + Option(consumerProps.getProperty(CONSUMER_START_OFFSET)) + .getOrElse(s"${OffsetRequest.EarliestTime}").toLong + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala new file mode 100644 index 0000000..e8cf574 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtil.scala @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib + +import java.io.InputStream +import java.util.Properties + +import kafka.admin.AdminUtils +import kafka.cluster.Broker +import kafka.common.TopicAndPartition +import kafka.consumer.ConsumerConfig +import kafka.utils.{ZKStringSerializer, ZkUtils} +import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig} +import org.apache.kafka.common.serialization.Serializer +import org.slf4j.Logger + +import org.apache.gearpump.util.LogUtil + +object KafkaUtil { + private val LOG: Logger = LogUtil.getLogger(getClass) + + def getBroker(connectZk: => ZkClient, topic: String, partition: Int): Broker = { + val zkClient = connectZk + try { + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) + .getOrElse(throw new RuntimeException( + s"leader not available for TopicAndPartition($topic, $partition)")) + ZkUtils.getBrokerInfo(zkClient, leader) + .getOrElse(throw new RuntimeException(s"broker info not found for leader $leader")) + } catch { + case e: Exception => + LOG.error(e.getMessage) + throw e + } finally { + zkClient.close() + } + } + + def getTopicAndPartitions(connectZk: => ZkClient, consumerTopics: List[String]) + : Array[TopicAndPartition] = { + val zkClient = connectZk + try { + ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap { + case (topic, partitions) => partitions.map(TopicAndPartition(topic, _)) + }.toArray + } catch { + case e: Exception => + LOG.error(e.getMessage) + throw e + } finally { + zkClient.close() + } + } + + def topicExists(connectZk: => ZkClient, topic: String): Boolean = { + val zkClient = connectZk + try { + AdminUtils.topicExists(zkClient, topic) + } catch { + case e: Exception => + LOG.error(e.getMessage) + throw e + } finally { + zkClient.close() + } + } + + /** + * create a new kafka topic + * return true if topic already exists, and false otherwise + */ + def createTopic(connectZk: => ZkClient, topic: String, partitions: Int, replicas: Int) + : Boolean = { + val zkClient = connectZk + try { + if (AdminUtils.topicExists(zkClient, topic)) { + LOG.info(s"topic $topic exists") + true + } else { + AdminUtils.createTopic(zkClient, topic, partitions, replicas) + LOG.info(s"created topic $topic") + false + } + } catch { + case e: Exception => + LOG.error(e.getMessage) + throw e + } finally { + zkClient.close() + } + } + + def deleteTopic(connectZk: => ZkClient, topic: String): Unit = { + val zkClient = connectZk + try { + AdminUtils.deleteTopic(zkClient, topic) + } catch { + case e: Exception => + LOG.error(e.getMessage) + } finally { + zkClient.close() + } + } + + def connectZookeeper(config: ConsumerConfig): () => ZkClient = { + val zookeeperConnect = config.zkConnect + val sessionTimeout = config.zkSessionTimeoutMs + val connectionTimeout = config.zkConnectionTimeoutMs + () => new ZkClient(zookeeperConnect, sessionTimeout, connectionTimeout, ZKStringSerializer) + } + + def loadProperties(filename: String): Properties = { + val props = new Properties() + var propStream: InputStream = null + try { + propStream = getClass.getClassLoader.getResourceAsStream(filename) + props.load(propStream) + } catch { + case e: Exception => + LOG.error(s"$filename not found") + } finally { + if (propStream != null) { + propStream.close() + } + } + props + } + + def createKafkaProducer[K, V](properties: Properties, + keySerializer: Serializer[K], + valueSerializer: Serializer[V]): KafkaProducer[K, V] = { + if (properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) == null) { + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + } + new KafkaProducer[K, V](properties, keySerializer, valueSerializer) + } + + def buildProducerConfig(bootstrapServers: String): Properties = { + val properties = new Properties() + properties.setProperty("bootstrap.servers", bootstrapServers) + properties + } + + def buildConsumerConfig(zkConnect: String): Properties = { + val properties = new Properties() + properties.setProperty("zookeeper.connect", zkConnect) + properties.setProperty("group.id", "gearpump") + properties + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala new file mode 100644 index 0000000..ce17f5a --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeper.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.consumer + +/** + * someone sleeps for exponentially increasing duration each time + * until the cap + * + * @param backOffMultiplier The factor by which the duration increases. + * @param initialDurationMs Time in milliseconds for initial sleep. + * @param maximumDurationMs Cap up to which we will increase the duration. + */ +private[consumer] class ExponentialBackoffSleeper( + backOffMultiplier: Double = 2.0, + initialDurationMs: Long = 100, + maximumDurationMs: Long = 10000) { + + require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1") + require(initialDurationMs > 0, "initialDurationMs must be positive") + require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be >= initialDurationMs") + + private var sleepDuration = initialDurationMs + + def reset(): Unit = { + sleepDuration = initialDurationMs + } + + def sleep(): Unit = { + Thread.sleep(sleepDuration) + setNextSleepDuration() + } + + def getSleepDuration: Long = sleepDuration + + def setNextSleepDuration(): Unit = { + val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long] + sleepDuration = math.min(math.max(initialDurationMs, next), maximumDurationMs) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala new file mode 100644 index 0000000..8550207 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThread.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.consumer + +import java.nio.channels.ClosedByInterruptException +import java.util.concurrent.LinkedBlockingQueue + +import kafka.common.TopicAndPartition +import kafka.consumer.ConsumerConfig +import org.slf4j.Logger + +import org.apache.gearpump.util.LogUtil + +object FetchThread { + private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread]) + + def apply(topicAndPartitions: Array[TopicAndPartition], + fetchThreshold: Int, + fetchSleepMS: Long, + startOffsetTime: Long, + consumerConfig: ConsumerConfig): FetchThread = { + val createConsumer = (tp: TopicAndPartition) => + KafkaConsumer(tp.topic, tp.partition, startOffsetTime, consumerConfig) + + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + new FetchThread(topicAndPartitions, createConsumer, incomingQueue, fetchThreshold, fetchSleepMS) + } +} + +/** + * A thread to fetch messages from multiple kafka org.apache.kafka.TopicAndPartition and puts them + * onto a queue, which is asynchronously polled by a consumer + * + * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a + * [[org.apache.gearpump.streaming.kafka.lib.consumer.KafkaConsumer]] to + * connect to it + * @param incomingQueue a queue to buffer incoming messages + * @param fetchThreshold above which thread should stop fetching messages + * @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold + */ +private[kafka] class FetchThread(topicAndPartitions: Array[TopicAndPartition], + createConsumer: TopicAndPartition => KafkaConsumer, + incomingQueue: LinkedBlockingQueue[KafkaMessage], + fetchThreshold: Int, + fetchSleepMS: Long) extends Thread { + import org.apache.gearpump.streaming.kafka.lib.consumer.FetchThread._ + + private var consumers: Map[TopicAndPartition, KafkaConsumer] = createAllConsumers + + def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = { + consumers(tp).setStartOffset(startOffset) + } + + def poll: Option[KafkaMessage] = { + Option(incomingQueue.poll()) + } + + override def run(): Unit = { + try { + var nextOffsets = Map.empty[TopicAndPartition, Long] + var reset = false + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 100L, + maximumDurationMs = 10000L) + while (!Thread.currentThread().isInterrupted) { + try { + if (reset) { + nextOffsets = consumers.mapValues(_.getNextOffset) + resetConsumers(nextOffsets) + reset = false + } + val hasMoreMessages = fetchMessage + sleeper.reset() + if (!hasMoreMessages) { + Thread.sleep(fetchSleepMS) + } + } catch { + case exception: Exception => + LOG.warn(s"resetting consumers due to $exception") + reset = true + sleeper.sleep() + } + } + } catch { + case e: InterruptedException => LOG.info("fetch thread got interrupted exception") + case e: ClosedByInterruptException => LOG.info("fetch thread closed by interrupt exception") + } finally { + consumers.values.foreach(_.close()) + } + } + + /** + * fetch message from each TopicAndPartition in a round-robin way + */ + def fetchMessage: Boolean = { + consumers.foldLeft(false) { (hasNext, tpAndConsumer) => + val (_, consumer) = tpAndConsumer + if (incomingQueue.size < fetchThreshold) { + if (consumer.hasNext) { + incomingQueue.put(consumer.next()) + true + } else { + hasNext + } + } else { + true + } + } + } + + private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = { + topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap + } + + private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit = { + consumers.values.foreach(_.close()) + consumers = createAllConsumers + consumers.foreach { case (tp, consumer) => + consumer.setStartOffset(nextOffsets(tp)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala new file mode 100644 index 0000000..55c327b --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.consumer + +import kafka.api.{FetchRequestBuilder, OffsetRequest} +import kafka.common.ErrorMapping._ +import kafka.common.TopicAndPartition +import kafka.consumer.{ConsumerConfig, SimpleConsumer} +import kafka.message.MessageAndOffset +import kafka.utils.Utils + +import org.apache.gearpump.streaming.kafka.lib.KafkaUtil + +object KafkaConsumer { + def apply(topic: String, partition: Int, startOffsetTime: Long, config: ConsumerConfig) + : KafkaConsumer = { + val connectZk = KafkaUtil.connectZookeeper(config) + val broker = KafkaUtil.getBroker(connectZk(), topic, partition) + val soTimeout = config.socketTimeoutMs + val soBufferSize = config.socketReceiveBufferBytes + val fetchSize = config.fetchMessageMaxBytes + val clientId = config.clientId + val consumer = new SimpleConsumer(broker.host, broker.port, soTimeout, soBufferSize, clientId) + val getIterator = (offset: Long) => { + val request = new FetchRequestBuilder() + .addFetch(topic, partition, offset, fetchSize) + .build() + + val response = consumer.fetch(request) + response.errorCode(topic, partition) match { + case NoError => response.messageSet(topic, partition).iterator + case error => throw exceptionFor(error) + } + } + new KafkaConsumer(consumer, topic, partition, getIterator, startOffsetTime) + } +} + +/** + * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over + * messages from a kafka kafka.common.TopicAndPartition. + */ +class KafkaConsumer(consumer: SimpleConsumer, + topic: String, + partition: Int, + getIterator: (Long) => Iterator[MessageAndOffset], + startOffsetTime: Long = OffsetRequest.EarliestTime) { + private val earliestOffset = consumer + .earliestOrLatestOffset(TopicAndPartition(topic, partition), startOffsetTime, -1) + private var nextOffset: Long = earliestOffset + private var iterator: Iterator[MessageAndOffset] = getIterator(nextOffset) + + def setStartOffset(startOffset: Long): Unit = { + nextOffset = startOffset + iterator = getIterator(nextOffset) + } + + def next(): KafkaMessage = { + val mo = iterator.next() + val message = mo.message + + nextOffset = mo.nextOffset + + val offset = mo.offset + val payload = Utils.readBytes(message.payload) + new KafkaMessage(topic, partition, offset, Option(message.key).map(Utils.readBytes), payload) + } + + def hasNext: Boolean = { + @annotation.tailrec + def hasNextHelper(iter: Iterator[MessageAndOffset], newIterator: Boolean): Boolean = { + if (iter.hasNext) true + else if (newIterator) false + else { + iterator = getIterator(nextOffset) + hasNextHelper(iterator, newIterator = true) + } + } + hasNextHelper(iterator, newIterator = false) + } + + def getNextOffset: Long = nextOffset + + def close(): Unit = { + consumer.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala new file mode 100644 index 0000000..e0813d9 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.consumer + +import kafka.common.TopicAndPartition + +/** + * wrapper over messages from kafka + * @param topicAndPartition where message comes from + * @param offset message offset on kafka queue + * @param key message key, could be None + * @param msg message payload + */ +case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long, + key: Option[Array[Byte]], msg: Array[Byte]) { + + def this(topic: String, partition: Int, offset: Long, + key: Option[Array[Byte]], msg: Array[Byte]) = { + this(TopicAndPartition(topic, partition), offset, key, msg) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala new file mode 100644 index 0000000..b34bf09 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.grouper + +import kafka.common.TopicAndPartition + +/** + * default grouper groups TopicAndPartitions among StreamProducers by partitions + * + * e.g. given 2 topics (topicA with 2 partitions and topicB with 3 partitions) and + * 2 streamProducers (streamProducer0 and streamProducer1) + * + * streamProducer0 gets (topicA, partition1), (topicB, partition1) and (topicA, partition3) + * streamProducer1 gets (topicA, partition2), (topicB, partition2) + */ +class KafkaDefaultGrouper extends KafkaGrouper { + def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) + : Array[TopicAndPartition] = { + topicAndPartitions.indices.filter(_ % taskNum == taskIndex) + .map(i => topicAndPartitions(i)).toArray + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala new file mode 100644 index 0000000..e2f5203 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.grouper + +import kafka.common.TopicAndPartition + +/** + * this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks + */ +trait KafkaGrouper { + def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) + : Array[TopicAndPartition] +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala deleted file mode 100644 index 2b00414..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSinkSpec.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka - -import com.twitter.bijection.Injection -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.Message -import io.gearpump.streaming.MockUtil - -class KafkaSinkSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val dataGen = for { - topic <- Gen.alphaStr - key <- Gen.alphaStr - msg <- Gen.alphaStr - } yield (topic, Injection[String, Array[Byte]](key), Injection[String, Array[Byte]](msg)) - - property("KafkaSink write should send producer record") { - forAll(dataGen) { - (data: (String, Array[Byte], Array[Byte])) => - val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] - val (topic, key, msg) = data - val kafkaSink = new KafkaSink(() => producer, topic) - kafkaSink.write(Message((key, msg))) - verify(producer).send(MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]]( - r => r.topic == topic && (r.key sameElements key) && (r.value sameElements msg))) - kafkaSink.write(Message(msg)) - verify(producer).send(MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]]( - r => r.topic() == topic && (r.key == null) && (r.value() sameElements msg) - )) - kafkaSink.close() - } - } - - property("KafkaSink close should close kafka producer") { - val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] - val kafkaSink = new KafkaSink(() => producer, "topic") - kafkaSink.close() - verify(producer).close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala deleted file mode 100644 index 7c804f7..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/KafkaSourceSpec.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka - -import scala.util.{Failure, Success} - -import com.twitter.bijection.Injection -import kafka.common.TopicAndPartition -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.Message -import io.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage} -import io.gearpump.streaming.kafka.lib.{KafkaOffsetManager, KafkaSourceConfig} -import io.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty -import io.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter} - -class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val startTimeGen = Gen.choose[Long](0L, 1000L) - val offsetGen = Gen.choose[Long](0L, 1000L) - - property("KafkaSource open sets consumer to earliest offset") { - val topicAndPartition = mock[TopicAndPartition] - val fetchThread = mock[FetchThread] - val offsetManager = mock[KafkaOffsetManager] - val messageDecoder = mock[MessageDecoder] - val timestampFilter = mock[TimeStampFilter] - val offsetStorageFactory = mock[OffsetStorageFactory] - val kafkaConfig = mock[KafkaSourceConfig] - val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, - timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) - - kafkaSource.setStartTime(None) - - verify(fetchThread).start() - verify(fetchThread, never()).setStartOffset(anyObject[TopicAndPartition](), anyLong()) - } - - property("KafkaSource open should not set consumer start offset if offset storage is empty") { - forAll(startTimeGen) { (startTime: Long) => - val offsetManager = mock[KafkaOffsetManager] - val topicAndPartition = mock[TopicAndPartition] - val fetchThread = mock[FetchThread] - val messageDecoder = mock[MessageDecoder] - val timestampFilter = mock[TimeStampFilter] - val offsetStorageFactory = mock[OffsetStorageFactory] - val kafkaConfig = mock[KafkaSourceConfig] - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, - timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) - - when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(StorageEmpty)) - - source.setStartTime(Some(startTime)) - verify(fetchThread, never()).setStartOffset(anyObject[TopicAndPartition](), anyLong()) - verify(fetchThread).start() - - when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new RuntimeException)) - intercept[RuntimeException] { - source.setStartTime(Some(startTime)) - } - source.close() - } - } - - property("KafkaSource open should set consumer start offset if offset storage is not empty") { - forAll(startTimeGen, offsetGen) { - (startTime: Long, offset: Long) => - val offsetManager = mock[KafkaOffsetManager] - val topicAndPartition = mock[TopicAndPartition] - val fetchThread = mock[FetchThread] - val messageDecoder = mock[MessageDecoder] - val timestampFilter = mock[TimeStampFilter] - val offsetStorageFactory = mock[OffsetStorageFactory] - val kafkaConfig = mock[KafkaSourceConfig] - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, - timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) - - when(offsetManager.resolveOffset(startTime)).thenReturn(Success(offset)) - - source.setStartTime(Some(startTime)) - verify(fetchThread).setStartOffset(topicAndPartition, offset) - verify(fetchThread).start() - - when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new RuntimeException)) - intercept[RuntimeException] { - source.setStartTime(Some(startTime)) - } - source.close() - } - } - - property("KafkaSource read should return number of messages in best effort") { - val kafkaMsgGen = for { - topic <- Gen.alphaStr - partition <- Gen.choose[Int](0, 1000) - offset <- Gen.choose[Long](0L, 1000L) - key = None - msg <- Gen.alphaStr.map(Injection[String, Array[Byte]]) - } yield KafkaMessage(TopicAndPartition(topic, partition), offset, key, msg) - val msgQueueGen = Gen.containerOf[Array, KafkaMessage](kafkaMsgGen) - forAll(msgQueueGen) { - (msgQueue: Array[KafkaMessage]) => - val offsetManager = mock[KafkaOffsetManager] - val fetchThread = mock[FetchThread] - val messageDecoder = mock[MessageDecoder] - - val timestampFilter = mock[TimeStampFilter] - val offsetStorageFactory = mock[OffsetStorageFactory] - val kafkaConfig = mock[KafkaSourceConfig] - val offsetManagers = msgQueue.map(_.topicAndPartition -> offsetManager).toMap - - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, - timestampFilter, Some(fetchThread), offsetManagers) - - if (msgQueue.isEmpty) { - when(fetchThread.poll).thenReturn(None) - source.read() shouldBe null - } else { - msgQueue.indices.foreach { i => - val message = Message(msgQueue(i).msg) - when(fetchThread.poll).thenReturn(Option(msgQueue(i))) - when(messageDecoder.fromBytes(anyObject[Array[Byte]])).thenReturn(message) - when(offsetManager.filter(anyObject[(Message, Long)])).thenReturn(Some(message)) - when(timestampFilter.filter(anyObject[Message], anyLong())).thenReturn(Some(message)) - - source.read shouldBe message - } - } - source.close() - } - } - - property("KafkaSource close should close all offset managers") { - val offsetManager = mock[KafkaOffsetManager] - val topicAndPartition = mock[TopicAndPartition] - val fetchThread = mock[FetchThread] - val timestampFilter = mock[TimeStampFilter] - val messageDecoder = mock[MessageDecoder] - val offsetStorageFactory = mock[OffsetStorageFactory] - val kafkaConfig = mock[KafkaSourceConfig] - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, - timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) - source.close() - verify(offsetManager).close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala deleted file mode 100644 index f3b5425..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib - -import com.twitter.bijection.Injection -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers { - property("DefaultMessageDecoder should keep the original bytes data in Message") { - val decoder = new DefaultMessageDecoder() - forAll(Gen.alphaStr) { (s: String) => - val bytes = Injection[String, Array[Byte]](s) - decoder.fromBytes(bytes).msg shouldBe bytes - } - } -} - -class StringMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers { - property("StringMessageDecoder should decode original bytes data into string") { - val decoder = new StringMessageDecoder() - forAll(Gen.alphaStr) { (s: String) => - val bytes = Injection[String, Array[Byte]](s) - decoder.fromBytes(bytes).msg shouldBe s - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala deleted file mode 100644 index c762b06..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib - -import scala.util.{Failure, Success} - -import com.twitter.bijection.Injection -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import io.gearpump.Message -import io.gearpump.streaming.transaction.api.OffsetStorage -import io.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} - -class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val timeStampGen = Gen.choose[Long](0L, 1000L) - val messageGen = for { - msg <- Gen.alphaStr - time <- timeStampGen - } yield Message(msg, time) - - val messageAndOffsetsGen = Gen.listOf[Message](messageGen).map(_.zipWithIndex) - - property("KafkaOffsetManager should append offset to storage in monotonically" + - " increasing time order") { - forAll(messageAndOffsetsGen) { (messageAndOffsets: List[(Message, Int)]) => - val offsetStorage = mock[OffsetStorage] - val offsetManager = new KafkaOffsetManager(offsetStorage) - messageAndOffsets.foldLeft(0L) { (max, messageAndOffset) => - val (message, offset) = messageAndOffset - offsetManager.filter((message, offset.toLong)) shouldBe Option(message) - if (message.timestamp > max) { - val newMax = message.timestamp - verify(offsetStorage).append(newMax, Injection[Long, Array[Byte]](offset.toLong)) - newMax - } else { - verifyZeroInteractions(offsetStorage) - max - } - } - offsetManager.close() - } - } - - val minTimeStampGen = Gen.choose[Long](0L, 500L) - val maxTimeStampGen = Gen.choose[Long](500L, 1000L) - property("KafkaOffsetManager resolveOffset should " + - "report StorageEmpty failure when storage is empty") { - forAll(timeStampGen) { (time: Long) => - val offsetStorage = mock[OffsetStorage] - val offsetManager = new KafkaOffsetManager(offsetStorage) - when(offsetStorage.lookUp(time)).thenReturn(Failure(StorageEmpty)) - offsetManager.resolveOffset(time) shouldBe Failure(StorageEmpty) - - doThrow(new RuntimeException).when(offsetStorage).lookUp(time) - intercept[RuntimeException] { - offsetManager.resolveOffset(time) - } - offsetManager.close() - } - } - - val offsetGen = Gen.choose[Long](0L, 1000L) - property("KafkaOffsetManager resolveOffset should return a valid" + - " offset when storage is not empty") { - forAll(timeStampGen, minTimeStampGen, maxTimeStampGen, offsetGen) { - (time: Long, min: Long, max: Long, offset: Long) => - val offsetStorage = mock[OffsetStorage] - val offsetManager = new KafkaOffsetManager(offsetStorage) - if (time < min) { - when(offsetStorage.lookUp(time)).thenReturn(Failure( - Underflow(Injection[Long, Array[Byte]](min)))) - offsetManager.resolveOffset(time) shouldBe Success(min) - } else if (time > max) { - when(offsetStorage.lookUp(time)).thenReturn(Failure( - Overflow(Injection[Long, Array[Byte]](max)))) - offsetManager.resolveOffset(time) shouldBe Success(max) - } else { - when(offsetStorage.lookUp(time)).thenReturn(Success(Injection[Long, Array[Byte]](offset))) - offsetManager.resolveOffset(time) shouldBe Success(offset) - } - - doThrow(new RuntimeException).when(offsetStorage).lookUp(time) - intercept[RuntimeException] { - offsetManager.resolveOffset(time) - } - offsetManager.close() - } - } - - property("KafkaOffsetManager close should close offset storage") { - val offsetStorage = mock[OffsetStorage] - val offsetManager = new KafkaOffsetManager(offsetStorage) - offsetManager.close() - verify(offsetStorage).close() - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/83b36ef7/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala b/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala deleted file mode 100644 index af23c12..0000000 --- a/external/kafka/src/test/scala/io/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.gearpump.streaming.kafka.lib - -// TODO: Fix the UT failure! - -// class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { -// val minTimeGen = Gen.choose[Long](1L, 500L) -// val maxTimeGen = Gen.choose[Long](500L, 999L) -// -// property("KafkaStorage lookup time should report StorageEmpty if storage is empty") { -// forAll { (time: Long, topic: String) => -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val getConsumer = () => mock[KafkaConsumer] -// val connectZk = () => mock[ZkClient] -// val storage = new KafkaStorage(topic, topicExists = false, producer, getConsumer(), -// connectZk()) -// storage.lookUp(time) shouldBe Failure(StorageEmpty) -// } -// } -// -// property("KafkaStorage lookup time should return data or report failure if storage not empty") { -// forAll(minTimeGen, maxTimeGen, Gen.alphaStr) {(minTime: Long, maxTime: Long, topic: String) => -// val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case (time, index) => -// val offset = index.toLong -// time -> offset -// } -// val timeAndOffsetsMap = timeAndOffsets.toMap -// val data = timeAndOffsets.map { -// case (time, offset) => -// new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, Array[Byte]](time)), -// Injection[Long, Array[Byte]](offset)) -// }.toList -// -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val consumer = mock[KafkaConsumer] -// val getConsumer = () => consumer -// val connectZk = () => mock[ZkClient] -// -// val hasNexts = List.fill(data.tail.size)(true) :+ false -// when(consumer.hasNext).thenReturn(true, hasNexts:_*) -// when(consumer.next).thenReturn(data.head, data.tail:_*) -// -// val storage = new KafkaStorage(topic, topicExists = true, producer, -// getConsumer(), connectZk()) -// forAll(Gen.choose[Long](minTime, maxTime)) { -// time => -// storage.lookUp(time) match { -// case Success(array) => -// array should equal (Injection[Long, Array[Byte]](timeAndOffsetsMap(time))) -// case Failure(e) => fail("time in range should return Success with value") -// } -// } -// -// forAll(Gen.choose[Long](0L, minTime - 1)) { -// time => -// storage.lookUp(time) match { -// case Failure(e) => e shouldBe a [Underflow] -// e.asInstanceOf[Underflow].min should equal -// (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime))) -// case Success(_) => fail("time less than min should return Underflow failure") -// } -// } -// -// forAll(Gen.choose[Long](maxTime + 1, 1000L)) { -// time => -// storage.lookUp(time) match { -// case Failure(e) => e shouldBe a [Overflow] -// e.asInstanceOf[Overflow].max should equal -// (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime))) -// case Success(_) => fail("time larger than max should return Overflow failure") -// } -// } -// } -// } -// -// property("KafkaStorage append should send data to Kafka") { -// forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000), -// Gen.alphaStr, Gen.oneOf(true, false)) { -// (time: Long, offset: Long, topic: String, topicExists: Boolean) => -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val getConsumer = () => mock[KafkaConsumer] -// val connectZk = () => mock[ZkClient] -// val storage = new KafkaStorage(topic, topicExists, producer, getConsumer(), connectZk()) -// val offsetBytes = Injection[Long, Array[Byte]](offset) -// storage.append(time, offsetBytes) -// verify(producer).send(anyObject[ProducerRecord[Array[Byte], Array[Byte]]]()) -// } -// } -// -// val topicAndPartitionGen = for { -// topic <- Gen.alphaStr -// partition <- Gen.choose[Int](0, 100) -// } yield TopicAndPartition(topic, partition) -// property("KafkaStorage should load data from Kafka") { -// val kafkaMsgGen = for { -// timestamp <- Gen.choose[Long](1L, 1000L) -// offset <- Gen.choose[Long](0L, 1000L) -// } yield (timestamp, Injection[Long, Array[Byte]](offset)) -// val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen) -// -// val topicExistsGen = Gen.oneOf(true, false) -// -// forAll(topicAndPartitionGen, msgListGen) { -// (topicAndPartition: TopicAndPartition, msgList: List[(Long, Array[Byte])]) => -// val producer= mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val consumer = mock[KafkaConsumer] -// val getConsumer = () => consumer -// val connectZk = () => mock[ZkClient] -// val kafkaStorage = new KafkaStorage(topicAndPartition.topic, -// topicExists = true, producer, getConsumer(), connectZk()) -// msgList match { -// case Nil => -// when(consumer.hasNext).thenReturn(false) -// case list => -// val hasNexts = List.fill(list.tail.size)(true) :+ false -// val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, bytes), index) => -// KafkaMessage(topicAndPartition, index.toLong, -// Some(Injection[Long, Array[Byte]](timestamp)), bytes) -// } -// when(consumer.hasNext).thenReturn(true, hasNexts: _*) -// when(consumer.next).thenReturn(kafkaMsgList.head, kafkaMsgList.tail: _*) -// } -// kafkaStorage.load(consumer) shouldBe msgList -// } -// } -// -// property("KafkaStorage should not get consumer when topic doesn't exist") { -// forAll(Gen.alphaStr) { (topic: String) => -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val getConsumer = mock[() => KafkaConsumer] -// val connectZk = () => mock[ZkClient] -// val kafkaStorage = new KafkaStorage(topic, -// topicExists = false, producer, getConsumer(), connectZk()) -// verify(getConsumer, never()).apply() -// kafkaStorage.close() -// } -// } -// -// property("KafkaStorage should fail to load invalid KafkaMessage") { -// val invalidKafkaMsgGen = for { -// tp <- topicAndPartitionGen -// offset <- Gen.choose[Long](1L, 1000L) -// timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, Array[Byte]](ByteBuffer.allocate(0))), -// None) -// msg <- Gen.alphaStr.map(Injection[String, Array[Byte]]) -// } yield KafkaMessage(tp, offset, timestamp, msg) -// forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) => -// val consumer = mock[KafkaConsumer] -// val getConsumer = () => consumer -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val connectZk = () => mock[ZkClient] -// val kafkaStorage = new KafkaStorage(invalidKafkaMsg.topicAndPartition.topic, -// topicExists = true, producer, getConsumer(), connectZk()) -// when(consumer.hasNext).thenReturn(true, false) -// when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg) -// Try(kafkaStorage.load(consumer)).isFailure shouldBe true -// } -// } -// -// property("KafkaStorage close should close kafka producer and delete topic") { -// val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] -// val getConsumer = () => mock[KafkaConsumer] -// val zkClient = mock[ZkClient] -// val connectZk = () => zkClient -// val kafkaStorage = new KafkaStorage("topic", false, producer, getConsumer(), connectZk()) -// kafkaStorage.close() -// verify(producer).close() -// verify(zkClient).createPersistent(anyString(), anyString()) -// } -// }
