Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 32f150717 -> 04c3975d6


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
deleted file mode 100644
index 6e4a1c4..0000000
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaStorageSpec.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib
-
-// TODO: Fix the UT failure!
-
-// class KafkaStorageSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
-//  val minTimeGen = Gen.choose[Long](1L, 500L)
-//  val maxTimeGen = Gen.choose[Long](500L, 999L)
-//
-//  property("KafkaStorage lookup time should report StorageEmpty if storage 
is empty") {
-//    forAll { (time: Long, topic: String) =>
-//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//      val getConsumer = () => mock[KafkaConsumer]
-//      val connectZk = () => mock[ZkClient]
-//      val storage = new KafkaStorage(topic, topicExists = false, producer, 
getConsumer(),
-//        connectZk())
-//      storage.lookUp(time) shouldBe Failure(StorageEmpty)
-//    }
-//  }
-//
-//  property("KafkaStorage lookup time should return data or report failure if 
storage not empty") {
-//    forAll(minTimeGen, maxTimeGen, Gen.alphaStr) {(minTime: Long, maxTime: 
Long, topic: String) =>
-//      val timeAndOffsets = minTime.to(maxTime).zipWithIndex.map { case 
(time, index) =>
-//        val offset = index.toLong
-//        time -> offset
-//      }
-//      val timeAndOffsetsMap = timeAndOffsets.toMap
-//      val data = timeAndOffsets.map {
-//        case (time, offset) =>
-//          new KafkaMessage(topic, 0, offset.toLong, Some(Injection[Long, 
Array[Byte]](time)),
-//            Injection[Long, Array[Byte]](offset))
-//      }.toList
-//
-//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//      val consumer = mock[KafkaConsumer]
-//      val getConsumer = () => consumer
-//      val connectZk = () => mock[ZkClient]
-//
-//      val hasNexts = List.fill(data.tail.size)(true) :+ false
-//      when(consumer.hasNext).thenReturn(true, hasNexts:_*)
-//      when(consumer.next).thenReturn(data.head, data.tail:_*)
-//
-//      val storage = new KafkaStorage(topic, topicExists = true, producer,
-//        getConsumer(), connectZk())
-//      forAll(Gen.choose[Long](minTime, maxTime)) {
-//        time =>
-//          storage.lookUp(time) match {
-//            case Success(array) =>
-//              array should equal (Injection[Long, 
Array[Byte]](timeAndOffsetsMap(time)))
-//            case Failure(e)     => fail("time in range should return Success 
with value")
-//          }
-//      }
-//
-//      forAll(Gen.choose[Long](0L, minTime - 1)) {
-//        time =>
-//          storage.lookUp(time) match {
-//            case Failure(e) => e shouldBe a [Underflow]
-//              e.asInstanceOf[Underflow].min should equal
-//                (Injection[Long, Array[Byte]](timeAndOffsetsMap(minTime)))
-//            case Success(_) => fail("time less than min should return 
Underflow failure")
-//          }
-//      }
-//
-//      forAll(Gen.choose[Long](maxTime + 1, 1000L)) {
-//        time =>
-//          storage.lookUp(time) match {
-//            case Failure(e) => e shouldBe a [Overflow]
-//              e.asInstanceOf[Overflow].max should equal
-//                (Injection[Long, Array[Byte]](timeAndOffsetsMap(maxTime)))
-//            case Success(_) => fail("time larger than max should return 
Overflow failure")
-//          }
-//      }
-//    }
-//  }
-//
-//  property("KafkaStorage append should send data to Kafka") {
-//    forAll(Gen.chooseNum[Long](1, 1000), Gen.chooseNum[Long](0, 1000),
-//      Gen.alphaStr, Gen.oneOf(true, false)) {
-//      (time: Long, offset: Long, topic: String, topicExists: Boolean) =>
-//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//      val getConsumer = () => mock[KafkaConsumer]
-//      val connectZk = () => mock[ZkClient]
-//      val storage = new KafkaStorage(topic, topicExists, producer, 
getConsumer(), connectZk())
-//      val offsetBytes = Injection[Long, Array[Byte]](offset)
-//      storage.append(time, offsetBytes)
-//      verify(producer).send(anyObject[ProducerRecord[Array[Byte], 
Array[Byte]]]())
-//    }
-//  }
-//
-//  val topicAndPartitionGen = for {
-//    topic <- Gen.alphaStr
-//    partition <- Gen.choose[Int](0, 100)
-//  } yield TopicAndPartition(topic, partition)
-//  property("KafkaStorage should load data from Kafka") {
-//    val kafkaMsgGen = for {
-//      timestamp <- Gen.choose[Long](1L, 1000L)
-//      offset    <- Gen.choose[Long](0L, 1000L)
-//    } yield (timestamp, Injection[Long, Array[Byte]](offset))
-//    val msgListGen = Gen.listOf[(Long, Array[Byte])](kafkaMsgGen)
-//
-//    val topicExistsGen = Gen.oneOf(true, false)
-//
-//    forAll(topicAndPartitionGen, msgListGen) {
-//      (topicAndPartition: TopicAndPartition, msgList: List[(Long, 
Array[Byte])]) =>
-//        val producer=  mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//        val consumer = mock[KafkaConsumer]
-//        val getConsumer = () => consumer
-//        val connectZk = () => mock[ZkClient]
-//        val kafkaStorage = new KafkaStorage(topicAndPartition.topic,
-//          topicExists = true, producer, getConsumer(), connectZk())
-//          msgList match {
-//            case Nil =>
-//              when(consumer.hasNext).thenReturn(false)
-//            case list =>
-//              val hasNexts = List.fill(list.tail.size)(true) :+ false
-//              val kafkaMsgList = list.zipWithIndex.map { case ((timestamp, 
bytes), index) =>
-//                KafkaMessage(topicAndPartition, index.toLong,
-//                  Some(Injection[Long, Array[Byte]](timestamp)), bytes)
-//              }
-//              when(consumer.hasNext).thenReturn(true, hasNexts: _*)
-//              when(consumer.next).thenReturn(kafkaMsgList.head, 
kafkaMsgList.tail: _*)
-//          }
-//          kafkaStorage.load(consumer) shouldBe msgList
-//    }
-//  }
-//
-//  property("KafkaStorage should not get consumer when topic doesn't exist") {
-//    forAll(Gen.alphaStr) { (topic: String) =>
-//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//      val getConsumer = mock[() => KafkaConsumer]
-//      val connectZk = () => mock[ZkClient]
-//      val kafkaStorage = new KafkaStorage(topic,
-//        topicExists = false, producer, getConsumer(), connectZk())
-//      verify(getConsumer, never()).apply()
-//      kafkaStorage.close()
-//    }
-//  }
-//
-//  property("KafkaStorage should fail to load invalid KafkaMessage") {
-//    val invalidKafkaMsgGen = for {
-//      tp <- topicAndPartitionGen
-//      offset <- Gen.choose[Long](1L, 1000L)
-//      timestamp <- Gen.oneOf(Some(Injection[ByteBuffer, 
Array[Byte]](ByteBuffer.allocate(0))),
-//        None)
-//      msg <- Gen.alphaStr.map(Injection[String, Array[Byte]])
-//    } yield KafkaMessage(tp, offset, timestamp, msg)
-//    forAll(invalidKafkaMsgGen) { (invalidKafkaMsg: KafkaMessage) =>
-//      val consumer = mock[KafkaConsumer]
-//      val getConsumer = () => consumer
-//      val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//      val connectZk = () => mock[ZkClient]
-//      val kafkaStorage = new 
KafkaStorage(invalidKafkaMsg.topicAndPartition.topic,
-//        topicExists = true, producer, getConsumer(), connectZk())
-//      when(consumer.hasNext).thenReturn(true, false)
-//      when(consumer.next).thenReturn(invalidKafkaMsg, invalidKafkaMsg)
-//      Try(kafkaStorage.load(consumer)).isFailure shouldBe true
-//    }
-//  }
-//
-//  property("KafkaStorage close should close kafka producer and delete 
topic") {
-//    val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
-//    val getConsumer = () => mock[KafkaConsumer]
-//    val zkClient = mock[ZkClient]
-//    val connectZk = () => zkClient
-//    val kafkaStorage = new KafkaStorage("topic", false, producer, 
getConsumer(), connectZk())
-//    kafkaStorage.close()
-//    verify(producer).close()
-//    verify(zkClient).createPersistent(anyString(), anyString())
-//  }
-// }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
deleted file mode 100644
index 099be3d..0000000
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/KafkaUtilSpec.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib
-
-import kafka.common.TopicAndPartition
-import kafka.server.{KafkaConfig => KafkaServerConfig}
-import kafka.utils.{TestUtils, TestZKUtils}
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{BeforeAndAfterEach, Matchers, PropSpec}
-
-import org.apache.gearpump.streaming.kafka.lib.grouper.KafkaGrouper
-import org.apache.gearpump.streaming.kafka.util.KafkaServerHarness
-
-class KafkaUtilSpec
-  extends PropSpec with PropertyChecks with BeforeAndAfterEach
-  with Matchers with KafkaServerHarness {
-
-  val numServers = 1
-  override val configs: List[KafkaServerConfig] =
-    for (props <- TestUtils.createBrokerConfigs(numServers, 
enableControlledShutdown = false))
-      yield new KafkaServerConfig(props) {
-        override val zkConnect = TestZKUtils.zookeeperConnect
-        override val numPartitions = 4
-      }
-
-  override def beforeEach(): Unit = {
-    super.setUp()
-  }
-
-  override def afterEach(): Unit = {
-    super.tearDown()
-  }
-
-  import org.apache.gearpump.streaming.kafka.lib.KafkaUtil._
-
-  property("KafkaUtil should be able to create topic") {
-    val args = {
-      Table(
-        ("topic", "partitions", "replicas"),
-        (TestUtils.tempTopic, 1, numServers),
-        (TestUtils.tempTopic, 2, numServers + 1),
-        ("", 1, numServers),
-        (TestUtils.tempTopic, 0, numServers),
-        (TestUtils.tempTopic, 1, 0)
-      )
-    }
-    forAll(args) {
-      (topic: String, partitions: Int, replicas: Int) =>
-        if (topic.nonEmpty && partitions > 0 && replicas > 0 && replicas <= 
numServers) {
-          createTopic(connectZk(), topic, partitions, replicas) shouldBe false
-          createTopic(connectZk(), topic, partitions, replicas) shouldBe true
-        } else {
-          intercept[RuntimeException] {
-            createTopic(connectZk(), topic, partitions, replicas)
-          }
-        }
-    }
-  }
-
-  property("KafkaUtil should be able to get broker info") {
-    val brokerList = getBrokerList
-    val partitions = 2
-    val replicas = numServers
-    val topic = TestUtils.tempTopic()
-    intercept[RuntimeException] {
-      getBroker(connectZk(), topic, partitions)
-    }
-    val partitionsToBrokers = createTopicUntilLeaderIsElected(topic, 
partitions, replicas)
-    0.until(partitions).foreach { part =>
-      val hostPorts = brokerList(partitionsToBrokers(part).get).split(":")
-      val broker = getBroker(connectZk(), topic, part)
-
-      broker.host shouldBe hostPorts(0)
-      broker.port.toString shouldBe hostPorts(1)
-    }
-  }
-
-  property("KafkaUtil should be able to get TopicAndPartitions info and group 
with KafkaGrouper") {
-    val grouper: KafkaGrouper = new KafkaGrouper {
-      override def group(taskNum: Int, taskIndex: Int, topicAndPartitions: 
Array[TopicAndPartition])
-        : Array[TopicAndPartition] = {
-        topicAndPartitions
-      }
-    }
-    val topicNum = 3
-    val topics = List.fill(topicNum)(TestUtils.tempTopic())
-    topics.foreach(t => createTopicUntilLeaderIsElected(t, partitions = 1, 
replicas = 1))
-    KafkaUtil.getTopicAndPartitions(connectZk(), topics).toSet shouldBe
-      topics.map(t => TopicAndPartition(t, 0)).toSet
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
deleted file mode 100644
index cc83b51..0000000
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/ExponentialBackoffSleeperSpec.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.consumer
-
-import org.scalatest.{Matchers, WordSpec}
-
-class ExponentialBackoffSleeperSpec extends WordSpec with Matchers {
-
-  "ExponentialBackOffSleeper" should {
-    "sleep for increasing duration" in {
-      val sleeper = new ExponentialBackoffSleeper(
-        backOffMultiplier = 2.0,
-        initialDurationMs = 100,
-        maximumDurationMs = 10000
-      )
-      sleeper.getSleepDuration shouldBe 100
-      sleeper.setNextSleepDuration()
-      sleeper.getSleepDuration shouldBe 200
-      sleeper.setNextSleepDuration()
-      sleeper.getSleepDuration shouldBe 400
-      sleeper.setNextSleepDuration()
-      sleeper.getSleepDuration shouldBe 800
-    }
-
-    "sleep for no more than maximum duration" in {
-      val sleeper = new ExponentialBackoffSleeper(
-        backOffMultiplier = 2.0,
-        initialDurationMs = 6400,
-        maximumDurationMs = 10000
-      )
-      sleeper.getSleepDuration shouldBe 6400
-      sleeper.setNextSleepDuration()
-      sleeper.getSleepDuration shouldBe 10000
-      sleeper.setNextSleepDuration()
-      sleeper.getSleepDuration shouldBe 10000
-    }
-
-    "sleep for initial duration after reset" in {
-      val sleeper = new ExponentialBackoffSleeper(
-        backOffMultiplier = 2.0,
-        initialDurationMs = 100,
-        maximumDurationMs = 10000
-      )
-      sleeper.getSleepDuration shouldBe 100
-      sleeper.setNextSleepDuration()
-      sleeper.getSleepDuration shouldBe 200
-      sleeper.reset()
-      sleeper.getSleepDuration shouldBe 100
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
deleted file mode 100644
index b6659a1..0000000
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/FetchThreadSpec.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.consumer
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import kafka.common.TopicAndPartition
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
-
-  val nonNegativeGen = Gen.choose[Int](0, 1000)
-  val positiveGen = Gen.choose[Int](1, 1000)
-  val startOffsetGen = Gen.choose[Long](0L, 1000L)
-  property("FetchThread should set startOffset to iterators") {
-    forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) {
-      (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) =>
-        val topicAndPartition = mock[TopicAndPartition]
-        val consumer = mock[KafkaConsumer]
-        val createConsumer = (tp: TopicAndPartition) => consumer
-        val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-        val fetchThread = new FetchThread(Array(topicAndPartition), 
createConsumer,
-          incomingQueue, fetchThreshold, fetchSleepMS)
-        fetchThread.setStartOffset(topicAndPartition, startOffset)
-        verify(consumer).setStartOffset(startOffset)
-    }
-  }
-
-  val topicAndPartitionGen = for {
-    topic <- Gen.alphaStr
-    partition <- Gen.choose[Int](0, Int.MaxValue)
-  } yield TopicAndPartition(topic, partition)
-  property("FetchThread should only fetchMessage when the number " +
-    "of messages in queue is below the threshold") {
-    forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, 
topicAndPartitionGen) {
-      (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int,
-        startOffset: Long, topicAndPartition: TopicAndPartition) =>
-        val message = mock[KafkaMessage]
-        val consumer = mock[KafkaConsumer]
-        val createConsumer = (tp: TopicAndPartition) => consumer
-        when(consumer.hasNext).thenReturn(true)
-        when(consumer.next).thenReturn(message)
-        val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-        val fetchThread = new FetchThread(Array(topicAndPartition), 
createConsumer,
-          incomingQueue, fetchThreshold, fetchSleepMS)
-
-        0.until(messageNum) foreach { _ =>
-          fetchThread.fetchMessage
-        }
-
-        incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold)
-    }
-  }
-
-  property("FetchThread poll should try to retrieve and remove the head of 
incoming queue") {
-    val topicAndPartition = mock[TopicAndPartition]
-    val consumer = mock[KafkaConsumer]
-    val createConsumer = (tp: TopicAndPartition) => consumer
-    val kafkaMsg = mock[KafkaMessage]
-    val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-    incomingQueue.put(kafkaMsg)
-    val fetchThread = new FetchThread(Array(topicAndPartition), 
createConsumer, incomingQueue, 0, 0)
-    fetchThread.poll shouldBe Some(kafkaMsg)
-    fetchThread.poll shouldBe None
-  }
-
-  val tpAndHasNextGen = for {
-    tp <- topicAndPartitionGen
-    hasNext <- Gen.oneOf(true, false)
-  } yield (tp, hasNext)
-
-  val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, 
Boolean)](tpAndHasNextGen)
-    .map(_.toMap) suchThat (_.nonEmpty)
-
-  property("FetchThread fetchMessage should return false when there are no 
more messages " +
-    "from any TopicAndPartition") {
-    forAll(tpHasNextMapGen, nonNegativeGen) {
-      (tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) =>
-        val createConsumer = (tp: TopicAndPartition) => {
-          val consumer = mock[KafkaConsumer]
-          val kafkaMsg = mock[KafkaMessage]
-          val hasNext = tpHasNextMap(tp)
-          when(consumer.hasNext).thenReturn(hasNext)
-          when(consumer.next).thenReturn(kafkaMsg)
-          consumer
-        }
-        val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
-        val fetchThread = new FetchThread(tpHasNextMap.keys.toArray,
-          createConsumer, incomingQueue, tpHasNextMap.size + 1, fetchSleepMS)
-        fetchThread.fetchMessage shouldBe tpHasNextMap.values.reduce(_ || _)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
deleted file mode 100644
index 3c6a663..0000000
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/consumer/KafkaConsumerSpec.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.consumer
-
-import com.twitter.bijection.Injection
-import kafka.api.OffsetRequest
-import kafka.common.TopicAndPartition
-import kafka.consumer.SimpleConsumer
-import kafka.message.{Message, MessageAndOffset}
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
-  val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, 
Array[Byte]](msg)))
-  val messageNumGen = Gen.choose[Int](0, 1000)
-  val topicAndPartitionGen = for {
-    topic <- Gen.alphaStr
-    partition <- Gen.choose[Int](0, Int.MaxValue)
-  } yield (topic, partition)
-
-  property("KafkaConsumer should iterate MessageAndOffset calling hasNext and 
next") {
-    forAll(messageGen, messageNumGen, topicAndPartitionGen) {
-      (message: Message, num: Int, topicAndPartition: (String, Int)) =>
-        val (topic, partition) = topicAndPartition
-        val consumer = mock[SimpleConsumer]
-        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, 
partition),
-          OffsetRequest.EarliestTime, -1)).thenReturn(0)
-        val iterator = 0.until(num).map(index => MessageAndOffset(message, 
index.toLong)).iterator
-        val getIterator = (offset: Long) => iterator
-        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, 
getIterator)
-        0.until(num).foreach { i =>
-          kafkaConsumer.hasNext shouldBe true
-          val kafkaMessage = kafkaConsumer.next
-          kafkaMessage.offset shouldBe i.toLong
-          kafkaMessage.key shouldBe None
-        }
-        kafkaConsumer.hasNext shouldBe false
-    }
-  }
-
-  val startOffsetGen = Gen.choose[Long](1L, 1000L)
-  property("KafkaConsumer setStartOffset should reset internal iterator") {
-    forAll(topicAndPartitionGen, startOffsetGen) {
-      (topicAndPartition: (String, Int), startOffset: Long) =>
-        val (topic, partition) = topicAndPartition
-        val consumer = mock[SimpleConsumer]
-        val getIterator = mock[Long => Iterator[MessageAndOffset]]
-        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, 
partition),
-          OffsetRequest.EarliestTime, -1)).thenReturn(0)
-        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, 
getIterator)
-        kafkaConsumer.setStartOffset(startOffset)
-        verify(getIterator).apply(startOffset)
-    }
-  }
-
-  property("KafkaConsumer close should close SimpleConsumer") {
-    forAll(topicAndPartitionGen) {
-      (topicAndPartition: (String, Int)) =>
-        val (topic, partition) = topicAndPartition
-        val consumer = mock[SimpleConsumer]
-        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, 
partition),
-          OffsetRequest.EarliestTime, -1)).thenReturn(0)
-        val getIterator = mock[Long => Iterator[MessageAndOffset]]
-        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, 
getIterator)
-        kafkaConsumer.close()
-        verify(consumer).close()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala
deleted file mode 100644
index 32b8685..0000000
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/grouper/KafkaDefaultGrouperSpec.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.grouper
-
-import kafka.common.TopicAndPartition
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class KafkaDefaultGrouperSpec extends PropSpec with PropertyChecks with 
Matchers {
-  property("KafkaDefaultGrouper should group TopicAndPartitions in a 
round-robin way") {
-    forAll(Gen.posNum[Int], Gen.posNum[Int], Gen.posNum[Int]) {
-      (topicNum: Int, partitionNum: Int, taskNum: Int) => {
-        val topicAndPartitions = for {
-          t <- 0.until(topicNum)
-          p <- 0.until(partitionNum)
-        } yield TopicAndPartition("topic" + t, p)
-        0.until(taskNum).foreach { taskIndex =>
-          val grouper = new KafkaDefaultGrouper
-          grouper.group(taskNum, taskIndex, topicAndPartitions.toArray).forall(
-            tp => topicAndPartitions.indexOf(tp) % taskNum == taskIndex)
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala
new file mode 100644
index 0000000..843aab7
--- /dev/null
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source
+
+import com.twitter.bijection.Injection
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with 
Matchers {
+  property("DefaultMessageDecoder should keep the original bytes data in 
Message") {
+    val decoder = new DefaultMessageDecoder()
+    forAll(Gen.chooseNum[Int](0, 100), Gen.alphaStr) { (k: Int, v: String) =>
+      val kbytes = Injection[Int, Array[Byte]](k)
+      val vbytes = Injection[String, Array[Byte]](v)
+      val timestamp = System.currentTimeMillis()
+      val message = decoder.fromBytes(kbytes, vbytes)
+      message.msg shouldBe vbytes
+      message.timestamp should be >= timestamp
+    }
+  }
+}
+
+class StringMessageDecoderSpec extends PropSpec with PropertyChecks with 
Matchers {
+  property("StringMessageDecoder should decode original bytes data into 
string") {
+    val decoder = new StringMessageDecoder()
+    forAll(Gen.alphaStr, Gen.alphaStr) { (k: String, v: String) =>
+      val kbytes = Injection[String, Array[Byte]](k)
+      val vbytes = Injection[String, Array[Byte]](v)
+      val timestamp = System.currentTimeMillis()
+      val message = decoder.fromBytes(kbytes, vbytes)
+      message.msg shouldBe v
+      message.timestamp should be >= timestamp
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeperSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeperSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeperSpec.scala
new file mode 100644
index 0000000..83c4584
--- /dev/null
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/ExponentialBackoffSleeperSpec.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source.consumer
+
+import org.scalatest.{Matchers, WordSpec}
+
+class ExponentialBackoffSleeperSpec extends WordSpec with Matchers {
+
+  "ExponentialBackOffSleeper" should {
+    "sleep for increasing duration" in {
+      val sleeper = new ExponentialBackoffSleeper(
+        backOffMultiplier = 2.0,
+        initialDurationMs = 100,
+        maximumDurationMs = 10000
+      )
+      sleeper.getSleepDuration shouldBe 100
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 200
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 400
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 800
+    }
+
+    "sleep for no more than maximum duration" in {
+      val sleeper = new ExponentialBackoffSleeper(
+        backOffMultiplier = 2.0,
+        initialDurationMs = 6400,
+        maximumDurationMs = 10000
+      )
+      sleeper.getSleepDuration shouldBe 6400
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 10000
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 10000
+    }
+
+    "sleep for initial duration after reset" in {
+      val sleeper = new ExponentialBackoffSleeper(
+        backOffMultiplier = 2.0,
+        initialDurationMs = 100,
+        maximumDurationMs = 10000
+      )
+      sleeper.getSleepDuration shouldBe 100
+      sleeper.setNextSleepDuration()
+      sleeper.getSleepDuration shouldBe 200
+      sleeper.reset()
+      sleeper.getSleepDuration shouldBe 100
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThreadSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThreadSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThreadSpec.scala
new file mode 100644
index 0000000..0569060
--- /dev/null
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/FetchThreadSpec.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source.consumer
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import kafka.common.TopicAndPartition
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class FetchThreadSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
+
+  val nonNegativeGen = Gen.choose[Int](0, 1000)
+  val positiveGen = Gen.choose[Int](1, 1000)
+  val startOffsetGen = Gen.choose[Long](0L, 1000L)
+  property("FetchThread should set startOffset to iterators") {
+    forAll(nonNegativeGen, nonNegativeGen, startOffsetGen) {
+      (fetchThreshold: Int, fetchSleepMS: Int, startOffset: Long) =>
+        val topicAndPartition = mock[TopicAndPartition]
+        val consumer = mock[KafkaConsumer]
+        val createConsumer = (tp: TopicAndPartition) => consumer
+        val sleeper = mock[ExponentialBackoffSleeper]
+        val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+        val fetchThread = new FetchThread(createConsumer,
+          incomingQueue, sleeper, fetchThreshold, fetchSleepMS)
+        fetchThread.setTopicAndPartitions(Array(topicAndPartition))
+        fetchThread.setStartOffset(topicAndPartition, startOffset)
+        verify(consumer).setStartOffset(startOffset)
+    }
+  }
+
+  val topicAndPartitionGen = for {
+    topic <- Gen.alphaStr
+    partition <- Gen.choose[Int](0, Int.MaxValue)
+  } yield TopicAndPartition(topic, partition)
+  property("FetchThread should only fetchMessage when the number " +
+    "of messages in queue is below the threshold") {
+    forAll(positiveGen, nonNegativeGen, nonNegativeGen, startOffsetGen, 
topicAndPartitionGen) {
+      (messageNum: Int, fetchThreshold: Int, fetchSleepMS: Int,
+        startOffset: Long, topicAndPartition: TopicAndPartition) =>
+        val message = mock[KafkaMessage]
+        val consumer = mock[KafkaConsumer]
+        val createConsumer = (tp: TopicAndPartition) => consumer
+
+        0.until(messageNum) foreach { _ =>
+          when(consumer.hasNext).thenReturn(true)
+          when(consumer.next()).thenReturn(message)
+        }
+
+        val sleeper = mock[ExponentialBackoffSleeper]
+        val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+        val fetchThread = new FetchThread(createConsumer,
+          incomingQueue, sleeper, fetchThreshold, fetchSleepMS)
+
+        fetchThread.setTopicAndPartitions(Array(topicAndPartition))
+
+        0.until(messageNum) foreach { _ =>
+          fetchThread.runLoop()
+        }
+
+        verify(sleeper, times(messageNum)).reset()
+        incomingQueue.size() shouldBe Math.min(messageNum, fetchThreshold)
+    }
+  }
+
+  property("FetchThread poll should try to retrieve and remove the head of 
incoming queue") {
+    val topicAndPartition = mock[TopicAndPartition]
+    val consumer = mock[KafkaConsumer]
+    val createConsumer = (tp: TopicAndPartition) => consumer
+    val kafkaMsg = mock[KafkaMessage]
+    val sleeper = mock[ExponentialBackoffSleeper]
+    val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+    incomingQueue.put(kafkaMsg)
+    val fetchThread = new FetchThread(createConsumer, incomingQueue, sleeper, 
0, 0)
+    fetchThread.setTopicAndPartitions(Array(topicAndPartition))
+    fetchThread.poll shouldBe Some(kafkaMsg)
+    fetchThread.poll shouldBe None
+  }
+
+  val tpAndHasNextGen = for {
+    tp <- topicAndPartitionGen
+    hasNext <- Gen.oneOf(true, false)
+  } yield (tp, hasNext)
+
+  val tpHasNextMapGen = Gen.listOf[(TopicAndPartition, 
Boolean)](tpAndHasNextGen)
+    .map(_.toMap) suchThat (_.nonEmpty)
+
+  property("FetchThread fetchMessage should return false when there are no 
more messages " +
+    "from any TopicAndPartition") {
+    forAll(tpHasNextMapGen, nonNegativeGen) {
+      (tpHasNextMap: Map[TopicAndPartition, Boolean], fetchSleepMS: Int) =>
+        val createConsumer = (tp: TopicAndPartition) => {
+          val consumer = mock[KafkaConsumer]
+          val kafkaMsg = mock[KafkaMessage]
+          val hasNext = tpHasNextMap(tp)
+          when(consumer.hasNext).thenReturn(hasNext)
+          when(consumer.next()).thenReturn(kafkaMsg)
+          consumer
+        }
+        val sleeper = mock[ExponentialBackoffSleeper]
+        val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+        val fetchThread = new FetchThread(createConsumer, incomingQueue, 
sleeper,
+          tpHasNextMap.size + 1, fetchSleepMS)
+        fetchThread.setTopicAndPartitions(tpHasNextMap.keys.toArray)
+        fetchThread.runLoop()
+        val hasMoreMessages = tpHasNextMap.values.reduce(_ || _)
+        if (!hasMoreMessages) {
+          verify(sleeper).sleep(fetchSleepMS)
+        }
+    }
+  }
+
+  property("FetchThread should reset consumers on exception") {
+    forAll(startOffsetGen) { (offset: Long) =>
+      val topicAndPartition = mock[TopicAndPartition]
+      val consumer = mock[KafkaConsumer]
+      val createConsumer = (tp: TopicAndPartition) => consumer
+      val sleeper = mock[ExponentialBackoffSleeper]
+      val incomingQueue = new LinkedBlockingQueue[KafkaMessage]()
+      val fetchThread = new FetchThread(createConsumer, incomingQueue, 
sleeper, 1, 0)
+      fetchThread.setTopicAndPartitions(Array(topicAndPartition))
+
+      when(consumer.hasNext).thenReturn(true)
+      when(consumer.next()).thenThrow(new RuntimeException)
+      fetchThread.runLoop()
+      // sleep on exception
+      verify(sleeper).sleep()
+
+      // reset on previous exception
+      when(consumer.getNextOffset).thenReturn(offset)
+      when(consumer.hasNext).thenReturn(false)
+      fetchThread.runLoop()
+      verify(consumer).close()
+      // consumer is reset
+      verify(consumer).setStartOffset(offset)
+      // reset sleeper on successful fetch
+      verify(sleeper).reset()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumerSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumerSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumerSpec.scala
new file mode 100644
index 0000000..4f8a239
--- /dev/null
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/consumer/KafkaConsumerSpec.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source.consumer
+
+import com.twitter.bijection.Injection
+import kafka.api.OffsetRequest
+import kafka.common.TopicAndPartition
+import kafka.consumer.SimpleConsumer
+import kafka.message.{Message, MessageAndOffset}
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class KafkaConsumerSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
+  val messageGen = Gen.alphaStr map (msg => new Message(Injection[String, 
Array[Byte]](msg)))
+  val messageNumGen = Gen.choose[Int](0, 1000)
+  val topicAndPartitionGen = for {
+    topic <- Gen.alphaStr
+    partition <- Gen.choose[Int](0, Int.MaxValue)
+  } yield (topic, partition)
+
+  property("KafkaConsumer should iterate MessageAndOffset calling hasNext and 
next") {
+    forAll(messageGen, messageNumGen, topicAndPartitionGen) {
+      (message: Message, num: Int, topicAndPartition: (String, Int)) =>
+        val (topic, partition) = topicAndPartition
+        val consumer = mock[SimpleConsumer]
+        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, 
partition),
+          OffsetRequest.EarliestTime, -1)).thenReturn(0)
+        val iterator = 0.until(num).map(index => MessageAndOffset(message, 
index.toLong)).iterator
+        val getIterator = (offset: Long) => iterator
+        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, 
getIterator)
+        0.until(num).foreach { i =>
+          kafkaConsumer.hasNext shouldBe true
+          val kafkaMessage = kafkaConsumer.next
+          kafkaMessage.offset shouldBe i.toLong
+          kafkaMessage.key shouldBe None
+        }
+        kafkaConsumer.hasNext shouldBe false
+    }
+  }
+
+  val startOffsetGen = Gen.choose[Long](1L, 1000L)
+  property("KafkaConsumer setStartOffset should reset internal iterator") {
+    forAll(topicAndPartitionGen, startOffsetGen) {
+      (topicAndPartition: (String, Int), startOffset: Long) =>
+        val (topic, partition) = topicAndPartition
+        val consumer = mock[SimpleConsumer]
+        val getIterator = mock[Long => Iterator[MessageAndOffset]]
+        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, 
partition),
+          OffsetRequest.EarliestTime, -1)).thenReturn(0)
+        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, 
getIterator)
+        kafkaConsumer.setStartOffset(startOffset)
+        verify(getIterator).apply(startOffset)
+    }
+  }
+
+  property("KafkaConsumer close should close SimpleConsumer") {
+    forAll(topicAndPartitionGen) {
+      (topicAndPartition: (String, Int)) =>
+        val (topic, partition) = topicAndPartition
+        val consumer = mock[SimpleConsumer]
+        when(consumer.earliestOrLatestOffset(TopicAndPartition(topic, 
partition),
+          OffsetRequest.EarliestTime, -1)).thenReturn(0)
+        val getIterator = mock[Long => Iterator[MessageAndOffset]]
+        val kafkaConsumer = new KafkaConsumer(consumer, topic, partition, 
getIterator)
+        kafkaConsumer.close()
+        verify(consumer).close()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouperSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouperSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouperSpec.scala
new file mode 100644
index 0000000..bd80826
--- /dev/null
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/grouper/DefaultPartitionGrouperSpec.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source.grouper
+
+import kafka.common.TopicAndPartition
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class DefaultPartitionGrouperSpec extends PropSpec with PropertyChecks with 
Matchers {
+  property("KafkaDefaultGrouper should group TopicAndPartitions in a 
round-robin way") {
+    forAll(Gen.posNum[Int], Gen.posNum[Int], Gen.posNum[Int]) {
+      (topicNum: Int, partitionNum: Int, taskNum: Int) => {
+        val topicAndPartitions = for {
+          t <- 0.until(topicNum)
+          p <- 0.until(partitionNum)
+        } yield TopicAndPartition("topic" + t, p)
+        0.until(taskNum).foreach { taskIndex =>
+          val grouper = new DefaultPartitionGrouper
+          grouper.group(taskNum, taskIndex, topicAndPartitions.toArray).forall(
+            tp => topicAndPartitions.indexOf(tp) % taskNum == taskIndex)
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala
new file mode 100644
index 0000000..b2db243
--- /dev/null
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.kafka.lib.util
+
+import java.util.Properties
+
+import kafka.common.TopicAndPartition
+import kafka.server.{KafkaConfig => KafkaServerConfig}
+import kafka.utils.{TestZKUtils, TestUtils}
+import org.apache.gearpump.streaming.kafka.lib.source.consumer.KafkaConsumer
+import org.apache.gearpump.streaming.kafka.util.{KafkaConfig, 
KafkaServerHarness}
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, BeforeAndAfterEach, PropSpec}
+
+class KafkaClientSpec extends PropSpec with PropertyChecks
+  with BeforeAndAfterEach with Matchers with KafkaServerHarness {
+
+  val numServers = 1
+  override val configs: List[KafkaServerConfig] =
+    for (props <- TestUtils.createBrokerConfigs(numServers, 
enableControlledShutdown = false))
+      yield new KafkaServerConfig(props) {
+        override val zkConnect = TestZKUtils.zookeeperConnect
+        override val numPartitions = 4
+      }
+
+  override def beforeEach(): Unit = {
+    super.setUp()
+  }
+
+  override def afterEach(): Unit = {
+    super.tearDown()
+  }
+
+
+  property("KafkaClient should be able to create topic") {
+    val args = {
+      Table(
+        ("topic", "partitions", "replicas"),
+        (TestUtils.tempTopic(), 1, numServers),
+        (TestUtils.tempTopic(), 2, numServers + 1),
+        ("", 1, numServers),
+        (TestUtils.tempTopic(), 0, numServers),
+        (TestUtils.tempTopic(), 1, 0)
+      )
+    }
+    forAll(args) {
+      (topic: String, partitions: Int, replicas: Int) =>
+        val props = new Properties
+        props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect)
+        props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList)
+        val kafkaClient = new KafkaClient(new KafkaConfig(props), getZkClient)
+        if (topic.nonEmpty && partitions > 0 && replicas > 0 && replicas <= 
numServers) {
+
+          kafkaClient.createTopic(topic, partitions, replicas) shouldBe false
+          kafkaClient.createTopic(topic, partitions, replicas) shouldBe true
+        } else {
+          intercept[RuntimeException] {
+            kafkaClient.createTopic(topic, partitions, replicas)
+          }
+        }
+    }
+  }
+
+  property("KafkaClient should be able to get broker info") {
+    val brokerList = getBrokerList.split(",", -1)
+    val partitions = 2
+    val replicas = numServers
+    val topic = TestUtils.tempTopic()
+    val props = new Properties
+    props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect)
+    props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList)
+    val kafkaClient = new KafkaClient(new KafkaConfig(props), getZkClient)
+    intercept[RuntimeException] {
+      kafkaClient.getBroker(topic, partitions)
+    }
+    val partitionsToBrokers = createTopicUntilLeaderIsElected(topic, 
partitions, replicas)
+    0.until(partitions).foreach { part =>
+      val hostPorts = brokerList(partitionsToBrokers(part).get).split(":")
+      val broker = kafkaClient.getBroker(topic, part)
+
+      broker.host shouldBe hostPorts(0)
+      broker.port.toString shouldBe hostPorts(1)
+    }
+  }
+
+  property("KafkaClient should be able to get TopicAndPartitions info") {
+    val props = new Properties
+    props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect)
+    props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList)
+    val kafkaClient = new KafkaClient(new KafkaConfig(props), getZkClient)
+    val topicNum = 3
+    val topics = List.fill(topicNum)(TestUtils.tempTopic())
+    topics.foreach(t => createTopicUntilLeaderIsElected(t, partitions = 1, 
replicas = 1))
+    val actual = kafkaClient.getTopicAndPartitions(topics).toSet
+    val expected = topics.map(t => TopicAndPartition(t, 0)).toSet
+    actual shouldBe expected
+  }
+
+  property("KafkaClient should be able to create consumer") {
+    val props = new Properties
+    props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect)
+    props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList)
+    val kafkaClient = new KafkaClient(new KafkaConfig(props), getZkClient)
+    val partitions = 2
+    val topic = TestUtils.tempTopic()
+    createTopicUntilLeaderIsElected(topic, partitions, replicas = 1)
+    0.until(partitions).foreach { par =>
+      kafkaClient.createConsumer(topic, par,
+        startOffsetTime = -2) shouldBe a [KafkaConsumer]
+    }
+
+  }
+
+  property("KafkaClient should be able to create producer") {
+    val props = new Properties
+    props.put(KafkaConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect)
+    props.put(KafkaConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerList)
+    val kafkaClient = new KafkaClient(new KafkaConfig(props), getZkClient)
+    kafkaClient.createProducer(new ByteArraySerializer,
+      new ByteArraySerializer) shouldBe a [KafkaProducer[_, _]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala
index 99231f5..b6124e5 100644
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/KafkaServerHarness.scala
@@ -31,10 +31,10 @@ trait KafkaServerHarness extends ZookeeperHarness {
   private var brokerList: String = null
 
   def getServers: List[KafkaServer] = servers
-  def getBrokerList: Array[String] = brokerList.split(",")
+  def getBrokerList: String = brokerList
 
   override def setUp() {
-    super.setUp
+    super.setUp()
     if (configs.size <= 0) {
       throw new KafkaException("Must supply at least one server config.")
     }
@@ -43,15 +43,14 @@ trait KafkaServerHarness extends ZookeeperHarness {
   }
 
   override def tearDown() {
-    servers.map(server => server.shutdown())
-    servers.map(server => server.config.logDirs.map(Utils.rm(_)))
-    super.tearDown
+    servers.foreach(_.shutdown())
+    servers.foreach(_.config.logDirs.foreach(Utils.rm))
+    super.tearDown()
   }
 
-  def createTopicUntilLeaderIsElected(
-      topic: String, partitions: Int, replicas: Int, timeout: Long = 10000)
-    : Map[Int, Option[Int]] = {
-    val zkClient = connectZk()
+  def createTopicUntilLeaderIsElected(topic: String, partitions: Int,
+      replicas: Int, timeout: Long = 10000): Map[Int, Option[Int]] = {
+    val zkClient = getZkClient
     try {
       // Creates topic
       AdminUtils.createTopic(zkClient, topic, partitions, replicas, new 
Properties)
@@ -62,8 +61,6 @@ trait KafkaServerHarness extends ZookeeperHarness {
       }.toMap
     } catch {
       case e: Exception => throw e
-    } finally {
-      zkClient.close()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala
 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala
index 8fb23c2..9121ea4 100644
--- 
a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala
+++ 
b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/util/ZookeeperHarness.scala
@@ -27,17 +27,19 @@ trait ZookeeperHarness {
   val zkConnectionTimeout = 60000
   val zkSessionTimeout = 60000
   private var zookeeper: EmbeddedZookeeper = null
+  private var zkClient: ZkClient = null
 
   def getZookeeper: EmbeddedZookeeper = zookeeper
-  def connectZk: () => ZkClient = () => {
-    new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
ZKStringSerializer)
-  }
+
+  def getZkClient: ZkClient = zkClient
 
   def setUp() {
     zookeeper = new EmbeddedZookeeper(zkConnect)
+    zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, 
ZKStringSerializer)
   }
 
   def tearDown() {
+    zkClient.close()
     Utils.swallow(zookeeper.shutdown())
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index e40d8cd..f04dc9b 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -47,7 +47,8 @@ abstract class PersistentTask[T](taskContext: TaskContext, 
conf: UserConfig)
 
   val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
     PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
-  val checkpointStore = checkpointStoreFactory.getCheckpointStore(conf, 
taskContext)
+  val checkpointStore = checkpointStoreFactory.getCheckpointStore(
+    s"app$appId-task${taskId.processorId}_${taskId.index}")
   val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
   val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
   // System time interval to attempt checkpoint

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
index 753050e..2591856 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
@@ -44,7 +44,7 @@ class InMemoryCheckpointStore extends CheckpointStore {
 }
 
 class InMemoryCheckpointStoreFactory extends CheckpointStoreFactory {
-  override def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): 
CheckpointStore = {
+  override def getCheckpointStore(name: String): CheckpointStore = {
     new InMemoryCheckpointStore
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
index 2ef8610..4650ac2 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
@@ -36,7 +36,14 @@ trait CheckpointStore {
   def close(): Unit
 }
 
+/**
+ * Creates CheckpointStore instance at runtime
+ */
 trait CheckpointStoreFactory extends java.io.Serializable {
-  def getCheckpointStore(conf: UserConfig, taskContext: TaskContext): 
CheckpointStore
+  /**
+   * @param name a unique name which maps to a unique path in checkpoint store
+   * @return a CheckpointStore instance
+   */
+  def getCheckpointStore(name: String): CheckpointStore
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
index 0615078..3ea33a5 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
@@ -21,9 +21,14 @@ package org.apache.gearpump.streaming.transaction.api
 import org.apache.gearpump.Message
 
 /**
- * MessageDecoder decodes raw bytes to Message It is usually written by end 
user and
- * passed into TimeReplayableSource
+ * Decodes raw bytes to Message.
+ * It is usually written by end user and passed into TimeReplayableSource
  */
 trait MessageDecoder extends java.io.Serializable {
-  def fromBytes(bytes: Array[Byte]): Message
+  /**
+   * @param key key of a kafka message, can be NULL
+   * @param value value of a kafka message
+   * @return a gearpump Message
+   */
+  def fromBytes(key: Array[Byte], value: Array[Byte]): Message
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala
deleted file mode 100644
index 616894a..0000000
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetManager.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.transaction.api
-
-import org.apache.gearpump.{Message, TimeStamp}
-
-import scala.util.Try
-
-/**
- * Filters offsets and store the mapping from timestamp to offset
- */
-trait MessageFilter {
-  def filter(messageAndOffset: (Message, Long)): Option[Message]
-}
-
-/**
- * Resolves timestamp to offset by look up the underlying storage
- */
-trait OffsetTimeStampResolver {
-  def resolveOffset(time: TimeStamp): Try[Long]
-}
-
-/**
- * Manages message's offset on TimeReplayableSource and timestamp
- */
-trait OffsetManager extends MessageFilter with OffsetTimeStampResolver {
-  def close(): Unit
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala
deleted file mode 100644
index 40fc088..0000000
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/OffsetStorage.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.transaction.api
-
-import org.apache.gearpump.TimeStamp
-
-import scala.util.Try
-
-object OffsetStorage {
-
-  /**
-   * StorageEmpty means no data has been stored
-   */
-  case object StorageEmpty extends Throwable
-
-  /**
-   * Overflow means the looked up time is
-   * larger than the maximum stored TimeStamp
-   */
-  case class Overflow(maxTimestamp: Array[Byte]) extends Throwable
-
-  /**
-   * Underflow means the looked up time is
-   * smaller than the minimum stored TimeStamp
-   */
-  case class Underflow(minTimestamp: Array[Byte]) extends Throwable
-}
-
-/**
- * OffsetStorage stores the mapping from TimeStamp to Offset
- */
-trait OffsetStorage {
-  /**
-   * Tries to look up the time in the OffsetStorage return the corresponding 
Offset if the time is
-   * in the range of stored TimeStamps or one of the failure info 
(StorageEmpty, Overflow,
-   * Underflow)
-   *
-   * @param time the time to look for
-   * @return the corresponding offset if the time is in the range, otherwise 
failure
-   */
-  def lookUp(time: TimeStamp): Try[Array[Byte]]
-
-  def append(time: TimeStamp, offset: Array[Byte]): Unit
-
-  def close(): Unit
-}
-
-trait OffsetStorageFactory extends java.io.Serializable {
-  def getOffsetStorage(dir: String): OffsetStorage
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/04c3975d/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala
index 16f98d5..c2d47e3 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeReplayableSource.scala
@@ -26,5 +26,11 @@ import org.apache.gearpump.streaming.source.DataSource
  * Subclass should be able to replay messages on recovery from the time
  * when an application crashed.
  */
-trait TimeReplayableSource extends DataSource
+trait TimeReplayableSource extends DataSource {
+  /**
+   * Sets store factory which creates a checkpoint store
+   * at runtime to checkpoint (timestamp, source_offsets)
+   */
+  def setCheckpointStore(factory: CheckpointStoreFactory)
+}
 

Reply via email to