fix #1641, add exactly-once it

Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/313b6c45
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/313b6c45
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/313b6c45

Branch: refs/heads/master
Commit: 313b6c45e7e9950b445649832b2b4a7f977e8d5b
Parents: 269838e
Author: manuzhang <[email protected]>
Authored: Tue Feb 16 16:55:05 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Apr 26 14:23:04 2016 +0800

----------------------------------------------------------------------
 .../kafka/wordcount/KafkaWordCount.scala        |  4 +-
 .../examples/state/MessageCountApp.scala        | 39 ++++++++--
 .../state/processor/CountProcessor.scala        | 12 +--
 .../examples/state/MessageCountAppSpec.scala    | 17 ++++-
 .../state/processor/CountProcessorSpec.scala    | 33 +++++----
 .../processor/WindowAverageProcessorSpec.scala  | 31 ++++----
 .../checklist/MessageDeliverySpec.scala         | 78 ++++++++++++++++++++
 .../checklist/StormCompatibilitySpec.scala      | 26 +------
 .../suites/StandaloneModeSuite.scala            |  3 +-
 .../integrationtest/hadoop/HadoopCluster.scala  | 41 ++++++++++
 .../integrationtest/kafka/KafkaCluster.scala    | 26 +++++++
 .../kafka/NumericalDataProducer.scala           |  5 +-
 .../integrationtest/kafka/ResultVerifier.scala  |  9 ++-
 .../kafka/SimpleKafkaReader.scala               |  6 +-
 project/BuildExample.scala                      |  2 +-
 project/BuildIntegrationTest.scala              |  3 +-
 .../streaming/state/api/PersistentTask.scala    | 53 +++++++++----
 .../state/impl/CheckpointManager.scala          | 33 ++++-----
 .../state/impl/CheckpointManagerSpec.scala      | 18 ++---
 19 files changed, 318 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
 
b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
index 17cfd50..51bc3d6 100644
--- 
a/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
+++ 
b/examples/streaming/kafka/src/main/scala/io/gearpump/streaming/examples/kafka/wordcount/KafkaWordCount.scala
@@ -37,10 +37,10 @@ object KafkaWordCount extends AkkaApp with ArgumentsParser {
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    "source" -> CLIOption[Int]("<hom many kafka producer tasks>", required = 
false, defaultValue = Some(1)),
+    "source" -> CLIOption[Int]("<how many kafka source tasks>", required = 
false, defaultValue = Some(1)),
     "split" -> CLIOption[Int]("<how many split tasks>", required = false, 
defaultValue = Some(1)),
     "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1)),
-    "sink" -> CLIOption[Int]("<hom many kafka processor tasks", required = 
false, defaultValue = Some(1))
+    "sink" -> CLIOption[Int]("<how many kafka sink tasks>", required = false, 
defaultValue = Some(1))
     )
 
   def application(config: ParseResult, system: ActorSystem) : 
StreamApplication = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
index 9d0980f..e9ceb8b 100644
--- 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
+++ 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/MessageCountApp.scala
@@ -19,8 +19,11 @@
 package io.gearpump.streaming.examples.state
 
 import akka.actor.ActorSystem
+import io.gearpump.streaming.kafka.{KafkaSink, KafkaSource, 
KafkaStorageFactory}
+import io.gearpump.streaming.sink.DataSinkProcessor
+import io.gearpump.streaming.source.DataSourceProcessor
 import io.gearpump.streaming.{StreamApplication, Processor}
-import io.gearpump.streaming.examples.state.processor.{CountProcessor, 
NumberGeneratorProcessor}
+import io.gearpump.streaming.examples.state.processor.CountProcessor
 import io.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
 import io.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
 import io.gearpump.streaming.state.impl.PersistentStateConfig
@@ -33,15 +36,29 @@ import io.gearpump.util.{AkkaApp, Graph}
 import org.apache.hadoop.conf.Configuration
 
 object MessageCountApp extends AkkaApp with ArgumentsParser {
+  val SOURCE_TASK = "sourceTask"
+  val COUNT_TASK = "countTask"
+  val SINK_TASK = "sinkTask"
+  val SOURCE_TOPIC = "sourceTopic"
+  val SINK_TOPIC = "sinkTopic"
+  val ZOOKEEPER_CONNECT = "zookeeperConnect"
+  val BROKER_LIST = "brokerList"
+  val DEFAULT_FS = "defaultFS"
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    "gen" -> CLIOption("<how many gen tasks>", required = false, defaultValue 
= Some(1)),
-    "count" -> CLIOption("<how mange count tasks", required = false, 
defaultValue = Some(1))
+    SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = 
false, defaultValue = Some(1)),
+    COUNT_TASK -> CLIOption("<how many count tasks>", required = false, 
defaultValue = Some(1)),
+    SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = 
false, defaultValue = Some(1)),
+    SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true),
+    SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true),
+    ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. 
localhost:2181/kafka>", required = true),
+    BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. 
localhost:9092>", required = true),
+    DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. 
hdfs://localhost:9000>", required = true)
   )
 
   def application(config: ParseResult)(implicit system: ActorSystem) : 
StreamApplication = {
     val hadoopConfig = new Configuration
-    hadoopConfig.set("fs.defaultFS", "hdfs://localhost:9000")
+    hadoopConfig.set("fs.defaultFS", config.getString(DEFAULT_FS))
     val checkpointStoreFactory = new 
HadoopCheckpointStoreFactory("MessageCount", hadoopConfig,
       // rotate on 1KB
       new FileSizeRotation(1000))
@@ -50,10 +67,18 @@ object MessageCountApp extends AkkaApp with ArgumentsParser 
{
       .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
       .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY, 
checkpointStoreFactory)
 
-    val gen = Processor[NumberGeneratorProcessor](config.getInt("gen"))
-    val count = Processor[CountProcessor](config.getInt("count"), taskConf = 
taskConfig)
+    val zookeeperConnect = config.getString(ZOOKEEPER_CONNECT)
+    val brokerList = config.getString(BROKER_LIST)
+    val offsetStorageFactory = new KafkaStorageFactory(zookeeperConnect, 
brokerList)
+    val sourceTopic = config.getString(SOURCE_TOPIC)
+    val kafkaSource = new KafkaSource(sourceTopic, zookeeperConnect, 
offsetStorageFactory)
+    val sourceProcessor = DataSourceProcessor(kafkaSource, 
config.getInt(SOURCE_TASK))
+    val countProcessor = Processor[CountProcessor](config.getInt(COUNT_TASK), 
taskConf = taskConfig)
+    val kafkaSink = new KafkaSink(config.getString(SINK_TOPIC), brokerList)
+    val sinkProcessor = DataSinkProcessor(kafkaSink, config.getInt(SINK_TASK))
     val partitioner = new HashPartitioner()
-    val app = StreamApplication("MessageCount", Graph(gen ~ partitioner ~> 
count), UserConfig.empty)
+    val graph = Graph(sourceProcessor ~ partitioner ~> countProcessor ~ 
partitioner ~> sinkProcessor)
+    val app = StreamApplication("MessageCount", graph, UserConfig.empty)
     app
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
index ba16261..04cea69 100644
--- 
a/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
+++ 
b/examples/streaming/state/src/main/scala/io/gearpump/streaming/examples/state/processor/CountProcessor.scala
@@ -27,15 +27,15 @@ import io.gearpump.cluster.UserConfig
 import io.gearpump.Message
 
 class CountProcessor(taskContext: TaskContext, conf: UserConfig)
-  extends PersistentTask[Long](taskContext, conf) {
+  extends PersistentTask[Int](taskContext, conf) {
 
-  override def persistentState: PersistentState[Long] = {
-    import com.twitter.algebird.Monoid.longMonoid
-    new NonWindowState[Long](new AlgebirdMonoid(longMonoid), new 
ChillSerializer[Long])
+  override def persistentState: PersistentState[Int] = {
+    import com.twitter.algebird.Monoid.intMonoid
+    new NonWindowState[Int](new AlgebirdMonoid(intMonoid), new 
ChillSerializer[Int])
   }
 
-  override def processMessage(state: PersistentState[Long], message: Message): 
Unit = {
-    state.update(message.timestamp, 1L)
+  override def processMessage(state: PersistentState[Int], message: Message): 
Unit = {
+    state.update(message.timestamp, 1)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
index eb0c4e9..8053f57 100644
--- 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
+++ 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/MessageCountAppSpec.scala
@@ -18,12 +18,14 @@
 
 package io.gearpump.streaming.examples.state
 
+import io.gearpump.streaming.examples.state.MessageCountApp._
 import io.gearpump.cluster.ClientToMaster.SubmitApplication
 import io.gearpump.cluster.MasterToClient.SubmitApplicationResult
 import io.gearpump.cluster.{MasterHarness, TestUtil}
 import org.scalatest.{Matchers, BeforeAndAfter, PropSpec}
 import org.scalatest.prop.PropertyChecks
 
+
 import scala.concurrent.Future
 import scala.util.Success
 
@@ -40,10 +42,17 @@ class MessageCountAppSpec extends PropSpec with 
PropertyChecks with Matchers wit
   override def config = TestUtil.DEFAULT_CONFIG
 
   property("MessageCount should succeed to submit application with required 
arguments") {
-    val requiredArgs = Array.empty[String]
+    val requiredArgs = Array(
+      s"-$SOURCE_TOPIC", "source",
+      s"-$SINK_TOPIC", "sink",
+      s"-$ZOOKEEPER_CONNECT", "localhost:2181",
+      s"-$BROKER_LIST", "localhost:9092",
+      s"-$DEFAULT_FS", "hdfs://localhost:9000"
+    )
     val optionalArgs = Array(
-      "-gen", "2",
-      "-count", "2"
+      s"-$SOURCE_TASK", "2",
+      s"-$COUNT_TASK", "2",
+      s"-$SINK_TASK", "2"
     )
 
     val args = {
@@ -51,9 +60,11 @@ class MessageCountAppSpec extends PropSpec with 
PropertyChecks with Matchers wit
         ("requiredArgs", "optionalArgs"),
         (requiredArgs, optionalArgs.take(0)),
         (requiredArgs, optionalArgs.take(2)),
+        (requiredArgs, optionalArgs.take(4)),
         (requiredArgs, optionalArgs)
       )
     }
+
     val masterReceiver = createMockMaster()
     forAll(args) { (requiredArgs: Array[String], optionalArgs: Array[String]) 
=>
       val args = requiredArgs ++ optionalArgs

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
index c24dd14..cbc9cce 100644
--- 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -21,8 +21,9 @@ package io.gearpump.streaming.examples.state.processor
 import akka.actor.ActorSystem
 import akka.testkit.TestProbe
 import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, 
PersistentStateConfig}
-import io.gearpump.streaming.task.{StartTime, ReportCheckpointClock}
+import io.gearpump.streaming.state.api.PersistentTask
+import io.gearpump.streaming.state.impl.PersistentStateConfig
+import io.gearpump.streaming.task.ReportCheckpointClock
 import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
@@ -33,6 +34,8 @@ import org.scalacheck.Gen
 import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.prop.PropertyChecks
 
+import scala.concurrent.duration._
+
 class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
 
   property("CountProcessor should update state") {
@@ -42,13 +45,14 @@ class CountProcessorSpec extends PropSpec with 
PropertyChecks with Matchers {
     implicit val system = ActorSystem("test")
 
     val longGen = Gen.chooseNum[Long](1, 1000)
-    forAll(longGen, longGen) {
-      (data: Long, num: Long) =>
+    forAll(longGen) {
+      (num: Long) =>
 
         val conf = UserConfig.empty
           .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
           .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
           
.withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
 new InMemoryCheckpointStoreFactory)
+
         val count = new CountProcessor(taskContext, conf)
 
         val appMaster = TestProbe()(system)
@@ -57,19 +61,22 @@ class CountProcessorSpec extends PropSpec with 
PropertyChecks with Matchers {
         count.onStart(StartTime(0L))
         appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
 
-        for (i <- 1L to num) {
-          when(taskContext.upstreamMinClock).thenReturn(0L)
-          count.onNext(Message("" + data, 0L))
-        }
 
-        count.state.get shouldBe Some(num)
+        for (i <- 0L to num) {
+          count.onNext(Message("", i))
+          count.state.get shouldBe Some(i + 1)
+        }
+        // next checkpoint time is at num
+        // not yet
+        when(taskContext.upstreamMinClock).thenReturn(0L)
+        count.onNext(PersistentTask.CHECKPOINT)
+        appMaster.expectNoMsg(10 milliseconds)
 
+        // time to checkpoint
         when(taskContext.upstreamMinClock).thenReturn(num)
-        count.onNext(Message("" + data, num))
+        count.onNext(PersistentTask.CHECKPOINT)
+        // only the state before checkpoint time is checkpointed
         appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
-
-
-        count.state.get shouldBe Some(num + 1)
     }
 
     system.shutdown()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
index bfc1ad0..03bc8fb 100644
--- 
a/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
+++ 
b/examples/streaming/state/src/test/scala/io/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
@@ -22,8 +22,9 @@ import akka.actor.ActorSystem
 import akka.testkit.TestProbe
 import com.twitter.algebird.AveragedValue
 import io.gearpump.streaming.MockUtil
-import io.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, 
WindowConfig, PersistentStateConfig}
-import io.gearpump.streaming.task.{StartTime, ReportCheckpointClock}
+import io.gearpump.streaming.state.api.PersistentTask
+import io.gearpump.streaming.state.impl.{WindowConfig, PersistentStateConfig}
+import io.gearpump.streaming.task.ReportCheckpointClock
 import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
 import io.gearpump.Message
 import io.gearpump.cluster.UserConfig
@@ -34,17 +35,18 @@ import org.scalacheck.Gen
 import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.prop.PropertyChecks
 
+import scala.concurrent.duration._
+
 
 class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with 
Matchers {
   property("WindowAverageProcessor should update state") {
 
-    val taskContext = MockUtil.mockTaskContext
-
     implicit val system = ActorSystem("test")
-
     val longGen = Gen.chooseNum[Long](1, 1000)
     forAll(longGen, longGen) {
       (data: Long, num: Long) =>
+        val taskContext = MockUtil.mockTaskContext
+
         val windowSize = num
         val windowStep = num
 
@@ -52,7 +54,6 @@ class WindowAverageProcessorSpec extends PropSpec with 
PropertyChecks with Match
             .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
             .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, num)
             
.withValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
 new InMemoryCheckpointStoreFactory)
-
             .withValue(WindowConfig.NAME, WindowConfig(windowSize, windowStep))
 
         val windowAverage = new WindowAverageProcessor(taskContext, conf)
@@ -63,19 +64,21 @@ class WindowAverageProcessorSpec extends PropSpec with 
PropertyChecks with Match
         windowAverage.onStart(StartTime(0L))
         appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, 0L))
 
-        for (i <- 1L to num) {
-          when(taskContext.upstreamMinClock).thenReturn(0L)
-          windowAverage.onNext(Message("" + data, 0L))
+        for (i <- 0L until num) {
+          windowAverage.onNext(Message("" + data, i))
+          windowAverage.state.get shouldBe Some(AveragedValue(i + 1, data))
         }
 
-        windowAverage.state.get shouldBe Some(AveragedValue(num, data))
+        // next checkpoint time is at num
+        // not yet
+        when(taskContext.upstreamMinClock).thenReturn(0L)
+        windowAverage.onNext(PersistentTask.CHECKPOINT)
+        appMaster.expectNoMsg(10 milliseconds)
 
+        // time to checkpoint
         when(taskContext.upstreamMinClock).thenReturn(num)
-        windowAverage.onNext(Message("" + data, num))
+        windowAverage.onNext(PersistentTask.CHECKPOINT)
         appMaster.expectMsg(ReportCheckpointClock(taskContext.taskId, num))
-
-
-        windowAverage.state.get shouldBe Some(AveragedValue(1L, data))
     }
 
     system.shutdown()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
new file mode 100644
index 0000000..3f029ef
--- /dev/null
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/MessageDeliverySpec.scala
@@ -0,0 +1,78 @@
+package io.gearpump.integrationtest.checklist
+
+import io.gearpump.integrationtest.hadoop.HadoopCluster._
+import io.gearpump.integrationtest.{Util, TestSpecBase}
+import io.gearpump.integrationtest.kafka.{SimpleKafkaReader, 
MessageLossDetector, NumericalDataProducer}
+import io.gearpump.integrationtest.kafka.KafkaCluster._
+
+class MessageDeliverySpec extends TestSpecBase {
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+  }
+
+  override def afterEach() = {
+    super.afterEach()
+  }
+
+  "Gearpump" should {
+    "support exactly-once message delivery" in {
+      withKafkaCluster(cluster) { kafkaCluster =>
+        // setup
+        val sourcePartitionNum = 1
+        val sourceTopic = "topic1"
+        val sinkTopic = "topic2"
+
+        // Generate number sequence (1, 2, 3, ...) to the topic
+        kafkaCluster.createTopic(sourceTopic, sourcePartitionNum)
+
+        withDataProducer(sourceTopic, kafkaCluster.getBrokerListConnectString) 
{ producer =>
+
+          withHadoopCluster { hadoopCluster =>
+            // exercise
+            val args = 
Array("io.gearpump.streaming.examples.state.MessageCountApp",
+              "-defaultFS", hadoopCluster.getDefaultFS,
+              "-zookeeperConnect", kafkaCluster.getZookeeperConnectString,
+              "-brokerList", kafkaCluster.getBrokerListConnectString,
+              "-sourceTopic", sourceTopic,
+              "-sinkTopic", sinkTopic,
+              "-sourceTask", sourcePartitionNum).mkString(" ")
+            val appId = restClient.getNextAvailableAppId()
+
+            val stateJar = cluster.queryBuiltInExampleJars("state-").head
+            val success = restClient.submitApp(stateJar, args)
+            success shouldBe true
+
+            // verify #1
+            expectAppIsRunning(appId, "MessageCount")
+            Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 
0)
+
+            // wait for checkpoint to take place
+            Thread.sleep(1000)
+
+            // verify #2
+            val executorToKill = 
restClient.queryExecutorBrief(appId).map(_.executorId).max
+            restClient.killExecutor(appId, executorToKill) shouldBe true
+            
Util.retryUntil(restClient.queryExecutorBrief(appId).map(_.executorId).max > 
executorToKill)
+
+            producer.stop()
+
+            // verify #3
+            val count = kafkaCluster.getLatestOffset(sourceTopic) + 1
+            val detector = new MessageLossDetector(count)
+            val kafkaReader = new SimpleKafkaReader(detector, sinkTopic,
+              host = kafkaCluster.advertisedHost, port = 
kafkaCluster.advertisedPort)
+            Util.retryUntil({
+              kafkaReader.read()
+              detector.received(count)
+            })
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
index 3baabf6..ef737a0 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/StormCompatibilitySpec.scala
@@ -17,7 +17,7 @@
  */
 package io.gearpump.integrationtest.checklist
 
-import io.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, 
NumericalDataProducer, SimpleKafkaReader}
+import io.gearpump.integrationtest.kafka.{KafkaCluster, MessageLossDetector, 
SimpleKafkaReader}
 import io.gearpump.integrationtest.storm.StormClient
 import io.gearpump.integrationtest.{TestSpecBase, Util}
 
@@ -43,27 +43,6 @@ class StormCompatibilitySpec extends TestSpecBase {
     testCode("010")
   }
 
-  def withKafkaCluster(testCode: KafkaCluster => Unit): Unit = {
-    val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka")
-    try {
-      kafkaCluster.start()
-      testCode(kafkaCluster)
-    } finally {
-      kafkaCluster.shutDown()
-    }
-  }
-
-  def withDataProducer(topic: String, brokerList: String)
-      (testCode: NumericalDataProducer => Unit): Unit = {
-    val producer = new NumericalDataProducer(topic, brokerList)
-    try {
-      producer.start()
-      testCode(producer)
-    } finally {
-      producer.stop()
-    }
-  }
-
   def getTopologyName(name: String, stormVersion: String): String = {
     s"${name}_$stormVersion"
   }
@@ -146,7 +125,8 @@ class StormCompatibilitySpec extends TestSpecBase {
         val topologyName = getTopologyName("storm_kafka", stormVersion)
         val stormKafkaTopology = 
s"io.gearpump.integrationtest.storm.Storm${stormVersion}KafkaTopology"
 
-        withKafkaCluster {
+        import KafkaCluster._
+        withKafkaCluster(cluster) {
           kafkaCluster =>
             val sourcePartitionNum = 2
             val sinkPartitionNum = 1

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
index c7a2c3e..b202d01 100644
--- 
a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
+++ 
b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/suites/StandaloneModeSuite.scala
@@ -33,7 +33,8 @@ class StandaloneModeSuite extends Suites(
   new ExampleSpec,
   new DynamicDagSpec,
   new StabilitySpec,
-  new StormCompatibilitySpec
+  new StormCompatibilitySpec,
+  new MessageDeliverySpec
 
 ) with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
new file mode 100644
index 0000000..1f14094
--- /dev/null
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/hadoop/HadoopCluster.scala
@@ -0,0 +1,41 @@
+package io.gearpump.integrationtest.hadoop
+
+import io.gearpump.integrationtest.Docker
+import org.apache.log4j.Logger
+
+object HadoopCluster {
+
+  def withHadoopCluster(testCode: HadoopCluster => Unit): Unit = {
+    val hadoopCluster = new HadoopCluster
+    try {
+      hadoopCluster.start()
+      testCode(hadoopCluster)
+    } finally {
+      hadoopCluster.shutDown()
+    }
+  }
+}
+/**
+ * This class maintains a single node Hadoop cluster
+ */
+class HadoopCluster {
+
+  private val LOG = Logger.getLogger(getClass)
+  private val HADOOP_DOCKER_IMAGE = "sequenceiq/hadoop-docker:2.6.0"
+  private val HADOOP_HOST = "hadoop0"
+
+  def start(): Unit = {
+    Docker.createAndStartContainer(HADOOP_HOST, HADOOP_DOCKER_IMAGE, "")
+    LOG.info("Hadoop cluster is started.")
+  }
+
+  def getDefaultFS: String = {
+    val hostIPAddr = Docker.getContainerIPAddr(HADOOP_HOST)
+    s"hdfs://$hostIPAddr:9000"
+  }
+
+  def shutDown(): Unit = {
+    Docker.killAndRemoveContainer(HADOOP_HOST)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
index 4e3ebe2..b053ffe 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/KafkaCluster.scala
@@ -17,9 +17,35 @@
  */
 package io.gearpump.integrationtest.kafka
 
+import io.gearpump.integrationtest.minicluster.MiniCluster
 import io.gearpump.integrationtest.{Docker, Util}
 import org.apache.log4j.Logger
 
+object KafkaCluster {
+
+  def withKafkaCluster(cluster: MiniCluster)(testCode: KafkaCluster => Unit): 
Unit = {
+    val kafkaCluster = new KafkaCluster(cluster.getNetworkGateway, "kafka")
+    try {
+      kafkaCluster.start()
+      testCode(kafkaCluster)
+    } finally {
+      kafkaCluster.shutDown()
+    }
+  }
+
+  def withDataProducer(topic: String, brokerList: String)
+                      (testCode: NumericalDataProducer => Unit): Unit = {
+    val producer = new NumericalDataProducer(topic, brokerList)
+    try {
+      producer.start()
+      testCode(producer)
+    } finally {
+      producer.stop()
+    }
+  }
+
+}
+
 /**
  * This class maintains a single node Kafka cluster with integrated Zookeeper.
  */

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
index db3fe3a..13de780 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/NumericalDataProducer.scala
@@ -19,7 +19,7 @@ package io.gearpump.integrationtest.kafka
 
 import java.util.Properties
 
-import com.twitter.bijection.Injection
+import io.gearpump.streaming.serializer.ChillSerializer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.log4j.Logger
@@ -29,6 +29,7 @@ class NumericalDataProducer(topic: String, bootstrapServers: 
String) {
   private val LOG = Logger.getLogger(getClass)
   private val producer = createProducer
   private val WRITE_SLEEP_NANOS = 10
+  private val serializer = new ChillSerializer[Int]
   var lastWriteNum = 0
 
   def start(): Unit = {
@@ -54,7 +55,7 @@ class NumericalDataProducer(topic: String, bootstrapServers: 
String) {
       try {
         while (!Thread.currentThread.isInterrupted) {
           lastWriteNum += 1
-          val msg = Injection[String, Array[Byte]](lastWriteNum.toString)
+          val msg = serializer.serialize(lastWriteNum)
           val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, msg)
           producer.send(record)
           Thread.sleep(0, WRITE_SLEEP_NANOS)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
index 58aa914..7330529 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/ResultVerifier.scala
@@ -20,14 +20,13 @@ package io.gearpump.integrationtest.kafka
 import scala.collection.mutable
 
 trait ResultVerifier {
-  def onNext(msg: String): Unit
+  def onNext(num: Int): Unit
 }
 
 class MessageLossDetector(totalNum: Int) extends ResultVerifier {
   private val bitSets = new mutable.BitSet(totalNum)
 
-  override def onNext(msg: String): Unit = {
-    val num = msg.toInt
+  override def onNext(num: Int): Unit = {
     bitSets.add(num)
   }
 
@@ -35,4 +34,8 @@ class MessageLossDetector(totalNum: Int) extends 
ResultVerifier {
     1.to(totalNum).forall(bitSets)
   }
 
+  def received(num: Int): Boolean = {
+    bitSets(num)
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
index 73c60e0..e2e1dbc 100644
--- 
a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
+++ 
b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/kafka/SimpleKafkaReader.scala
@@ -17,17 +17,17 @@
  */
 package io.gearpump.integrationtest.kafka
 
-import com.twitter.bijection.Injection
+import io.gearpump.streaming.serializer.ChillSerializer
 import kafka.api.FetchRequestBuilder
 import kafka.consumer.SimpleConsumer
 import kafka.utils.Utils
-
 import scala.util.{Failure, Success}
 
 class SimpleKafkaReader(verifier: ResultVerifier, topic: String, partition: 
Int = 0,
   host: String, port: Int) {
 
   private val consumer = new SimpleConsumer(host, port, 100000, 64 * 1024, "")
+  private val serializer = new ChillSerializer[Int]
   private var offset = 0L
 
   def read(): Unit = {
@@ -36,7 +36,7 @@ class SimpleKafkaReader(verifier: ResultVerifier, topic: 
String, partition: Int
     ).messageSet(topic, partition)
 
     for (messageAndOffset <- messageSet) {
-      Injection.invert[String, 
Array[Byte]](Utils.readBytes(messageAndOffset.message.payload)) match {
+      
serializer.deserialize(Utils.readBytes(messageAndOffset.message.payload)) match 
{
         case Success(msg) =>
           offset = messageAndOffset.nextOffset
           verifier.onNext(msg)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/project/BuildExample.scala
----------------------------------------------------------------------
diff --git a/project/BuildExample.scala b/project/BuildExample.scala
index e239c41..0bee08e 100644
--- a/project/BuildExample.scala
+++ b/project/BuildExample.scala
@@ -176,7 +176,7 @@ object BuildExample extends sbt.Build {
       target in assembly := baseDirectory.value.getParentFile.getParentFile / 
"target" /
           CrossVersion.binaryScalaVersion(scalaVersion.value)
     )
-  ) dependsOn (streaming % "test->test; provided", external_hadoopfs, 
external_monoid, external_serializer)
+  ) dependsOn (streaming % "test->test; provided", external_hadoopfs, 
external_monoid, external_serializer, external_kafka)
 
   lazy val pagerank = Project(
     id = "gearpump-examples-pagerank",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/project/BuildIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/project/BuildIntegrationTest.scala 
b/project/BuildIntegrationTest.scala
index acdb708..17c7a3a 100644
--- a/project/BuildIntegrationTest.scala
+++ b/project/BuildIntegrationTest.scala
@@ -34,7 +34,8 @@ object BuildIntegrationTest extends sbt.Build {
         streaming % "test->test; provided",
         services % "test->test; provided",
         external_kafka,
-        storm
+        storm,
+        external_serializer
       )
 
   // integration test for Storm 0.9.x

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala 
b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
index c8a5fd1..fbf507f 100644
--- 
a/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
+++ 
b/streaming/src/main/scala/io/gearpump/streaming/state/api/PersistentTask.scala
@@ -18,12 +18,22 @@
 
 package io.gearpump.streaming.state.api
 
+import java.util.concurrent.TimeUnit
+
 import io.gearpump.cluster.UserConfig
 import io.gearpump.streaming.state.impl.{PersistentStateConfig, 
CheckpointManager}
 import io.gearpump.streaming.task.{ReportCheckpointClock, StartTime, Task, 
TaskContext}
 import io.gearpump.streaming.transaction.api.CheckpointStoreFactory
+import io.gearpump.util.LogUtil
 import io.gearpump.{Message, TimeStamp}
 
+import scala.concurrent.duration.FiniteDuration
+
+object PersistentTask {
+  val CHECKPOINT = Message("checkpoint")
+  val LOG = LogUtil.getLogger(getClass)
+}
+
 /**
  * PersistentTask is part of the transaction API
  *
@@ -32,11 +42,15 @@ import io.gearpump.{Message, TimeStamp}
  */
 abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
+  import io.gearpump.streaming.state.api.PersistentTask._
+  import taskContext._
 
   val checkpointStoreFactory = 
conf.getValue[CheckpointStoreFactory](PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
   val checkpointStore = checkpointStoreFactory.getCheckpointStore(conf, 
taskContext)
   val checkpointInterval = 
conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
   val checkpointManager = new CheckpointManager(checkpointInterval, 
checkpointStore)
+  // system time interval to attempt checkpoint
+  private val checkpointAttemptInterval = 1000L
 
   /**
    * subclass should override this method to pass in
@@ -64,24 +78,29 @@ abstract class PersistentTask[T](taskContext: TaskContext, 
conf: UserConfig)
     checkpointManager
       .recover(timestamp)
       .foreach(state.recover(timestamp, _))
-    state.setNextCheckpointTime(checkpointManager.getCheckpointTime)
+
     reportCheckpointClock(timestamp)
+    scheduleCheckpoint(checkpointAttemptInterval)
   }
 
   final override def onNext(message: Message): Unit = {
-    val checkpointTime = checkpointManager.getCheckpointTime
-
-    processMessage(state, message)
-
-    checkpointManager.update(message.timestamp)
-    val upstreamMinClock = taskContext.upstreamMinClock
-    if (checkpointManager.shouldCheckpoint(upstreamMinClock)) {
-      val serialized = state.checkpoint()
-      checkpointManager.checkpoint(checkpointTime, serialized)
-      reportCheckpointClock(checkpointTime)
-
-      val nextCheckpointTime = checkpointManager.updateCheckpointTime()
-      state.setNextCheckpointTime(nextCheckpointTime)
+    message match {
+      case CHECKPOINT =>
+        val upstreamMinClock = taskContext.upstreamMinClock
+        if (checkpointManager.shouldCheckpoint(upstreamMinClock)) {
+          checkpointManager.getCheckpointTime.foreach { checkpointTime =>
+            val serialized = state.checkpoint()
+            checkpointManager.checkpoint(checkpointTime, serialized)
+              .foreach(state.setNextCheckpointTime)
+            taskContext.output(Message(serialized, checkpointTime))
+            reportCheckpointClock(checkpointTime)
+          }
+        }
+        scheduleCheckpoint(checkpointAttemptInterval)
+      case _ =>
+        checkpointManager.update(message.timestamp)
+          .foreach(state.setNextCheckpointTime)
+        processMessage(state, message)
     }
   }
 
@@ -89,7 +108,11 @@ abstract class PersistentTask[T](taskContext: TaskContext, 
conf: UserConfig)
     checkpointManager.close()
   }
 
+  private def scheduleCheckpoint(interval: Long): Unit = {
+    scheduleOnce(new FiniteDuration(interval, TimeUnit.MILLISECONDS))(self ! 
CHECKPOINT)
+  }
+
   private def reportCheckpointClock(timestamp: TimeStamp): Unit = {
-    taskContext.appMaster ! ReportCheckpointClock(taskContext.taskId, 
timestamp)
+    appMaster ! ReportCheckpointClock(taskContext.taskId, timestamp)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
 
b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
index 5f7d86d..5fcbbcb 100644
--- 
a/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
+++ 
b/streaming/src/main/scala/io/gearpump/streaming/state/impl/CheckpointManager.scala
@@ -21,40 +21,39 @@ package io.gearpump.streaming.state.impl
 import io.gearpump.TimeStamp
 import io.gearpump.streaming.transaction.api.CheckpointStore
 
-
 class CheckpointManager(checkpointInterval: Long,
     checkpointStore: CheckpointStore) {
 
-  private var maxMessageTime = 0L
-  private var checkpointTime = checkpointInterval
-  private var lastCheckpointTime = 0L
+  private var maxMessageTime: Long = 0L
+  private var checkpointTime: Option[Long] = None
 
   def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
-    checkpointTime = (timestamp / checkpointInterval + 1) * checkpointInterval
     checkpointStore.recover(timestamp)
   }
 
-  def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
+  def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): 
Option[TimeStamp] = {
     checkpointStore.persist(timestamp, checkpoint)
-    lastCheckpointTime = checkpointTime
+    checkpointTime = checkpointTime.collect { case time if maxMessageTime > 
time =>
+     time + (1 + (maxMessageTime - time) / checkpointInterval) * 
checkpointInterval
+    }
+
+    checkpointTime
   }
 
-  def update(messageTime: TimeStamp): Unit = {
+  def update(messageTime: TimeStamp): Option[TimeStamp] = {
     maxMessageTime = Math.max(maxMessageTime, messageTime)
+    if (checkpointTime.isEmpty) {
+      checkpointTime = Some((1 + messageTime / checkpointInterval) * 
checkpointInterval)
+    }
+
+    checkpointTime
   }
 
   def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = {
-    upstreamMinClock >= checkpointTime && checkpointTime > lastCheckpointTime
+    checkpointTime.exists(time => upstreamMinClock >= time)
   }
 
-  def getCheckpointTime: TimeStamp = checkpointTime
-
-  def updateCheckpointTime(): TimeStamp = {
-    if (maxMessageTime >= checkpointTime) {
-      checkpointTime += (1 + (maxMessageTime - checkpointTime) / 
checkpointInterval) * checkpointInterval
-    }
-    checkpointTime
-  }
+  def getCheckpointTime: Option[TimeStamp] = checkpointTime
 
   def close(): Unit = {
     checkpointStore.close()

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/313b6c45/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
index 6e6f915..483200f 100644
--- 
a/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
+++ 
b/streaming/src/test/scala/io/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
@@ -40,7 +40,6 @@ class CheckpointManagerSpec extends PropSpec with 
PropertyChecks with Matchers w
         checkpointManager.recover(timestamp)
 
         verify(checkpointStore).recover(timestamp)
-        checkpointManager.getCheckpointTime - timestamp should be <= 
checkpointInterval
     }
   }
 
@@ -69,22 +68,21 @@ class CheckpointManagerSpec extends PropSpec with 
PropertyChecks with Matchers w
   }
 
   property("CheckpointManager should update checkpoint time according to max 
message timestamp") {
-    val timestampListGen = Gen.containerOf[Array, TimeStamp](timestampGen) 
suchThat (_.nonEmpty)
-    forAll(timestampListGen, checkpointIntervalGen) {
-      (timestamps: Array[TimeStamp], checkpointInterval: Long) =>
+    forAll(timestampGen, checkpointIntervalGen) {
+      (timestamp: TimeStamp, checkpointInterval: Long) =>
         val checkpointStore = mock[CheckpointStore]
         val checkpointManager =
           new CheckpointManager(checkpointInterval, checkpointStore)
-        timestamps.foreach(checkpointManager.update)
-        val maxTimestamp = timestamps.max
-        checkpointManager.getMaxMessageTime shouldBe maxTimestamp
+        checkpointManager.update(timestamp)
+        checkpointManager.getMaxMessageTime shouldBe timestamp
+
+        val checkpointTime = checkpointManager.getCheckpointTime.get
+        timestamp should (be < checkpointTime and be >= (checkpointTime - 
checkpointInterval))
 
-        val checkpointTime = checkpointManager.getCheckpointTime
         checkpointManager.checkpoint(checkpointTime, Array.empty[Byte])
         verify(checkpointStore).persist(MockitoMatchers.eq(checkpointTime),
           MockitoMatchers.anyObject[Array[Byte]]())
-        val newCheckpointTime = checkpointManager.updateCheckpointTime()
-        maxTimestamp should (be < newCheckpointTime and be >= 
(newCheckpointTime - checkpointInterval))
+        checkpointManager.getCheckpointTime shouldBe empty
     }
   }
 


Reply via email to