Repository: incubator-gearpump Updated Branches: refs/heads/master 232f527d6 -> 000e846ab
[GEARPUMP-307] Fix TransformTask$Transform invocation Author: manuzhang <[email protected]> Closes #182 from manuzhang/fix_transform. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/000e846a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/000e846a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/000e846a Branch: refs/heads/master Commit: 000e846ab227a2113a182b5d6917724f128cb63a Parents: 232f527 Author: manuzhang <[email protected]> Authored: Wed May 10 12:58:23 2017 +0800 Committer: manuzhang <[email protected]> Committed: Wed May 10 12:58:38 2017 +0800 ---------------------------------------------------------------------- .../streaming/dsl/task/TransformTask.scala | 22 ++---- .../streaming/source/DataSourceTask.scala | 2 - .../streaming/dsl/task/TransformTaskSpec.scala | 73 +++++++++----------- .../streaming/source/DataSourceTaskSpec.scala | 66 ++++++++++-------- 4 files changed, 76 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/000e846a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index ed48dc7..86ac933 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -32,25 +32,18 @@ object TransformTask { operator: Option[FunctionRunner[IN, OUT]], private var buffer: Vector[Message] = Vector.empty[Message]) { - def onStart(startTime: Instant): Unit = { - operator.foreach(_.setup()) - } - def onNext(msg: Message): Unit = { buffer +:= msg } - def onStop(): Unit = { - operator.foreach(_.teardown()) - } - def onWatermarkProgress(watermark: Instant): Unit = { val watermarkTime = watermark.toEpochMilli var nextBuffer = Vector.empty[Message] val processor = operator.map(FunctionRunner.withEmitFn(_, (out: OUT) => taskContext.output(Message(out, watermarkTime)))) + processor.foreach(_.setup()) buffer.foreach { case message@Message(in, time) => - if (time <= watermarkTime) { + if (time < watermarkTime) { processor match { case Some(p) => // .toList forces eager evaluation @@ -63,7 +56,8 @@ object TransformTask { } } // .toList forces eager evaluation - processor.map(_.finish()) + processor.map(_.finish().toList) + processor.foreach(_.teardown()) buffer = nextBuffer } } @@ -78,18 +72,10 @@ class TransformTask[IN, OUT](transform: Transform[IN, OUT], GEARPUMP_STREAMING_OPERATOR)(taskContext.system)), taskContext, userConf) } - override def onStart(startTime: Instant): Unit = { - transform.onStart(startTime) - } - override def onNext(msg: Message): Unit = { transform.onNext(msg) } - override def onStop(): Unit = { - transform.onStop() - } - override def onWatermarkProgress(watermark: Instant): Unit = { transform.onWatermarkProgress(watermark) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/000e846a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index 3fceb1a..ff1b2d4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -59,7 +59,6 @@ class DataSourceTask[IN, OUT] private[source]( override def onStart(startTime: Instant): Unit = { LOG.info(s"opening data source at $startTime") source.open(context, startTime) - transform.onStart(startTime) self ! Watermark(source.getWatermark) } @@ -78,7 +77,6 @@ class DataSourceTask[IN, OUT] private[source]( override def onStop(): Unit = { LOG.info("closing data source...") - transform.onStop() source.close() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/000e846a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala index 67fa375..f0bccd7 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -25,7 +25,8 @@ import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform import org.apache.gearpump.streaming.source.Watermark -import org.mockito.Mockito.{verify, when} +import org.mockito.{Matchers => MockitoMatchers} +import org.mockito.Mockito.{times, verify, when} import org.scalacheck.Gen import org.scalatest.{Matchers, PropSpec} import org.scalatest.mock.MockitoSugar @@ -33,50 +34,44 @@ import org.scalatest.prop.PropertyChecks class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - property("TransformTask should setup functions") { - forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) => - val taskContext = MockUtil.mockTaskContext - implicit val system = MockUtil.system - val config = UserConfig.empty - val operator = mock[FunctionRunner[Any, Any]] - val transform = new Transform[Any, Any](taskContext, Some(operator)) - val sourceTask = new TransformTask[Any, Any](transform, taskContext, config) - - sourceTask.onStart(startTime) - - verify(operator).setup() - } + private val timeGen = Gen.chooseNum[Long](Watermark.MIN.toEpochMilli, + Watermark.MAX.toEpochMilli - 1).map(Instant.ofEpochMilli) + private val runnerGen = { + val runner = mock[FunctionRunner[Any, Any]] + Gen.oneOf(Some(runner), None) } - property("TransformTask should process inputs") { - forAll(Gen.alphaStr) { (str: String) => - val taskContext = MockUtil.mockTaskContext - implicit val system = MockUtil.system - val config = UserConfig.empty - val operator = mock[FunctionRunner[Any, Any]] - val transform = new Transform[Any, Any](taskContext, Some(operator)) - val task = new TransformTask[Any, Any](transform, taskContext, config) - val msg = Message(str) - when(operator.process(str)).thenReturn(Some(str)) - when(operator.finish()).thenReturn(None) + property("TransformTask should emit on watermark") { + val msgGen = for { + str <- Gen.alphaStr.suchThat(!_.isEmpty) + t <- timeGen + } yield Message(s"$str:$t", t) + val msgsGen = Gen.listOfN(10, msgGen) - task.onNext(msg) - task.onWatermarkProgress(Watermark.MAX) + forAll(runnerGen, msgsGen) { + (runner: Option[FunctionRunner[Any, Any]], msgs: List[Message]) => + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val config = UserConfig.empty + val transform = new Transform[Any, Any](taskContext, runner) + val task = new TransformTask[Any, Any](transform, taskContext, config) - verify(taskContext).output(Message(str, Watermark.MAX)) - } - } + msgs.foreach(task.onNext) - property("TransformTask should teardown functions") { - val taskContext = MockUtil.mockTaskContext - implicit val system = MockUtil.system - val config = UserConfig.empty - val operator = mock[FunctionRunner[Any, Any]] - val transform = new Transform[Any, Any](taskContext, Some(operator)) - val task = new TransformTask[Any, Any](transform, taskContext, config) + runner.foreach(r => when(r.finish()).thenReturn(None)) + task.onWatermarkProgress(Watermark.MIN) + verify(taskContext, times(0)).output(MockitoMatchers.any[Message]) - task.onStop() + msgs.foreach { msg => + runner.foreach(r => + when(r.process(msg.msg)).thenReturn(Some(msg.msg))) + } + task.onWatermarkProgress(Watermark.MAX) - verify(operator).teardown() + msgs.foreach { msg => + verify(taskContext).output(MockitoMatchers.eq(Message(msg.msg, Watermark.MAX))) + } + } } + } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/000e846a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index 3c44c4c..7db9b15 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -33,55 +33,65 @@ import org.scalatest.prop.PropertyChecks class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - property("DataSourceTask should setup data source and Transform") { - forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) => + private val runnerGen = { + val runner = mock[FunctionRunner[Any, Any]] + Gen.oneOf(Some(runner), None) + } + + property("DataSourceTask should setup data source") { + forAll(runnerGen, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { + (runner: Option[FunctionRunner[Any, Any]], startTime: Instant) => val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val operator = mock[FunctionRunner[Any, Any]] - val transform = new Transform[Any, Any](taskContext, Some(operator)) + val transform = new Transform[Any, Any](taskContext, runner) val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform) sourceTask.onStart(startTime) verify(dataSource).open(taskContext, startTime) - verify(operator).setup() } } property("DataSourceTask should read from DataSource and transform inputs") { - forAll(Gen.alphaStr) { (str: String) => + forAll(runnerGen, Gen.alphaStr) { + (runner: Option[FunctionRunner[Any, Any]], str: String) => + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val dataSource = mock[DataSource] + val config = UserConfig.empty + .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) + val transform = new Transform[Any, Any](taskContext, runner) + val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform) + val msg = Message(str) + when(dataSource.read()).thenReturn(msg) + runner.foreach(r => { + when(r.process(str)).thenReturn(Some(str)) + when(r.finish()).thenReturn(None) + }) + + sourceTask.onNext(Message("next")) + sourceTask.onWatermarkProgress(Watermark.MAX) + + verify(taskContext).output(Message(str, Watermark.MAX)) + } + } + + property("DataSourceTask should teardown DataSource") { + forAll(runnerGen) { (runner: Option[FunctionRunner[Any, Any]]) => val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val transform = new Transform[Any, Any](taskContext, None) + val transform = new Transform[Any, Any](taskContext, runner) val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform) - val msg = Message(str) - when(dataSource.read()).thenReturn(msg) - - sourceTask.onNext(Message("next")) - sourceTask.onWatermarkProgress(Watermark.MAX) - verify(taskContext).output(Message(str, Watermark.MAX)) - } - } - - property("DataSourceTask should teardown DataSource and Transform") { - val taskContext = MockUtil.mockTaskContext - implicit val system = MockUtil.system - val dataSource = mock[DataSource] - val config = UserConfig.empty - .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val operator = mock[FunctionRunner[Any, Any]] - val transform = new Transform[Any, Any](taskContext, Some(operator)) - val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform) - sourceTask.onStop() + sourceTask.onStop() - verify(dataSource).close() - verify(operator).teardown() + verify(dataSource).close() + } } }
