http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala deleted file mode 100644 index 55c327b..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib.consumer - -import kafka.api.{FetchRequestBuilder, OffsetRequest} -import kafka.common.ErrorMapping._ -import kafka.common.TopicAndPartition -import kafka.consumer.{ConsumerConfig, SimpleConsumer} -import kafka.message.MessageAndOffset -import kafka.utils.Utils - -import org.apache.gearpump.streaming.kafka.lib.KafkaUtil - -object KafkaConsumer { - def apply(topic: String, partition: Int, startOffsetTime: Long, config: ConsumerConfig) - : KafkaConsumer = { - val connectZk = KafkaUtil.connectZookeeper(config) - val broker = KafkaUtil.getBroker(connectZk(), topic, partition) - val soTimeout = config.socketTimeoutMs - val soBufferSize = config.socketReceiveBufferBytes - val fetchSize = config.fetchMessageMaxBytes - val clientId = config.clientId - val consumer = new SimpleConsumer(broker.host, broker.port, soTimeout, soBufferSize, clientId) - val getIterator = (offset: Long) => { - val request = new FetchRequestBuilder() - .addFetch(topic, partition, offset, fetchSize) - .build() - - val response = consumer.fetch(request) - response.errorCode(topic, partition) match { - case NoError => response.messageSet(topic, partition).iterator - case error => throw exceptionFor(error) - } - } - new KafkaConsumer(consumer, topic, partition, getIterator, startOffsetTime) - } -} - -/** - * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over - * messages from a kafka kafka.common.TopicAndPartition. - */ -class KafkaConsumer(consumer: SimpleConsumer, - topic: String, - partition: Int, - getIterator: (Long) => Iterator[MessageAndOffset], - startOffsetTime: Long = OffsetRequest.EarliestTime) { - private val earliestOffset = consumer - .earliestOrLatestOffset(TopicAndPartition(topic, partition), startOffsetTime, -1) - private var nextOffset: Long = earliestOffset - private var iterator: Iterator[MessageAndOffset] = getIterator(nextOffset) - - def setStartOffset(startOffset: Long): Unit = { - nextOffset = startOffset - iterator = getIterator(nextOffset) - } - - def next(): KafkaMessage = { - val mo = iterator.next() - val message = mo.message - - nextOffset = mo.nextOffset - - val offset = mo.offset - val payload = Utils.readBytes(message.payload) - new KafkaMessage(topic, partition, offset, Option(message.key).map(Utils.readBytes), payload) - } - - def hasNext: Boolean = { - @annotation.tailrec - def hasNextHelper(iter: Iterator[MessageAndOffset], newIterator: Boolean): Boolean = { - if (iter.hasNext) true - else if (newIterator) false - else { - iterator = getIterator(nextOffset) - hasNextHelper(iterator, newIterator = true) - } - } - hasNextHelper(iterator, newIterator = false) - } - - def getNextOffset: Long = nextOffset - - def close(): Unit = { - consumer.close() - } -}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala deleted file mode 100644 index e0813d9..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib.consumer - -import kafka.common.TopicAndPartition - -/** - * wrapper over messages from kafka - * @param topicAndPartition where message comes from - * @param offset message offset on kafka queue - * @param key message key, could be None - * @param msg message payload - */ -case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long, - key: Option[Array[Byte]], msg: Array[Byte]) { - - def this(topic: String, partition: Int, offset: Long, - key: Option[Array[Byte]], msg: Array[Byte]) = { - this(TopicAndPartition(topic, partition), offset, key, msg) - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala deleted file mode 100644 index b34bf09..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib.grouper - -import kafka.common.TopicAndPartition - -/** - * default grouper groups TopicAndPartitions among StreamProducers by partitions - * - * e.g. given 2 topics (topicA with 2 partitions and topicB with 3 partitions) and - * 2 streamProducers (streamProducer0 and streamProducer1) - * - * streamProducer0 gets (topicA, partition1), (topicB, partition1) and (topicA, partition3) - * streamProducer1 gets (topicA, partition2), (topicB, partition2) - */ -class KafkaDefaultGrouper extends KafkaGrouper { - def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) - : Array[TopicAndPartition] = { - topicAndPartitions.indices.filter(_ % taskNum == taskIndex) - .map(i => topicAndPartitions(i)).toArray - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala deleted file mode 100644 index e2f5203..0000000 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib.grouper - -import kafka.common.TopicAndPartition - -/** - * this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks - */ -trait KafkaGrouper { - def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) - : Array[TopicAndPartition] -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala new file mode 100644 index 0000000..e5534a6 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.sink + +import java.util.Properties + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.kafka.lib.sink.AbstractKafkaSink.KafkaProducerFactory +import org.apache.gearpump.streaming.kafka.util.KafkaConfig +import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.LogUtil +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.serialization.ByteArraySerializer + +object AbstractKafkaSink { + private val LOG = LogUtil.getLogger(classOf[AbstractKafkaSink]) + + val producerFactory = new KafkaProducerFactory { + override def getKafkaProducer(config: KafkaConfig): KafkaProducer[Array[Byte], Array[Byte]] = { + new KafkaProducer[Array[Byte], Array[Byte]](config.getProducerConfig, + new ByteArraySerializer, new ByteArraySerializer) + } + } + + trait KafkaProducerFactory extends java.io.Serializable { + def getKafkaProducer(config: KafkaConfig): KafkaProducer[Array[Byte], Array[Byte]] + } +} +/** + * kafka sink connectors that invokes {{org.apache.kafka.clients.producer.KafkaProducer}} to send + * messages to kafka queue + */ +abstract class AbstractKafkaSink private[kafka]( + topic: String, + props: Properties, + kafkaConfigFactory: KafkaConfigFactory, + factory: KafkaProducerFactory) extends DataSink { + import org.apache.gearpump.streaming.kafka.lib.sink.AbstractKafkaSink._ + + def this(topic: String, props: Properties) = { + this(topic, props, new KafkaConfigFactory, AbstractKafkaSink.producerFactory) + } + + private lazy val config = kafkaConfigFactory.getKafkaConfig(props) + // Lazily construct producer since KafkaProducer is not serializable + private lazy val producer = factory.getKafkaProducer(config) + + override def open(context: TaskContext): Unit = { + LOG.info("KafkaSink opened") + } + + override def write(message: Message): Unit = { + message.msg match { + case (k: Array[Byte], v: Array[Byte]) => + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, k, v) + producer.send(record) + LOG.debug("KafkaSink sent record {} to Kafka", record) + case v: Array[Byte] => + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, v) + producer.send(record) + LOG.debug("KafkaSink sent record {} to Kafka", record) + case m => + val errorMsg = s"unexpected message type ${m.getClass}; " + + s"Array[Byte] or (Array[Byte], Array[Byte]) required" + LOG.error(errorMsg) + } + } + + override def close(): Unit = { + producer.close() + LOG.info("KafkaSink closed") + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala new file mode 100644 index 0000000..da08b04 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source + +import java.util.Properties + +import com.twitter.bijection.Injection +import kafka.common.TopicAndPartition +import org.apache.gearpump.streaming.kafka.KafkaSource +import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient +import KafkaClient.KafkaClientFactory +import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, FetchThread} +import org.apache.gearpump.streaming.kafka.lib.source.grouper.PartitionGrouper +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient +import org.apache.gearpump.streaming.kafka.util.KafkaConfig +import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.streaming.transaction.api._ +import org.apache.gearpump.util.LogUtil +import org.apache.gearpump.{Message, TimeStamp} +import org.slf4j.Logger + +object AbstractKafkaSource { + private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource]) +} + +/** + * Contains implementation for Kafka source connectors, users should use + * [[org.apache.gearpump.streaming.kafka.KafkaSource]]. + * + * This is a TimeReplayableSource which is able to replay messages given a start time. + * Each kafka message is tagged with a timestamp by + * [[org.apache.gearpump.streaming.transaction.api.MessageDecoder]] and the (timestamp, offset) + * mapping is stored to a [[org.apache.gearpump.streaming.transaction.api.CheckpointStore]]. + * On recovery, we could retrieve the previously stored offset from the + * [[org.apache.gearpump.streaming.transaction.api.CheckpointStore]] by timestamp and start to read + * from there. + * + * kafka message is wrapped into gearpump [[org.apache.gearpump.Message]] and further filtered by a + * [[org.apache.gearpump.streaming.transaction.api.TimeStampFilter]] + * such that obsolete messages are dropped. + */ +abstract class AbstractKafkaSource( + topic: String, + props: Properties, + kafkaConfigFactory: KafkaConfigFactory, + kafkaClientFactory: KafkaClientFactory, + fetchThreadFactory: FetchThreadFactory) + extends TimeReplayableSource { + import org.apache.gearpump.streaming.kafka.lib.source.AbstractKafkaSource._ + + def this(topic: String, properties: Properties) = { + this(topic, properties, new KafkaConfigFactory, KafkaClient.factory, FetchThread.factory) + } + + private lazy val config: KafkaConfig = kafkaConfigFactory.getKafkaConfig(props) + private lazy val kafkaClient: KafkaClient = kafkaClientFactory.getKafkaClient(config) + private lazy val fetchThread: FetchThread = fetchThreadFactory.getFetchThread(config, kafkaClient) + private lazy val messageDecoder = config.getConfiguredInstance( + KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[MessageDecoder]) + private lazy val timestampFilter = config.getConfiguredInstance( + KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG, classOf[TimeStampFilter]) + + private var startTime: Long = 0L + private var checkpointStoreFactory: Option[CheckpointStoreFactory] = None + private var checkpointStores: Map[TopicAndPartition, CheckpointStore] = + Map.empty[TopicAndPartition, CheckpointStore] + + override def setCheckpointStore(checkpointStoreFactory: CheckpointStoreFactory): Unit = { + this.checkpointStoreFactory = Some(checkpointStoreFactory) + } + + override def open(context: TaskContext, startTime: TimeStamp): Unit = { + import context.{parallelism, taskId} + + LOG.info("KafkaSource opened at start time {}", startTime) + this.startTime = startTime + val topicList = topic.split(",", -1).toList + val grouper = config.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG, + classOf[PartitionGrouper]) + val topicAndPartitions = grouper.group(parallelism, taskId.index, + kafkaClient.getTopicAndPartitions(topicList)) + LOG.info("assigned partitions {}", s"Array(${topicAndPartitions.mkString(",")})") + + fetchThread.setTopicAndPartitions(topicAndPartitions) + maybeSetupCheckpointStores(topicAndPartitions) + maybeRecover() + } + + /** + * Reads a record from incoming queue, decodes, filters and checkpoints offsets + * before returns a Message. Message can be null if the incoming queue is empty. + * @return a [[org.apache.gearpump.Message]] or null + */ + override def read(): Message = { + fetchThread.poll.flatMap(filterAndCheckpointMessage).orNull + } + + override def close(): Unit = { + kafkaClient.close() + checkpointStores.foreach(_._2.close()) + LOG.info("KafkaSource closed") + } + + /** + * 1. Decodes raw bytes into Message with timestamp + * 2. Filters message against start time + * 3. Checkpoints (timestamp, kafka_offset) + */ + private def filterAndCheckpointMessage(kafkaMsg: KafkaMessage): Option[Message] = { + val msg = messageDecoder.fromBytes(kafkaMsg.key.orNull, kafkaMsg.msg) + LOG.debug("read message {}", msg) + val filtered = timestampFilter.filter(msg, startTime) + filtered.foreach { m => + val time = m.timestamp + val offset = kafkaMsg.offset + LOG.debug("checkpoint message state ({}, {})", time, offset) + checkpointOffsets(kafkaMsg.topicAndPartition, time, offset) + } + filtered + } + + private def checkpointOffsets(tp: TopicAndPartition, time: TimeStamp, offset: Long): Unit = { + checkpointStores.get(tp).foreach(_.persist(time, Injection[Long, Array[Byte]](offset))) + } + + private def maybeSetupCheckpointStores(tps: Array[TopicAndPartition]): Unit = { + for { + f <- checkpointStoreFactory + tp <- tps + } { + val store = f.getCheckpointStore(KafkaConfig.getCheckpointStoreNameSuffix(tp)) + LOG.info("created checkpoint store for {}", tp) + checkpointStores += tp -> store + } + } + + private def maybeRecover(): Unit = { + checkpointStores.foreach { case (tp, store) => + for { + bytes <- store.recover(startTime) + offset <- Injection.invert[Long, Array[Byte]](bytes).toOption + } { + LOG.info("recovered offset {} for {}", offset, tp) + fetchThread.setStartOffset(tp, offset) + } + } + // let JVM exit when other threads are closed + fetchThread.setDaemon(true) + fetchThread.start() + } + + protected def addCheckpointStore(tp: TopicAndPartition, store: CheckpointStore): Unit = { + checkpointStores += tp -> store + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala new file mode 100644 index 0000000..1c1214d --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source + +import com.twitter.bijection.Injection +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.transaction.api.MessageDecoder + +import scala.util.{Failure, Success} + +class DefaultMessageDecoder extends MessageDecoder { + override def fromBytes(key: Array[Byte], value: Array[Byte]): Message = { + Message(value, System.currentTimeMillis()) + } +} + +class StringMessageDecoder extends MessageDecoder { + override def fromBytes(key: Array[Byte], value: Array[Byte]): Message = { + Message(Injection.invert[String, Array[Byte]](value).get, + System.currentTimeMillis()) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeper.scala new file mode 100644 index 0000000..62cd519 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeper.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source.consumer + +/** + * someone sleeps for exponentially increasing duration each time + * until the cap + * + * @param backOffMultiplier The factor by which the duration increases. + * @param initialDurationMs Time in milliseconds for initial sleep. + * @param maximumDurationMs Cap up to which we will increase the duration. + */ +private[consumer] class ExponentialBackoffSleeper( + backOffMultiplier: Double = 2.0, + initialDurationMs: Long = 100, + maximumDurationMs: Long = 10000) { + + require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1") + require(initialDurationMs > 0, "initialDurationMs must be positive") + require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be >= initialDurationMs") + + private var sleepDuration = initialDurationMs + + def reset(): Unit = { + sleepDuration = initialDurationMs + } + + def sleep(): Unit = { + sleep(sleepDuration) + setNextSleepDuration() + } + + def sleep(duration: Long): Unit = { + Thread.sleep(duration) + } + + def getSleepDuration: Long = sleepDuration + + def setNextSleepDuration(): Unit = { + val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long] + sleepDuration = math.min(math.max(initialDurationMs, next), maximumDurationMs) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala new file mode 100644 index 0000000..3119f40 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source.consumer + +import java.nio.channels.ClosedByInterruptException +import java.util.concurrent.LinkedBlockingQueue + +import kafka.common.TopicAndPartition +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient +import org.apache.gearpump.streaming.kafka.util.KafkaConfig +import org.slf4j.Logger + +import org.apache.gearpump.util.LogUtil + +object FetchThread { + private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread]) + + val factory = new FetchThreadFactory + + class FetchThreadFactory extends java.io.Serializable { + def getFetchThread(config: KafkaConfig, client: KafkaClient): FetchThread = { + val fetchThreshold = config.getInt(KafkaConfig.FETCH_THRESHOLD_CONFIG) + val fetchSleepMS = config.getInt(KafkaConfig.FETCH_SLEEP_MS_CONFIG) + val startOffsetTime = config.getLong(KafkaConfig.CONSUMER_START_OFFSET_CONFIG) + FetchThread(fetchThreshold, fetchSleepMS, startOffsetTime, client) + } + } + + def apply(fetchThreshold: Int, + fetchSleepMS: Long, + startOffsetTime: Long, + client: KafkaClient): FetchThread = { + val createConsumer = (tp: TopicAndPartition) => + client.createConsumer(tp.topic, tp.partition, startOffsetTime) + val incomingQueue = new LinkedBlockingQueue[KafkaMessage]() + val sleeper = new ExponentialBackoffSleeper( + backOffMultiplier = 2.0, + initialDurationMs = 100L, + maximumDurationMs = 10000L) + new FetchThread(createConsumer, incomingQueue, sleeper, fetchThreshold, fetchSleepMS) + } +} + +/** + * A thread to fetch messages from multiple kafka org.apache.kafka.TopicAndPartition and puts them + * onto a queue, which is asynchronously polled by a consumer + * + * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a + * [[KafkaConsumer]] to + * connect to it + * @param incomingQueue a queue to buffer incoming messages + * @param fetchThreshold above which thread should stop fetching messages + * @param fetchSleepMS interval to sleep when no more messages or hitting fetchThreshold + */ +private[kafka] class FetchThread( + createConsumer: TopicAndPartition => KafkaConsumer, + incomingQueue: LinkedBlockingQueue[KafkaMessage], + sleeper: ExponentialBackoffSleeper, + fetchThreshold: Int, + fetchSleepMS: Long) extends Thread { + import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread._ + + private var consumers: Map[TopicAndPartition, KafkaConsumer] = + Map.empty[TopicAndPartition, KafkaConsumer] + private var topicAndPartitions: Array[TopicAndPartition] = + Array.empty[TopicAndPartition] + private var nextOffsets = Map.empty[TopicAndPartition, Long] + private var reset = false + + def setTopicAndPartitions(topicAndPartitions: Array[TopicAndPartition]): Unit = { + this.topicAndPartitions = topicAndPartitions + consumers = createAllConsumers + } + + def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = { + consumers.get(tp).foreach(_.setStartOffset(startOffset)) + } + + def poll: Option[KafkaMessage] = { + Option(incomingQueue.poll()) + } + + override def run(): Unit = { + try { + while (!Thread.currentThread().isInterrupted) { + runLoop() + } + } catch { + case e: InterruptedException => LOG.info("fetch thread got interrupted exception") + case e: ClosedByInterruptException => LOG.info("fetch thread closed by interrupt exception") + } finally { + consumers.values.foreach(_.close()) + } + } + + private[lib] def runLoop(): Unit = { + try { + if (reset) { + nextOffsets = consumers.mapValues(_.getNextOffset) + resetConsumers(nextOffsets) + reset = false + } + val hasMoreMessages = fetchMessage + sleeper.reset() + if (!hasMoreMessages) { + // sleep for given duration + sleeper.sleep(fetchSleepMS) + } + } catch { + case exception: Exception => + LOG.warn(s"resetting consumers due to $exception") + reset = true + // sleep for exponentially increasing duration + sleeper.sleep() + } + } + + /** + * fetch message from each TopicAndPartition in a round-robin way + */ + private def fetchMessage: Boolean = { + consumers.foldLeft(false) { (hasNext, tpAndConsumer) => + val (_, consumer) = tpAndConsumer + if (incomingQueue.size < fetchThreshold) { + if (consumer.hasNext) { + incomingQueue.put(consumer.next()) + true + } else { + hasNext + } + } else { + true + } + } + } + + private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = { + topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap + } + + private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit = { + consumers.values.foreach(_.close()) + consumers = createAllConsumers + consumers.foreach { case (tp, consumer) => + consumer.setStartOffset(nextOffsets(tp)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumer.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumer.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumer.scala new file mode 100644 index 0000000..1e307cc --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumer.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source.consumer + +import kafka.api.{FetchRequestBuilder, OffsetRequest} +import kafka.common.ErrorMapping._ +import kafka.common.TopicAndPartition +import kafka.consumer.SimpleConsumer +import kafka.message.MessageAndOffset +import kafka.utils.Utils + +object KafkaConsumer { + def apply(topic: String, partition: Int, startOffsetTime: Long, + fetchSize: Int, consumer: SimpleConsumer): KafkaConsumer = { + val getIterator = (offset: Long) => { + val request = new FetchRequestBuilder() + .addFetch(topic, partition, offset, fetchSize) + .build() + + val response = consumer.fetch(request) + response.errorCode(topic, partition) match { + case NoError => response.messageSet(topic, partition).iterator + case error => throw exceptionFor(error) + } + } + new KafkaConsumer(consumer, topic, partition, getIterator, startOffsetTime) + } +} + +/** + * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over + * messages from a kafka kafka.common.TopicAndPartition. + */ +class KafkaConsumer(consumer: SimpleConsumer, + topic: String, + partition: Int, + getIterator: (Long) => Iterator[MessageAndOffset], + startOffsetTime: Long = OffsetRequest.EarliestTime) { + private val earliestOffset = consumer + .earliestOrLatestOffset(TopicAndPartition(topic, partition), startOffsetTime, -1) + private var nextOffset: Long = earliestOffset + private var iterator: Iterator[MessageAndOffset] = getIterator(nextOffset) + + def setStartOffset(startOffset: Long): Unit = { + nextOffset = startOffset + iterator = getIterator(nextOffset) + } + + def next(): KafkaMessage = { + val mo = iterator.next() + val message = mo.message + + nextOffset = mo.nextOffset + + val offset = mo.offset + val payload = Utils.readBytes(message.payload) + new KafkaMessage(topic, partition, offset, Option(message.key).map(Utils.readBytes), payload) + } + + def hasNext: Boolean = { + @annotation.tailrec + def hasNextHelper(iter: Iterator[MessageAndOffset], newIterator: Boolean): Boolean = { + if (iter.hasNext) true + else if (newIterator) false + else { + iterator = getIterator(nextOffset) + hasNextHelper(iterator, newIterator = true) + } + } + hasNextHelper(iterator, newIterator = false) + } + + def getNextOffset: Long = nextOffset + + def close(): Unit = { + consumer.close() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaMessage.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaMessage.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaMessage.scala new file mode 100644 index 0000000..d2a404d --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaMessage.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source.consumer + +import kafka.common.TopicAndPartition + +/** + * wrapper over messages from kafka + * @param topicAndPartition where message comes from + * @param offset message offset on kafka queue + * @param key message key, could be None + * @param msg message payload + */ +case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long, + key: Option[Array[Byte]], msg: Array[Byte]) { + + def this(topic: String, partition: Int, offset: Long, + key: Option[Array[Byte]], msg: Array[Byte]) = { + this(TopicAndPartition(topic, partition), offset, key, msg) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouper.scala new file mode 100644 index 0000000..f2baf2f --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouper.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source.grouper + +import kafka.common.TopicAndPartition + +/** + * default grouper groups TopicAndPartitions among StreamProducers by partitions + * + * e.g. given 2 topics (topicA with 2 partitions and topicB with 3 partitions) and + * 2 streamProducers (streamProducer0 and streamProducer1) + * + * streamProducer0 gets (topicA, partition1), (topicB, partition1) and (topicA, partition3) + * streamProducer1 gets (topicA, partition2), (topicB, partition2) + */ +class DefaultPartitionGrouper extends PartitionGrouper { + def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) + : Array[TopicAndPartition] = { + topicAndPartitions.indices.filter(_ % taskNum == taskIndex) + .map(i => topicAndPartitions(i)).toArray + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/PartitionGrouper.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/PartitionGrouper.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/PartitionGrouper.scala new file mode 100644 index 0000000..83660e5 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/PartitionGrouper.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.source.grouper + +import kafka.common.TopicAndPartition + +/** + * this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks + */ +trait PartitionGrouper { + def group(taskNum: Int, taskIndex: Int, topicAndPartitions: Array[TopicAndPartition]) + : Array[TopicAndPartition] +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala new file mode 100644 index 0000000..e2450f4 --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.store + +import java.util.Properties + +import com.twitter.bijection.Injection +import kafka.api.OffsetRequest +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.kafka.lib.source.consumer.KafkaConsumer +import org.apache.gearpump.streaming.kafka.util.KafkaConfig +import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory +import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory} +import org.apache.gearpump.util.LogUtil +import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} +import org.apache.kafka.common.serialization.ByteArraySerializer + +/** + * Factory class that constructs a KafkaStore + * + * @param props configuration for kafka store + */ + +abstract class AbstractKafkaStoreFactory( + props: Properties, + configFactory: KafkaConfigFactory) + extends CheckpointStoreFactory { + + def this(props: Properties) { + this(props, new KafkaConfigFactory) + } + + private lazy val config: KafkaConfig = configFactory.getKafkaConfig(props) + + override def getCheckpointStore(name: String): CheckpointStore = { + val topic = config.getKafkaStoreTopic(name) + val client = config.getKafkaClientFactory.getKafkaClient(config) + val replicas = config.getInt(KafkaConfig.REPLICATION_FACTOR_CONFIG) + val topicExists = client.createTopic(topic, 1, replicas) + val consumer = if (topicExists) { + Some(client.createConsumer(topic, 0, OffsetRequest.EarliestTime)) + } else { + None + } + val producer = client.createProducer[Array[Byte], Array[Byte]](new ByteArraySerializer, + new ByteArraySerializer) + new KafkaStore(topic, producer, consumer) + } +} + +object KafkaStore { + private val LOG = LogUtil.getLogger(classOf[KafkaStore]) +} + +/** + * checkpoints (timestamp, state) to kafka + * + * @param topic kafka store topic + * @param producer kafka producer + * @param optConsumer kafka consumer + */ +class KafkaStore private[kafka]( + val topic: String, + val producer: KafkaProducer[Array[Byte], Array[Byte]], + val optConsumer: Option[KafkaConsumer]) + extends CheckpointStore { + import org.apache.gearpump.streaming.kafka.lib.store.KafkaStore._ + + private var maxTime: TimeStamp = 0L + + override def persist(time: TimeStamp, checkpoint: Array[Byte]): Unit = { + // make sure checkpointed timestamp is monotonically increasing + // hence (1, 1), (3, 2), (2, 3) is checkpointed as (1, 1), (3, 2), (3, 3) + if (time > maxTime) { + maxTime = time + } + val key = maxTime + val value = checkpoint + val message = new ProducerRecord[Array[Byte], Array[Byte]]( + topic, 0, Injection[Long, Array[Byte]](key), value) + producer.send(message) + LOG.debug("KafkaStore persisted state ({}, {})", key, value) + } + + override def recover(time: TimeStamp): Option[Array[Byte]] = { + var checkpoint: Option[Array[Byte]] = None + optConsumer.foreach { consumer => + while (consumer.hasNext && checkpoint.isEmpty) { + val kafkaMsg = consumer.next() + checkpoint = for { + k <- kafkaMsg.key + t <- Injection.invert[TimeStamp, Array[Byte]](k).toOption + c = kafkaMsg.msg if t >= time + } yield c + } + consumer.close() + } + checkpoint match { + case Some(c) => + LOG.info(s"KafkaStore recovered checkpoint ($time, $c)") + case None => + LOG.info(s"no checkpoint existing for $time") + } + checkpoint + } + + override def close(): Unit = { + producer.close() + LOG.info("KafkaStore closed") + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala new file mode 100644 index 0000000..417b6de --- /dev/null +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.kafka.lib.util + +import java.util.Properties + +import kafka.admin.AdminUtils +import kafka.cluster.Broker +import kafka.common.TopicAndPartition +import kafka.consumer.SimpleConsumer +import kafka.utils.{ZKStringSerializer, ZkUtils} +import org.I0Itec.zkclient.ZkClient +import org.apache.gearpump.streaming.kafka.lib.source.consumer.KafkaConsumer +import org.apache.gearpump.streaming.kafka.util.KafkaConfig +import org.apache.gearpump.util.LogUtil +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.common.serialization.Serializer + +object KafkaClient { + private val LOG = LogUtil.getLogger(classOf[KafkaClient]) + + val factory = new KafkaClientFactory + + class KafkaClientFactory extends java.io.Serializable { + def getKafkaClient(config: KafkaConfig): KafkaClient = { + val consumerConfig = config.getConsumerConfig + val zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, + consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) + new KafkaClient(config, zkClient) + } + } +} + +class KafkaClient(config: KafkaConfig, zkClient: ZkClient) { + import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient._ + + private val consumerConfig = config.getConsumerConfig + + def getTopicAndPartitions(consumerTopics: List[String]): Array[TopicAndPartition] = { + try { + ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap { + case (topic, partitions) => partitions.map(TopicAndPartition(topic, _)) + }.toArray + } catch { + case e: Exception => + LOG.error(e.getMessage) + throw e + } + } + + def getBroker(topic: String, partition: Int): Broker = { + try { + val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition) + .getOrElse(throw new RuntimeException( + s"leader not available for TopicAndPartition($topic, $partition)")) + ZkUtils.getBrokerInfo(zkClient, leader) + .getOrElse(throw new RuntimeException(s"broker info not found for leader $leader")) + } catch { + case e: Exception => + LOG.error(e.getMessage) + throw e + } + } + + def createConsumer(topic: String, partition: Int, startOffsetTime: Long): KafkaConsumer = { + val broker = getBroker(topic, partition) + val soTimeout = consumerConfig.socketTimeoutMs + val soBufferSize = consumerConfig.socketReceiveBufferBytes + val clientId = consumerConfig.clientId + val fetchSize = consumerConfig.fetchMessageMaxBytes + val consumer = new SimpleConsumer(broker.host, broker.port, soTimeout, soBufferSize, clientId) + KafkaConsumer(topic, partition, startOffsetTime, fetchSize, consumer) + } + + def createProducer[K, V](keySerializer: Serializer[K], + valueSerializer: Serializer[V]): KafkaProducer[K, V] = { + new KafkaProducer[K, V](config.getProducerConfig, keySerializer, valueSerializer) + } + + /** + * create a new kafka topic + * return true if topic already exists, and false otherwise + */ + def createTopic(topic: String, partitions: Int, replicas: Int): Boolean = { + try { + if (AdminUtils.topicExists(zkClient, topic)) { + LOG.info(s"topic $topic exists") + true + } else { + AdminUtils.createTopic(zkClient, topic, partitions, replicas) + LOG.info(s"created topic $topic") + false + } + } catch { + case e: Exception => + LOG.error(e.getMessage) + throw e + } + } + + def close(): Unit = { + zkClient.close() + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala index 2f8a533..62a70bd 100644 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala @@ -18,7 +18,12 @@ package org.apache.gearpump.streaming.kafka +import java.util.Properties + import com.twitter.bijection.Injection +import org.apache.gearpump.streaming.kafka.lib.sink.AbstractKafkaSink.KafkaProducerFactory +import org.apache.gearpump.streaming.kafka.util.KafkaConfig +import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.mockito.Mockito._ import org.scalacheck.Gen @@ -40,9 +45,17 @@ class KafkaSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock property("KafkaSink write should send producer record") { forAll(dataGen) { (data: (String, Array[Byte], Array[Byte])) => + val props = mock[Properties] val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] + val producerFactory = mock[KafkaProducerFactory] + val configFactory = mock[KafkaConfigFactory] + val config = mock[KafkaConfig] + + when(configFactory.getKafkaConfig(props)).thenReturn(config) + when(producerFactory.getKafkaProducer(config)).thenReturn(producer) + val (topic, key, msg) = data - val kafkaSink = new KafkaSink(() => producer, topic) + val kafkaSink = new KafkaSink(topic, props, configFactory, producerFactory) kafkaSink.write(Message((key, msg))) verify(producer).send(MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]]( r => r.topic == topic && (r.key sameElements key) && (r.value sameElements msg))) @@ -55,8 +68,16 @@ class KafkaSinkSpec extends PropSpec with PropertyChecks with Matchers with Mock } property("KafkaSink close should close kafka producer") { + val props = mock[Properties] val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] - val kafkaSink = new KafkaSink(() => producer, "topic") + val producerFactory = mock[KafkaProducerFactory] + val configFactory = mock[KafkaConfigFactory] + val config = mock[KafkaConfig] + + when(configFactory.getKafkaConfig(props)).thenReturn(config) + when(producerFactory.getKafkaProducer(config)).thenReturn(producer) + + val kafkaSink = new KafkaSink("topic", props, configFactory, producerFactory) kafkaSink.close() verify(producer).close() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala index 8055d00..e40276f 100644 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala @@ -18,10 +18,21 @@ package org.apache.gearpump.streaming.kafka -import scala.util.{Failure, Success} +import java.util.Properties import com.twitter.bijection.Injection import kafka.common.TopicAndPartition +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory +import org.apache.gearpump.streaming.kafka.lib.source.grouper.PartitionGrouper +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient +import KafkaClient.KafkaClientFactory +import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, FetchThread} +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient +import org.apache.gearpump.streaming.kafka.util.KafkaConfig +import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory +import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory, MessageDecoder, TimeStampFilter} +import org.apache.gearpump.Message import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalacheck.Gen @@ -29,139 +40,168 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks import org.scalatest.{Matchers, PropSpec} -import org.apache.gearpump.Message -import org.apache.gearpump.streaming.kafka.lib.consumer.{FetchThread, KafkaMessage} -import org.apache.gearpump.streaming.kafka.lib.{KafkaOffsetManager, KafkaSourceConfig} -import org.apache.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty -import org.apache.gearpump.streaming.transaction.api.{MessageDecoder, OffsetStorageFactory, TimeStampFilter} - class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - val startTimeGen = Gen.choose[Long](0L, 1000L) - val offsetGen = Gen.choose[Long](0L, 1000L) - - property("KafkaSource open sets consumer to earliest offset") { - val topicAndPartition = mock[TopicAndPartition] - val fetchThread = mock[FetchThread] - val offsetManager = mock[KafkaOffsetManager] - val messageDecoder = mock[MessageDecoder] - val timestampFilter = mock[TimeStampFilter] - val offsetStorageFactory = mock[OffsetStorageFactory] - val kafkaConfig = mock[KafkaSourceConfig] - val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, - timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) - - kafkaSource.setStartTime(None) - - verify(fetchThread).start() - verify(fetchThread, never()).setStartOffset(anyObject[TopicAndPartition](), anyLong()) - } - - property("KafkaSource open should not set consumer start offset if offset storage is empty") { - forAll(startTimeGen) { (startTime: Long) => - val offsetManager = mock[KafkaOffsetManager] - val topicAndPartition = mock[TopicAndPartition] + val startTimeGen = Gen.choose[Long](0L, 100L) + val offsetGen = Gen.choose[Long](0L, 100L) + val topicAndPartitionGen = for { + topic <- Gen.alphaStr suchThat (_.nonEmpty) + partition <- Gen.chooseNum[Int](1, 100) + } yield TopicAndPartition(topic, partition) + val tpsGen = Gen.listOf[TopicAndPartition](topicAndPartitionGen) suchThat (_.nonEmpty) + + property("KafkaSource open should not recover without checkpoint") { + forAll(startTimeGen, tpsGen) { (startTime: Long, tps: List[TopicAndPartition]) => + val taskContext = MockUtil.mockTaskContext val fetchThread = mock[FetchThread] - val messageDecoder = mock[MessageDecoder] - val timestampFilter = mock[TimeStampFilter] - val offsetStorageFactory = mock[OffsetStorageFactory] - val kafkaConfig = mock[KafkaSourceConfig] - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, - timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) - - when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(StorageEmpty)) + val kafkaClient = mock[KafkaClient] + val clientFactory = mock[KafkaClientFactory] + val threadFactory = mock[FetchThreadFactory] + val topics = tps.map(_.topic) + val properties = mock[Properties] + val kafkaConfig = mock[KafkaConfig] + val configFactory = mock[KafkaConfigFactory] + val partitionGrouper = mock[PartitionGrouper] + + when(configFactory.getKafkaConfig(properties)).thenReturn(kafkaConfig) + when(clientFactory.getKafkaClient(kafkaConfig)).thenReturn(kafkaClient) + val tpsArray = tps.toArray + when(kafkaClient.getTopicAndPartitions(topics)).thenReturn(tpsArray) + when(kafkaConfig.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG, + classOf[PartitionGrouper])).thenReturn(partitionGrouper) + when(partitionGrouper.group(taskContext.parallelism, taskContext.taskId.index, tpsArray)) + .thenReturn(tpsArray) + when(threadFactory.getFetchThread(kafkaConfig, kafkaClient)).thenReturn(fetchThread) + + val source = new KafkaSource(topics.mkString(","), properties, configFactory, + clientFactory, threadFactory) + + source.open(taskContext, startTime) - source.setStartTime(Some(startTime)) verify(fetchThread, never()).setStartOffset(anyObject[TopicAndPartition](), anyLong()) - verify(fetchThread).start() - - when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new RuntimeException)) - intercept[RuntimeException] { - source.setStartTime(Some(startTime)) - } - source.close() } } - property("KafkaSource open should set consumer start offset if offset storage is not empty") { - forAll(startTimeGen, offsetGen) { - (startTime: Long, offset: Long) => - val offsetManager = mock[KafkaOffsetManager] - val topicAndPartition = mock[TopicAndPartition] + property("KafkaSource open should recover with checkpoint") { + forAll(startTimeGen, offsetGen, tpsGen) { + (startTime: Long, offset: Long, tps: List[TopicAndPartition]) => + val taskContext = MockUtil.mockTaskContext + val checkpointStoreFactory = mock[CheckpointStoreFactory] + val checkpointStores = tps.map(_ -> mock[CheckpointStore]).toMap + val topics = tps.map(_.topic) + val properties = mock[Properties] + val kafkaConfig = mock[KafkaConfig] + val configFactory = mock[KafkaConfigFactory] + val kafkaClient = mock[KafkaClient] + val clientFactory = mock[KafkaClientFactory] val fetchThread = mock[FetchThread] - val messageDecoder = mock[MessageDecoder] - val timestampFilter = mock[TimeStampFilter] - val offsetStorageFactory = mock[OffsetStorageFactory] - val kafkaConfig = mock[KafkaSourceConfig] - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, - timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) - - when(offsetManager.resolveOffset(startTime)).thenReturn(Success(offset)) - - source.setStartTime(Some(startTime)) - verify(fetchThread).setStartOffset(topicAndPartition, offset) - verify(fetchThread).start() - - when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new RuntimeException)) - intercept[RuntimeException] { - source.setStartTime(Some(startTime)) + val threadFactory = mock[FetchThreadFactory] + val partitionGrouper = mock[PartitionGrouper] + + when(configFactory.getKafkaConfig(properties)).thenReturn(kafkaConfig) + when(clientFactory.getKafkaClient(kafkaConfig)).thenReturn(kafkaClient) + when(threadFactory.getFetchThread(kafkaConfig, kafkaClient)).thenReturn(fetchThread) + val tpsArray = tps.toArray + when(kafkaClient.getTopicAndPartitions(topics)).thenReturn(tps.toArray) + when(kafkaConfig.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG, + classOf[PartitionGrouper])).thenReturn(partitionGrouper) + when(partitionGrouper.group(taskContext.parallelism, taskContext.taskId.index, tpsArray)) + .thenReturn(tpsArray) + + val source = new KafkaSource(topics.mkString(","), properties, configFactory, + clientFactory, threadFactory) + checkpointStores.foreach{ case (tp, store) => source.addPartitionAndStore(tp, store) } + + checkpointStores.foreach { case (tp, store) => + when(checkpointStoreFactory.getCheckpointStore( + KafkaConfig.getCheckpointStoreNameSuffix(tp))).thenReturn(store) + when(store.recover(startTime)).thenReturn(Some(Injection[Long, Array[Byte]](offset))) } - source.close() + + source.setCheckpointStore(checkpointStoreFactory) + source.open(taskContext, startTime) + + tps.foreach(tp => verify(fetchThread).setStartOffset(tp, offset)) } } - property("KafkaSource read should return number of messages in best effort") { - val kafkaMsgGen = for { - topic <- Gen.alphaStr - partition <- Gen.choose[Int](0, 1000) + property("KafkaSource read checkpoints offset and returns a message or null") { + val msgGen = for { + tp <- topicAndPartitionGen offset <- Gen.choose[Long](0L, 1000L) - key = None + key = Some(Injection[Long, Array[Byte]](offset)) msg <- Gen.alphaStr.map(Injection[String, Array[Byte]]) - } yield KafkaMessage(TopicAndPartition(topic, partition), offset, key, msg) - val msgQueueGen = Gen.containerOf[Array, KafkaMessage](kafkaMsgGen) - forAll(msgQueueGen) { - (msgQueue: Array[KafkaMessage]) => - val offsetManager = mock[KafkaOffsetManager] - val fetchThread = mock[FetchThread] - val messageDecoder = mock[MessageDecoder] - - val timestampFilter = mock[TimeStampFilter] - val offsetStorageFactory = mock[OffsetStorageFactory] - val kafkaConfig = mock[KafkaSourceConfig] - val offsetManagers = msgQueue.map(_.topicAndPartition -> offsetManager).toMap - - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, - timestampFilter, Some(fetchThread), offsetManagers) - - if (msgQueue.isEmpty) { - when(fetchThread.poll).thenReturn(None) - source.read() shouldBe null - } else { - msgQueue.indices.foreach { i => - val message = Message(msgQueue(i).msg) - when(fetchThread.poll).thenReturn(Option(msgQueue(i))) - when(messageDecoder.fromBytes(anyObject[Array[Byte]])).thenReturn(message) - when(offsetManager.filter(anyObject[(Message, Long)])).thenReturn(Some(message)) - when(timestampFilter.filter(anyObject[Message], anyLong())).thenReturn(Some(message)) - - source.read shouldBe message - } + } yield KafkaMessage(tp, offset, key, msg) + val msgQueueGen = Gen.listOf[KafkaMessage](msgGen) + + forAll(msgQueueGen) { (msgQueue: List[KafkaMessage]) => + val properties = mock[Properties] + val config = mock[KafkaConfig] + val configFactory = mock[KafkaConfigFactory] + val timestampFilter = mock[TimeStampFilter] + val messageDecoder = mock[MessageDecoder] + val kafkaClient = mock[KafkaClient] + val clientFactory = mock[KafkaClientFactory] + val fetchThread = mock[FetchThread] + val threadFactory = mock[FetchThreadFactory] + val checkpointStoreFactory = mock[CheckpointStoreFactory] + + val checkpointStores = msgQueue.map(_.topicAndPartition -> mock[CheckpointStore]).toMap + val topics = checkpointStores.map(_._1.topic).mkString(",") + + checkpointStores.foreach { case (tp, store) => + when(checkpointStoreFactory.getCheckpointStore(KafkaConfig + .getCheckpointStoreNameSuffix(tp))).thenReturn(store) + } + when(configFactory.getKafkaConfig(properties)).thenReturn(config) + when(config.getConfiguredInstance(KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG, + classOf[TimeStampFilter])).thenReturn(timestampFilter) + when(config.getConfiguredInstance(KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, + classOf[MessageDecoder])).thenReturn(messageDecoder) + when(clientFactory.getKafkaClient(config)).thenReturn(kafkaClient) + when(threadFactory.getFetchThread(config, kafkaClient)).thenReturn(fetchThread) + + val source = new KafkaSource(topics, properties, configFactory, clientFactory, threadFactory) + checkpointStores.foreach{ case (tp, store) => source.addPartitionAndStore(tp, store) } + source.setCheckpointStore(checkpointStoreFactory) + + if (msgQueue.isEmpty) { + when(fetchThread.poll).thenReturn(None) + source.read() shouldBe null + } else { + msgQueue.foreach { kafkaMsg => + when(fetchThread.poll).thenReturn(Option(kafkaMsg)) + val message = Message(kafkaMsg.msg, kafkaMsg.offset) + when(messageDecoder.fromBytes(kafkaMsg.key.get, kafkaMsg.msg)).thenReturn(message) + when(timestampFilter.filter(message, 0)).thenReturn(Some(message)) + source.read() shouldBe Message(kafkaMsg.msg, kafkaMsg.offset) + verify(checkpointStores(kafkaMsg.topicAndPartition)).persist( + kafkaMsg.offset, Injection[Long, Array[Byte]](kafkaMsg.offset)) } - source.close() + } } } - property("KafkaSource close should close all offset managers") { - val offsetManager = mock[KafkaOffsetManager] - val topicAndPartition = mock[TopicAndPartition] - val fetchThread = mock[FetchThread] - val timestampFilter = mock[TimeStampFilter] - val messageDecoder = mock[MessageDecoder] - val offsetStorageFactory = mock[OffsetStorageFactory] - val kafkaConfig = mock[KafkaSourceConfig] - val source = new KafkaSource(kafkaConfig, offsetStorageFactory, messageDecoder, - timestampFilter, Some(fetchThread), Map(topicAndPartition -> offsetManager)) - source.close() - verify(offsetManager).close() + property("KafkaSource close should close all checkpoint stores") { + forAll(Gen.chooseNum[Int](1, 100)) { (num: Int) => + val tps = 0.until(num).map(id => TopicAndPartition("topic", id)) + val checkpointStores = tps.map(_ -> mock[CheckpointStore]).toMap + val props = mock[Properties] + val kafkaConfig = mock[KafkaConfig] + val configFactory = mock[KafkaConfigFactory] + val threadFactory = mock[FetchThreadFactory] + val kafkaClient = mock[KafkaClient] + val clientFactory = mock[KafkaClientFactory] + + when(configFactory.getKafkaConfig(props)).thenReturn(kafkaConfig) + when(clientFactory.getKafkaClient(kafkaConfig)).thenReturn(kafkaClient) + + val source = new KafkaSource("topic", props, configFactory, clientFactory, threadFactory) + checkpointStores.foreach{ case (tp, store) => source.addPartitionAndStore(tp, store) } + source.close() + + verify(kafkaClient).close() + checkpointStores.foreach{ case (_, store) => verify(store).close() } + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala new file mode 100644 index 0000000..67c64c4 --- /dev/null +++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.kafka + +import java.util.Properties + +import com.twitter.bijection.Injection +import kafka.api.OffsetRequest +import kafka.common.TopicAndPartition +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, KafkaConsumer} +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient +import KafkaClient.KafkaClientFactory +import org.apache.gearpump.streaming.kafka.lib.store.KafkaStore +import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient +import org.apache.gearpump.streaming.kafka.util.KafkaConfig +import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory +import org.apache.kafka.clients.producer.{Producer, ProducerRecord, KafkaProducer} +import org.apache.kafka.common.serialization.Serializer +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.prop.PropertyChecks + + +class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + val timestampGen = Gen.chooseNum[Long](0L, 100L) + + property("KafkaStoreFactory should get KafkaStore given store name") { + forAll(Gen.alphaStr, Gen.alphaStr, Gen.oneOf(true, false)) { + (prefix: String, name: String, topicExists: Boolean) => + val props = mock[Properties] + val config = mock[KafkaConfig] + val configFactory = mock[KafkaConfigFactory] + val clientFactory = mock[KafkaClientFactory] + val client = mock[KafkaClient] + val consumer = mock[KafkaConsumer] + val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] + val topic = s"$prefix-$name" + val replica = 1 + + when(configFactory.getKafkaConfig(props)).thenReturn(config) + when(config.getKafkaStoreTopic(name)).thenReturn(topic) + when(config.getString(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG)).thenReturn(prefix) + when(config.getInt(KafkaConfig.REPLICATION_FACTOR_CONFIG)).thenReturn(replica) + when(config.getKafkaClientFactory).thenReturn(clientFactory) + when(clientFactory.getKafkaClient(config)).thenReturn(client) + when(client.createTopic(topic, 1, replica)).thenReturn(topicExists) + if (topicExists) { + when(client.createConsumer(topic, 0, OffsetRequest.EarliestTime)).thenReturn(consumer) + } + when(client.createProducer[Array[Byte], Array[Byte]](any[Serializer[Array[Byte]]], + any[Serializer[Array[Byte]]])).thenReturn(producer) + + val storeFactory = new KafkaStoreFactory(props, configFactory) + storeFactory.getCheckpointStore(name) shouldBe a [KafkaStore] + + if (topicExists) { + verify(client).createConsumer(topic, 0, OffsetRequest.EarliestTime) + } + } + } + + property("KafkaStore should close producer on close") { + forAll(Gen.alphaStr) { (topic: String) => + val consumer = mock[KafkaConsumer] + val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] + val kafkaStore = new KafkaStore(topic, producer, Some(consumer)) + kafkaStore.close() + verify(producer).close() + } + } + + property("KafkaStore should read checkpoint from timestamp on recover") { + forAll(Gen.alphaStr, timestampGen) { + (topic: String, recoverTime: TimeStamp) => + val consumer = mock[KafkaConsumer] + val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] + val kafkaStore = new KafkaStore(topic, producer, Some(consumer)) + + // case 1: no checkpoint available + when(consumer.hasNext).thenReturn(false) + kafkaStore.recover(recoverTime) shouldBe None + verify(consumer).close() + } + + forAll(Gen.alphaStr, timestampGen) { + (topic: String, recoverTime: TimeStamp) => + val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] + val kafkaStore = new KafkaStore(topic, producer, None) + + // case 2: no checkpoint store available + kafkaStore.recover(recoverTime) shouldBe None + } + + forAll(Gen.alphaStr, timestampGen, timestampGen) { + (topic: String, recoverTime: TimeStamp, checkpointTime: TimeStamp) => + val consumer = mock[KafkaConsumer] + val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] + val kafkaStore = new KafkaStore(topic, producer, Some(consumer)) + + val key = Injection[TimeStamp, Array[Byte]](checkpointTime) + val msg = key + val kafkaMsg = KafkaMessage(TopicAndPartition(topic, 0), 0, Some(key), msg) + + when(consumer.hasNext).thenReturn(true, false) + when(consumer.next()).thenReturn(kafkaMsg) + + if (checkpointTime < recoverTime) { + // case 3: checkpointTime is less than recoverTime + kafkaStore.recover(recoverTime) shouldBe None + } else { + // case 4: checkpoint time is equal to or larger than given timestamp + kafkaStore.recover(recoverTime) shouldBe Some(msg) + } + + verify(consumer).close() + } + } + + property("KafkaStore persist should write checkpoint with monotonically increasing timestamp") { + forAll(Gen.alphaStr, timestampGen, Gen.alphaStr) { + (topic: String, checkpointTime: TimeStamp, data: String) => + val consumer = mock[KafkaConsumer] + val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]] + val kafkaStore = new KafkaStore(topic, producer, Some(consumer)) + + val value = Injection[String, Array[Byte]](data) + kafkaStore.persist(checkpointTime, value) + kafkaStore.persist(checkpointTime - 1, value) + kafkaStore.persist(checkpointTime + 1, value) + + verifyProducer(producer, count = 2, topic, 0, checkpointTime, data) + verifyProducer(producer, count = 1, topic, 0, checkpointTime + 1, data) + + } + + def verifyProducer(producer: Producer[Array[Byte], Array[Byte]], count: Int, + topic: String, partition: Int, time: TimeStamp, data: String): Unit = { + verify(producer, times(count)).send( + MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](record => + record.topic() == topic + && record.partition() == partition + && Injection.invert[TimeStamp, Array[Byte]](record.key()).get == time + && Injection.invert[String, Array[Byte]](record.value()).get == data + )) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala deleted file mode 100644 index e243eab..0000000 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib - -import com.twitter.bijection.Injection -import org.scalacheck.Gen -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers { - property("DefaultMessageDecoder should keep the original bytes data in Message") { - val decoder = new DefaultMessageDecoder() - forAll(Gen.alphaStr) { (s: String) => - val bytes = Injection[String, Array[Byte]](s) - decoder.fromBytes(bytes).msg shouldBe bytes - } - } -} - -class StringMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers { - property("StringMessageDecoder should decode original bytes data into string") { - val decoder = new StringMessageDecoder() - forAll(Gen.alphaStr) { (s: String) => - val bytes = Injection[String, Array[Byte]](s) - decoder.fromBytes(bytes).msg shouldBe s - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala deleted file mode 100644 index 987d975..0000000 --- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.kafka.lib - -import scala.util.{Failure, Success} - -import com.twitter.bijection.Injection -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -import org.apache.gearpump.Message -import org.apache.gearpump.streaming.transaction.api.OffsetStorage -import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, StorageEmpty, Underflow} - -class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - - val timeStampGen = Gen.choose[Long](0L, 1000L) - val messageGen = for { - msg <- Gen.alphaStr - time <- timeStampGen - } yield Message(msg, time) - - val messageAndOffsetsGen = Gen.listOf[Message](messageGen).map(_.zipWithIndex) - - property("KafkaOffsetManager should append offset to storage in monotonically" + - " increasing time order") { - forAll(messageAndOffsetsGen) { (messageAndOffsets: List[(Message, Int)]) => - val offsetStorage = mock[OffsetStorage] - val offsetManager = new KafkaOffsetManager(offsetStorage) - messageAndOffsets.foldLeft(0L) { (max, messageAndOffset) => - val (message, offset) = messageAndOffset - offsetManager.filter((message, offset.toLong)) shouldBe Option(message) - if (message.timestamp > max) { - val newMax = message.timestamp - verify(offsetStorage).append(newMax, Injection[Long, Array[Byte]](offset.toLong)) - newMax - } else { - verifyZeroInteractions(offsetStorage) - max - } - } - offsetManager.close() - } - } - - val minTimeStampGen = Gen.choose[Long](0L, 500L) - val maxTimeStampGen = Gen.choose[Long](500L, 1000L) - property("KafkaOffsetManager resolveOffset should " + - "report StorageEmpty failure when storage is empty") { - forAll(timeStampGen) { (time: Long) => - val offsetStorage = mock[OffsetStorage] - val offsetManager = new KafkaOffsetManager(offsetStorage) - when(offsetStorage.lookUp(time)).thenReturn(Failure(StorageEmpty)) - offsetManager.resolveOffset(time) shouldBe Failure(StorageEmpty) - - doThrow(new RuntimeException).when(offsetStorage).lookUp(time) - intercept[RuntimeException] { - offsetManager.resolveOffset(time) - } - offsetManager.close() - } - } - - val offsetGen = Gen.choose[Long](0L, 1000L) - property("KafkaOffsetManager resolveOffset should return a valid" + - " offset when storage is not empty") { - forAll(timeStampGen, minTimeStampGen, maxTimeStampGen, offsetGen) { - (time: Long, min: Long, max: Long, offset: Long) => - val offsetStorage = mock[OffsetStorage] - val offsetManager = new KafkaOffsetManager(offsetStorage) - if (time < min) { - when(offsetStorage.lookUp(time)).thenReturn(Failure( - Underflow(Injection[Long, Array[Byte]](min)))) - offsetManager.resolveOffset(time) shouldBe Success(min) - } else if (time > max) { - when(offsetStorage.lookUp(time)).thenReturn(Failure( - Overflow(Injection[Long, Array[Byte]](max)))) - offsetManager.resolveOffset(time) shouldBe Success(max) - } else { - when(offsetStorage.lookUp(time)).thenReturn(Success(Injection[Long, Array[Byte]](offset))) - offsetManager.resolveOffset(time) shouldBe Success(offset) - } - - doThrow(new RuntimeException).when(offsetStorage).lookUp(time) - intercept[RuntimeException] { - offsetManager.resolveOffset(time) - } - offsetManager.close() - } - } - - property("KafkaOffsetManager close should close offset storage") { - val offsetStorage = mock[OffsetStorage] - val offsetManager = new KafkaOffsetManager(offsetStorage) - offsetManager.close() - verify(offsetStorage).close() - } -}
