Repository: incubator-gearpump Updated Branches: refs/heads/master 385a612bb -> a23a40f5e
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala index 98bf24f..f0920de 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -25,7 +25,8 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.Processor.DefaultProcessor import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask} -import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.GroupByFn import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource @@ -145,7 +146,6 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val chainedOp = chainableOp1.chain(chainableOp2) - verify(fn1).andThen(fn2) chainedOp shouldBe a[ChainableOp[_, _]] unchainableOps.foreach { op => @@ -156,12 +156,9 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS } "get Processor" in { - val fn = new SingleInputFunction[Any, Any] { - override def process(value: Any): TraversableOnce[Any] = null - - override def description: String = null - } - val chainableOp = ChainableOp[Any, Any](fn) + val fn = mock[FlatMapFunction[Any, Any]] + val flatMapper = new FlatMapper(fn, "flatMap") + val chainableOp = ChainableOp[Any, Any](flatMapper) val processor = chainableOp.getProcessor processor shouldBe a[Processor[_]] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala index 1610f0e..3f23fa9 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -23,10 +23,12 @@ import java.time.Instant import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._ -import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.GroupByFn import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource @@ -56,8 +58,8 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc val graph = Graph.empty[Op, OpEdge] val sourceOp = DataSourceOp(new AnySource) val groupByOp = GroupByOp(new AnyGroupByFn) - val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction) - val reduceOp = ChainableOp[Any, Any](anyReduceFunction) + val flatMapOp = ChainableOp[Any, Any](anyFlatMapper) + val reduceOp = ChainableOp[Any, Any](anyReducer) val processorOp = new ProcessorOp[AnyTask] val sinkOp = DataSinkOp(new AnySink) val directEdge = Direct @@ -92,9 +94,10 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc object PlannerSpec { private val anyParallelism = 1 - private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), "flatMap") - private val anyReduceFunction = new ReduceFunction[Any]( - (left: Any, right: Any) => (left, right), "reduce") + private val anyFlatMapper = new FlatMapper[Any, Any]( + FlatMapFunction(Option(_)), "flatMap") + private val anyReducer = new Reducer[Any]( + ReduceFunction((left: Any, right: Any) => (left, right)), "reduce") class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala index ad12e33..2c03e1c 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala @@ -23,9 +23,11 @@ import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.CollectionDataSource import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} import org.apache.gearpump.streaming.dsl.window.api.CountWindow import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow @@ -77,165 +79,164 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { andThen.finish().toList shouldBe List(secondResult) } - "clear both states on clearState" in { - andThen.clearState() + "set up both functions on setup" in { + andThen.setup() - verify(first).clearState() - verify(second).clearState() + verify(first).setup() + verify(second).setup() } - "return AndThen on andThen" in { - val third = mock[SingleInputFunction[T, Any]] - when(second.andThen(third)).thenReturn(AndThen(second, third)) + "tear down both functions on teardown" in { + andThen.teardown() - andThen.andThen[Any](third) + verify(first).teardown() + verify(second).teardown() + } + + "chain multiple single input function" in { + val split = new FlatMapper[String, String](FlatMapFunction(_.split("\\s")), "split") + + val filter = new FlatMapper[String, String]( + FlatMapFunction(word => if (word.isEmpty) None else Some(word)), "filter") + + val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), "map") + + val sum = new Reducer[Int](ReduceFunction({(left, right) => left + right}), "sum") + + val all = AndThen(split, AndThen(filter, AndThen(map, sum))) - verify(first).andThen(AndThen(second, third)) + assert(all.description == "split.filter.map.sum") + + val data = + """ + five four three two one + five four three two + five four three + five four + five + """ + // force eager evaluation + all.process(data).toList + val result = all.finish().toList + assert(result.nonEmpty) + assert(result.last == 15) } } - "FlatMapFunction" should { + "FlatMapper" should { - val flatMap = mock[R => TraversableOnce[S]] - val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap") + val flatMapFunction = mock[FlatMapFunction[R, S]] + val flatMapper = new FlatMapper[R, S](flatMapFunction, "flatMap") "call flatMap function when processing input value" in { val input = mock[R] - flatMapFunction.process(input) - verify(flatMap).apply(input) + flatMapper.process(input) + verify(flatMapFunction).apply(input) } "return passed in description" in { - flatMapFunction.description shouldBe "flatMap" + flatMapper.description shouldBe "flatMap" } "return None on finish" in { - flatMapFunction.finish() shouldBe List.empty[S] + flatMapper.finish() shouldBe List.empty[S] } - "do nothing on clearState" in { - flatMapFunction.clearState() - verifyZeroInteractions(flatMap) + "set up FlatMapFunction on setup" in { + flatMapper.setup() + + verify(flatMapFunction).setup() } - "return AndThen on andThen" in { - val other = mock[SingleInputFunction[S, T]] - flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]] + "tear down FlatMapFunction on teardown" in { + flatMapper.teardown() + + verify(flatMapFunction).teardown() } } "ReduceFunction" should { - "call reduce function when processing input value" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") val input1 = mock[T] val input2 = mock[T] val output = mock[T] - when(reduce.apply(input1, input2)).thenReturn(output, output) + when(reduceFunction.apply(input1, input2)).thenReturn(output, output) - reduceFunction.process(input1) shouldBe List.empty[T] - reduceFunction.process(input2) shouldBe List.empty[T] - reduceFunction.finish() shouldBe List(output) + reducer.process(input1) shouldBe List.empty[T] + reducer.process(input2) shouldBe List.empty[T] + reducer.finish() shouldBe List(output) - reduceFunction.clearState() - reduceFunction.process(input1) shouldBe List.empty[T] - reduceFunction.clearState() - reduceFunction.process(input2) shouldBe List.empty[T] - reduceFunction.finish() shouldBe List(input2) + reducer.teardown() + reducer.process(input1) shouldBe List.empty[T] + reducer.teardown() + reducer.process(input2) shouldBe List.empty[T] + reducer.finish() shouldBe List(input2) } "return passed in description" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - reduceFunction.description shouldBe "reduce" + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.description shouldBe "reduce" } "return None on finish" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - reduceFunction.finish() shouldBe List.empty[T] + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.finish() shouldBe List.empty[T] } - "do nothing on clearState" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - reduceFunction.clearState() - verifyZeroInteractions(reduce) + "set up reduce function on setup" in { + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.setup() + + verify(reduceFunction).setup() } - "return AndThen on andThen" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - val other = mock[SingleInputFunction[T, Any]] - reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]] + "tear down reduce function on teardown" in { + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.teardown() + + verify(reduceFunction).teardown() } } - "EmitFunction" should { + "Emit" should { - val emit = mock[T => Unit] - val emitFunction = new EmitFunction[T](emit) + val emitFunction = mock[T => Unit] + val emit = new Emit[T](emitFunction) "emit input value when processing input value" in { val input = mock[T] - emitFunction.process(input) shouldBe List.empty[Unit] + emit.process(input) shouldBe List.empty[Unit] - verify(emit).apply(input) + verify(emitFunction).apply(input) } "return empty description" in { - emitFunction.description shouldBe "" + emit.description shouldBe "" } "return None on finish" in { - emitFunction.finish() shouldBe List.empty[Unit] + emit.finish() shouldBe List.empty[Unit] } - "do nothing on clearState" in { - emitFunction.clearState() - verifyZeroInteractions(emit) - } + "do nothing on setup" in { + emit.setup() - "throw exception on andThen" in { - val other = mock[SingleInputFunction[Unit, Any]] - intercept[UnsupportedOperationException] { - emitFunction.andThen(other) - } + verifyZeroInteractions(emitFunction) } - } - - "andThen" should { - "chain multiple single input function" in { - val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") - val filter = new FlatMapFunction[String, String](word => - if (word.isEmpty) None else Some(word), "filter") + "do nothing on teardown" in { + emit.teardown() - val map = new FlatMapFunction[String, Int](word => Some(1), "map") - - val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") - - val all = split.andThen(filter).andThen(map).andThen(sum) - - assert(all.description == "split.filter.map.sum") - - val data = - """ - five four three two one - five four three two - five four three - five four - five - """ - // force eager evaluation - all.process(data).toList - val result = all.finish().toList - assert(result.nonEmpty) - assert(result.last == 15) + verifyZeroInteractions(emitFunction) } } @@ -261,7 +262,8 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { // Source with transformer val anotherTaskContext = MockUtil.mockTaskContext - val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val double = new FlatMapper[String, String](FlatMapFunction( + word => List(word, word)), "double") val another = new DataSourceTask(anotherTaskContext, conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) another.onStart(Instant.EPOCH) @@ -279,9 +281,8 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { val data = "1 2 2 3 3 3" - val concat = new ReduceFunction[String]({ (left, right) => - left + right - }, "concat") + val concat = new Reducer[String](ReduceFunction({ (left, right) => + left + right}), "concat") implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( @@ -315,7 +316,8 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { // Source with transformer val taskContext = MockUtil.mockTaskContext val conf = UserConfig.empty - val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val double = new FlatMapper[String, String](FlatMapFunction( + word => List(word, word)), "double") val task = new TransformTask[String, String](Some(double), taskContext, conf) task.onStart(Instant.EPOCH) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala new file mode 100644 index 0000000..5b90a3e --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.scalaapi + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.dsl.scalaapi +import org.apache.gearpump.streaming.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} +import org.apache.gearpump.util.Graph +import org.mockito.Mockito.when +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "be able to generate multiple new streams" in { + val context: ClientContext = mock[ClientContext] + when(context.system).thenReturn(system) + + val dsl = StreamApp("dsl", context) + dsl.source(List("A"), 2, "A") shouldBe a [scalaapi.Stream[_]] + dsl.source(List("B"), 3, "B") shouldBe a [scalaapi.Stream[_]] + + val application = dsl.plan() + application shouldBe a [StreamApplication] + application.name shouldBe "dsl" + val dag = application.userConfig + .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get + dag.vertices.size shouldBe 2 + dag.vertices.foreach { processor => + processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName + if (processor.description == "A") { + processor.parallelism shouldBe 2 + } else if (processor.description == "B") { + processor.parallelism shouldBe 3 + } else { + fail(s"undefined source ${processor.description}") + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala new file mode 100644 index 0000000..62a3bcb --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.scalaapi + +import akka.actor._ +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner +import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} +import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} +import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph._ +import org.mockito.Mockito.when +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.{Either, Left, Right} + +class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "translate the DSL to a DAG" in { + val context: ClientContext = mock[ClientContext] + when(context.system).thenReturn(system) + + val dsl = StreamApp("dsl", context) + + val data = + """ + five four three two one + five four three two + five four three + five four + five + """ + val stream = dsl.source(data.lines.toList, 1, ""). + flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty). + map(word => (word, 1)). + groupBy(_._1, parallelism = 2). + reduce((left, right) => (left._1, left._2 + right._2)). + map[Either[(String, Int), String]]({t: (String, Int) => Left(t)}) + + val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]]( + {s: String => Right(s)}) + stream.merge(query).process[(String, Int)](classOf[Join], 1) + + val app: StreamApplication = dsl.plan() + val dag = app.userConfig + .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get + + val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => + edge.partitionerFactory.partitioner.getClass.getName + } + val expectedDagTopology = getExpectedDagTopology + + dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet + dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet + } + + private def getExpectedDagTopology: Graph[String, String] = { + val source = classOf[DataSourceTask[_, _]].getName + val group = classOf[CountTriggerTask[_, _]].getName + val merge = classOf[TransformTask[_, _]].getName + val join = classOf[Join].getName + + val hash = classOf[HashPartitioner].getName + val groupBy = classOf[GroupByPartitioner[_, _]].getName + val colocation = classOf[CoLocationPartitioner].getName + + val expectedDagTopology = Graph( + source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join, + source ~ hash ~> merge + ) + expectedDagTopology + } +} + +object StreamSpec { + + class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { + + var query: String = _ + + override def onNext(msg: Message): Unit = { + msg.msg match { + case Left(wordCount: (String @unchecked, Int @unchecked)) => + if (query != null && wordCount._1 == query) { + taskContext.output(new Message(wordCount)) + } + + case Right(query: String) => + this.query = query + } + } + } +}
