http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
deleted file mode 100644
index 55c327b..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumer.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.consumer
-
-import kafka.api.{FetchRequestBuilder, OffsetRequest}
-import kafka.common.ErrorMapping._
-import kafka.common.TopicAndPartition
-import kafka.consumer.{ConsumerConfig, SimpleConsumer}
-import kafka.message.MessageAndOffset
-import kafka.utils.Utils
-
-import org.apache.gearpump.streaming.kafka.lib.KafkaUtil
-
-object KafkaConsumer {
-  def apply(topic: String, partition: Int, startOffsetTime: Long, config: 
ConsumerConfig)
-    : KafkaConsumer = {
-    val connectZk = KafkaUtil.connectZookeeper(config)
-    val broker = KafkaUtil.getBroker(connectZk(), topic, partition)
-    val soTimeout = config.socketTimeoutMs
-    val soBufferSize = config.socketReceiveBufferBytes
-    val fetchSize = config.fetchMessageMaxBytes
-    val clientId = config.clientId
-    val consumer = new SimpleConsumer(broker.host, broker.port, soTimeout, 
soBufferSize, clientId)
-    val getIterator = (offset: Long) => {
-      val request = new FetchRequestBuilder()
-        .addFetch(topic, partition, offset, fetchSize)
-        .build()
-
-      val response = consumer.fetch(request)
-      response.errorCode(topic, partition) match {
-        case NoError => response.messageSet(topic, partition).iterator
-        case error => throw exceptionFor(error)
-      }
-    }
-    new KafkaConsumer(consumer, topic, partition, getIterator, startOffsetTime)
-  }
-}
-
-/**
- * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over
- * messages from a kafka kafka.common.TopicAndPartition.
- */
-class KafkaConsumer(consumer: SimpleConsumer,
-    topic: String,
-    partition: Int,
-    getIterator: (Long) => Iterator[MessageAndOffset],
-    startOffsetTime: Long = OffsetRequest.EarliestTime) {
-  private val earliestOffset = consumer
-    .earliestOrLatestOffset(TopicAndPartition(topic, partition), 
startOffsetTime, -1)
-  private var nextOffset: Long = earliestOffset
-  private var iterator: Iterator[MessageAndOffset] = getIterator(nextOffset)
-
-  def setStartOffset(startOffset: Long): Unit = {
-    nextOffset = startOffset
-    iterator = getIterator(nextOffset)
-  }
-
-  def next(): KafkaMessage = {
-    val mo = iterator.next()
-    val message = mo.message
-
-    nextOffset = mo.nextOffset
-
-    val offset = mo.offset
-    val payload = Utils.readBytes(message.payload)
-    new KafkaMessage(topic, partition, offset, 
Option(message.key).map(Utils.readBytes), payload)
-  }
-
-  def hasNext: Boolean = {
-    @annotation.tailrec
-    def hasNextHelper(iter: Iterator[MessageAndOffset], newIterator: Boolean): 
Boolean = {
-      if (iter.hasNext) true
-      else if (newIterator) false
-      else {
-        iterator = getIterator(nextOffset)
-        hasNextHelper(iterator, newIterator = true)
-      }
-    }
-    hasNextHelper(iterator, newIterator = false)
-  }
-
-  def getNextOffset: Long = nextOffset
-
-  def close(): Unit = {
-    consumer.close()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
deleted file mode 100644
index e0813d9..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaMessage.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.consumer
-
-import kafka.common.TopicAndPartition
-
-/**
- * wrapper over messages from kafka
- * @param topicAndPartition where message comes from
- * @param offset message offset on kafka queue
- * @param key message key, could be None
- * @param msg message payload
- */
-case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long,
-    key: Option[Array[Byte]], msg: Array[Byte]) {
-
-  def this(topic: String, partition: Int, offset: Long,
-      key: Option[Array[Byte]], msg: Array[Byte]) = {
-    this(TopicAndPartition(topic, partition), offset, key, msg)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
deleted file mode 100644
index b34bf09..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouper.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.grouper
-
-import kafka.common.TopicAndPartition
-
-/**
- * default grouper groups TopicAndPartitions among StreamProducers by 
partitions
- *
- * e.g. given 2 topics (topicA with 2 partitions and topicB with 3 partitions) 
and
- * 2 streamProducers (streamProducer0 and streamProducer1)
- *
- * streamProducer0 gets (topicA, partition1), (topicB, partition1) and 
(topicA, partition3)
- * streamProducer1 gets (topicA, partition2), (topicB, partition2)
- */
-class KafkaDefaultGrouper extends KafkaGrouper {
-  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: 
Array[TopicAndPartition])
-    : Array[TopicAndPartition] = {
-    topicAndPartitions.indices.filter(_ % taskNum == taskIndex)
-      .map(i => topicAndPartitions(i)).toArray
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
deleted file mode 100644
index e2f5203..0000000
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaGrouper.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.grouper
-
-import kafka.common.TopicAndPartition
-
-/**
- * this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks
- */
-trait KafkaGrouper {
-  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: 
Array[TopicAndPartition])
-    : Array[TopicAndPartition]
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
new file mode 100644
index 0000000..e5534a6
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/sink/AbstractKafkaSink.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.sink
+
+import java.util.Properties
+
+import org.apache.gearpump.Message
+import 
org.apache.gearpump.streaming.kafka.lib.sink.AbstractKafkaSink.KafkaProducerFactory
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+
+object AbstractKafkaSink {
+  private val LOG = LogUtil.getLogger(classOf[AbstractKafkaSink])
+
+  val producerFactory = new KafkaProducerFactory {
+    override def getKafkaProducer(config: KafkaConfig): 
KafkaProducer[Array[Byte], Array[Byte]] = {
+      new KafkaProducer[Array[Byte], Array[Byte]](config.getProducerConfig,
+        new ByteArraySerializer, new ByteArraySerializer)
+    }
+  }
+
+  trait KafkaProducerFactory extends java.io.Serializable {
+    def getKafkaProducer(config: KafkaConfig): KafkaProducer[Array[Byte], 
Array[Byte]]
+  }
+}
+/**
+ * kafka sink connectors that invokes 
{{org.apache.kafka.clients.producer.KafkaProducer}} to send
+ * messages to kafka queue
+ */
+abstract class AbstractKafkaSink private[kafka](
+    topic: String,
+    props: Properties,
+    kafkaConfigFactory: KafkaConfigFactory,
+    factory: KafkaProducerFactory) extends DataSink {
+  import org.apache.gearpump.streaming.kafka.lib.sink.AbstractKafkaSink._
+
+  def this(topic: String, props: Properties) = {
+    this(topic, props, new KafkaConfigFactory, 
AbstractKafkaSink.producerFactory)
+  }
+
+  private lazy val config = kafkaConfigFactory.getKafkaConfig(props)
+  // Lazily construct producer since KafkaProducer is not serializable
+  private lazy val producer = factory.getKafkaProducer(config)
+
+  override def open(context: TaskContext): Unit = {
+    LOG.info("KafkaSink opened")
+  }
+
+  override def write(message: Message): Unit = {
+    message.msg match {
+      case (k: Array[Byte], v: Array[Byte]) =>
+        val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, k, v)
+        producer.send(record)
+        LOG.debug("KafkaSink sent record {} to Kafka", record)
+      case v: Array[Byte] =>
+        val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, v)
+        producer.send(record)
+        LOG.debug("KafkaSink sent record {} to Kafka", record)
+      case m =>
+        val errorMsg = s"unexpected message type ${m.getClass}; " +
+          s"Array[Byte] or (Array[Byte], Array[Byte]) required"
+        LOG.error(errorMsg)
+    }
+  }
+
+  override def close(): Unit = {
+    producer.close()
+    LOG.info("KafkaSink closed")
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
new file mode 100644
index 0000000..da08b04
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source
+
+import java.util.Properties
+
+import com.twitter.bijection.Injection
+import kafka.common.TopicAndPartition
+import org.apache.gearpump.streaming.kafka.KafkaSource
+import 
org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
+import KafkaClient.KafkaClientFactory
+import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, 
FetchThread}
+import org.apache.gearpump.streaming.kafka.lib.source.grouper.PartitionGrouper
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.streaming.transaction.api._
+import org.apache.gearpump.util.LogUtil
+import org.apache.gearpump.{Message, TimeStamp}
+import org.slf4j.Logger
+
+object AbstractKafkaSource {
+  private val LOG: Logger = LogUtil.getLogger(classOf[KafkaSource])
+}
+
+/**
+ * Contains implementation for Kafka source connectors, users should use
+ * [[org.apache.gearpump.streaming.kafka.KafkaSource]].
+ *
+ * This is a TimeReplayableSource which is able to replay messages given a 
start time.
+ * Each kafka message is tagged with a timestamp by
+ * [[org.apache.gearpump.streaming.transaction.api.MessageDecoder]] and the 
(timestamp, offset)
+ * mapping is stored to a 
[[org.apache.gearpump.streaming.transaction.api.CheckpointStore]].
+ * On recovery, we could retrieve the previously stored offset from the
+ * [[org.apache.gearpump.streaming.transaction.api.CheckpointStore]] by 
timestamp and start to read
+ * from there.
+ *
+ * kafka message is wrapped into gearpump [[org.apache.gearpump.Message]] and 
further filtered by a
+ * [[org.apache.gearpump.streaming.transaction.api.TimeStampFilter]]
+ * such that obsolete messages are dropped.
+ */
+abstract class AbstractKafkaSource(
+    topic: String,
+    props: Properties,
+    kafkaConfigFactory: KafkaConfigFactory,
+    kafkaClientFactory: KafkaClientFactory,
+    fetchThreadFactory: FetchThreadFactory)
+  extends TimeReplayableSource {
+  import org.apache.gearpump.streaming.kafka.lib.source.AbstractKafkaSource._
+
+  def this(topic: String, properties: Properties) = {
+    this(topic, properties, new KafkaConfigFactory, KafkaClient.factory, 
FetchThread.factory)
+  }
+
+  private lazy val config: KafkaConfig = 
kafkaConfigFactory.getKafkaConfig(props)
+  private lazy val kafkaClient: KafkaClient = 
kafkaClientFactory.getKafkaClient(config)
+  private lazy val fetchThread: FetchThread = 
fetchThreadFactory.getFetchThread(config, kafkaClient)
+  private lazy val messageDecoder = config.getConfiguredInstance(
+    KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[MessageDecoder])
+  private lazy val timestampFilter = config.getConfiguredInstance(
+    KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG, classOf[TimeStampFilter])
+
+  private var startTime: Long = 0L
+  private var checkpointStoreFactory: Option[CheckpointStoreFactory] = None
+  private var checkpointStores: Map[TopicAndPartition, CheckpointStore] =
+    Map.empty[TopicAndPartition, CheckpointStore]
+
+  override def setCheckpointStore(checkpointStoreFactory: 
CheckpointStoreFactory): Unit = {
+    this.checkpointStoreFactory = Some(checkpointStoreFactory)
+  }
+
+  override def open(context: TaskContext, startTime: TimeStamp): Unit = {
+    import context.{parallelism, taskId}
+
+    LOG.info("KafkaSource opened at start time {}", startTime)
+    this.startTime = startTime
+    val topicList = topic.split(",", -1).toList
+    val grouper = 
config.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG,
+      classOf[PartitionGrouper])
+    val topicAndPartitions = grouper.group(parallelism, taskId.index,
+      kafkaClient.getTopicAndPartitions(topicList))
+    LOG.info("assigned partitions {}", 
s"Array(${topicAndPartitions.mkString(",")})")
+
+    fetchThread.setTopicAndPartitions(topicAndPartitions)
+    maybeSetupCheckpointStores(topicAndPartitions)
+    maybeRecover()
+  }
+
+  /**
+   * Reads a record from incoming queue, decodes, filters and checkpoints 
offsets
+   * before returns a Message. Message can be null if the incoming queue is 
empty.
+   * @return a [[org.apache.gearpump.Message]] or null
+   */
+  override def read(): Message = {
+    fetchThread.poll.flatMap(filterAndCheckpointMessage).orNull
+  }
+
+  override def close(): Unit = {
+    kafkaClient.close()
+    checkpointStores.foreach(_._2.close())
+    LOG.info("KafkaSource closed")
+  }
+
+  /**
+   * 1. Decodes raw bytes into Message with timestamp
+   * 2. Filters message against start time
+   * 3. Checkpoints (timestamp, kafka_offset)
+   */
+  private def filterAndCheckpointMessage(kafkaMsg: KafkaMessage): 
Option[Message] = {
+    val msg = messageDecoder.fromBytes(kafkaMsg.key.orNull, kafkaMsg.msg)
+    LOG.debug("read message {}", msg)
+    val filtered = timestampFilter.filter(msg, startTime)
+    filtered.foreach { m =>
+      val time = m.timestamp
+      val offset = kafkaMsg.offset
+      LOG.debug("checkpoint message state ({}, {})", time, offset)
+      checkpointOffsets(kafkaMsg.topicAndPartition, time, offset)
+    }
+    filtered
+  }
+
+  private def checkpointOffsets(tp: TopicAndPartition, time: TimeStamp, 
offset: Long): Unit = {
+    checkpointStores.get(tp).foreach(_.persist(time, Injection[Long, 
Array[Byte]](offset)))
+  }
+
+  private def maybeSetupCheckpointStores(tps: Array[TopicAndPartition]): Unit 
= {
+    for {
+      f <- checkpointStoreFactory
+      tp <- tps
+    } {
+      val store = 
f.getCheckpointStore(KafkaConfig.getCheckpointStoreNameSuffix(tp))
+      LOG.info("created checkpoint store for {}", tp)
+      checkpointStores += tp -> store
+    }
+  }
+
+  private def maybeRecover(): Unit = {
+    checkpointStores.foreach { case (tp, store) =>
+      for {
+        bytes <- store.recover(startTime)
+        offset <- Injection.invert[Long, Array[Byte]](bytes).toOption
+      } {
+        LOG.info("recovered offset {} for {}", offset, tp)
+        fetchThread.setStartOffset(tp, offset)
+      }
+    }
+    // let JVM exit when other threads are closed
+    fetchThread.setDaemon(true)
+    fetchThread.start()
+  }
+
+  protected def addCheckpointStore(tp: TopicAndPartition, store: 
CheckpointStore): Unit = {
+    checkpointStores += tp -> store
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala
new file mode 100644
index 0000000..1c1214d
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source
+
+import com.twitter.bijection.Injection
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.transaction.api.MessageDecoder
+
+import scala.util.{Failure, Success}
+
+class DefaultMessageDecoder extends MessageDecoder {
+  override def fromBytes(key: Array[Byte], value: Array[Byte]): Message = {
+    Message(value, System.currentTimeMillis())
+  }
+}
+
+class StringMessageDecoder extends MessageDecoder {
+  override def fromBytes(key: Array[Byte], value: Array[Byte]): Message = {
+    Message(Injection.invert[String, Array[Byte]](value).get,
+      System.currentTimeMillis())
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeper.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeper.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeper.scala
new file mode 100644
index 0000000..62cd519
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeper.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source.consumer
+
+/**
+ * someone sleeps for exponentially increasing duration each time
+ * until the cap
+ *
+ * @param backOffMultiplier The factor by which the duration increases.
+ * @param initialDurationMs Time in milliseconds for initial sleep.
+ * @param maximumDurationMs Cap up to which we will increase the duration.
+ */
+private[consumer] class ExponentialBackoffSleeper(
+    backOffMultiplier: Double = 2.0,
+    initialDurationMs: Long = 100,
+    maximumDurationMs: Long = 10000) {
+
+  require(backOffMultiplier > 1.0, "backOffMultiplier must be greater than 1")
+  require(initialDurationMs > 0, "initialDurationMs must be positive")
+  require(maximumDurationMs >= initialDurationMs, "maximumDurationMs must be 
>= initialDurationMs")
+
+  private var sleepDuration = initialDurationMs
+
+  def reset(): Unit = {
+    sleepDuration = initialDurationMs
+  }
+
+  def sleep(): Unit = {
+    sleep(sleepDuration)
+    setNextSleepDuration()
+  }
+
+  def sleep(duration: Long): Unit = {
+    Thread.sleep(duration)
+  }
+
+  def getSleepDuration: Long = sleepDuration
+
+  def setNextSleepDuration(): Unit = {
+    val next = (sleepDuration * backOffMultiplier).asInstanceOf[Long]
+    sleepDuration = math.min(math.max(initialDurationMs, next), 
maximumDurationMs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
new file mode 100644
index 0000000..3119f40
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThread.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source.consumer
+
+import java.nio.channels.ClosedByInterruptException
+import java.util.concurrent.LinkedBlockingQueue
+
+import kafka.common.TopicAndPartition
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.slf4j.Logger
+
+import org.apache.gearpump.util.LogUtil
+
+object FetchThread {
+  private val LOG: Logger = LogUtil.getLogger(classOf[FetchThread])
+
+  val factory = new FetchThreadFactory
+
+  class FetchThreadFactory extends java.io.Serializable {
+    def getFetchThread(config: KafkaConfig, client: KafkaClient): FetchThread 
= {
+      val fetchThreshold = config.getInt(KafkaConfig.FETCH_THRESHOLD_CONFIG)
+      val fetchSleepMS = config.getInt(KafkaConfig.FETCH_SLEEP_MS_CONFIG)
+      val startOffsetTime = 
config.getLong(KafkaConfig.CONSUMER_START_OFFSET_CONFIG)
+      FetchThread(fetchThreshold, fetchSleepMS, startOffsetTime, client)
+    }
+  }
+
+  def apply(fetchThreshold: Int,
+      fetchSleepMS: Long,
+      startOffsetTime: Long,
+      client: KafkaClient): FetchThread = {
+    val createConsumer = (tp: TopicAndPartition) =>
+      client.createConsumer(tp.topic, tp.partition, startOffsetTime)
+    val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+    val sleeper = new ExponentialBackoffSleeper(
+      backOffMultiplier = 2.0,
+      initialDurationMs = 100L,
+      maximumDurationMs = 10000L)
+    new FetchThread(createConsumer, incomingQueue, sleeper, fetchThreshold, 
fetchSleepMS)
+  }
+}
+
+/**
+ * A thread to fetch messages from multiple kafka 
org.apache.kafka.TopicAndPartition and puts them
+ * onto a queue, which is asynchronously polled by a consumer
+ *
+ * @param createConsumer given a org.apache.kafka.TopicAndPartition, create a
+ *                       [[KafkaConsumer]] to
+ *                       connect to it
+ * @param incomingQueue a queue to buffer incoming messages
+ * @param fetchThreshold above which thread should stop fetching messages
+ * @param fetchSleepMS interval to sleep when no more messages or hitting 
fetchThreshold
+ */
+private[kafka] class FetchThread(
+    createConsumer: TopicAndPartition => KafkaConsumer,
+    incomingQueue: LinkedBlockingQueue[KafkaMessage],
+    sleeper: ExponentialBackoffSleeper,
+    fetchThreshold: Int,
+    fetchSleepMS: Long) extends Thread {
+  import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread._
+
+  private var consumers: Map[TopicAndPartition, KafkaConsumer] =
+    Map.empty[TopicAndPartition, KafkaConsumer]
+  private var topicAndPartitions: Array[TopicAndPartition] =
+    Array.empty[TopicAndPartition]
+  private var nextOffsets = Map.empty[TopicAndPartition, Long]
+  private var reset = false
+
+  def setTopicAndPartitions(topicAndPartitions: Array[TopicAndPartition]): 
Unit = {
+    this.topicAndPartitions = topicAndPartitions
+    consumers = createAllConsumers
+  }
+
+  def setStartOffset(tp: TopicAndPartition, startOffset: Long): Unit = {
+    consumers.get(tp).foreach(_.setStartOffset(startOffset))
+  }
+
+  def poll: Option[KafkaMessage] = {
+    Option(incomingQueue.poll())
+  }
+
+  override def run(): Unit = {
+    try {
+      while (!Thread.currentThread().isInterrupted) {
+        runLoop()
+      }
+    } catch {
+      case e: InterruptedException => LOG.info("fetch thread got interrupted 
exception")
+      case e: ClosedByInterruptException => LOG.info("fetch thread closed by 
interrupt exception")
+    } finally {
+      consumers.values.foreach(_.close())
+    }
+  }
+
+  private[lib] def runLoop(): Unit = {
+    try {
+      if (reset) {
+        nextOffsets = consumers.mapValues(_.getNextOffset)
+        resetConsumers(nextOffsets)
+        reset = false
+      }
+      val hasMoreMessages = fetchMessage
+      sleeper.reset()
+      if (!hasMoreMessages) {
+        // sleep for given duration
+        sleeper.sleep(fetchSleepMS)
+      }
+    } catch {
+      case exception: Exception =>
+        LOG.warn(s"resetting consumers due to $exception")
+        reset = true
+        // sleep for exponentially increasing duration
+        sleeper.sleep()
+    }
+  }
+
+  /**
+   * fetch message from each TopicAndPartition in a round-robin way
+   */
+  private def fetchMessage: Boolean = {
+    consumers.foldLeft(false) { (hasNext, tpAndConsumer) =>
+      val (_, consumer) = tpAndConsumer
+      if (incomingQueue.size < fetchThreshold) {
+        if (consumer.hasNext) {
+          incomingQueue.put(consumer.next())
+          true
+        } else {
+          hasNext
+        }
+      } else {
+        true
+      }
+    }
+  }
+
+  private def createAllConsumers: Map[TopicAndPartition, KafkaConsumer] = {
+    topicAndPartitions.map(tp => tp -> createConsumer(tp)).toMap
+  }
+
+  private def resetConsumers(nextOffsets: Map[TopicAndPartition, Long]): Unit 
= {
+    consumers.values.foreach(_.close())
+    consumers = createAllConsumers
+    consumers.foreach { case (tp, consumer) =>
+      consumer.setStartOffset(nextOffsets(tp))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumer.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumer.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumer.scala
new file mode 100644
index 0000000..1e307cc
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumer.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source.consumer
+
+import kafka.api.{FetchRequestBuilder, OffsetRequest}
+import kafka.common.ErrorMapping._
+import kafka.common.TopicAndPartition
+import kafka.consumer.SimpleConsumer
+import kafka.message.MessageAndOffset
+import kafka.utils.Utils
+
+object KafkaConsumer {
+  def apply(topic: String, partition: Int, startOffsetTime: Long,
+      fetchSize: Int, consumer: SimpleConsumer): KafkaConsumer = {
+    val getIterator = (offset: Long) => {
+      val request = new FetchRequestBuilder()
+        .addFetch(topic, partition, offset, fetchSize)
+        .build()
+
+      val response = consumer.fetch(request)
+      response.errorCode(topic, partition) match {
+        case NoError => response.messageSet(topic, partition).iterator
+        case error => throw exceptionFor(error)
+      }
+    }
+    new KafkaConsumer(consumer, topic, partition, getIterator, startOffsetTime)
+  }
+}
+
+/**
+ * uses kafka kafka.consumer.SimpleConsumer to consume and iterate over
+ * messages from a kafka kafka.common.TopicAndPartition.
+ */
+class KafkaConsumer(consumer: SimpleConsumer,
+    topic: String,
+    partition: Int,
+    getIterator: (Long) => Iterator[MessageAndOffset],
+    startOffsetTime: Long = OffsetRequest.EarliestTime) {
+  private val earliestOffset = consumer
+    .earliestOrLatestOffset(TopicAndPartition(topic, partition), 
startOffsetTime, -1)
+  private var nextOffset: Long = earliestOffset
+  private var iterator: Iterator[MessageAndOffset] = getIterator(nextOffset)
+
+  def setStartOffset(startOffset: Long): Unit = {
+    nextOffset = startOffset
+    iterator = getIterator(nextOffset)
+  }
+
+  def next(): KafkaMessage = {
+    val mo = iterator.next()
+    val message = mo.message
+
+    nextOffset = mo.nextOffset
+
+    val offset = mo.offset
+    val payload = Utils.readBytes(message.payload)
+    new KafkaMessage(topic, partition, offset, 
Option(message.key).map(Utils.readBytes), payload)
+  }
+
+  def hasNext: Boolean = {
+    @annotation.tailrec
+    def hasNextHelper(iter: Iterator[MessageAndOffset], newIterator: Boolean): 
Boolean = {
+      if (iter.hasNext) true
+      else if (newIterator) false
+      else {
+        iterator = getIterator(nextOffset)
+        hasNextHelper(iterator, newIterator = true)
+      }
+    }
+    hasNextHelper(iterator, newIterator = false)
+  }
+
+  def getNextOffset: Long = nextOffset
+
+  def close(): Unit = {
+    consumer.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaMessage.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaMessage.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaMessage.scala
new file mode 100644
index 0000000..d2a404d
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaMessage.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source.consumer
+
+import kafka.common.TopicAndPartition
+
+/**
+ * wrapper over messages from kafka
+ * @param topicAndPartition where message comes from
+ * @param offset message offset on kafka queue
+ * @param key message key, could be None
+ * @param msg message payload
+ */
+case class KafkaMessage(topicAndPartition: TopicAndPartition, offset: Long,
+    key: Option[Array[Byte]], msg: Array[Byte]) {
+
+  def this(topic: String, partition: Int, offset: Long,
+      key: Option[Array[Byte]], msg: Array[Byte]) = {
+    this(TopicAndPartition(topic, partition), offset, key, msg)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouper.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouper.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouper.scala
new file mode 100644
index 0000000..f2baf2f
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouper.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source.grouper
+
+import kafka.common.TopicAndPartition
+
+/**
+ * default grouper groups TopicAndPartitions among StreamProducers by 
partitions
+ *
+ * e.g. given 2 topics (topicA with 2 partitions and topicB with 3 partitions) 
and
+ * 2 streamProducers (streamProducer0 and streamProducer1)
+ *
+ * streamProducer0 gets (topicA, partition1), (topicB, partition1) and 
(topicA, partition3)
+ * streamProducer1 gets (topicA, partition2), (topicB, partition2)
+ */
+class DefaultPartitionGrouper extends PartitionGrouper {
+  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: 
Array[TopicAndPartition])
+    : Array[TopicAndPartition] = {
+    topicAndPartitions.indices.filter(_ % taskNum == taskIndex)
+      .map(i => topicAndPartitions(i)).toArray
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/PartitionGrouper.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/PartitionGrouper.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/PartitionGrouper.scala
new file mode 100644
index 0000000..83660e5
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/PartitionGrouper.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source.grouper
+
+import kafka.common.TopicAndPartition
+
+/**
+ * this class dispatches kafka kafka.common.TopicAndPartition to gearpump tasks
+ */
+trait PartitionGrouper {
+  def group(taskNum: Int, taskIndex: Int, topicAndPartitions: 
Array[TopicAndPartition])
+    : Array[TopicAndPartition]
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
new file mode 100644
index 0000000..e2450f4
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.store
+
+import java.util.Properties
+
+import com.twitter.bijection.Injection
+import kafka.api.OffsetRequest
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.kafka.lib.source.consumer.KafkaConsumer
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
+import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, 
CheckpointStoreFactory}
+import org.apache.gearpump.util.LogUtil
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.apache.kafka.common.serialization.ByteArraySerializer
+
+/**
+ * Factory class that constructs a KafkaStore
+ *
+ * @param props configuration for kafka store
+ */
+
+abstract class AbstractKafkaStoreFactory(
+    props: Properties,
+    configFactory: KafkaConfigFactory)
+  extends CheckpointStoreFactory {
+
+  def this(props: Properties) {
+    this(props, new KafkaConfigFactory)
+  }
+
+  private lazy val config: KafkaConfig = configFactory.getKafkaConfig(props)
+
+  override def getCheckpointStore(name: String): CheckpointStore = {
+    val topic = config.getKafkaStoreTopic(name)
+    val client = config.getKafkaClientFactory.getKafkaClient(config)
+    val replicas = config.getInt(KafkaConfig.REPLICATION_FACTOR_CONFIG)
+    val topicExists = client.createTopic(topic, 1, replicas)
+    val consumer = if (topicExists) {
+      Some(client.createConsumer(topic, 0, OffsetRequest.EarliestTime))
+    } else {
+      None
+    }
+    val producer = client.createProducer[Array[Byte], Array[Byte]](new 
ByteArraySerializer,
+      new ByteArraySerializer)
+    new KafkaStore(topic, producer, consumer)
+  }
+}
+
+object KafkaStore {
+  private val LOG = LogUtil.getLogger(classOf[KafkaStore])
+}
+
+/**
+ * checkpoints (timestamp, state) to kafka
+ *
+ * @param topic kafka store topic
+ * @param producer kafka producer
+ * @param optConsumer kafka consumer
+ */
+class KafkaStore private[kafka](
+    val topic: String,
+    val producer: KafkaProducer[Array[Byte], Array[Byte]],
+    val optConsumer: Option[KafkaConsumer])
+  extends CheckpointStore {
+  import org.apache.gearpump.streaming.kafka.lib.store.KafkaStore._
+
+  private var maxTime: TimeStamp = 0L
+
+  override def persist(time: TimeStamp, checkpoint: Array[Byte]): Unit = {
+    // make sure checkpointed timestamp is monotonically increasing
+    // hence (1, 1), (3, 2), (2, 3) is checkpointed as (1, 1), (3, 2), (3, 3)
+    if (time > maxTime) {
+      maxTime = time
+    }
+    val key = maxTime
+    val value = checkpoint
+    val message = new ProducerRecord[Array[Byte], Array[Byte]](
+      topic, 0, Injection[Long, Array[Byte]](key), value)
+    producer.send(message)
+    LOG.debug("KafkaStore persisted state ({}, {})", key, value)
+  }
+
+  override def recover(time: TimeStamp): Option[Array[Byte]] = {
+    var checkpoint: Option[Array[Byte]] = None
+    optConsumer.foreach { consumer =>
+      while (consumer.hasNext && checkpoint.isEmpty) {
+        val kafkaMsg = consumer.next()
+        checkpoint = for {
+          k <- kafkaMsg.key
+          t <- Injection.invert[TimeStamp, Array[Byte]](k).toOption
+          c = kafkaMsg.msg if t >= time
+        } yield c
+      }
+      consumer.close()
+    }
+    checkpoint match {
+      case Some(c) =>
+        LOG.info(s"KafkaStore recovered checkpoint ($time, $c)")
+      case None =>
+        LOG.info(s"no checkpoint existing for $time")
+    }
+    checkpoint
+  }
+
+  override def close(): Unit = {
+    producer.close()
+    LOG.info("KafkaStore closed")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala
new file mode 100644
index 0000000..417b6de
--- /dev/null
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.util
+
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.cluster.Broker
+import kafka.common.TopicAndPartition
+import kafka.consumer.SimpleConsumer
+import kafka.utils.{ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+import org.apache.gearpump.streaming.kafka.lib.source.consumer.KafkaConsumer
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.apache.gearpump.util.LogUtil
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.common.serialization.Serializer
+
+object KafkaClient {
+  private val LOG = LogUtil.getLogger(classOf[KafkaClient])
+
+  val factory = new KafkaClientFactory
+
+  class KafkaClientFactory extends java.io.Serializable {
+    def getKafkaClient(config: KafkaConfig): KafkaClient = {
+      val consumerConfig = config.getConsumerConfig
+      val zkClient = new ZkClient(consumerConfig.zkConnect, 
consumerConfig.zkSessionTimeoutMs,
+        consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
+      new KafkaClient(config, zkClient)
+    }
+  }
+}
+
+class KafkaClient(config: KafkaConfig, zkClient: ZkClient) {
+  import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient._
+
+  private val consumerConfig = config.getConsumerConfig
+
+  def getTopicAndPartitions(consumerTopics: List[String]): 
Array[TopicAndPartition] = {
+    try {
+      ZkUtils.getPartitionsForTopics(zkClient, consumerTopics).flatMap {
+        case (topic, partitions) => partitions.map(TopicAndPartition(topic, _))
+      }.toArray
+    } catch {
+      case e: Exception =>
+        LOG.error(e.getMessage)
+        throw e
+    }
+  }
+
+  def getBroker(topic: String, partition: Int): Broker = {
+    try {
+      val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+        .getOrElse(throw new RuntimeException(
+          s"leader not available for TopicAndPartition($topic, $partition)"))
+      ZkUtils.getBrokerInfo(zkClient, leader)
+        .getOrElse(throw new RuntimeException(s"broker info not found for 
leader $leader"))
+    } catch {
+      case e: Exception =>
+        LOG.error(e.getMessage)
+        throw e
+    }
+  }
+
+  def createConsumer(topic: String, partition: Int, startOffsetTime: Long): 
KafkaConsumer = {
+    val broker = getBroker(topic, partition)
+    val soTimeout = consumerConfig.socketTimeoutMs
+    val soBufferSize = consumerConfig.socketReceiveBufferBytes
+    val clientId = consumerConfig.clientId
+    val fetchSize = consumerConfig.fetchMessageMaxBytes
+    val consumer = new SimpleConsumer(broker.host, broker.port, soTimeout, 
soBufferSize, clientId)
+    KafkaConsumer(topic, partition, startOffsetTime, fetchSize, consumer)
+  }
+
+  def createProducer[K, V](keySerializer: Serializer[K],
+      valueSerializer: Serializer[V]): KafkaProducer[K, V] = {
+    new KafkaProducer[K, V](config.getProducerConfig, keySerializer, 
valueSerializer)
+  }
+
+  /**
+   * create a new kafka topic
+   * return true if topic already exists, and false otherwise
+   */
+  def createTopic(topic: String, partitions: Int, replicas: Int): Boolean = {
+    try {
+      if (AdminUtils.topicExists(zkClient, topic)) {
+        LOG.info(s"topic $topic exists")
+        true
+      } else {
+        AdminUtils.createTopic(zkClient, topic, partitions, replicas)
+        LOG.info(s"created topic $topic")
+        false
+      }
+    } catch {
+      case e: Exception =>
+        LOG.error(e.getMessage)
+        throw e
+    }
+  }
+
+  def close(): Unit = {
+    zkClient.close()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala
index 2f8a533..62a70bd 100644
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSinkSpec.scala
@@ -18,7 +18,12 @@
 
 package org.apache.gearpump.streaming.kafka
 
+import java.util.Properties
+
 import com.twitter.bijection.Injection
+import 
org.apache.gearpump.streaming.kafka.lib.sink.AbstractKafkaSink.KafkaProducerFactory
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
@@ -40,9 +45,17 @@ class KafkaSinkSpec extends PropSpec with PropertyChecks 
with Matchers with Mock
   property("KafkaSink write should send producer record") {
     forAll(dataGen) {
       (data: (String, Array[Byte], Array[Byte])) =>
+        val props = mock[Properties]
         val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+        val producerFactory = mock[KafkaProducerFactory]
+        val configFactory = mock[KafkaConfigFactory]
+        val config = mock[KafkaConfig]
+
+        when(configFactory.getKafkaConfig(props)).thenReturn(config)
+        when(producerFactory.getKafkaProducer(config)).thenReturn(producer)
+
         val (topic, key, msg) = data
-        val kafkaSink = new KafkaSink(() => producer, topic)
+        val kafkaSink = new KafkaSink(topic, props, configFactory, 
producerFactory)
         kafkaSink.write(Message((key, msg)))
         verify(producer).send(MockUtil.argMatch[ProducerRecord[Array[Byte], 
Array[Byte]]](
           r => r.topic == topic && (r.key sameElements key) && (r.value 
sameElements msg)))
@@ -55,8 +68,16 @@ class KafkaSinkSpec extends PropSpec with PropertyChecks 
with Matchers with Mock
   }
 
   property("KafkaSink close should close kafka producer") {
+    val props = mock[Properties]
     val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-    val kafkaSink = new KafkaSink(() => producer, "topic")
+    val producerFactory = mock[KafkaProducerFactory]
+    val configFactory = mock[KafkaConfigFactory]
+    val config = mock[KafkaConfig]
+
+    when(configFactory.getKafkaConfig(props)).thenReturn(config)
+    when(producerFactory.getKafkaProducer(config)).thenReturn(producer)
+
+    val kafkaSink = new KafkaSink("topic", props, configFactory, 
producerFactory)
     kafkaSink.close()
     verify(producer).close()
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
index 8055d00..e40276f 100644
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
@@ -18,10 +18,21 @@
 
 package org.apache.gearpump.streaming.kafka
 
-import scala.util.{Failure, Success}
+import java.util.Properties
 
 import com.twitter.bijection.Injection
 import kafka.common.TopicAndPartition
+import org.apache.gearpump.streaming.MockUtil
+import 
org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory
+import org.apache.gearpump.streaming.kafka.lib.source.grouper.PartitionGrouper
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
+import KafkaClient.KafkaClientFactory
+import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, 
FetchThread}
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
+import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, 
CheckpointStoreFactory, MessageDecoder, TimeStampFilter}
+import org.apache.gearpump.Message
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalacheck.Gen
@@ -29,139 +40,168 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.kafka.lib.consumer.{FetchThread, 
KafkaMessage}
-import org.apache.gearpump.streaming.kafka.lib.{KafkaOffsetManager, 
KafkaSourceConfig}
-import org.apache.gearpump.streaming.transaction.api.OffsetStorage.StorageEmpty
-import org.apache.gearpump.streaming.transaction.api.{MessageDecoder, 
OffsetStorageFactory, TimeStampFilter}
-
 class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
 
-  val startTimeGen = Gen.choose[Long](0L, 1000L)
-  val offsetGen = Gen.choose[Long](0L, 1000L)
-
-  property("KafkaSource open sets consumer to earliest offset") {
-    val topicAndPartition = mock[TopicAndPartition]
-    val fetchThread = mock[FetchThread]
-    val offsetManager = mock[KafkaOffsetManager]
-    val messageDecoder = mock[MessageDecoder]
-    val timestampFilter = mock[TimeStampFilter]
-    val offsetStorageFactory = mock[OffsetStorageFactory]
-    val kafkaConfig = mock[KafkaSourceConfig]
-    val kafkaSource = new KafkaSource(kafkaConfig, offsetStorageFactory, 
messageDecoder,
-      timestampFilter, Some(fetchThread), Map(topicAndPartition -> 
offsetManager))
-
-    kafkaSource.setStartTime(None)
-
-    verify(fetchThread).start()
-    verify(fetchThread, 
never()).setStartOffset(anyObject[TopicAndPartition](), anyLong())
-  }
-
-  property("KafkaSource open should not set consumer start offset if offset 
storage is empty") {
-    forAll(startTimeGen) { (startTime: Long) =>
-      val offsetManager = mock[KafkaOffsetManager]
-      val topicAndPartition = mock[TopicAndPartition]
+  val startTimeGen = Gen.choose[Long](0L, 100L)
+  val offsetGen = Gen.choose[Long](0L, 100L)
+  val topicAndPartitionGen = for {
+    topic <- Gen.alphaStr suchThat (_.nonEmpty)
+    partition <- Gen.chooseNum[Int](1, 100)
+  } yield TopicAndPartition(topic, partition)
+  val tpsGen = Gen.listOf[TopicAndPartition](topicAndPartitionGen) suchThat 
(_.nonEmpty)
+
+  property("KafkaSource open should not recover without checkpoint") {
+    forAll(startTimeGen, tpsGen) { (startTime: Long, tps: 
List[TopicAndPartition]) =>
+      val taskContext = MockUtil.mockTaskContext
       val fetchThread = mock[FetchThread]
-      val messageDecoder = mock[MessageDecoder]
-      val timestampFilter = mock[TimeStampFilter]
-      val offsetStorageFactory = mock[OffsetStorageFactory]
-      val kafkaConfig = mock[KafkaSourceConfig]
-      val source = new KafkaSource(kafkaConfig, offsetStorageFactory, 
messageDecoder,
-        timestampFilter, Some(fetchThread), Map(topicAndPartition -> 
offsetManager))
-
-      
when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(StorageEmpty))
+      val kafkaClient = mock[KafkaClient]
+      val clientFactory = mock[KafkaClientFactory]
+      val threadFactory = mock[FetchThreadFactory]
+      val topics = tps.map(_.topic)
+      val properties = mock[Properties]
+      val kafkaConfig = mock[KafkaConfig]
+      val configFactory = mock[KafkaConfigFactory]
+      val partitionGrouper = mock[PartitionGrouper]
+
+      when(configFactory.getKafkaConfig(properties)).thenReturn(kafkaConfig)
+      when(clientFactory.getKafkaClient(kafkaConfig)).thenReturn(kafkaClient)
+      val tpsArray = tps.toArray
+      when(kafkaClient.getTopicAndPartitions(topics)).thenReturn(tpsArray)
+      
when(kafkaConfig.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG,
+        classOf[PartitionGrouper])).thenReturn(partitionGrouper)
+      when(partitionGrouper.group(taskContext.parallelism, 
taskContext.taskId.index, tpsArray))
+        .thenReturn(tpsArray)
+      when(threadFactory.getFetchThread(kafkaConfig, 
kafkaClient)).thenReturn(fetchThread)
+
+      val source = new KafkaSource(topics.mkString(","), properties, 
configFactory,
+        clientFactory, threadFactory)
+
+      source.open(taskContext, startTime)
 
-      source.setStartTime(Some(startTime))
       verify(fetchThread, 
never()).setStartOffset(anyObject[TopicAndPartition](), anyLong())
-      verify(fetchThread).start()
-
-      when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new 
RuntimeException))
-      intercept[RuntimeException] {
-        source.setStartTime(Some(startTime))
-      }
-      source.close()
     }
   }
 
-  property("KafkaSource open should set consumer start offset if offset 
storage is not empty") {
-    forAll(startTimeGen, offsetGen) {
-      (startTime: Long, offset: Long) =>
-        val offsetManager = mock[KafkaOffsetManager]
-        val topicAndPartition = mock[TopicAndPartition]
+  property("KafkaSource open should recover with checkpoint") {
+    forAll(startTimeGen, offsetGen, tpsGen) {
+      (startTime: Long, offset: Long, tps: List[TopicAndPartition]) =>
+        val taskContext = MockUtil.mockTaskContext
+        val checkpointStoreFactory = mock[CheckpointStoreFactory]
+        val checkpointStores = tps.map(_ -> mock[CheckpointStore]).toMap
+        val topics = tps.map(_.topic)
+        val properties = mock[Properties]
+        val kafkaConfig = mock[KafkaConfig]
+        val configFactory = mock[KafkaConfigFactory]
+        val kafkaClient = mock[KafkaClient]
+        val clientFactory = mock[KafkaClientFactory]
         val fetchThread = mock[FetchThread]
-        val messageDecoder = mock[MessageDecoder]
-        val timestampFilter = mock[TimeStampFilter]
-        val offsetStorageFactory = mock[OffsetStorageFactory]
-        val kafkaConfig = mock[KafkaSourceConfig]
-        val source = new KafkaSource(kafkaConfig, offsetStorageFactory, 
messageDecoder,
-          timestampFilter, Some(fetchThread), Map(topicAndPartition -> 
offsetManager))
-
-        
when(offsetManager.resolveOffset(startTime)).thenReturn(Success(offset))
-
-        source.setStartTime(Some(startTime))
-        verify(fetchThread).setStartOffset(topicAndPartition, offset)
-        verify(fetchThread).start()
-
-        when(offsetManager.resolveOffset(startTime)).thenReturn(Failure(new 
RuntimeException))
-        intercept[RuntimeException] {
-          source.setStartTime(Some(startTime))
+        val threadFactory = mock[FetchThreadFactory]
+        val partitionGrouper = mock[PartitionGrouper]
+
+        when(configFactory.getKafkaConfig(properties)).thenReturn(kafkaConfig)
+        when(clientFactory.getKafkaClient(kafkaConfig)).thenReturn(kafkaClient)
+        when(threadFactory.getFetchThread(kafkaConfig, 
kafkaClient)).thenReturn(fetchThread)
+        val tpsArray = tps.toArray
+        when(kafkaClient.getTopicAndPartitions(topics)).thenReturn(tps.toArray)
+        
when(kafkaConfig.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG,
+          classOf[PartitionGrouper])).thenReturn(partitionGrouper)
+        when(partitionGrouper.group(taskContext.parallelism, 
taskContext.taskId.index, tpsArray))
+          .thenReturn(tpsArray)
+
+        val source = new KafkaSource(topics.mkString(","), properties, 
configFactory,
+          clientFactory, threadFactory)
+        checkpointStores.foreach{ case (tp, store) => 
source.addPartitionAndStore(tp, store) }
+
+        checkpointStores.foreach { case (tp, store) =>
+          when(checkpointStoreFactory.getCheckpointStore(
+            KafkaConfig.getCheckpointStoreNameSuffix(tp))).thenReturn(store)
+          when(store.recover(startTime)).thenReturn(Some(Injection[Long, 
Array[Byte]](offset)))
         }
-        source.close()
+
+        source.setCheckpointStore(checkpointStoreFactory)
+        source.open(taskContext, startTime)
+
+        tps.foreach(tp => verify(fetchThread).setStartOffset(tp, offset))
     }
   }
 
-  property("KafkaSource read should return number of messages in best effort") 
{
-    val kafkaMsgGen = for {
-      topic <- Gen.alphaStr
-      partition <- Gen.choose[Int](0, 1000)
+  property("KafkaSource read checkpoints offset and returns a message or 
null") {
+    val msgGen = for {
+      tp <- topicAndPartitionGen
       offset <- Gen.choose[Long](0L, 1000L)
-      key = None
+      key = Some(Injection[Long, Array[Byte]](offset))
       msg <- Gen.alphaStr.map(Injection[String, Array[Byte]])
-    } yield KafkaMessage(TopicAndPartition(topic, partition), offset, key, msg)
-    val msgQueueGen = Gen.containerOf[Array, KafkaMessage](kafkaMsgGen)
-    forAll(msgQueueGen) {
-      (msgQueue: Array[KafkaMessage]) =>
-        val offsetManager = mock[KafkaOffsetManager]
-        val fetchThread = mock[FetchThread]
-        val messageDecoder = mock[MessageDecoder]
-
-        val timestampFilter = mock[TimeStampFilter]
-        val offsetStorageFactory = mock[OffsetStorageFactory]
-        val kafkaConfig = mock[KafkaSourceConfig]
-        val offsetManagers = msgQueue.map(_.topicAndPartition -> 
offsetManager).toMap
-
-        val source = new KafkaSource(kafkaConfig, offsetStorageFactory, 
messageDecoder,
-          timestampFilter, Some(fetchThread), offsetManagers)
-
-        if (msgQueue.isEmpty) {
-          when(fetchThread.poll).thenReturn(None)
-          source.read() shouldBe null
-        } else {
-          msgQueue.indices.foreach { i =>
-            val message = Message(msgQueue(i).msg)
-            when(fetchThread.poll).thenReturn(Option(msgQueue(i)))
-            
when(messageDecoder.fromBytes(anyObject[Array[Byte]])).thenReturn(message)
-            when(offsetManager.filter(anyObject[(Message, 
Long)])).thenReturn(Some(message))
-            when(timestampFilter.filter(anyObject[Message], 
anyLong())).thenReturn(Some(message))
-
-            source.read shouldBe message
-          }
+    } yield KafkaMessage(tp, offset, key, msg)
+    val msgQueueGen = Gen.listOf[KafkaMessage](msgGen)
+
+    forAll(msgQueueGen) { (msgQueue: List[KafkaMessage]) =>
+      val properties = mock[Properties]
+      val config = mock[KafkaConfig]
+      val configFactory = mock[KafkaConfigFactory]
+      val timestampFilter = mock[TimeStampFilter]
+      val messageDecoder = mock[MessageDecoder]
+      val kafkaClient = mock[KafkaClient]
+      val clientFactory = mock[KafkaClientFactory]
+      val fetchThread = mock[FetchThread]
+      val threadFactory = mock[FetchThreadFactory]
+      val checkpointStoreFactory = mock[CheckpointStoreFactory]
+
+      val checkpointStores = msgQueue.map(_.topicAndPartition -> 
mock[CheckpointStore]).toMap
+      val topics = checkpointStores.map(_._1.topic).mkString(",")
+
+      checkpointStores.foreach { case (tp, store) =>
+        when(checkpointStoreFactory.getCheckpointStore(KafkaConfig
+          .getCheckpointStoreNameSuffix(tp))).thenReturn(store)
+      }
+      when(configFactory.getKafkaConfig(properties)).thenReturn(config)
+      
when(config.getConfiguredInstance(KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG,
+        classOf[TimeStampFilter])).thenReturn(timestampFilter)
+      
when(config.getConfiguredInstance(KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG,
+        classOf[MessageDecoder])).thenReturn(messageDecoder)
+      when(clientFactory.getKafkaClient(config)).thenReturn(kafkaClient)
+      when(threadFactory.getFetchThread(config, 
kafkaClient)).thenReturn(fetchThread)
+
+      val source = new KafkaSource(topics, properties, configFactory, 
clientFactory, threadFactory)
+      checkpointStores.foreach{ case (tp, store) => 
source.addPartitionAndStore(tp, store) }
+      source.setCheckpointStore(checkpointStoreFactory)
+
+      if (msgQueue.isEmpty) {
+        when(fetchThread.poll).thenReturn(None)
+        source.read() shouldBe null
+      } else {
+        msgQueue.foreach { kafkaMsg =>
+          when(fetchThread.poll).thenReturn(Option(kafkaMsg))
+          val message = Message(kafkaMsg.msg, kafkaMsg.offset)
+          when(messageDecoder.fromBytes(kafkaMsg.key.get, 
kafkaMsg.msg)).thenReturn(message)
+          when(timestampFilter.filter(message, 0)).thenReturn(Some(message))
+          source.read() shouldBe Message(kafkaMsg.msg, kafkaMsg.offset)
+          verify(checkpointStores(kafkaMsg.topicAndPartition)).persist(
+            kafkaMsg.offset, Injection[Long, Array[Byte]](kafkaMsg.offset))
         }
-        source.close()
+      }
     }
   }
 
-  property("KafkaSource close should close all offset managers") {
-    val offsetManager = mock[KafkaOffsetManager]
-    val topicAndPartition = mock[TopicAndPartition]
-    val fetchThread = mock[FetchThread]
-    val timestampFilter = mock[TimeStampFilter]
-    val messageDecoder = mock[MessageDecoder]
-    val offsetStorageFactory = mock[OffsetStorageFactory]
-    val kafkaConfig = mock[KafkaSourceConfig]
-    val source = new KafkaSource(kafkaConfig, offsetStorageFactory, 
messageDecoder,
-      timestampFilter, Some(fetchThread), Map(topicAndPartition -> 
offsetManager))
-    source.close()
-    verify(offsetManager).close()
+  property("KafkaSource close should close all checkpoint stores") {
+    forAll(Gen.chooseNum[Int](1, 100)) { (num: Int) =>
+      val tps = 0.until(num).map(id => TopicAndPartition("topic", id))
+      val checkpointStores = tps.map(_ -> mock[CheckpointStore]).toMap
+      val props = mock[Properties]
+      val kafkaConfig = mock[KafkaConfig]
+      val configFactory = mock[KafkaConfigFactory]
+      val threadFactory = mock[FetchThreadFactory]
+      val kafkaClient = mock[KafkaClient]
+      val clientFactory = mock[KafkaClientFactory]
+
+      when(configFactory.getKafkaConfig(props)).thenReturn(kafkaConfig)
+      when(clientFactory.getKafkaClient(kafkaConfig)).thenReturn(kafkaClient)
+
+      val source = new KafkaSource("topic", props, configFactory, 
clientFactory, threadFactory)
+      checkpointStores.foreach{ case (tp, store) => 
source.addPartitionAndStore(tp, store) }
+      source.close()
+
+      verify(kafkaClient).close()
+      checkpointStores.foreach{ case (_, store) => verify(store).close() }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
new file mode 100644
index 0000000..67c64c4
--- /dev/null
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.kafka
+
+import java.util.Properties
+
+import com.twitter.bijection.Injection
+import kafka.api.OffsetRequest
+import kafka.common.TopicAndPartition
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, 
KafkaConsumer}
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
+import KafkaClient.KafkaClientFactory
+import org.apache.gearpump.streaming.kafka.lib.store.KafkaStore
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig
+import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
+import org.apache.kafka.clients.producer.{Producer, ProducerRecord, 
KafkaProducer}
+import org.apache.kafka.common.serialization.Serializer
+import org.mockito.Matchers._
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.prop.PropertyChecks
+
+
+class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
+
+  val timestampGen = Gen.chooseNum[Long](0L, 100L)
+
+  property("KafkaStoreFactory should get KafkaStore given store name") {
+    forAll(Gen.alphaStr, Gen.alphaStr, Gen.oneOf(true, false)) {
+      (prefix: String, name: String, topicExists: Boolean) =>
+        val props = mock[Properties]
+        val config = mock[KafkaConfig]
+        val configFactory = mock[KafkaConfigFactory]
+        val clientFactory = mock[KafkaClientFactory]
+        val client = mock[KafkaClient]
+        val consumer = mock[KafkaConsumer]
+        val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+        val topic = s"$prefix-$name"
+        val replica = 1
+
+        when(configFactory.getKafkaConfig(props)).thenReturn(config)
+        when(config.getKafkaStoreTopic(name)).thenReturn(topic)
+        
when(config.getString(KafkaConfig.CHECKPOINT_STORE_NAME_PREFIX_CONFIG)).thenReturn(prefix)
+        
when(config.getInt(KafkaConfig.REPLICATION_FACTOR_CONFIG)).thenReturn(replica)
+        when(config.getKafkaClientFactory).thenReturn(clientFactory)
+        when(clientFactory.getKafkaClient(config)).thenReturn(client)
+        when(client.createTopic(topic, 1, replica)).thenReturn(topicExists)
+        if (topicExists) {
+          when(client.createConsumer(topic, 0, 
OffsetRequest.EarliestTime)).thenReturn(consumer)
+        }
+        when(client.createProducer[Array[Byte], 
Array[Byte]](any[Serializer[Array[Byte]]],
+          any[Serializer[Array[Byte]]])).thenReturn(producer)
+
+        val storeFactory = new KafkaStoreFactory(props, configFactory)
+        storeFactory.getCheckpointStore(name) shouldBe a [KafkaStore]
+
+        if (topicExists) {
+          verify(client).createConsumer(topic, 0, OffsetRequest.EarliestTime)
+        }
+    }
+  }
+
+  property("KafkaStore should close producer on close") {
+    forAll(Gen.alphaStr) { (topic: String) =>
+      val consumer = mock[KafkaConsumer]
+      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+      val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
+      kafkaStore.close()
+      verify(producer).close()
+    }
+  }
+
+  property("KafkaStore should read checkpoint from timestamp on recover") {
+    forAll(Gen.alphaStr, timestampGen) {
+      (topic: String, recoverTime: TimeStamp) =>
+        val consumer = mock[KafkaConsumer]
+        val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+        val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
+
+        // case 1: no checkpoint available
+        when(consumer.hasNext).thenReturn(false)
+        kafkaStore.recover(recoverTime) shouldBe None
+        verify(consumer).close()
+    }
+
+    forAll(Gen.alphaStr, timestampGen) {
+      (topic: String, recoverTime: TimeStamp) =>
+        val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+        val kafkaStore = new KafkaStore(topic, producer, None)
+
+        // case 2: no checkpoint store available
+        kafkaStore.recover(recoverTime) shouldBe None
+    }
+
+    forAll(Gen.alphaStr, timestampGen, timestampGen) {
+      (topic: String, recoverTime: TimeStamp, checkpointTime: TimeStamp) =>
+        val consumer = mock[KafkaConsumer]
+        val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+        val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
+
+        val key = Injection[TimeStamp, Array[Byte]](checkpointTime)
+        val msg = key
+        val kafkaMsg = KafkaMessage(TopicAndPartition(topic, 0), 0, Some(key), 
msg)
+
+        when(consumer.hasNext).thenReturn(true, false)
+        when(consumer.next()).thenReturn(kafkaMsg)
+
+        if (checkpointTime < recoverTime) {
+          // case 3: checkpointTime is less than recoverTime
+          kafkaStore.recover(recoverTime) shouldBe None
+        } else {
+          // case 4: checkpoint time is equal to or larger than given timestamp
+          kafkaStore.recover(recoverTime) shouldBe Some(msg)
+        }
+
+        verify(consumer).close()
+    }
+  }
+
+  property("KafkaStore persist should write checkpoint with monotonically 
increasing timestamp") {
+    forAll(Gen.alphaStr, timestampGen, Gen.alphaStr) {
+      (topic: String, checkpointTime: TimeStamp, data: String) =>
+        val consumer = mock[KafkaConsumer]
+        val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
+        val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
+
+        val value = Injection[String, Array[Byte]](data)
+        kafkaStore.persist(checkpointTime, value)
+        kafkaStore.persist(checkpointTime - 1, value)
+        kafkaStore.persist(checkpointTime + 1, value)
+
+        verifyProducer(producer, count = 2, topic, 0, checkpointTime, data)
+        verifyProducer(producer, count = 1, topic, 0, checkpointTime + 1, data)
+
+    }
+
+    def verifyProducer(producer: Producer[Array[Byte], Array[Byte]], count: 
Int,
+        topic: String, partition: Int, time: TimeStamp, data: String): Unit = {
+      verify(producer, times(count)).send(
+        MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](record =>
+          record.topic() == topic
+          && record.partition() == partition
+          && Injection.invert[TimeStamp, Array[Byte]](record.key()).get == time
+          && Injection.invert[String, Array[Byte]](record.value()).get == data
+        ))
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
deleted file mode 100644
index e243eab..0000000
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/DefaultMessageDecoderSpec.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib
-
-import com.twitter.bijection.Injection
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with 
Matchers {
-  property("DefaultMessageDecoder should keep the original bytes data in 
Message") {
-    val decoder = new DefaultMessageDecoder()
-    forAll(Gen.alphaStr) { (s: String) =>
-      val bytes = Injection[String, Array[Byte]](s)
-      decoder.fromBytes(bytes).msg shouldBe bytes
-    }
-  }
-}
-
-class StringMessageDecoderSpec extends PropSpec with PropertyChecks with 
Matchers {
-  property("StringMessageDecoder should decode original bytes data into 
string") {
-    val decoder = new StringMessageDecoder()
-    forAll(Gen.alphaStr) { (s: String) =>
-      val bytes = Injection[String, Array[Byte]](s)
-      decoder.fromBytes(bytes).msg shouldBe s
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
deleted file mode 100644
index 987d975..0000000
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaOffsetManagerSpec.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib
-
-import scala.util.{Failure, Success}
-
-import com.twitter.bijection.Injection
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.transaction.api.OffsetStorage
-import org.apache.gearpump.streaming.transaction.api.OffsetStorage.{Overflow, 
StorageEmpty, Underflow}
-
-class KafkaOffsetManagerSpec extends PropSpec with PropertyChecks with 
Matchers with MockitoSugar {
-
-  val timeStampGen = Gen.choose[Long](0L, 1000L)
-  val messageGen = for {
-    msg <- Gen.alphaStr
-    time <- timeStampGen
-  } yield Message(msg, time)
-
-  val messageAndOffsetsGen = 
Gen.listOf[Message](messageGen).map(_.zipWithIndex)
-
-  property("KafkaOffsetManager should append offset to storage in 
monotonically" +
-    " increasing time order") {
-    forAll(messageAndOffsetsGen) { (messageAndOffsets: List[(Message, Int)]) =>
-      val offsetStorage = mock[OffsetStorage]
-      val offsetManager = new KafkaOffsetManager(offsetStorage)
-      messageAndOffsets.foldLeft(0L) { (max, messageAndOffset) =>
-        val (message, offset) = messageAndOffset
-        offsetManager.filter((message, offset.toLong)) shouldBe Option(message)
-        if (message.timestamp > max) {
-          val newMax = message.timestamp
-          verify(offsetStorage).append(newMax, Injection[Long, 
Array[Byte]](offset.toLong))
-          newMax
-        } else {
-          verifyZeroInteractions(offsetStorage)
-          max
-        }
-      }
-      offsetManager.close()
-    }
-  }
-
-  val minTimeStampGen = Gen.choose[Long](0L, 500L)
-  val maxTimeStampGen = Gen.choose[Long](500L, 1000L)
-  property("KafkaOffsetManager resolveOffset should " +
-    "report StorageEmpty failure when storage is empty") {
-    forAll(timeStampGen) { (time: Long) =>
-      val offsetStorage = mock[OffsetStorage]
-      val offsetManager = new KafkaOffsetManager(offsetStorage)
-      when(offsetStorage.lookUp(time)).thenReturn(Failure(StorageEmpty))
-      offsetManager.resolveOffset(time) shouldBe Failure(StorageEmpty)
-
-      doThrow(new RuntimeException).when(offsetStorage).lookUp(time)
-      intercept[RuntimeException] {
-        offsetManager.resolveOffset(time)
-      }
-      offsetManager.close()
-    }
-  }
-
-  val offsetGen = Gen.choose[Long](0L, 1000L)
-  property("KafkaOffsetManager resolveOffset should return a valid" +
-    " offset when storage is not empty") {
-    forAll(timeStampGen, minTimeStampGen, maxTimeStampGen, offsetGen) {
-      (time: Long, min: Long, max: Long, offset: Long) =>
-        val offsetStorage = mock[OffsetStorage]
-        val offsetManager = new KafkaOffsetManager(offsetStorage)
-        if (time < min) {
-          when(offsetStorage.lookUp(time)).thenReturn(Failure(
-            Underflow(Injection[Long, Array[Byte]](min))))
-          offsetManager.resolveOffset(time) shouldBe Success(min)
-        } else if (time > max) {
-          when(offsetStorage.lookUp(time)).thenReturn(Failure(
-            Overflow(Injection[Long, Array[Byte]](max))))
-          offsetManager.resolveOffset(time) shouldBe Success(max)
-        } else {
-          when(offsetStorage.lookUp(time)).thenReturn(Success(Injection[Long, 
Array[Byte]](offset)))
-          offsetManager.resolveOffset(time) shouldBe Success(offset)
-        }
-
-        doThrow(new RuntimeException).when(offsetStorage).lookUp(time)
-        intercept[RuntimeException] {
-          offsetManager.resolveOffset(time)
-        }
-        offsetManager.close()
-    }
-  }
-
-  property("KafkaOffsetManager close should close offset storage") {
-    val offsetStorage = mock[OffsetStorage]
-    val offsetManager = new KafkaOffsetManager(offsetStorage)
-    offsetManager.close()
-    verify(offsetStorage).close()
-  }
-}


Reply via email to