Repository: incubator-gearpump Updated Branches: refs/heads/master 215531cd8 -> 1bf6a9ba8
[GEARPUMP-249] Fix function chains to enforce execution Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the commit message is formatted like: `[GEARPUMP-<Jira issue #>] Meaningful description of pull request` - [x] Make sure tests pass via `sbt clean test`. - [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. Author: manuzhang <[email protected]> Closes #123 from manuzhang/GEARPUMP-249. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/1bf6a9ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/1bf6a9ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/1bf6a9ba Branch: refs/heads/master Commit: 1bf6a9ba8fa774b985fcd90a5d6bef129a4fb9a5 Parents: 215531c Author: manuzhang <[email protected]> Authored: Wed Dec 14 19:12:52 2016 +0800 Committer: manuzhang <[email protected]> Committed: Wed Dec 14 19:12:52 2016 +0800 ---------------------------------------------------------------------- .../dsl/plan/functions/SingleInputFunction.scala | 9 +++++++-- .../dsl/plan/functions/SingleInputFunctionSpec.scala | 10 +++++++--- 2 files changed, 14 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1bf6a9ba/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala index 609fbb0..5322648 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala @@ -20,17 +20,22 @@ package org.apache.gearpump.streaming.dsl.plan.functions trait SingleInputFunction[IN, OUT] extends Serializable { def process(value: IN): TraversableOnce[OUT] def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { - new AndThen(this, other) + AndThen(this, other) } def finish(): TraversableOnce[OUT] = None def clearState(): Unit = {} def description: String } -class AndThen[IN, MIDDLE, OUT]( +case class AndThen[IN, MIDDLE, OUT]( first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) extends SingleInputFunction[IN, OUT] { + override def andThen[OUTER]( + other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { + first.andThen(second.andThen(other)) + } + override def process(value: IN): TraversableOnce[OUT] = { first.process(value).flatMap(second.process) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1bf6a9ba/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala index 94feae4..ad12e33 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala @@ -45,7 +45,7 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { val first = mock[SingleInputFunction[R, S]] val second = mock[SingleInputFunction[S, T]] - val andThen = new AndThen(first, second) + val andThen = AndThen(first, second) "chain first and second functions when processing input value" in { val input = mock[R] @@ -86,7 +86,11 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { "return AndThen on andThen" in { val third = mock[SingleInputFunction[T, Any]] - andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]] + when(second.andThen(third)).thenReturn(AndThen(second, third)) + + andThen.andThen[Any](third) + + verify(first).andThen(AndThen(second, third)) } } @@ -241,7 +245,7 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { val taskContext = MockUtil.mockTaskContext implicit val actorSystem = MockUtil.system - val data = "one two three".split("\\s") + val data = "one two three".split("\\s+") val dataSource = new CollectionDataSource[String](data) val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
