Repository: incubator-gearpump Updated Branches: refs/heads/master 5bf7c7cb6 -> 584a2ca23
[GEARPUMP-203] Use DataSourceTask and DataSinkTask for DSL Author: manuzhang <[email protected]> Closes #80 from manuzhang/window_dsl. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/584a2ca2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/584a2ca2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/584a2ca2 Branch: refs/heads/master Commit: 584a2ca23180add9f7454334907a10c8144565de Parents: 5bf7c7c Author: manuzhang <[email protected]> Authored: Tue Sep 6 15:19:00 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Sep 6 15:19:00 2016 +0800 ---------------------------------------------------------------------- .../gearpump/streaming/javaapi/Processor.java | 3 +- .../apache/gearpump/streaming/Constants.scala | 3 +- .../streaming/dsl/plan/OpTranslator.scala | 92 +++----------------- .../gearpump/streaming/dsl/plan/Planner.scala | 4 +- .../streaming/source/DataSourceProcessor.scala | 9 +- .../streaming/source/DataSourceTask.scala | 46 +++++++--- .../gearpump/streaming/dsl/StreamAppSpec.scala | 4 +- .../gearpump/streaming/dsl/StreamSpec.scala | 9 +- .../streaming/dsl/plan/OpTranslatorSpec.scala | 25 +++--- .../streaming/source/DataSourceTaskSpec.scala | 6 +- 10 files changed, 77 insertions(+), 124 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java index 8757081..59b375f 100644 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java +++ b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/Processor.java @@ -73,7 +73,8 @@ public class Processor<T extends org.apache.gearpump.streaming.task.Task> implem * @return the new created source processor */ public static Processor<DataSourceTask> source(DataSource source, int parallelism, String description, UserConfig taskConf, ActorSystem system) { - org.apache.gearpump.streaming.Processor<DataSourceTask> p = DataSourceProcessor.apply(source, parallelism, description, taskConf, system); + org.apache.gearpump.streaming.Processor<DataSourceTask<Object, Object>> p = + DataSourceProcessor.apply(source, parallelism, description, taskConf, system); return new Processor(p); } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala index 320e46f..cd33b50 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala @@ -20,8 +20,7 @@ package org.apache.gearpump.streaming object Constants { val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator" - val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.dsl.source" - val GEARPUMP_STREAMING_SINK = "gearpump.streaming.dsl.sink" + val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source" val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function" val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities" http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala index 6bd0da2..8de291c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala @@ -18,8 +18,6 @@ package org.apache.gearpump.streaming.dsl.plan -import java.time.Instant - import scala.collection.TraversableOnce import akka.actor.ActorSystem import org.slf4j.Logger @@ -30,8 +28,8 @@ import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.Processor.DefaultProcessor import org.apache.gearpump.streaming.dsl.op._ import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ -import org.apache.gearpump.streaming.sink.DataSink -import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.sink.DataSinkProcessor +import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.LogUtil @@ -52,26 +50,24 @@ class OpTranslator extends java.io.Serializable { val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func) op match { - case DataSourceOp(dataSource, parallism, conf, description) => - Processor[SourceTask[Object, Object]](parallism, + case DataSourceOp(dataSource, parallelism, conf, description) => + Processor[DataSourceTask[Any, Any]](parallelism, description = description + "." + func.description, userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)) - case groupby@GroupByOp(_, parallism, description, _) => - Processor[GroupByTask[Object, Object, Object]](parallism, + case groupby@GroupByOp(_, parallelism, description, _) => + Processor[GroupByTask[Object, Object, Object]](parallelism, description = description + "." + func.description, userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby)) case merge: MergeOp => Processor[TransformTask[Object, Object]](1, description = op.description + "." + func.description, userConfig) - case ProcessorOp(processor, parallism, conf, description) => - DefaultProcessor(parallism, + case ProcessorOp(processor, parallelism, conf, description) => + DefaultProcessor(parallelism, description = description + "." + func.description, userConfig, processor) case DataSinkOp(dataSink, parallelism, conf, description) => - Processor[SinkTask[Object]](parallelism, - description = description + func.description, - userConfig.withValue(GEARPUMP_STREAMING_SINK, dataSink)) + DataSinkProcessor(dataSink, parallelism, description + func.description) } case op: SlaveOp[_] => val func = toFunction(ops.ops) @@ -156,7 +152,7 @@ object OpTranslator { class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String) extends SingleInputFunction[T, T] { - private var state: Any = null + private var state: Any = _ override def process(value: T): TraversableOnce[T] = { if (state == null) { @@ -200,50 +196,6 @@ object OpTranslator { } } - class SourceTask[T, OUT]( - source: DataSource, operator: Option[SingleInputFunction[T, OUT]], taskContext: TaskContext, - userConf: UserConfig) - extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this( - userConf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(taskContext.system).get, - userConf.getValue[SingleInputFunction[T, OUT]](GEARPUMP_STREAMING_OPERATOR)( - taskContext.system), - taskContext, userConf) - } - - override def onStart(startTime: Instant): Unit = { - source.open(taskContext, startTime) - self ! Message("start", System.currentTimeMillis()) - } - - override def onNext(msg: Message): Unit = { - val time = System.currentTimeMillis() - Option(source.read()).foreach { msg => - operator match { - case Some(operator) => - operator match { - case bad: DummyInputFunction[T] => - taskContext.output(msg) - case _ => - operator.process(msg.msg.asInstanceOf[T]).foreach(msg => { - taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) - }) - } - case None => - taskContext.output(msg) - } - } - - self ! Message("next", System.currentTimeMillis()) - } - - override def onStop(): Unit = { - source.close() - } - } - class TransformTask[IN, OUT]( operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { @@ -257,8 +209,8 @@ object OpTranslator { val time = msg.timestamp operator match { - case Some(operator) => - operator.process(msg.msg.asInstanceOf[IN]).foreach { msg => + case Some(op) => + op.process(msg.msg.asInstanceOf[IN]).foreach { msg => taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) } case None => @@ -267,24 +219,4 @@ object OpTranslator { } } - class SinkTask[T](dataSink: DataSink, taskContext: TaskContext, userConf: UserConfig) - extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[DataSink](GEARPUMP_STREAMING_SINK)(taskContext.system).get, - taskContext, userConf) - } - - override def onStart(startTime: Instant): Unit = { - dataSink.open(taskContext) - } - - override def onNext(msg: Message): Unit = { - dataSink.write(msg) - } - - override def onStop(): Unit = { - dataSink.close() - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala index 3af5e97..f5bbd65 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -29,7 +29,7 @@ import org.apache.gearpump.util.Graph class Planner { - /* + /** * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low * level Graph API. */ @@ -74,7 +74,7 @@ class Planner { dag.inDegreeOf(node2) == 1 && // For processor node, we don't allow it to merge with downstream operators !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) { - val (_, edge, _) = dag.outgoingEdgesOf(node1)(0) + val (_, edge, _) = dag.outgoingEdgesOf(node1).head if (edge == Direct) { val opList = OpChain(node1.ops ++ node2.ops) dag.addVertex(opList) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala index 4e3600f..d1cc5c8 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala @@ -19,9 +19,8 @@ package org.apache.gearpump.streaming.source import akka.actor.ActorSystem - import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.{Constants, Processor} /** * Utility that helps user to create a DAG starting with [[DataSourceTask]] @@ -42,8 +41,8 @@ object DataSourceProcessor { parallelism: Int = 1, description: String = "", taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem) - : Processor[DataSourceTask] = { - Processor[DataSourceTask](parallelism, description = description, - taskConf.withValue[DataSource](DataSourceTask.DATA_SOURCE, dataSource)) + : Processor[DataSourceTask[Any, Any]] = { + Processor[DataSourceTask[Any, Any]](parallelism, description, + taskConf.withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index 5d1a11e..fb2d898 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -19,17 +19,12 @@ package org.apache.gearpump.streaming.source import java.time.Instant -import java.util.concurrent.TimeUnit import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.task.{UpstreamMinClock, Task, TaskContext} - -import scala.concurrent.duration._ - -object DataSourceTask { - val DATA_SOURCE = "data_source" -} +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.plan.OpTranslator.{DummyInputFunction, SingleInputFunction} +import org.apache.gearpump.streaming.task.{Task, TaskContext} /** * Default Task container for [[org.apache.gearpump.streaming.source.DataSource]] that @@ -43,14 +38,39 @@ object DataSourceTask { * - `DataSource.read()` in each `onNext`, which reads a batch of messages * - `DataSource.close()` in `onStop` */ -class DataSourceTask private[source](context: TaskContext, conf: UserConfig, source: DataSource) +class DataSourceTask[IN, OUT] private[source]( + context: TaskContext, + conf: UserConfig, + source: DataSource, + operator: Option[SingleInputFunction[IN, OUT]]) extends Task(context, conf) { def this(context: TaskContext, conf: UserConfig) = { - this(context, conf, conf.getValue[DataSource](DataSourceTask.DATA_SOURCE)(context.system).get) + this(context, conf, + conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get, + conf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system) + ) } + private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000) + private val processMessage: Message => Unit = + operator match { + case Some(op) => + op match { + case bad: DummyInputFunction[IN] => + (message: Message) => context.output(message) + case _ => + (message: Message) => { + op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT => + context.output(Message(m, message.timestamp)) + } + } + } + case None => + (message: Message) => context.output(message) + } + override def onStart(startTime: Instant): Unit = { LOG.info(s"opening data source at $startTime") source.open(context, startTime) @@ -58,11 +78,9 @@ class DataSourceTask private[source](context: TaskContext, conf: UserConfig, sou self ! Watermark(source.getWatermark) } - override def onNext(message: Message): Unit = { + override def onNext(m: Message): Unit = { 0.until(batchSize).foreach { _ => - Option(source.read()).foreach { msg => - context.output(msg) - } + Option(source.read()).foreach(processMessage) } self ! Watermark(source.getWatermark) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala index dd286de..e919a34 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.dsl import akka.actor.ActorSystem import org.apache.gearpump.cluster.TestUtil import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.streaming.dsl.plan.OpTranslator.SourceTask +import org.apache.gearpump.streaming.source.DataSourceTask import org.mockito.Mockito.when import org.scalatest._ import org.scalatest.mock.MockitoSugar @@ -60,7 +60,7 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M val parallism = 3 app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _) val task = app.plan.dag.vertices.iterator.next() - assert(task.taskClass == classOf[SourceTask[_, _]].getName) + assert(task.taskClass == classOf[DataSourceTask[_, _]].getName) assert(task.parallelism == parallism) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala index 82979e0..816feef 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala @@ -26,6 +26,7 @@ import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner} import org.apache.gearpump.streaming.dsl.StreamSpec.Join import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ +import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph._ @@ -40,7 +41,7 @@ import scala.util.{Either, Left, Right} class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - implicit var system: ActorSystem = null + implicit var system: ActorSystem = _ override def beforeAll(): Unit = { system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) @@ -75,7 +76,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock val query = app.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) stream.merge(query).process[(String, Int)](classOf[Join], 1) - val appDescription = app.plan + val appDescription = app.plan() val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => edge.partitionerFactory.partitioner.getClass.getName @@ -87,7 +88,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock } private def getExpectedDagTopology: Graph[String, String] = { - val source = classOf[SourceTask[_, _]].getName + val source = classOf[DataSourceTask[_, _]].getName val group = classOf[GroupByTask[_, _, _]].getName val merge = classOf[TransformTask[_, _]].getName val join = classOf[Join].getName @@ -108,7 +109,7 @@ object StreamSpec { class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { - var query: String = null + var query: String = _ override def onNext(msg: Message): Unit = { msg.msg match { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala index 144df0f..2112fd0 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala @@ -22,19 +22,18 @@ import java.time.Instant import scala.concurrent.Await import scala.concurrent.duration.Duration - import akka.actor.ActorSystem import org.mockito.ArgumentCaptor import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest._ - import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.dsl.CollectionDataSource import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ +import org.apache.gearpump.streaming.source.DataSourceTask class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { @@ -69,25 +68,31 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { "Source" should "iterate over input source and apply attached operator" in { val taskContext = MockUtil.mockTaskContext + implicit val actorSystem = MockUtil.system - val conf = UserConfig.empty val data = "one two three".split("\\s") + val dataSource = new CollectionDataSource[String](data) + val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource) // Source with no transformer - val source = new SourceTask[String, String](new CollectionDataSource[String](data), None, + val source = new DataSourceTask[String, String]( taskContext, conf) source.onStart(Instant.EPOCH) source.onNext(Message("next")) - verify(taskContext, times(1)).output(anyObject()) + data.foreach { s => + verify(taskContext, times(1)).output(Message(s)) + } // Source with transformer val anotherTaskContext = MockUtil.mockTaskContext val double = new FlatMapFunction[String, String](word => List(word, word), "double") - val another = new SourceTask(new CollectionDataSource[String](data), Some(double), - anotherTaskContext, conf) + val another = new DataSourceTask(anotherTaskContext, + conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) another.onStart(Instant.EPOCH) another.onNext(Message("next")) - verify(anotherTaskContext, times(2)).output(anyObject()) + data.foreach { s => + verify(anotherTaskContext, times(2)).output(Message(s)) + } } "GroupByTask" should "group input by groupBy Function and " + @@ -95,8 +100,6 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { val data = "1 2 2 3 3 3" - var map = Map.empty[String, Int] - val concat = new ReduceFunction[String]({ (left, right) => left + right }, "concat") @@ -119,7 +122,7 @@ class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { import scala.collection.JavaConverters._ - val values = peopleCaptor.getAllValues().asScala.map(input => input.msg.asInstanceOf[String]) + val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String]) assert(values.mkString(",") == "1,2,22,3,33,333") system.terminate() Await.result(system.whenTerminated, Duration.Inf) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/584a2ca2/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index ae9bf37..c786047 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -39,7 +39,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val sourceTask = new DataSourceTask(taskContext, config, dataSource) + val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None) sourceTask.onStart(startTime) verify(dataSource).open(taskContext, startTime) @@ -54,7 +54,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val sourceTask = new DataSourceTask(taskContext, config, dataSource) + val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None) val msg = Message(str) when(dataSource.read()).thenReturn(msg) @@ -69,7 +69,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val sourceTask = new DataSourceTask(taskContext, config, dataSource) + val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None) sourceTask.onStop() verify(dataSource).close()
