Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 6d919ec97 -> 23daf0cf9


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
index 2efce45..89018a1 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
+++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java
@@ -21,9 +21,10 @@ package org.apache.gearpump.streaming.javaapi;
 import akka.actor.ActorRef;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.UserConfig;
-import org.apache.gearpump.streaming.task.StartTime;
 import org.apache.gearpump.streaming.task.TaskContext;
 
+import java.time.Instant;
+
 /**
  * Java version of Task.
  *
@@ -45,7 +46,7 @@ public class Task extends 
org.apache.gearpump.streaming.task.Task {
   }
 
   @Override
-  public void onStart(StartTime startTime) {
+  public void onStart(Instant startTime) {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
index 5027500..b6c087e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.dsl
 
+import java.time.Instant
+
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
@@ -27,7 +29,7 @@ import org.apache.gearpump.streaming.dsl.plan.Planner
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
 
 import scala.language.implicitConversions
 
@@ -69,36 +71,36 @@ object StreamApp {
   }
 
   implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication 
= {
-    streamApp.plan
+    streamApp.plan()
   }
 
   implicit class Source(app: StreamApp) extends java.io.Serializable {
 
-    def source[T](dataSource: DataSource, parallism: Int): Stream[T] = {
-      source(dataSource, parallism, UserConfig.empty)
+    def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = {
+      source(dataSource, parallelism, UserConfig.empty)
     }
 
-    def source[T](dataSource: DataSource, parallism: Int, description: 
String): Stream[T] = {
-      source(dataSource, parallism, UserConfig.empty, description)
+    def source[T](dataSource: DataSource, parallelism: Int, description: 
String): Stream[T] = {
+      source(dataSource, parallelism, UserConfig.empty, description)
     }
 
-    def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig): 
Stream[T] = {
-      source(dataSource, parallism, conf, description = null)
+    def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): 
Stream[T] = {
+      source(dataSource, parallelism, conf, description = null)
     }
 
-    def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, 
description: String)
+    def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, 
description: String)
       : Stream[T] = {
-      implicit val sourceOp = DataSourceOp(dataSource, parallism, conf, 
description)
+      implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, 
description)
       app.graph.addVertex(sourceOp)
       new Stream[T](app.graph, sourceOp)
     }
-    def source[T](seq: Seq[T], parallism: Int, description: String): Stream[T] 
= {
-      this.source(new CollectionDataSource[T](seq), parallism, 
UserConfig.empty, description)
+    def source[T](seq: Seq[T], parallelism: Int, description: String): 
Stream[T] = {
+      this.source(new CollectionDataSource[T](seq), parallelism, 
UserConfig.empty, description)
     }
 
-    def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, 
description: String)
+    def source[T](source: Class[_ <: Task], parallelism: Int, conf: 
UserConfig, description: String)
       : Stream[T] = {
-      val sourceOp = ProcessorOp(source, parallism, conf, 
Option(description).getOrElse("source"))
+      val sourceOp = ProcessorOp(source, parallelism, conf, 
Option(description).getOrElse("source"))
       app.graph.addVertex(sourceOp)
       new Stream[T](app.graph, sourceOp)
     }
@@ -119,5 +121,5 @@ class CollectionDataSource[T](seq: Seq[T]) extends 
DataSource {
 
   override def close(): Unit = {}
 
-  override def open(context: TaskContext, startTime: TimeStamp): Unit = {}
+  override def open(context: TaskContext, startTime: Instant): Unit = {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
index 56d31db..6bd0da2 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
@@ -18,11 +18,11 @@
 
 package org.apache.gearpump.streaming.dsl.plan
 
-import scala.collection.TraversableOnce
+import java.time.Instant
 
+import scala.collection.TraversableOnce
 import akka.actor.ActorSystem
 import org.slf4j.Logger
-
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
@@ -32,7 +32,7 @@ import org.apache.gearpump.streaming.dsl.op._
 import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.LogUtil
 
 /**
@@ -116,7 +116,7 @@ object OpTranslator {
 
   class DummyInputFunction[T] extends SingleInputFunction[T, T] {
     override def andThen[OUTER](other: SingleInputFunction[T, OUTER])
-      : SingleInputFunction[T, OUTER] = {
+    : SingleInputFunction[T, OUTER] = {
       other
     }
 
@@ -131,13 +131,13 @@ object OpTranslator {
     extends SingleInputFunction[IN, OUT] {
 
     override def process(value: IN): TraversableOnce[OUT] = {
-      first.process(value).flatMap(second.process(_))
+      first.process(value).flatMap(second.process)
     }
 
     override def description: String = {
       Option(first.description).flatMap { description =>
         Option(second.description).map(description + "." + _)
-      }.getOrElse(null)
+      }.orNull
     }
   }
 
@@ -182,9 +182,6 @@ object OpTranslator {
 
     private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]]
 
-    override def onStart(startTime: StartTime): Unit = {
-    }
-
     override def onNext(msg: Message): Unit = {
       val time = msg.timestamp
 
@@ -216,8 +213,8 @@ object OpTranslator {
         taskContext, userConf)
     }
 
-    override def onStart(startTime: StartTime): Unit = {
-      source.open(taskContext, startTime.startTime)
+    override def onStart(startTime: Instant): Unit = {
+      source.open(taskContext, startTime)
       self ! Message("start", System.currentTimeMillis())
     }
 
@@ -256,9 +253,6 @@ object OpTranslator {
         GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, 
userConf)
     }
 
-    override def onStart(startTime: StartTime): Unit = {
-    }
-
     override def onNext(msg: Message): Unit = {
       val time = msg.timestamp
 
@@ -281,7 +275,7 @@ object OpTranslator {
         taskContext, userConf)
     }
 
-    override def onStart(startTime: StartTime): Unit = {
+    override def onStart(startTime: Instant): Unit = {
       dataSink.open(taskContext)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
index f8bc0ab..0db44f2 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
@@ -18,9 +18,11 @@
 
 package org.apache.gearpump.streaming.sink
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 object DataSinkTask {
   val DATA_SINK = "data_sink"
@@ -32,11 +34,12 @@ object DataSinkTask {
 class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: 
DataSink)
   extends Task(context, conf) {
 
+
   def this(context: TaskContext, conf: UserConfig) = {
     this(context, conf, 
conf.getValue[DataSink](DataSinkTask.DATA_SINK)(context.system).get)
   }
 
-  override def onStart(startTime: StartTime): Unit = {
+  override def onStart(startTime: Instant): Unit = {
     LOG.info("opening data sink...")
     sink.open(context)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
index 0fb6db4..f55d102 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
@@ -18,11 +18,11 @@
 
 package org.apache.gearpump.streaming.source
 
+import java.time.Instant
+
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.Message
 
-import scala.util.Random
-
 /**
  * Interface to implement custom source where data is read into the system.
  * a DataSource could be a message queue like kafka or simply data generation 
source.
@@ -52,7 +52,7 @@ trait DataSource extends java.io.Serializable {
    * @param context is the task context at runtime
    * @param startTime is the start time of system
    */
-  def open(context: TaskContext, startTime: Long): Unit
+  def open(context: TaskContext, startTime: Instant): Unit
 
   /**
    * Reads next message from data source and

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index f845628..468ae3b 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -18,9 +18,11 @@
 
 package org.apache.gearpump.streaming.source
 
+import java.time.Instant
+
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 object DataSourceTask {
   val DATA_SOURCE = "data_source"
@@ -45,10 +47,8 @@ class DataSourceTask private[source](context: TaskContext, 
conf: UserConfig, sou
     this(context, conf, 
conf.getValue[DataSource](DataSourceTask.DATA_SOURCE)(context.system).get)
   }
   private val batchSize = 
conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
-  private var startTime = 0L
 
-  override def onStart(newStartTime: StartTime): Unit = {
-    startTime = newStartTime.startTime
+  override def onStart(startTime: Instant): Unit = {
     LOG.info(s"opening data source at $startTime")
     source.open(context, startTime)
     self ! Message("start", System.currentTimeMillis())

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index c7b503e..aceff4a 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -18,12 +18,13 @@
 
 package org.apache.gearpump.streaming.state.api
 
+import java.time.Instant
 import java.util.concurrent.TimeUnit
 import scala.concurrent.duration.FiniteDuration
 
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.state.impl.{CheckpointManager, 
PersistentStateConfig}
-import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime, 
Task, TaskContext}
+import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, Task, 
TaskContext}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 import org.apache.gearpump.util.LogUtil
 import org.apache.gearpump.{Message, TimeStamp}
@@ -70,8 +71,8 @@ abstract class PersistentTask[T](taskContext: TaskContext, 
conf: UserConfig)
   /** Persistent state that will be stored (by checkpointing) automatically to 
storage like HDFS */
   val state = persistentState
 
-  final override def onStart(startTime: StartTime): Unit = {
-    val timestamp = startTime.startTime
+  final override def onStart(startTime: Instant): Unit = {
+    val timestamp = startTime.toEpochMilli
     checkpointManager
       .recover(timestamp)
       .foreach(state.recover(timestamp, _))
@@ -101,6 +102,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, 
conf: UserConfig)
     }
   }
 
+
   final override def onStop(): Unit = {
     checkpointManager.close()
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
deleted file mode 100644
index fb097d3..0000000
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.task
-
-import org.apache.gearpump.TimeStamp
-
-/** Start time of streaming application. All message older than start time 
will be dropped */
-case class StartTime(startTime: TimeStamp = 0)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
index 9c76a40..c94dec4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.task
 
+import java.time.Instant
+
 import scala.concurrent.duration.FiniteDuration
 
 import akka.actor.Actor.Receive
@@ -133,7 +135,7 @@ trait TaskInterface {
    *                  replay the data source, or from when a processor task 
should recover its
    *                  checkpoint data in to in-memory state.
    */
-  def onStart(startTime: StartTime): Unit
+  def onStart(startTime: Instant): Unit
 
   /**
    * Method called for each message received.
@@ -176,7 +178,7 @@ abstract class Task(taskContext: TaskContext, userConf: 
UserConfig) extends Task
    */
   protected def sender: ActorRef = taskContext.sender
 
-  def onStart(startTime: StartTime): Unit = {}
+  def onStart(startTime: Instant): Unit = {}
 
   def onNext(msg: Message): Unit = {}
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index d12aac1..30a24fa 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -18,12 +18,12 @@
 
 package org.apache.gearpump.streaming.task
 
+import java.time.Instant
 import java.util
 import java.util.concurrent.TimeUnit
 
 import akka.actor._
 import org.slf4j.Logger
-
 import org.apache.gearpump.cluster.UserConfig
 import 
org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
 import org.apache.gearpump.metrics.Metrics
@@ -101,7 +101,7 @@ class TaskActor(
 
   task.setTaskActor(this)
 
-  def onStart(startTime: StartTime): Unit = {
+  def onStart(startTime: Instant): Unit = {
     task.onStart(startTime)
   }
 
@@ -111,6 +111,7 @@ class TaskActor(
 
   def onStop(): Unit = task.onStop()
 
+
   /**
    * output to a downstream by specifying a arrayIndex
    * @param arrayIndex this is not same as ProcessorId
@@ -193,7 +194,7 @@ class TaskActor(
     // Put this as the last step so that the subscription is already 
initialized.
     // Message sending in current Task before onStart will not be delivered to
     // target
-    onStart(new StartTime(upstreamMinClock))
+    onStart(Instant.ofEpochMilli(upstreamMinClock))
 
     appMaster ! GetUpstreamMinClock(taskId)
     context.become(handleMessages(sender))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
index cd33f7e..31c991e 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -18,15 +18,15 @@
 
 package org.apache.gearpump.streaming.task
 
-import scala.concurrent.duration.FiniteDuration
+import java.time.Instant
 
+import scala.concurrent.duration.FiniteDuration
 import akka.actor.Actor._
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
 import org.slf4j.Logger
-
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.{TimeStamp, Message}
 
 /**
  * This provides TaskContext for user defined tasks
@@ -41,7 +41,7 @@ class TaskWrapper(
 
   private val LOG = LogUtil.getLogger(taskClass, task = taskId)
 
-  private var actor: TaskActor = null
+  private var actor: TaskActor = _
 
   private var task: Option[Task] = None
 
@@ -87,8 +87,8 @@ class TaskWrapper(
 
   override def actorOf(props: Props, name: String): ActorRef = 
actor.context.actorOf(props, name)
 
-  override def onStart(startTime: StartTime): Unit = {
-    if (None != task) {
+  override def onStart(startTime: Instant): Unit = {
+    if (task.isDefined) {
       LOG.error(s"Task.onStart should not be called multiple times... 
${task.getClass}")
     }
     val constructor = taskClass.getConstructor(classOf[TaskContext], 
classOf[UserConfig])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index c9f1b89..647ad0a 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -38,7 +38,7 @@ import org.apache.gearpump.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask
 import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, 
UnRegisterTask}
 import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, 
LookupTaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _}
+import org.apache.gearpump.streaming.task.{TaskContext, _}
 import org.apache.gearpump.streaming.{Constants, DAG, Processor, 
StreamApplication}
 import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem
 import org.apache.gearpump.util.{ActorUtil, Graph}
@@ -300,17 +300,9 @@ object AppMasterSpec {
 }
 
 class TaskA(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
-
-  override def onStart(startTime: StartTime): Unit = {
-  }
-
   override def onNext(msg: Message): Unit = {}
 }
 
 class TaskB(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
-
-  override def onStart(startTime: StartTime): Unit = {
-  }
-
   override def onNext(msg: Message): Unit = {}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 0dd3e5b..bb495a7 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -36,7 +36,7 @@ import 
org.apache.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetai
 import org.apache.gearpump.streaming.appmaster.TaskManager.ApplicationReady
 import org.apache.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, 
Task2}
 import org.apache.gearpump.streaming.executor.Executor.RestartTasks
-import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _}
+import org.apache.gearpump.streaming.task.{TaskContext, _}
 import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, 
ProcessorId}
 import org.apache.gearpump.transport.HostPort
 import org.apache.gearpump.util.Graph
@@ -270,13 +270,11 @@ object TaskManagerSpec {
 
   class Task1(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = {}
     override def onNext(msg: Message): Unit = {}
   }
 
   class Task2(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = {}
     override def onNext(msg: Message): Unit = {}
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
index 4a532dd..864aa93 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -18,14 +18,13 @@
 package org.apache.gearpump.streaming.appmaster
 
 import com.typesafe.config.ConfigFactory
-import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, 
ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities
 import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, 
TestTask2}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, 
TaskId}
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}
 import org.apache.gearpump.streaming.{DAG, ProcessorDescription}
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
@@ -108,8 +107,8 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
 
       taskScheduler.setDAG(dag)
       val tasks = taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, 
Resource(4))
-      assert(tasks.filter(_.processorId == 0).length == 2)
-      assert(tasks.filter(_.processorId == 1).length == 2)
+      assert(tasks.count(_.processorId == 0) == 2)
+      assert(tasks.count(_.processorId == 1) == 2)
     }
   }
 }
@@ -117,13 +116,9 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
 object TaskSchedulerSpec {
   class TestTask1(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = Unit
-    override def onNext(msg: Message): Unit = Unit
   }
 
   class TestTask2(taskContext: TaskContext, userConf: UserConfig)
     extends Task(taskContext, userConf) {
-    override def onStart(startTime: StartTime): Unit = Unit
-    override def onNext(msg: Message): Unit = Unit
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
index 6bdd8aa..82979e0 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
@@ -26,7 +26,7 @@ import 
org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner}
 import org.apache.gearpump.streaming.dsl.StreamSpec.Join
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
 import org.mockito.Mockito.when
@@ -39,6 +39,7 @@ import scala.util.{Either, Left, Right}
 
 class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
 
+
   implicit var system: ActorSystem = null
 
   override def beforeAll(): Unit = {
@@ -108,7 +109,6 @@ object StreamSpec {
   class Join(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
 
     var query: String = null
-    override def onStart(startTime: StartTime): Unit = {}
 
     override def onNext(msg: Message): Unit = {
       msg.msg match {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
index ecc5352..144df0f 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala
@@ -18,6 +18,8 @@
 
 package org.apache.gearpump.streaming.dsl.plan
 
+import java.time.Instant
+
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 
@@ -33,10 +35,10 @@ import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.dsl.CollectionDataSource
 import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.task.StartTime
 
 class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
 
+
   "andThen" should "chain multiple single input function" in {
     val dummy = new DummyInputFunction[String]
     val split = new FlatMapFunction[String, String](line => line.split("\\s"), 
"split")
@@ -74,7 +76,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     // Source with no transformer
     val source = new SourceTask[String, String](new 
CollectionDataSource[String](data), None,
       taskContext, conf)
-    source.onStart(StartTime(0))
+    source.onStart(Instant.EPOCH)
     source.onNext(Message("next"))
     verify(taskContext, times(1)).output(anyObject())
 
@@ -83,7 +85,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     val double = new FlatMapFunction[String, String](word => List(word, word), 
"double")
     val another = new SourceTask(new CollectionDataSource[String](data), 
Some(double),
       anotherTaskContext, conf)
-    another.onStart(StartTime(0))
+    another.onStart(Instant.EPOCH)
     another.onNext(Message("next"))
     verify(anotherTaskContext, times(2)).output(anyObject())
   }
@@ -106,7 +108,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     val taskContext = MockUtil.mockTaskContext
 
     val task = new GroupByTask[String, String, String](input => input, 
taskContext, config)
-    task.onStart(StartTime(0))
+    task.onStart(Instant.EPOCH)
 
     val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
 
@@ -130,7 +132,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
     val conf = UserConfig.empty
     val double = new FlatMapFunction[String, String](word => List(word, word), 
"double")
     val task = new TransformTask[String, String](Some(double), taskContext, 
conf)
-    task.onStart(StartTime(0))
+    task.onStart(Instant.EPOCH)
 
     val data = "1 2  2  3 3  3".split("\\s+")
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
index 7a2c2d1..55e59e0 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala
@@ -18,10 +18,11 @@
 
 package org.apache.gearpump.streaming.sink
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.StartTime
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -30,13 +31,14 @@ import org.scalatest.{PropSpec, Matchers}
 
 class DataSinkTaskSpec extends PropSpec with PropertyChecks with Matchers with 
MockitoSugar {
 
+
   property("DataSinkTask.onStart should call DataSink.open" ) {
-    forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) =>
+    forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { 
(startTime: Instant) =>
       val taskContext = MockUtil.mockTaskContext
       val config = UserConfig.empty
       val dataSink = mock[DataSink]
       val sinkTask = new DataSinkTask(taskContext, config, dataSink)
-      sinkTask.onStart(StartTime(startTime))
+      sinkTask.onStart(startTime)
       verify(dataSink).open(taskContext)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index d4d580f..ae9bf37 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -18,10 +18,11 @@
 
 package org.apache.gearpump.streaming.source
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.task.{TaskContext, StartTime}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -31,7 +32,7 @@ import org.scalatest.prop.PropertyChecks
 class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers 
with MockitoSugar {
 
   property("DataSourceTask.onStart should call DataSource.open") {
-    forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) =>
+    forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { 
(startTime: Instant) =>
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
       val dataSource = mock[DataSource]
@@ -40,7 +41,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks 
with Matchers with
 
       val sourceTask = new DataSourceTask(taskContext, config, dataSource)
 
-      sourceTask.onStart(StartTime(startTime))
+      sourceTask.onStart(startTime)
       verify(dataSource).open(taskContext, startTime)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 4afee8b..258a5ff 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -31,6 +31,7 @@ import 
org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
 import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription}
 
 class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
+
   val appId = 0
   val executorId = 0
   val taskId = TaskId(0, 0)
@@ -132,11 +133,5 @@ class SubscriptionSpec extends FlatSpec with Matchers with 
MockitoSugar {
 object SubscriptionSpec {
 
   class NextTask(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-
-    override def onStart(startTime: StartTime): Unit = {
-    }
-
-    override def onNext(msg: Message): Unit = {
-    }
   }
 }
\ No newline at end of file

Reply via email to