Repository: incubator-gearpump Updated Branches: refs/heads/master 6a65dcbd7 -> 07d8d51e7
[GEARPUMP-287] Trigger data process on watermark Author: manuzhang <[email protected]> Closes #165 from manuzhang/GEARPUMP-287. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/07d8d51e Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/07d8d51e Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/07d8d51e Branch: refs/heads/master Commit: 07d8d51e7f0ce39564aeb919a61cee35845deacc Parents: 6a65dcb Author: manuzhang <[email protected]> Authored: Thu Mar 2 13:54:33 2017 +0800 Committer: manuzhang <[email protected]> Committed: Thu Mar 2 13:54:38 2017 +0800 ---------------------------------------------------------------------- .../dsl/plan/functions/FunctionRunner.scala | 8 +++ .../streaming/dsl/task/TransformTask.scala | 70 +++++++++++++++----- .../dsl/window/impl/WindowRunner.scala | 10 +-- .../streaming/source/DataSourceTask.scala | 28 +++----- .../dsl/plan/functions/FunctionRunnerSpec.scala | 12 +++- .../streaming/dsl/task/TransformTaskSpec.scala | 21 ++++-- .../streaming/source/DataSourceTaskSpec.scala | 20 +++--- 7 files changed, 112 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala index 36821e4..9dfa6ad 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala @@ -20,6 +20,13 @@ package org.apache.gearpump.streaming.dsl.plan.functions import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +object FunctionRunner { + def withEmitFn[IN, OUT](runner: FunctionRunner[IN, OUT], + fn: OUT => Unit): FunctionRunner[IN, Unit] = { + AndThen(runner, new Emit(fn)) + } +} + /** * Interface to invoke SerializableFunction methods * @@ -126,3 +133,4 @@ class Emit[T](emit: T => Unit) extends FunctionRunner[T, Unit] { override def description: String = "" } + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index c4278dd..ed48dc7 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -23,34 +23,74 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner +import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform import org.apache.gearpump.streaming.task.{Task, TaskContext} -class TransformTask[IN, OUT](operator: Option[FunctionRunner[IN, OUT]], +object TransformTask { + + class Transform[IN, OUT](taskContext: TaskContext, + operator: Option[FunctionRunner[IN, OUT]], + private var buffer: Vector[Message] = Vector.empty[Message]) { + + def onStart(startTime: Instant): Unit = { + operator.foreach(_.setup()) + } + + def onNext(msg: Message): Unit = { + buffer +:= msg + } + + def onStop(): Unit = { + operator.foreach(_.teardown()) + } + + def onWatermarkProgress(watermark: Instant): Unit = { + val watermarkTime = watermark.toEpochMilli + var nextBuffer = Vector.empty[Message] + val processor = operator.map(FunctionRunner.withEmitFn(_, + (out: OUT) => taskContext.output(Message(out, watermarkTime)))) + buffer.foreach { case message@Message(in, time) => + if (time <= watermarkTime) { + processor match { + case Some(p) => + // .toList forces eager evaluation + p.process(in.asInstanceOf[IN]).toList + case None => + taskContext.output(Message(in, watermarkTime)) + } + } else { + nextBuffer +:= message + } + } + // .toList forces eager evaluation + processor.map(_.finish()) + buffer = nextBuffer + } + } + +} + +class TransformTask[IN, OUT](transform: Transform[IN, OUT], taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[FunctionRunner[IN, OUT]]( - GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) + this(new Transform(taskContext, userConf.getValue[FunctionRunner[IN, OUT]]( + GEARPUMP_STREAMING_OPERATOR)(taskContext.system)), taskContext, userConf) } override def onStart(startTime: Instant): Unit = { - operator.foreach(_.setup()) + transform.onStart(startTime) } override def onNext(msg: Message): Unit = { - val time = msg.timestamp - - operator match { - case Some(op) => - op.process(msg.msg.asInstanceOf[IN]).foreach { msg => - taskContext.output(Message(msg, time)) - } - case None => - taskContext.output(Message(msg.msg, time)) - } + transform.onNext(msg) } override def onStop(): Unit = { - operator.foreach(_.teardown()) + transform.onStop() + } + + override def onWatermarkProgress(watermark: Instant): Unit = { + transform.onWatermarkProgress(watermark) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 91edd73..44d724d 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -28,7 +28,7 @@ import com.gs.collections.impl.list.mutable.FastList import com.gs.collections.impl.map.mutable.UnifiedMap import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, FunctionRunner} +import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner import org.apache.gearpump.streaming.dsl.window.api.Discarding import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.LogUtil @@ -114,8 +114,8 @@ class DefaultWindowRunner[IN, GROUP, OUT]( if (!time.isBefore(firstWin.endTime)) { val inputs = windowInputs.remove(firstWin) if (groupedFnRunners.containsKey(group)) { - val reduceFn = AndThen(groupedFnRunners.get(group), - new Emit[OUT](output => emitResult(output, time))) + val reduceFn = FunctionRunner.withEmitFn(groupedFnRunners.get(group), + (output: OUT) => taskContext.output(Message(output, time))) inputs.forEach(new Procedure[IN] { override def value(t: IN): Unit = { // .toList forces eager evaluation @@ -134,9 +134,5 @@ class DefaultWindowRunner[IN, GROUP, OUT]( } } } - - def emitResult(result: OUT, time: Instant): Unit = { - taskContext.output(Message(result, time)) - } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index 0d0dfa2..3fceb1a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -24,6 +24,7 @@ import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner +import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform import org.apache.gearpump.streaming.task.{Task, TaskContext} /** @@ -42,49 +43,42 @@ class DataSourceTask[IN, OUT] private[source]( context: TaskContext, conf: UserConfig, source: DataSource, - operator: Option[FunctionRunner[IN, OUT]]) + transform: Transform[IN, OUT]) extends Task(context, conf) { def this(context: TaskContext, conf: UserConfig) = { this(context, conf, conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get, - conf.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system) + new Transform[IN, OUT](context, + conf.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system)) ) } private val batchSize = conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000) - private val processMessage: Message => Unit = - operator match { - case Some(op) => - (message: Message) => { - op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT => - context.output(Message(m, message.timestamp)) - } - } - case None => - (message: Message) => context.output(message) - } - override def onStart(startTime: Instant): Unit = { LOG.info(s"opening data source at $startTime") source.open(context, startTime) - operator.foreach(_.setup()) + transform.onStart(startTime) self ! Watermark(source.getWatermark) } override def onNext(m: Message): Unit = { 0.until(batchSize).foreach { _ => - Option(source.read()).foreach(processMessage) + Option(source.read()).foreach(transform.onNext) } self ! Watermark(source.getWatermark) } + override def onWatermarkProgress(watermark: Instant): Unit = { + transform.onWatermarkProgress(watermark) + } + override def onStop(): Unit = { - operator.foreach(_.teardown()) LOG.info("closing data source...") + transform.onStop() source.close() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala index 08a259a..d26b7d9 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala @@ -23,11 +23,12 @@ import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.streaming.source.{DataSourceTask, Watermark} import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} import org.apache.gearpump.streaming.dsl.window.api.CountWindows import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow @@ -255,6 +256,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { taskContext, conf) source.onStart(Instant.EPOCH) source.onNext(Message("next")) + source.onWatermarkProgress(Watermark.MAX) data.foreach { s => verify(taskContext, times(1)).output(MockUtil.argMatch[Message]( message => message.msg == s)) @@ -268,6 +270,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) another.onStart(Instant.EPOCH) another.onNext(Message("next")) + another.onWatermarkProgress(Watermark.MAX) data.foreach { s => verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message]( message => message.msg == s)) @@ -311,7 +314,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { } } - "MergeTask" should { + "TransformTask" should { "accept two stream and apply the attached operator" in { // Source with transformer @@ -319,7 +322,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val conf = UserConfig.empty val double = new FlatMapper[String, String](FlatMapFunction( word => List(word, word)), "double") - val task = new TransformTask[String, String](Some(double), taskContext, conf) + val transform = new Transform[String, String](taskContext, Some(double)) + val task = new TransformTask[String, String](transform, taskContext, conf) task.onStart(Instant.EPOCH) val data = "1 2 2 3 3 3".split("\\s+") @@ -328,6 +332,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { task.onNext(Message(input)) } + task.onWatermarkProgress(Watermark.MAX) + verify(taskContext, times(data.length * 2)).output(anyObject()) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala index 8266df5..67fa375 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -23,6 +23,8 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner +import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform +import org.apache.gearpump.streaming.source.Watermark import org.mockito.Mockito.{verify, when} import org.scalacheck.Gen import org.scalatest.{Matchers, PropSpec} @@ -31,13 +33,14 @@ import org.scalatest.prop.PropertyChecks class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - property("TransformTask.onStart should call SingleInputFunction.setup") { + property("TransformTask should setup functions") { forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) => val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val config = UserConfig.empty val operator = mock[FunctionRunner[Any, Any]] - val sourceTask = new TransformTask[Any, Any](Some(operator), taskContext, config) + val transform = new Transform[Any, Any](taskContext, Some(operator)) + val sourceTask = new TransformTask[Any, Any](transform, taskContext, config) sourceTask.onStart(startTime) @@ -45,28 +48,32 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with } } - property("TransformTask.onNext should call SingleInputFunction.process") { + property("TransformTask should process inputs") { forAll(Gen.alphaStr) { (str: String) => val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val config = UserConfig.empty val operator = mock[FunctionRunner[Any, Any]] - val task = new TransformTask[Any, Any](Some(operator), taskContext, config) + val transform = new Transform[Any, Any](taskContext, Some(operator)) + val task = new TransformTask[Any, Any](transform, taskContext, config) val msg = Message(str) when(operator.process(str)).thenReturn(Some(str)) + when(operator.finish()).thenReturn(None) task.onNext(msg) + task.onWatermarkProgress(Watermark.MAX) - verify(taskContext).output(msg) + verify(taskContext).output(Message(str, Watermark.MAX)) } } - property("DataSourceTask.onStop should call SingleInputFunction.setup") { + property("TransformTask should teardown functions") { val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val config = UserConfig.empty val operator = mock[FunctionRunner[Any, Any]] - val task = new TransformTask[Any, Any](Some(operator), taskContext, config) + val transform = new Transform[Any, Any](taskContext, Some(operator)) + val task = new TransformTask[Any, Any](transform, taskContext, config) task.onStop() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/07d8d51e/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index f7f6fd9..3c44c4c 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -24,6 +24,7 @@ import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner +import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -32,7 +33,7 @@ import org.scalatest.prop.PropertyChecks class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - property("DataSourceTask.onStart should call DataSource.open") { + property("DataSourceTask should setup data source and Transform") { forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) => val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system @@ -40,7 +41,8 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) val operator = mock[FunctionRunner[Any, Any]] - val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator)) + val transform = new Transform[Any, Any](taskContext, Some(operator)) + val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform) sourceTask.onStart(startTime) @@ -49,31 +51,33 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with } } - property("DataSourceTask.onNext should call DataSource.read") { + property("DataSourceTask should read from DataSource and transform inputs") { forAll(Gen.alphaStr) { (str: String) => val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - - val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None) + val transform = new Transform[Any, Any](taskContext, None) + val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform) val msg = Message(str) when(dataSource.read()).thenReturn(msg) sourceTask.onNext(Message("next")) - verify(taskContext).output(msg) + sourceTask.onWatermarkProgress(Watermark.MAX) + verify(taskContext).output(Message(str, Watermark.MAX)) } } - property("DataSourceTask.onStop should call DataSource.close") { + property("DataSourceTask should teardown DataSource and Transform") { val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) val operator = mock[FunctionRunner[Any, Any]] - val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator)) + val transform = new Transform[Any, Any](taskContext, Some(operator)) + val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform) sourceTask.onStop()
