Repository: incubator-gearpump
Updated Branches:
  refs/heads/sql 54686e0e2 -> 1cf87bf77


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
index 2591856..cecf127 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
@@ -18,9 +18,7 @@
 
 package org.apache.gearpump.streaming.state.impl
 
-import org.apache.gearpump.TimeStamp
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, 
CheckpointStoreFactory}
 
 /**
@@ -28,18 +26,18 @@ import 
org.apache.gearpump.streaming.transaction.api.{CheckpointStore, Checkpoin
  * should not be used in real cases
  */
 class InMemoryCheckpointStore extends CheckpointStore {
-  private var checkpoints = Map.empty[TimeStamp, Array[Byte]]
+  private var checkpoints = Map.empty[MilliSeconds, Array[Byte]]
 
-  override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
+  override def persist(timestamp: MilliSeconds, checkpoint: Array[Byte]): Unit 
= {
     checkpoints += timestamp -> checkpoint
   }
 
-  override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+  override def recover(timestamp: MilliSeconds): Option[Array[Byte]] = {
     checkpoints.get(timestamp)
   }
 
   override def close(): Unit = {
-    checkpoints = Map.empty[TimeStamp, Array[Byte]]
+    checkpoints = Map.empty[MilliSeconds, Array[Byte]]
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
index b161713..1393f4a 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.state.impl
 
 import org.slf4j.Logger
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.state.api.{Monoid, MonoidState, 
Serializer}
 import org.apache.gearpump.streaming.state.impl.NonWindowState._
 import org.apache.gearpump.util.LogUtil
@@ -35,11 +35,11 @@ object NonWindowState {
 class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T])
   extends MonoidState[T](monoid) {
 
-  override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
+  override def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit = {
     serializer.deserialize(bytes).foreach(left = _)
   }
 
-  override def update(timestamp: TimeStamp, t: T): Unit = {
+  override def update(timestamp: MilliSeconds, t: T): Unit = {
     updateState(timestamp, t)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
index c1f647e..0318d3d 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.gearpump.streaming.state.impl
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
  * Used in window applications
@@ -29,10 +29,10 @@ class Window(val windowSize: Long, val windowStep: Long) {
     this(windowConfig.windowSize, windowConfig.windowStep)
   }
 
-  private var clock: TimeStamp = 0L
+  private var clock: MilliSeconds = 0L
   private var startTime = 0L
 
-  def update(clock: TimeStamp): Unit = {
+  def update(clock: MilliSeconds): Unit = {
     this.clock = clock
   }
 
@@ -40,7 +40,7 @@ class Window(val windowSize: Long, val windowStep: Long) {
     startTime += windowStep
   }
 
-  def slideTo(timestamp: TimeStamp): Unit = {
+  def slideTo(timestamp: MilliSeconds): Unit = {
     startTime = timestamp / windowStep * windowStep
   }
 
@@ -48,7 +48,7 @@ class Window(val windowSize: Long, val windowStep: Long) {
     clock >= (startTime + windowSize)
   }
 
-  def range: (TimeStamp, TimeStamp) = {
+  def range: (MilliSeconds, MilliSeconds) = {
     startTime -> (startTime + windowSize)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
index 348f09e..a73b6db 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
@@ -22,7 +22,7 @@ import scala.collection.immutable.TreeMap
 
 import org.slf4j.Logger
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.state.api.{Group, MonoidState, Serializer}
 import org.apache.gearpump.streaming.state.impl.WindowState._
 import org.apache.gearpump.streaming.task.TaskContext
@@ -31,7 +31,7 @@ import org.apache.gearpump.util.LogUtil
 /**
  * an interval is a dynamic time range that is divided by window boundary and 
checkpoint time
  */
-case class Interval(startTime: TimeStamp, endTime: TimeStamp) extends 
Ordered[Interval] {
+case class Interval(startTime: MilliSeconds, endTime: MilliSeconds) extends 
Ordered[Interval] {
   override def compare(that: Interval): Int = {
     if (startTime < that.startTime) -1
     else if (startTime > that.startTime) 1
@@ -63,7 +63,7 @@ class WindowState[T](group: Group[T],
 
   private var lastCheckpointTime = 0L
 
-  override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
+  override def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit = {
     window.slideTo(timestamp)
     serializer.deserialize(bytes)
       .foreach { states =>
@@ -74,7 +74,7 @@ class WindowState[T](group: Group[T],
       }
   }
 
-  override def update(timestamp: TimeStamp, t: T): Unit = {
+  override def update(timestamp: MilliSeconds, t: T): Unit = {
     val (startTime, endTime) = window.range
     if (timestamp >= startTime && timestamp < endTime) {
       updateState(timestamp, t)
@@ -127,7 +127,7 @@ class WindowState[T](group: Group[T],
    * upperBound2 = step * Nmin2 + size > t
    * }}}
    */
-  private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: 
TimeStamp): Interval = {
+  private[impl] def getInterval(timestamp: MilliSeconds, checkpointTime: 
MilliSeconds): Interval = {
     val windowSize = window.windowSize
     val windowStep = window.windowStep
     val lowerBound1 = timestamp / windowStep * windowStep
@@ -147,8 +147,8 @@ class WindowState[T](group: Group[T],
     }
   }
 
-  private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, 
checkpointTime: TimeStamp)
-  : Unit = {
+  private[impl] def updateIntervalStates(timestamp: MilliSeconds, t: T,
+      checkpointTime: MilliSeconds): Unit = {
     val interval = getInterval(timestamp, checkpointTime)
     intervalStates.get(interval) match {
       case Some(st) =>
@@ -158,7 +158,7 @@ class WindowState[T](group: Group[T],
     }
   }
 
-  private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp)
+  private[impl] def getIntervalStates(startTime: MilliSeconds, endTime: 
MilliSeconds)
   : TreeMap[Interval, T] = {
     intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime 
<= endTime)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
index 675d5cc..c3e3b14 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
@@ -19,9 +19,9 @@ package org.apache.gearpump.streaming.task
 
 import java.io.{DataInput, DataOutput}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
-case class SerializedMessage(timeStamp: TimeStamp, bytes: Array[Byte])
+case class SerializedMessage(timeStamp: MilliSeconds, bytes: Array[Byte])
 
 class SerializedMessageSerializer extends 
TaskMessageSerializer[SerializedMessage] {
   override def getLength(obj: SerializedMessage): Int = 12 + obj.bytes.length

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index 8a6f04f..24f1763 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -20,13 +20,14 @@ package org.apache.gearpump.streaming.task
 
 import org.slf4j.Logger
 import com.google.common.primitives.Shorts
+import org.apache.gearpump.{Message, Time}
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, 
Partitioner, UnicastPartitioner}
 import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException
 import org.apache.gearpump.streaming.LifeTime
 import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task.Subscription._
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
 
 /**
  * Manages the output and message clock for single downstream processor
@@ -59,9 +60,9 @@ class Subscription(
   private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
   private val processingWatermarkSince: Array[Short] = new 
Array[Short](parallelism)
 
-  private val outputWatermark: Array[TimeStamp] = Array.fill(parallelism)(
+  private val outputWatermark: Array[MilliSeconds] = Array.fill(parallelism)(
     Watermark.MIN.toEpochMilli)
-  private val processingWatermark: Array[TimeStamp] = Array.fill(parallelism)(
+  private val processingWatermark: Array[MilliSeconds] = 
Array.fill(parallelism)(
     Watermark.MIN.toEpochMilli)
 
   private var maxPendingCount: Short = 0
@@ -135,7 +136,7 @@ class Subscription(
     }
   }
 
-  private var lastFlushTime: Long = MIN_TIME_MILLIS
+  private var lastFlushTime: Long = Time.MIN_TIME_MILLIS
   private val FLUSH_INTERVAL = 5 * 1000 // ms
   private def needFlush: Boolean = {
     System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL &&
@@ -181,7 +182,7 @@ class Subscription(
     }
   }
 
-  def watermark: TimeStamp = {
+  def watermark: MilliSeconds = {
     outputWatermark.min
   }
 
@@ -189,7 +190,7 @@ class Subscription(
     maxPendingCount < maxPendingMessageCount
   }
 
-  def onStallingTime(stallingTime: TimeStamp): Unit = {
+  def onStallingTime(stallingTime: MilliSeconds): Unit = {
     outputWatermark.indices.foreach { i =>
       if (outputWatermark(i) == stallingTime &&
         pendingMessageCount(i) > 0 &&

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
index dc80511..b587cc7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
@@ -23,10 +23,11 @@ import java.time.Instant
 import scala.concurrent.duration.FiniteDuration
 import akka.actor.Actor.Receive
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
 
 /**
  * This provides context information for a task.
@@ -113,7 +114,7 @@ trait TaskContext {
    *
    * @return the min clock
    */
-  def upstreamMinClock: TimeStamp
+  def upstreamMinClock: MilliSeconds
 
   /**
    * Update TaskActor with the processing progress (watermark)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 1fb61bd..fb2aaed 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit
 
 import akka.actor._
 import com.gs.collections.impl.map.mutable.primitive.IntShortHashMap
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.source.Watermark
 import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
@@ -35,7 +37,6 @@ import org.apache.gearpump.streaming.ExecutorToAppMaster._
 import org.apache.gearpump.streaming.ProcessorId
 import org.apache.gearpump.streaming.task.TaskActor._
 import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
-import org.apache.gearpump.{Message, TimeStamp}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
@@ -141,7 +142,7 @@ class TaskActor(
       context.become(waitForStartTask(startClock))
   }
 
-  def waitForStartTask(startClock: TimeStamp): Receive = {
+  def waitForStartTask(startClock: MilliSeconds): Receive = {
     case start@StartTask(tid) =>
       assert(tid == this.taskId, s"$start sent to the wrong task 
${this.taskId}")
       onStartTask(startClock)
@@ -227,7 +228,7 @@ class TaskActor(
   /**
    * Returns min clock of upstream task
    */
-  def getUpstreamMinClock: TimeStamp = upstreamWatermark.toEpochMilli
+  def getUpstreamMinClock: MilliSeconds = upstreamWatermark.toEpochMilli
 
   def getProcessingWatermark: Instant = processingWatermark
 
@@ -265,7 +266,7 @@ class TaskActor(
     count
   }
 
-  private def onStartTask(startClock: TimeStamp): Unit = {
+  private def onStartTask(startClock: MilliSeconds): Unit = {
     LOG.info(s"received start, clock: $startClock, sessionId: $sessionId")
     subscriptions = taskContextData.subscribers.map { subscriber =>
       (subscriber.processorId,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
index c2e2faa..4ba9315 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.task
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.ProcessorId
 
 /*
@@ -42,25 +42,25 @@ case class Ack(taskId: TaskId, seq: Short, 
actualReceivedNum: Short, sessionId:
 
 sealed trait ClockEvent
 
-case class UpdateClock(taskId: TaskId, time: TimeStamp) extends ClockEvent
+case class UpdateClock(taskId: TaskId, time: MilliSeconds) extends ClockEvent
 
 object GetLatestMinClock extends ClockEvent
 
 case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent
 
-case class UpdateCheckpointClock(taskId: TaskId, clock: TimeStamp) extends 
ClockEvent
+case class UpdateCheckpointClock(taskId: TaskId, clock: MilliSeconds) extends 
ClockEvent
 
 case object GetCheckpointClock extends ClockEvent
 
-case class CheckpointClock(clock: Option[TimeStamp])
+case class CheckpointClock(clock: Option[MilliSeconds])
 
-case class UpstreamMinClock(latestMinClock: Option[TimeStamp])
+case class UpstreamMinClock(latestMinClock: Option[MilliSeconds])
 
-case class LatestMinClock(clock: TimeStamp)
+case class LatestMinClock(clock: MilliSeconds)
 
 case object GetStartClock
 
-case class StartClock(clock: TimeStamp)
+case class StartClock(clock: MilliSeconds)
 
 case object EndingClock
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
index 82cae96..1e4430b 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -23,10 +23,11 @@ import java.time.Instant
 import scala.concurrent.duration.FiniteDuration
 import akka.actor.Actor._
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
-import org.slf4j.Logger
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.slf4j.Logger
 
 /**
  * This provides TaskContext for user defined tasks
@@ -107,7 +108,7 @@ class TaskWrapper(
     task.map(_.receiveUnManagedMessage).getOrElse(defaultMessageHandler)
   }
 
-  override def upstreamMinClock: TimeStamp = {
+  override def upstreamMinClock: MilliSeconds = {
     actor.getUpstreamMinClock
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
index 4650ac2..8d026db 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
@@ -18,9 +18,7 @@
 
 package org.apache.gearpump.streaming.transaction.api
 
-import org.apache.gearpump.TimeStamp
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
  * CheckpointStore persistently stores mapping of timestamp to checkpoint
@@ -29,9 +27,9 @@ import org.apache.gearpump.streaming.task.TaskContext
  */
 trait CheckpointStore {
 
-  def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit
+  def persist(timeStamp: MilliSeconds, checkpoint: Array[Byte]): Unit
 
-  def recover(timestamp: TimeStamp): Option[Array[Byte]]
+  def recover(timestamp: MilliSeconds): Option[Array[Byte]]
 
   def close(): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
index 2ddca3a..856b1c5 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
@@ -18,12 +18,13 @@
 
 package org.apache.gearpump.streaming.transaction.api
 
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
- * TimeStampFilter filters out messages that are obsolete.
+ * MilliSecondsFilter filters out messages that are obsolete.
  */
 trait TimeStampFilter extends java.io.Serializable {
-  def filter(msg: Message, predicate: TimeStamp): Option[Message]
+  def filter(msg: Message, predicate: MilliSeconds): Option[Message]
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
index c8478f6..d75a1a0 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
@@ -20,7 +20,6 @@ package org.apache.gearpump.streaming
 import akka.actor._
 import akka.testkit.TestActorRef
 import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import org.apache.gearpump.cluster.appmaster.ApplicationRuntimeInfo
 import org.apache.gearpump.cluster.scheduler.Resource
 import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, 
MiniCluster, UserConfig}
 import org.apache.gearpump.streaming.appmaster.AppMaster

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index 4faa058..8819c0c 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -17,10 +17,8 @@
  */
 package org.apache.gearpump.streaming.appmaster
 
-
 import akka.actor.{ActorRef, ActorSystem, Props}
 import akka.testkit.{TestActorRef, TestProbe}
-import com.typesafe.config.ConfigFactory
 import org.apache.gearpump.cluster.AppMasterToMaster._
 import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import org.apache.gearpump.cluster.ClientToMaster.GetLastFailure
@@ -40,7 +38,7 @@ import 
org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, Ta
 import org.apache.gearpump.streaming.task.{TaskContext, _}
 import org.apache.gearpump.streaming.{DAG, Processor, StreamApplication}
 import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem
-import org.apache.gearpump.util.{ActorUtil, Constants, Graph}
+import org.apache.gearpump.util.{ActorUtil, Graph}
 import org.apache.gearpump.util.Graph._
 import org.scalatest._
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
index adde927..7020210 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -18,7 +18,6 @@
 
 package org.apache.gearpump.streaming.appmaster
 
-
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
@@ -28,7 +27,6 @@ import org.apache.gearpump.streaming.task.{Subscriber, 
TaskActor}
 import org.apache.gearpump.streaming._
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
-import org.scalatest.mock.MockitoSugar
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpecLike}
 
 import scala.concurrent.Await

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index c223a53..d3bd51b 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.{DAG, LifeTime, 
ProcessorDescription, Proce
 import org.apache.gearpump.transport.HostPort
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
 
@@ -238,7 +238,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
 
     // Step11: Tell ClockService to update DAG.
     clockService.expectMsgType[ChangeToNewDAG]
-    clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, 
TimeStamp]))
+    clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, 
MilliSeconds]))
 
     // Step12: start all tasks
     import scala.concurrent.duration._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
index 0f87a1c..9e6bf59 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
@@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner
 import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
 import org.apache.gearpump.streaming.{Constants, MockUtil}
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, 
WindowRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner
 import org.apache.gearpump.streaming.source.Watermark
 import org.mockito.Mockito._
 import org.scalacheck.Gen

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index 0bb4d6a..e38c5a3 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -23,7 +23,6 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, 
TriggeredOutputs, WindowRunner}
-import org.mockito.{Matchers => MockitoMatchers}
 import org.mockito.Mockito.{verify, when}
 import org.scalacheck.Gen
 import org.scalatest.{Matchers, PropSpec}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
index 9e42e85..bf50fad 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
@@ -25,7 +25,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.transaction.api.CheckpointStore
 
 class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
@@ -34,7 +34,7 @@ class CheckpointManagerSpec extends PropSpec with 
PropertyChecks with Matchers w
   val checkpointIntervalGen = Gen.chooseNum[Long](100L, 10000L)
   property("CheckpointManager should recover from CheckpointStore") {
     forAll(timestampGen, checkpointIntervalGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long) =>
+      (timestamp: MilliSeconds, checkpointInterval: Long) =>
         val checkpointStore = mock[CheckpointStore]
         val checkpointManager =
           new CheckpointManager(checkpointInterval, checkpointStore)
@@ -47,7 +47,7 @@ class CheckpointManagerSpec extends PropSpec with 
PropertyChecks with Matchers w
   property("CheckpointManager should write checkpoint to CheckpointStore") {
     val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8"))
     forAll(timestampGen, checkpointIntervalGen, checkpointGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long, checkpoint: 
Array[Byte]) =>
+      (timestamp: MilliSeconds, checkpointInterval: Long, checkpoint: 
Array[Byte]) =>
         val checkpointStore = mock[CheckpointStore]
         val checkpointManager =
           new CheckpointManager(checkpointInterval, checkpointStore)
@@ -70,7 +70,7 @@ class CheckpointManagerSpec extends PropSpec with 
PropertyChecks with Matchers w
 
   property("CheckpointManager should update checkpoint time according to max 
message timestamp") {
     forAll(timestampGen, checkpointIntervalGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long) =>
+      (timestamp: MilliSeconds, checkpointInterval: Long) =>
         val checkpointStore = mock[CheckpointStore]
         val checkpointManager =
           new CheckpointManager(checkpointInterval, checkpointStore)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
index 4cdff95..41b0624 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
@@ -26,7 +26,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.state.api.{Monoid, Serializer}
 
 class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
@@ -35,7 +35,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with
 
   property("NonWindowState should recover checkpointed state at given 
timestamp") {
     forAll(longGen) {
-      (timestamp: TimeStamp) =>
+      (timestamp: MilliSeconds) =>
         val monoid = mock[Monoid[AnyRef]]
         val serializer = mock[Serializer[AnyRef]]
         val bytes = Array.empty[Byte]
@@ -61,7 +61,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with
 
   property("NonWindowState checkpoints state") {
     forAll(longGen) {
-      (checkpointTime: TimeStamp) =>
+      (checkpointTime: MilliSeconds) =>
         val monoid = mock[Monoid[AnyRef]]
         val serializer = mock[Serializer[AnyRef]]
 
@@ -95,7 +95,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with
 
   property("NonWindowState updates state") {
     forAll(longGen) {
-      (checkpointTime: TimeStamp) =>
+      (checkpointTime: MilliSeconds) =>
         val monoid = mock[Monoid[AnyRef]]
         val serializer = mock[Serializer[AnyRef]]
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
index d9282ae..9975f49 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
@@ -23,7 +23,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 class WindowSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
 
@@ -32,7 +32,7 @@ class WindowSpec extends PropSpec with PropertyChecks with 
Matchers with Mockito
   val timestampGen = Gen.chooseNum[Long](0L, 1000L)
   property("Window should only slide when time passes window end") {
     forAll(timestampGen, windowSizeGen, windowStepGen) {
-      (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
+      (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) =>
         val window = new Window(windowSize, windowStep)
         window.shouldSlide shouldBe false
         window.update(timestamp)
@@ -42,7 +42,7 @@ class WindowSpec extends PropSpec with PropertyChecks with 
Matchers with Mockito
 
   property("Window should slide by one or to given timestamp") {
     forAll(timestampGen, windowSizeGen, windowStepGen) {
-      (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
+      (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) =>
         val window = new Window(windowSize, windowStep)
         window.range shouldBe(0L, windowSize)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
index 299a626..2b784bf 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
@@ -18,16 +18,15 @@
 
 package org.apache.gearpump.streaming.state.impl
 
+import org.apache.gearpump.Time.MilliSeconds
+
 import scala.collection.immutable.TreeMap
 import scala.util.Success
-
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
-
-import org.apache.gearpump._
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.state.api.{Group, Serializer}
 
@@ -74,7 +73,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
   }
 
   property("WindowState checkpoints") {
-    forAll(longGen) { (checkpointTime: TimeStamp) =>
+    forAll(longGen) { (checkpointTime: MilliSeconds) =>
       val window = mock[Window]
       val taskContext = MockUtil.mockTaskContext
       val group = mock[Group[AnyRef]]
@@ -120,7 +119,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
   }
 
   property("WindowState updates state") {
-    forAll(longGen) { (checkpointTime: TimeStamp) =>
+    forAll(longGen) { (checkpointTime: MilliSeconds) =>
       val window = mock[Window]
       val taskContext = MockUtil.mockTaskContext
       val group = mock[Group[AnyRef]]
@@ -205,7 +204,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
 
   property("WindowState gets interval for timestamp") {
     forAll(longGen, longGen, longGen, longGen) {
-      (timestamp: TimeStamp, checkpointTime: TimeStamp, windowSize: Long, 
windowStep: Long) =>
+      (timestamp: MilliSeconds, checkpointTime: MilliSeconds, windowSize: 
Long, windowStep: Long) =>
         val windowManager = new Window(windowSize, windowStep)
         val taskContext = MockUtil.mockTaskContext
         val group = mock[Group[AnyRef]]
@@ -225,8 +224,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks 
with Matchers with Mo
         interval.endTime shouldBe nextInterval.startTime
     }
 
-    def intervalSpec(interval: Interval, timestamp: TimeStamp,
-        checkpointTime: TimeStamp, windowSize: Long, windowStep: Long): Unit = 
{
+    def intervalSpec(interval: Interval, timestamp: MilliSeconds,
+        checkpointTime: MilliSeconds, windowSize: Long, windowStep: Long): 
Unit = {
       interval.startTime should be <= interval.endTime
       timestamp / windowStep * windowStep should (be <= interval.startTime)
       (timestamp - windowSize) / windowStep * windowStep should (be <= 
interval.startTime)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 285bf44..b05befa 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -24,7 +24,7 @@ import java.util.Random
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message}
+import org.apache.gearpump.{Message, Time}
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.source.Watermark
@@ -73,14 +73,14 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
     subscription.sendMessage(msg1)
 
     verify(sender, times(1)).transport(msg1, TaskId(1, 1))
-    assert(subscription.watermark == MIN_TIME_MILLIS)
+    assert(subscription.watermark == Time.MIN_TIME_MILLIS)
 
     val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50))
     when(sender.getProcessingWatermark).thenReturn(msg2.timestamp)
     subscription.sendMessage(msg2)
 
     verify(sender, times(1)).transport(msg2, TaskId(1, 0))
-    assert(subscription.watermark == MIN_TIME_MILLIS)
+    assert(subscription.watermark == Time.MIN_TIME_MILLIS)
 
     val initialMinClock = subscription.watermark
 


Reply via email to