Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 385a612bb -> a23a40f5e


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index 98bf24f..f0920de 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -25,7 +25,8 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
 import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, 
AnyTask}
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, 
SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
@@ -145,7 +146,6 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
 
       val chainedOp = chainableOp1.chain(chainableOp2)
 
-      verify(fn1).andThen(fn2)
       chainedOp shouldBe a[ChainableOp[_, _]]
 
       unchainableOps.foreach { op =>
@@ -156,12 +156,9 @@ class OpSpec extends WordSpec with Matchers with 
BeforeAndAfterAll with MockitoS
     }
 
     "get Processor" in {
-      val fn = new SingleInputFunction[Any, Any] {
-        override def process(value: Any): TraversableOnce[Any] = null
-
-        override def description: String = null
-      }
-      val chainableOp = ChainableOp[Any, Any](fn)
+      val fn = mock[FlatMapFunction[Any, Any]]
+      val flatMapper = new FlatMapper(fn, "flatMap")
+      val chainableOp = ChainableOp[Any, Any](flatMapper)
 
       val processor = chainableOp.getProcessor
       processor shouldBe a[Processor[_]]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index 1610f0e..3f23fa9 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -23,10 +23,12 @@ import java.time.Instant
 import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
 import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, 
ReduceFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
@@ -56,8 +58,8 @@ class PlannerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Moc
     val graph = Graph.empty[Op, OpEdge]
     val sourceOp = DataSourceOp(new AnySource)
     val groupByOp = GroupByOp(new AnyGroupByFn)
-    val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction)
-    val reduceOp = ChainableOp[Any, Any](anyReduceFunction)
+    val flatMapOp = ChainableOp[Any, Any](anyFlatMapper)
+    val reduceOp = ChainableOp[Any, Any](anyReducer)
     val processorOp = new ProcessorOp[AnyTask]
     val sinkOp = DataSinkOp(new AnySink)
     val directEdge = Direct
@@ -92,9 +94,10 @@ class PlannerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll with Moc
 object PlannerSpec {
 
   private val anyParallelism = 1
-  private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), 
"flatMap")
-  private val anyReduceFunction = new ReduceFunction[Any](
-    (left: Any, right: Any) => (left, right), "reduce")
+  private val anyFlatMapper = new FlatMapper[Any, Any](
+    FlatMapFunction(Option(_)), "flatMap")
+  private val anyReducer = new Reducer[Any](
+    ReduceFunction((left: Any, right: Any) => (left, right)), "reduce")
 
   class AnyTask(context: TaskContext, config: UserConfig) extends 
Task(context, config)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
index ad12e33..2c03e1c 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
@@ -23,9 +23,11 @@ import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
 import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
 import org.apache.gearpump.streaming.dsl.window.api.CountWindow
 import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
@@ -77,165 +79,164 @@ class SingleInputFunctionSpec extends WordSpec with 
Matchers with MockitoSugar {
       andThen.finish().toList shouldBe List(secondResult)
     }
 
-    "clear both states on clearState" in {
-      andThen.clearState()
+    "set up both functions on setup" in {
+      andThen.setup()
 
-      verify(first).clearState()
-      verify(second).clearState()
+      verify(first).setup()
+      verify(second).setup()
     }
 
-    "return AndThen on andThen" in {
-      val third = mock[SingleInputFunction[T, Any]]
-      when(second.andThen(third)).thenReturn(AndThen(second, third))
+    "tear down both functions on teardown" in {
+      andThen.teardown()
 
-      andThen.andThen[Any](third)
+      verify(first).teardown()
+      verify(second).teardown()
+    }
+
+    "chain multiple single input function" in {
+      val split = new FlatMapper[String, 
String](FlatMapFunction(_.split("\\s")), "split")
+
+      val filter = new FlatMapper[String, String](
+        FlatMapFunction(word => if (word.isEmpty) None else Some(word)), 
"filter")
+
+      val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), 
"map")
+
+      val sum = new Reducer[Int](ReduceFunction({(left, right) => left + 
right}), "sum")
+
+      val all = AndThen(split, AndThen(filter, AndThen(map, sum)))
 
-      verify(first).andThen(AndThen(second, third))
+      assert(all.description == "split.filter.map.sum")
+
+      val data =
+        """
+      five  four three  two    one
+      five  four three  two
+      five  four three
+      five  four
+      five
+        """
+      // force eager evaluation
+      all.process(data).toList
+      val result = all.finish().toList
+      assert(result.nonEmpty)
+      assert(result.last == 15)
     }
   }
 
-  "FlatMapFunction" should {
+  "FlatMapper" should {
 
-    val flatMap = mock[R => TraversableOnce[S]]
-    val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap")
+    val flatMapFunction = mock[FlatMapFunction[R, S]]
+    val flatMapper = new FlatMapper[R, S](flatMapFunction, "flatMap")
 
     "call flatMap function when processing input value" in {
       val input = mock[R]
-      flatMapFunction.process(input)
-      verify(flatMap).apply(input)
+      flatMapper.process(input)
+      verify(flatMapFunction).apply(input)
     }
 
     "return passed in description" in {
-      flatMapFunction.description shouldBe "flatMap"
+      flatMapper.description shouldBe "flatMap"
     }
 
     "return None on finish" in {
-      flatMapFunction.finish() shouldBe List.empty[S]
+      flatMapper.finish() shouldBe List.empty[S]
     }
 
-    "do nothing on clearState" in {
-      flatMapFunction.clearState()
-      verifyZeroInteractions(flatMap)
+    "set up FlatMapFunction on setup" in {
+      flatMapper.setup()
+
+      verify(flatMapFunction).setup()
     }
 
-    "return AndThen on andThen" in {
-      val other = mock[SingleInputFunction[S, T]]
-      flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]]
+    "tear down FlatMapFunction on teardown" in {
+      flatMapper.teardown()
+
+      verify(flatMapFunction).teardown()
     }
   }
 
   "ReduceFunction" should {
 
-
     "call reduce function when processing input value" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
       val input1 = mock[T]
       val input2 = mock[T]
       val output = mock[T]
 
-      when(reduce.apply(input1, input2)).thenReturn(output, output)
+      when(reduceFunction.apply(input1, input2)).thenReturn(output, output)
 
-      reduceFunction.process(input1) shouldBe List.empty[T]
-      reduceFunction.process(input2) shouldBe List.empty[T]
-      reduceFunction.finish() shouldBe List(output)
+      reducer.process(input1) shouldBe List.empty[T]
+      reducer.process(input2) shouldBe List.empty[T]
+      reducer.finish() shouldBe List(output)
 
-      reduceFunction.clearState()
-      reduceFunction.process(input1) shouldBe List.empty[T]
-      reduceFunction.clearState()
-      reduceFunction.process(input2) shouldBe List.empty[T]
-      reduceFunction.finish() shouldBe List(input2)
+      reducer.teardown()
+      reducer.process(input1) shouldBe List.empty[T]
+      reducer.teardown()
+      reducer.process(input2) shouldBe List.empty[T]
+      reducer.finish() shouldBe List(input2)
     }
 
     "return passed in description" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      reduceFunction.description shouldBe "reduce"
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.description shouldBe "reduce"
     }
 
     "return None on finish" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      reduceFunction.finish() shouldBe List.empty[T]
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.finish() shouldBe List.empty[T]
     }
 
-    "do nothing on clearState" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      reduceFunction.clearState()
-      verifyZeroInteractions(reduce)
+    "set up reduce function on setup" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.setup()
+
+      verify(reduceFunction).setup()
     }
 
-    "return AndThen on andThen" in {
-      val reduce = mock[(T, T) => T]
-      val reduceFunction = new ReduceFunction[T](reduce, "reduce")
-      val other = mock[SingleInputFunction[T, Any]]
-      reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]]
+    "tear down reduce function on teardown" in {
+      val reduceFunction = mock[ReduceFunction[T]]
+      val reducer = new Reducer[T](reduceFunction, "reduce")
+      reducer.teardown()
+
+      verify(reduceFunction).teardown()
     }
   }
 
-  "EmitFunction" should {
+  "Emit" should {
 
-    val emit = mock[T => Unit]
-    val emitFunction = new EmitFunction[T](emit)
+    val emitFunction = mock[T => Unit]
+    val emit = new Emit[T](emitFunction)
 
     "emit input value when processing input value" in {
       val input = mock[T]
 
-      emitFunction.process(input) shouldBe List.empty[Unit]
+      emit.process(input) shouldBe List.empty[Unit]
 
-      verify(emit).apply(input)
+      verify(emitFunction).apply(input)
     }
 
     "return empty description" in {
-      emitFunction.description shouldBe ""
+      emit.description shouldBe ""
     }
 
     "return None on finish" in {
-      emitFunction.finish() shouldBe List.empty[Unit]
+      emit.finish() shouldBe List.empty[Unit]
     }
 
-    "do nothing on clearState" in {
-      emitFunction.clearState()
-      verifyZeroInteractions(emit)
-    }
+    "do nothing on setup" in {
+      emit.setup()
 
-    "throw exception on andThen" in {
-      val other = mock[SingleInputFunction[Unit, Any]]
-      intercept[UnsupportedOperationException] {
-        emitFunction.andThen(other)
-      }
+      verifyZeroInteractions(emitFunction)
     }
-  }
-
-  "andThen" should {
-    "chain multiple single input function" in {
-      val split = new FlatMapFunction[String, String](line => 
line.split("\\s"), "split")
 
-      val filter = new FlatMapFunction[String, String](word =>
-        if (word.isEmpty) None else Some(word), "filter")
+    "do nothing on teardown" in {
+      emit.teardown()
 
-      val map = new FlatMapFunction[String, Int](word => Some(1), "map")
-
-      val sum = new ReduceFunction[Int]({ (left, right) => left + right }, 
"sum")
-
-      val all = split.andThen(filter).andThen(map).andThen(sum)
-
-      assert(all.description == "split.filter.map.sum")
-
-      val data =
-        """
-      five  four three  two    one
-      five  four three  two
-      five  four three
-      five  four
-      five
-        """
-      // force eager evaluation
-      all.process(data).toList
-      val result = all.finish().toList
-      assert(result.nonEmpty)
-      assert(result.last == 15)
+      verifyZeroInteractions(emitFunction)
     }
   }
 
@@ -261,7 +262,8 @@ class SingleInputFunctionSpec extends WordSpec with 
Matchers with MockitoSugar {
 
       // Source with transformer
       val anotherTaskContext = MockUtil.mockTaskContext
-      val double = new FlatMapFunction[String, String](word => List(word, 
word), "double")
+      val double = new FlatMapper[String, String](FlatMapFunction(
+        word => List(word, word)), "double")
       val another = new DataSourceTask(anotherTaskContext,
         conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
       another.onStart(Instant.EPOCH)
@@ -279,9 +281,8 @@ class SingleInputFunctionSpec extends WordSpec with 
Matchers with MockitoSugar {
 
       val data = "1 2  2  3 3  3"
 
-      val concat = new ReduceFunction[String]({ (left, right) =>
-        left + right
-      }, "concat")
+      val concat = new Reducer[String](ReduceFunction({ (left, right) =>
+        left + right}), "concat")
 
       implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
       val config = UserConfig.empty.withValue[SingleInputFunction[String, 
String]](
@@ -315,7 +316,8 @@ class SingleInputFunctionSpec extends WordSpec with 
Matchers with MockitoSugar {
       // Source with transformer
       val taskContext = MockUtil.mockTaskContext
       val conf = UserConfig.empty
-      val double = new FlatMapFunction[String, String](word => List(word, 
word), "double")
+      val double = new FlatMapper[String, String](FlatMapFunction(
+        word => List(word, word)), "double")
       val task = new TransformTask[String, String](Some(double), taskContext, 
conf)
       task.onStart(Instant.EPOCH)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
new file mode 100644
index 0000000..5b90a3e
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.dsl.scalaapi
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.mockito.Mockito.when
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
+
+  implicit var system: ActorSystem = _
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "be able to generate multiple new streams" in {
+    val context: ClientContext = mock[ClientContext]
+    when(context.system).thenReturn(system)
+
+    val dsl = StreamApp("dsl", context)
+    dsl.source(List("A"), 2, "A") shouldBe a [scalaapi.Stream[_]]
+    dsl.source(List("B"), 3, "B") shouldBe a [scalaapi.Stream[_]]
+
+    val application = dsl.plan()
+    application shouldBe a [StreamApplication]
+    application.name shouldBe "dsl"
+    val dag = application.userConfig
+      .getValue[Graph[ProcessorDescription, 
PartitionerDescription]](StreamApplication.DAG).get
+    dag.vertices.size shouldBe 2
+    dag.vertices.foreach { processor =>
+      processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
+      if (processor.description == "A") {
+        processor.parallelism shouldBe 2
+      } else if (processor.description == "B") {
+        processor.parallelism shouldBe 3
+      } else {
+        fail(s"undefined source ${processor.description}")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
new file mode 100644
index 0000000..62a3bcb
--- /dev/null
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import akka.actor._
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, 
HashPartitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.apache.gearpump.util.Graph._
+import org.mockito.Mockito.when
+import org.scalatest._
+import org.scalatest.mock.MockitoSugar
+
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
+import scala.util.{Either, Left, Right}
+
+class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
+
+  implicit var system: ActorSystem = _
+
+  override def beforeAll(): Unit = {
+    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
+  }
+
+  override def afterAll(): Unit = {
+    system.terminate()
+    Await.result(system.whenTerminated, Duration.Inf)
+  }
+
+  it should "translate the DSL to a DAG" in {
+    val context: ClientContext = mock[ClientContext]
+    when(context.system).thenReturn(system)
+
+    val dsl = StreamApp("dsl", context)
+
+    val data =
+      """
+        five  four three  two    one
+        five  four three  two
+        five  four three
+        five  four
+        five
+      """
+    val stream = dsl.source(data.lines.toList, 1, "").
+      flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty).
+      map(word => (word, 1)).
+      groupBy(_._1, parallelism = 2).
+      reduce((left, right) => (left._1, left._2 + right._2)).
+      map[Either[(String, Int), String]]({t: (String, Int) => Left(t)})
+
+    val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), 
String]](
+      {s: String => Right(s)})
+    stream.merge(query).process[(String, Int)](classOf[Join], 1)
+
+    val app: StreamApplication = dsl.plan()
+    val dag = app.userConfig
+      .getValue[Graph[ProcessorDescription, 
PartitionerDescription]](StreamApplication.DAG).get
+
+    val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, 
node2) =>
+      edge.partitionerFactory.partitioner.getClass.getName
+    }
+    val expectedDagTopology = getExpectedDagTopology
+
+    dagTopology.vertices.toSet should contain theSameElementsAs 
expectedDagTopology.vertices.toSet
+    dagTopology.edges.toSet should contain theSameElementsAs 
expectedDagTopology.edges.toSet
+  }
+
+  private def getExpectedDagTopology: Graph[String, String] = {
+    val source = classOf[DataSourceTask[_, _]].getName
+    val group = classOf[CountTriggerTask[_, _]].getName
+    val merge = classOf[TransformTask[_, _]].getName
+    val join = classOf[Join].getName
+
+    val hash = classOf[HashPartitioner].getName
+    val groupBy = classOf[GroupByPartitioner[_, _]].getName
+    val colocation = classOf[CoLocationPartitioner].getName
+
+    val expectedDagTopology = Graph(
+      source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join,
+      source ~ hash ~> merge
+    )
+    expectedDagTopology
+  }
+}
+
+object StreamSpec {
+
+  class Join(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
+
+    var query: String = _
+
+    override def onNext(msg: Message): Unit = {
+      msg.msg match {
+        case Left(wordCount: (String @unchecked, Int @unchecked)) =>
+          if (query != null && wordCount._1 == query) {
+            taskContext.output(new Message(wordCount))
+          }
+
+        case Right(query: String) =>
+          this.query = query
+      }
+    }
+  }
+}

Reply via email to