Repository: incubator-gearpump Updated Branches: refs/heads/master 6d919ec97 -> 23daf0cf9
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java index 2efce45..89018a1 100644 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Task.java @@ -21,9 +21,10 @@ package org.apache.gearpump.streaming.javaapi; import akka.actor.ActorRef; import org.apache.gearpump.Message; import org.apache.gearpump.cluster.UserConfig; -import org.apache.gearpump.streaming.task.StartTime; import org.apache.gearpump.streaming.task.TaskContext; +import java.time.Instant; + /** * Java version of Task. * @@ -45,7 +46,7 @@ public class Task extends org.apache.gearpump.streaming.task.Task { } @Override - public void onStart(StartTime startTime) { + public void onStart(Instant startTime) { } @Override http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala index 5027500..b6c087e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala @@ -18,6 +18,8 @@ package org.apache.gearpump.streaming.dsl +import java.time.Instant + import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext @@ -27,7 +29,7 @@ import org.apache.gearpump.streaming.dsl.plan.Planner import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.Message import scala.language.implicitConversions @@ -69,36 +71,36 @@ object StreamApp { } implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = { - streamApp.plan + streamApp.plan() } implicit class Source(app: StreamApp) extends java.io.Serializable { - def source[T](dataSource: DataSource, parallism: Int): Stream[T] = { - source(dataSource, parallism, UserConfig.empty) + def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = { + source(dataSource, parallelism, UserConfig.empty) } - def source[T](dataSource: DataSource, parallism: Int, description: String): Stream[T] = { - source(dataSource, parallism, UserConfig.empty, description) + def source[T](dataSource: DataSource, parallelism: Int, description: String): Stream[T] = { + source(dataSource, parallelism, UserConfig.empty, description) } - def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig): Stream[T] = { - source(dataSource, parallism, conf, description = null) + def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): Stream[T] = { + source(dataSource, parallelism, conf, description = null) } - def source[T](dataSource: DataSource, parallism: Int, conf: UserConfig, description: String) + def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String) : Stream[T] = { - implicit val sourceOp = DataSourceOp(dataSource, parallism, conf, description) + implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description) app.graph.addVertex(sourceOp) new Stream[T](app.graph, sourceOp) } - def source[T](seq: Seq[T], parallism: Int, description: String): Stream[T] = { - this.source(new CollectionDataSource[T](seq), parallism, UserConfig.empty, description) + def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { + this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) } - def source[T](source: Class[_ <: Task], parallism: Int, conf: UserConfig, description: String) + def source[T](source: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String) : Stream[T] = { - val sourceOp = ProcessorOp(source, parallism, conf, Option(description).getOrElse("source")) + val sourceOp = ProcessorOp(source, parallelism, conf, Option(description).getOrElse("source")) app.graph.addVertex(sourceOp) new Stream[T](app.graph, sourceOp) } @@ -119,5 +121,5 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource { override def close(): Unit = {} - override def open(context: TaskContext, startTime: TimeStamp): Unit = {} + override def open(context: TaskContext, startTime: Instant): Unit = {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala index 56d31db..6bd0da2 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala @@ -18,11 +18,11 @@ package org.apache.gearpump.streaming.dsl.plan -import scala.collection.TraversableOnce +import java.time.Instant +import scala.collection.TraversableOnce import akka.actor.ActorSystem import org.slf4j.Logger - import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ @@ -32,7 +32,7 @@ import org.apache.gearpump.streaming.dsl.op._ import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.LogUtil /** @@ -116,7 +116,7 @@ object OpTranslator { class DummyInputFunction[T] extends SingleInputFunction[T, T] { override def andThen[OUTER](other: SingleInputFunction[T, OUTER]) - : SingleInputFunction[T, OUTER] = { + : SingleInputFunction[T, OUTER] = { other } @@ -131,13 +131,13 @@ object OpTranslator { extends SingleInputFunction[IN, OUT] { override def process(value: IN): TraversableOnce[OUT] = { - first.process(value).flatMap(second.process(_)) + first.process(value).flatMap(second.process) } override def description: String = { Option(first.description).flatMap { description => Option(second.description).map(description + "." + _) - }.getOrElse(null) + }.orNull } } @@ -182,9 +182,6 @@ object OpTranslator { private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]] - override def onStart(startTime: StartTime): Unit = { - } - override def onNext(msg: Message): Unit = { val time = msg.timestamp @@ -216,8 +213,8 @@ object OpTranslator { taskContext, userConf) } - override def onStart(startTime: StartTime): Unit = { - source.open(taskContext, startTime.startTime) + override def onStart(startTime: Instant): Unit = { + source.open(taskContext, startTime) self ! Message("start", System.currentTimeMillis()) } @@ -256,9 +253,6 @@ object OpTranslator { GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) } - override def onStart(startTime: StartTime): Unit = { - } - override def onNext(msg: Message): Unit = { val time = msg.timestamp @@ -281,7 +275,7 @@ object OpTranslator { taskContext, userConf) } - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { dataSink.open(taskContext) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala index f8bc0ab..0db44f2 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala @@ -18,9 +18,11 @@ package org.apache.gearpump.streaming.sink +import java.time.Instant + import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} object DataSinkTask { val DATA_SINK = "data_sink" @@ -32,11 +34,12 @@ object DataSinkTask { class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: DataSink) extends Task(context, conf) { + def this(context: TaskContext, conf: UserConfig) = { this(context, conf, conf.getValue[DataSink](DataSinkTask.DATA_SINK)(context.system).get) } - override def onStart(startTime: StartTime): Unit = { + override def onStart(startTime: Instant): Unit = { LOG.info("opening data sink...") sink.open(context) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala index 0fb6db4..f55d102 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala @@ -18,11 +18,11 @@ package org.apache.gearpump.streaming.source +import java.time.Instant + import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.Message -import scala.util.Random - /** * Interface to implement custom source where data is read into the system. * a DataSource could be a message queue like kafka or simply data generation source. @@ -52,7 +52,7 @@ trait DataSource extends java.io.Serializable { * @param context is the task context at runtime * @param startTime is the start time of system */ - def open(context: TaskContext, startTime: Long): Unit + def open(context: TaskContext, startTime: Instant): Unit /** * Reads next message from data source and http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index f845628..468ae3b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -18,9 +18,11 @@ package org.apache.gearpump.streaming.source +import java.time.Instant + import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} object DataSourceTask { val DATA_SOURCE = "data_source" @@ -45,10 +47,8 @@ class DataSourceTask private[source](context: TaskContext, conf: UserConfig, sou this(context, conf, conf.getValue[DataSource](DataSourceTask.DATA_SOURCE)(context.system).get) } private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000) - private var startTime = 0L - override def onStart(newStartTime: StartTime): Unit = { - startTime = newStartTime.startTime + override def onStart(startTime: Instant): Unit = { LOG.info(s"opening data source at $startTime") source.open(context, startTime) self ! Message("start", System.currentTimeMillis()) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala index c7b503e..aceff4a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala @@ -18,12 +18,13 @@ package org.apache.gearpump.streaming.state.api +import java.time.Instant import java.util.concurrent.TimeUnit import scala.concurrent.duration.FiniteDuration import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig} -import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{UpdateCheckpointClock, Task, TaskContext} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory import org.apache.gearpump.util.LogUtil import org.apache.gearpump.{Message, TimeStamp} @@ -70,8 +71,8 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) /** Persistent state that will be stored (by checkpointing) automatically to storage like HDFS */ val state = persistentState - final override def onStart(startTime: StartTime): Unit = { - val timestamp = startTime.startTime + final override def onStart(startTime: Instant): Unit = { + val timestamp = startTime.toEpochMilli checkpointManager .recover(timestamp) .foreach(state.recover(timestamp, _)) @@ -101,6 +102,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig) } } + final override def onStop(): Unit = { checkpointManager.close() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala deleted file mode 100644 index fb097d3..0000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/StartTime.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.task - -import org.apache.gearpump.TimeStamp - -/** Start time of streaming application. All message older than start time will be dropped */ -case class StartTime(startTime: TimeStamp = 0) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala index 9c76a40..c94dec4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala @@ -18,6 +18,8 @@ package org.apache.gearpump.streaming.task +import java.time.Instant + import scala.concurrent.duration.FiniteDuration import akka.actor.Actor.Receive @@ -133,7 +135,7 @@ trait TaskInterface { * replay the data source, or from when a processor task should recover its * checkpoint data in to in-memory state. */ - def onStart(startTime: StartTime): Unit + def onStart(startTime: Instant): Unit /** * Method called for each message received. @@ -176,7 +178,7 @@ abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends Task */ protected def sender: ActorRef = taskContext.sender - def onStart(startTime: StartTime): Unit = {} + def onStart(startTime: Instant): Unit = {} def onNext(msg: Message): Unit = {} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index d12aac1..30a24fa 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -18,12 +18,12 @@ package org.apache.gearpump.streaming.task +import java.time.Instant import java.util import java.util.concurrent.TimeUnit import akka.actor._ import org.slf4j.Logger - import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap import org.apache.gearpump.metrics.Metrics @@ -101,7 +101,7 @@ class TaskActor( task.setTaskActor(this) - def onStart(startTime: StartTime): Unit = { + def onStart(startTime: Instant): Unit = { task.onStart(startTime) } @@ -111,6 +111,7 @@ class TaskActor( def onStop(): Unit = task.onStop() + /** * output to a downstream by specifying a arrayIndex * @param arrayIndex this is not same as ProcessorId @@ -193,7 +194,7 @@ class TaskActor( // Put this as the last step so that the subscription is already initialized. // Message sending in current Task before onStart will not be delivered to // target - onStart(new StartTime(upstreamMinClock)) + onStart(Instant.ofEpochMilli(upstreamMinClock)) appMaster ! GetUpstreamMinClock(taskId) context.become(handleMessages(sender)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala index cd33f7e..31c991e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala @@ -18,15 +18,15 @@ package org.apache.gearpump.streaming.task -import scala.concurrent.duration.FiniteDuration +import java.time.Instant +import scala.concurrent.duration.FiniteDuration import akka.actor.Actor._ import akka.actor.{ActorRef, ActorSystem, Cancellable, Props} import org.slf4j.Logger - import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.util.LogUtil -import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.{TimeStamp, Message} /** * This provides TaskContext for user defined tasks @@ -41,7 +41,7 @@ class TaskWrapper( private val LOG = LogUtil.getLogger(taskClass, task = taskId) - private var actor: TaskActor = null + private var actor: TaskActor = _ private var task: Option[Task] = None @@ -87,8 +87,8 @@ class TaskWrapper( override def actorOf(props: Props, name: String): ActorRef = actor.context.actorOf(props, name) - override def onStart(startTime: StartTime): Unit = { - if (None != task) { + override def onStart(startTime: Instant): Unit = { + if (task.isDefined) { LOG.error(s"Task.onStart should not be called multiple times... ${task.getClass}") } val constructor = taskClass.getConstructor(classOf[TaskContext], classOf[UserConfig]) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala index c9f1b89..647ad0a 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -38,7 +38,7 @@ import org.apache.gearpump.partitioner.HashPartitioner import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, UnRegisterTask} import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, LookupTaskActorRef} -import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _} +import org.apache.gearpump.streaming.task.{TaskContext, _} import org.apache.gearpump.streaming.{Constants, DAG, Processor, StreamApplication} import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem import org.apache.gearpump.util.{ActorUtil, Graph} @@ -300,17 +300,9 @@ object AppMasterSpec { } class TaskA(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - - override def onStart(startTime: StartTime): Unit = { - } - override def onNext(msg: Message): Unit = {} } class TaskB(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - - override def onStart(startTime: StartTime): Unit = { - } - override def onNext(msg: Message): Unit = {} } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala index 0dd3e5b..bb495a7 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala @@ -36,7 +36,7 @@ import org.apache.gearpump.streaming.appmaster.JarScheduler.ResourceRequestDetai import org.apache.gearpump.streaming.appmaster.TaskManager.ApplicationReady import org.apache.gearpump.streaming.appmaster.TaskManagerSpec.{Env, Task1, Task2} import org.apache.gearpump.streaming.executor.Executor.RestartTasks -import org.apache.gearpump.streaming.task.{StartTime, TaskContext, _} +import org.apache.gearpump.streaming.task.{TaskContext, _} import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, ProcessorId} import org.apache.gearpump.transport.HostPort import org.apache.gearpump.util.Graph @@ -270,13 +270,11 @@ object TaskManagerSpec { class Task1(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = {} override def onNext(msg: Message): Unit = {} } class Task2(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = {} override def onNext(msg: Message): Unit = {} } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala index 4a532dd..864aa93 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala @@ -18,14 +18,13 @@ package org.apache.gearpump.streaming.appmaster import com.typesafe.config.ConfigFactory -import org.apache.gearpump.Message import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2} -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId} +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId} import org.apache.gearpump.streaming.{DAG, ProcessorDescription} import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph._ @@ -108,8 +107,8 @@ class TaskSchedulerSpec extends WordSpec with Matchers { taskScheduler.setDAG(dag) val tasks = taskScheduler.schedule(WorkerId(1, 0L), executorId = 0, Resource(4)) - assert(tasks.filter(_.processorId == 0).length == 2) - assert(tasks.filter(_.processorId == 1).length == 2) + assert(tasks.count(_.processorId == 0) == 2) + assert(tasks.count(_.processorId == 1) == 2) } } } @@ -117,13 +116,9 @@ class TaskSchedulerSpec extends WordSpec with Matchers { object TaskSchedulerSpec { class TestTask1(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = Unit - override def onNext(msg: Message): Unit = Unit } class TestTask2(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - override def onStart(startTime: StartTime): Unit = Unit - override def onNext(msg: Message): Unit = Unit } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala index 6bdd8aa..82979e0 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala @@ -26,7 +26,7 @@ import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner} import org.apache.gearpump.streaming.dsl.StreamSpec.Join import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ -import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext} +import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph._ import org.mockito.Mockito.when @@ -39,6 +39,7 @@ import scala.util.{Either, Left, Right} class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + implicit var system: ActorSystem = null override def beforeAll(): Unit = { @@ -108,7 +109,6 @@ object StreamSpec { class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { var query: String = null - override def onStart(startTime: StartTime): Unit = {} override def onNext(msg: Message): Unit = { msg.msg match { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala index ecc5352..144df0f 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala @@ -18,6 +18,8 @@ package org.apache.gearpump.streaming.dsl.plan +import java.time.Instant + import scala.concurrent.Await import scala.concurrent.duration.Duration @@ -33,10 +35,10 @@ import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.dsl.CollectionDataSource import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ -import org.apache.gearpump.streaming.task.StartTime class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + "andThen" should "chain multiple single input function" in { val dummy = new DummyInputFunction[String] val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") @@ -74,7 +76,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { // Source with no transformer val source = new SourceTask[String, String](new CollectionDataSource[String](data), None, taskContext, conf) - source.onStart(StartTime(0)) + source.onStart(Instant.EPOCH) source.onNext(Message("next")) verify(taskContext, times(1)).output(anyObject()) @@ -83,7 +85,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val double = new FlatMapFunction[String, String](word => List(word, word), "double") val another = new SourceTask(new CollectionDataSource[String](data), Some(double), anotherTaskContext, conf) - another.onStart(StartTime(0)) + another.onStart(Instant.EPOCH) another.onNext(Message("next")) verify(anotherTaskContext, times(2)).output(anyObject()) } @@ -106,7 +108,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val taskContext = MockUtil.mockTaskContext val task = new GroupByTask[String, String, String](input => input, taskContext, config) - task.onStart(StartTime(0)) + task.onStart(Instant.EPOCH) val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) @@ -130,7 +132,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val conf = UserConfig.empty val double = new FlatMapFunction[String, String](word => List(word, word), "double") val task = new TransformTask[String, String](Some(double), taskContext, conf) - task.onStart(StartTime(0)) + task.onStart(Instant.EPOCH) val data = "1 2 2 3 3 3".split("\\s+") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala index 7a2c2d1..55e59e0 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/sink/DataSinkTaskSpec.scala @@ -18,10 +18,11 @@ package org.apache.gearpump.streaming.sink +import java.time.Instant + import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.task.StartTime import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -30,13 +31,14 @@ import org.scalatest.{PropSpec, Matchers} class DataSinkTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + property("DataSinkTask.onStart should call DataSink.open" ) { - forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) => + forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) => val taskContext = MockUtil.mockTaskContext val config = UserConfig.empty val dataSink = mock[DataSink] val sinkTask = new DataSinkTask(taskContext, config, dataSink) - sinkTask.onStart(StartTime(startTime)) + sinkTask.onStart(startTime) verify(dataSink).open(taskContext) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index d4d580f..ae9bf37 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -18,10 +18,11 @@ package org.apache.gearpump.streaming.source +import java.time.Instant + import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.task.{TaskContext, StartTime} import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -31,7 +32,7 @@ import org.scalatest.prop.PropertyChecks class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { property("DataSourceTask.onStart should call DataSource.open") { - forAll(Gen.chooseNum[Long](0L, 1000L)) { (startTime: Long) => + forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) => val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val dataSource = mock[DataSource] @@ -40,7 +41,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val sourceTask = new DataSourceTask(taskContext, config, dataSource) - sourceTask.onStart(StartTime(startTime)) + sourceTask.onStart(startTime) verify(dataSource).open(taskContext, startTime) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/23daf0cf/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala index 4afee8b..258a5ff 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -31,6 +31,7 @@ import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription} class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { + val appId = 0 val executorId = 0 val taskId = TaskId(0, 0) @@ -132,11 +133,5 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { object SubscriptionSpec { class NextTask(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) { - - override def onStart(startTime: StartTime): Unit = { - } - - override def onNext(msg: Message): Unit = { - } } } \ No newline at end of file
