Repository: incubator-gearpump Updated Branches: refs/heads/master 176d82763 -> fc8006cea
[GEARPUMP-308] Fix TransformTask output time Author: manuzhang <owenzhang1...@gmail.com> Closes #184 from manuzhang/fix_source_output_time. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/fc8006ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/fc8006ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/fc8006ce Branch: refs/heads/master Commit: fc8006cead6f05aeda2b835deef1f843f6b9a926 Parents: 176d827 Author: manuzhang <owenzhang1...@gmail.com> Authored: Fri May 19 10:46:32 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Fri May 19 10:47:02 2017 +0800 ---------------------------------------------------------------------- .../streaming/dsl/scalaapi/Stream.scala | 6 +- .../streaming/dsl/task/TransformTask.scala | 30 ++++---- .../dsl/window/impl/WindowRunner.scala | 4 +- .../streaming/dsl/task/TransformTaskSpec.scala | 2 +- .../window/impl/DefaultWindowRunnerSpec.scala | 73 ++++++++++++++++++++ .../streaming/source/DataSourceTaskSpec.scala | 8 +-- 6 files changed, 99 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fc8006ce/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala index 9c5e347..82d6beb 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -108,7 +108,11 @@ class Stream[T]( * @return a new stream after fold */ def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = { - transform(new FoldRunner(fn, description)) + if (graph.vertices.exists(_.isInstanceOf[GroupByOp[_, _]])) { + transform(new FoldRunner(fn, description)) + } else { + throw new UnsupportedOperationException("fold operation can only be applied on window") + } } /** http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fc8006ce/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index 9571697..572df94 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -29,7 +29,7 @@ import org.apache.gearpump.streaming.task.{Task, TaskContext} object TransformTask { class Transform[IN, OUT](taskContext: TaskContext, - operator: Option[FunctionRunner[IN, OUT]], + processor: Option[FunctionRunner[IN, OUT]], private var buffer: Vector[Message] = Vector.empty[Message]) { def onNext(msg: Message): Unit = { @@ -37,27 +37,23 @@ object TransformTask { } def onWatermarkProgress(watermark: Instant): Unit = { - val watermarkTime = watermark.toEpochMilli var nextBuffer = Vector.empty[Message] - val processor = operator.map(FunctionRunner.withEmitFn(_, - (out: OUT) => taskContext.output(Message(out, watermarkTime)))) processor.foreach(_.setup()) - buffer.foreach { - message: Message => - if (message.timestamp.toEpochMilli < watermarkTime) { - processor match { - case Some(p) => + buffer.foreach { message: Message => + if (message.timestamp.isBefore(watermark)) { + processor match { + case Some(p) => + FunctionRunner + .withEmitFn(p, (out: OUT) => taskContext.output(Message(out, message.timestamp))) // .toList forces eager evaluation - p.process(message.value.asInstanceOf[IN]).toList - case None => - taskContext.output(Message(message.value, watermarkTime)) - } - } else { - nextBuffer +:= message + .process(message.value.asInstanceOf[IN]).toList + case None => + taskContext.output(message) } + } else { + nextBuffer +:= message + } } - // .toList forces eager evaluation - processor.map(_.finish().toList) processor.foreach(_.teardown()) buffer = nextBuffer } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fc8006ce/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index 74749b9..f392f70 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -116,7 +116,9 @@ class DefaultWindowRunner[IN, GROUP, OUT]( val inputs = windowInputs.remove(firstWin) if (groupedFnRunners.containsKey(group)) { val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group), - (output: OUT) => taskContext.output(Message(output, time))) + (output: OUT) => { + taskContext.output(Message(output, time)) + }) val setup = groupedRunnerSetups.get(group) if (!setup) { runner.setup() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fc8006ce/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala index 281d69a..481925a 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -69,7 +69,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with task.onWatermarkProgress(Watermark.MAX) msgs.foreach { msg => - verify(taskContext).output(MockitoMatchers.eq(Message(msg.value, Watermark.MAX))) + verify(taskContext).output(MockitoMatchers.eq(msg)) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fc8006ce/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala new file mode 100644 index 0000000..fbbee3e --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.window.impl + +import java.time.{Duration, Instant} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.{Constants, MockUtil} +import org.apache.gearpump.streaming.dsl.plan.functions.FoldRunner +import org.apache.gearpump.streaming.dsl.window.api.SessionWindows +import org.apache.gearpump.streaming.source.Watermark +import org.mockito.Mockito.{times, verify} +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks + + +class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks + with Matchers with MockitoSugar { + + property("DefaultWindowRunner should handle SessionWindow") { + + val data = List( + Message(("foo", 1L), Instant.ofEpochMilli(1L)), + Message(("bar", 1L), Instant.ofEpochMilli(8L)), + Message(("foo", 1L), Instant.ofEpochMilli(15L)), + Message(("bar", 1L), Instant.ofEpochMilli(17L)), + Message(("bar", 1L), Instant.ofEpochMilli(18L)), + Message(("foo", 1L), Instant.ofEpochMilli(25L)), + Message(("foo", 1L), Instant.ofEpochMilli(26L)), + Message(("bar", 1L), Instant.ofEpochMilli(30L)), + Message(("bar", 1L), Instant.ofEpochMilli(31L)) + ) + + type KV = (String, Long) + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2)) + val operator = new FoldRunner(reduce, "reduce") + val userConfig = UserConfig.empty.withValue( + Constants.GEARPUMP_STREAMING_OPERATOR, operator) + val windows = SessionWindows.apply[KV](Duration.ofMillis(4L)) + val groupBy = GroupAlsoByWindow[KV, String](_._1, windows) + val windowRunner = new DefaultWindowRunner(taskContext, userConfig, groupBy) + + data.foreach(windowRunner.process) + windowRunner.trigger(Watermark.MAX) + + verify(taskContext, times(2)).output(Message(Some(("foo", 1)), Watermark.MAX)) + verify(taskContext).output(Message(Some(("foo", 2)), Watermark.MAX)) + verify(taskContext, times(2)).output(Message(Some(("bar", 2)), Watermark.MAX)) + verify(taskContext).output(Message(Some(("bar", 1)), Watermark.MAX)) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fc8006ce/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index 7db9b15..7651251 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -56,8 +56,8 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with } property("DataSourceTask should read from DataSource and transform inputs") { - forAll(runnerGen, Gen.alphaStr) { - (runner: Option[FunctionRunner[Any, Any]], str: String) => + forAll(runnerGen, Gen.alphaStr, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { + (runner: Option[FunctionRunner[Any, Any]], str: String, timestamp: Instant) => val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val dataSource = mock[DataSource] @@ -65,7 +65,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) val transform = new Transform[Any, Any](taskContext, runner) val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform) - val msg = Message(str) + val msg = Message(str, timestamp) when(dataSource.read()).thenReturn(msg) runner.foreach(r => { when(r.process(str)).thenReturn(Some(str)) @@ -75,7 +75,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with sourceTask.onNext(Message("next")) sourceTask.onWatermarkProgress(Watermark.MAX) - verify(taskContext).output(Message(str, Watermark.MAX)) + verify(taskContext).output(msg) } }