[GEARPUMP-262] Add setup and teardown to user defined functions

Author: manuzhang <owenzhang1...@gmail.com>

Closes #131 from manuzhang/setup_teardown.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/a23a40f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/a23a40f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/a23a40f5

Branch: refs/heads/master
Commit: a23a40f5e558a1a5f10503f80204d9c6e690e0bd
Parents: 385a612
Author: manuzhang <owenzhang1...@gmail.com>
Authored: Fri Jan 13 17:11:54 2017 +0800
Committer: manuzhang <owenzhang1...@gmail.com>
Committed: Fri Jan 13 17:12:38 2017 +0800

----------------------------------------------------------------------
 .../examples/kafka/dsl/KafkaReadWrite.scala     |   4 +-
 .../examples/wordcountjava/dsl/WordCount.java   |  47 ++-
 .../wordcount/dsl/WindowedWordCount.scala       |   2 +-
 .../examples/wordcount/dsl/WordCount.scala      |   4 +-
 .../external/hbase/dsl/HBaseDSLSink.scala       |   5 +-
 .../gearpump/streaming/kafka/dsl/KafkaDSL.scala |  12 +-
 .../javaapi/dsl/functions/FilterFunction.java   |  30 --
 .../javaapi/dsl/functions/FlatMapFunction.java  |  32 ---
 .../javaapi/dsl/functions/GroupByFunction.java  |  31 --
 .../javaapi/dsl/functions/MapFunction.java      |  31 --
 .../javaapi/dsl/functions/ReduceFunction.java   |  30 --
 .../apache/gearpump/streaming/dsl/Stream.scala  | 245 ----------------
 .../gearpump/streaming/dsl/StreamApp.scala      | 109 -------
 .../dsl/api/functions/FilterFunction.scala      |  42 +++
 .../dsl/api/functions/MapFunction.scala         |  43 +++
 .../dsl/api/functions/ReduceFunction.scala      |  42 +++
 .../streaming/dsl/javaapi/JavaStream.scala      |  18 +-
 .../streaming/dsl/javaapi/JavaStreamApp.scala   |   5 +-
 .../dsl/javaapi/functions/FlatMapFunction.scala |  32 +++
 .../dsl/javaapi/functions/GroupByFunction.scala |  28 ++
 .../apache/gearpump/streaming/dsl/plan/OP.scala |   4 +-
 .../plan/functions/SingleInputFunction.scala    |  66 +++--
 .../streaming/dsl/scalaapi/Stream.scala         | 287 +++++++++++++++++++
 .../streaming/dsl/scalaapi/StreamApp.scala      | 109 +++++++
 .../scalaapi/functions/FlatMapFunction.scala    | 103 +++++++
 .../functions/SerializableFunction.scala        |  32 +++
 .../streaming/dsl/task/TransformTask.scala      |   5 +-
 .../dsl/window/impl/WindowRunner.scala          |  16 +-
 .../gearpump/streaming/dsl/StreamAppSpec.scala  |  72 -----
 .../gearpump/streaming/dsl/StreamSpec.scala     | 128 ---------
 .../gearpump/streaming/dsl/plan/OpSpec.scala    |  13 +-
 .../streaming/dsl/plan/PlannerSpec.scala        |  15 +-
 .../functions/SingleInputFunctionSpec.scala     | 202 ++++++-------
 .../streaming/dsl/scalaapi/StreamAppSpec.scala  |  73 +++++
 .../streaming/dsl/scalaapi/StreamSpec.scala     | 129 +++++++++
 35 files changed, 1154 insertions(+), 892 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
index 49d3619..cbfe57a 100644
--- 
a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
+++ 
b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
@@ -21,8 +21,8 @@ package org.apache.gearpump.streaming.examples.kafka.dsl
 import java.util.Properties
 
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{CLIOption, ArgumentsParser}
-import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
 import org.apache.gearpump.streaming.kafka.KafkaStoreFactory
 import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL
 import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index d4866ed..2942861 100644
--- 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -25,12 +25,17 @@ import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.cluster.client.ClientContext;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
 import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
 import org.apache.gearpump.streaming.source.DataSource;
 import org.apache.gearpump.streaming.task.TaskContext;
 import scala.Tuple2;
 
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.Iterator;
 
 /** Java version of WordCount with high level DSL API */
 public class WordCount {
@@ -46,15 +51,13 @@ public class WordCount {
     JavaStream<String> sentence = app.source(new StringSource("This is a good 
start, bingo!! bingo!!"),
         1, UserConfig.empty(), "source");
 
-    JavaStream<String> words = sentence.flatMap(s -> 
Arrays.asList(s.split("\\s+")).iterator(),
-        "flatMap");
+    JavaStream<String> words = sentence.flatMap(new Split(), "flatMap");
 
-    JavaStream<Tuple2<String, Integer>> ones = words.map(s -> new Tuple2<>(s, 
1), "map");
+    JavaStream<Tuple2<String, Integer>> ones = words.map(new Ones(), "map");
 
-    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(Tuple2::_1, 
1, "groupBy");
+    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new 
TupleKey(), 1, "groupBy");
 
-    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(
-        (t1, t2) -> new Tuple2<>(t1._1(), t1._2() + t2._2()), "reduce");
+    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new 
Count(), "reduce");
 
     wordcount.log();
 
@@ -88,4 +91,36 @@ public class WordCount {
       return Instant.now();
     }
   }
+
+  private static class Split extends FlatMapFunction<String, String> {
+
+    @Override
+    public Iterator<String> apply(String s) {
+      return Arrays.asList(s.split("\\s+")).iterator();
+    }
+  }
+
+  private static class Ones extends MapFunction<String, Tuple2<String, 
Integer>> {
+
+    @Override
+    public Tuple2<String, Integer> apply(String s) {
+      return new Tuple2<>(s, 1);
+    }
+  }
+
+  private static class Count extends ReduceFunction<Tuple2<String, Integer>> {
+
+    @Override
+    public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, 
Tuple2<String, Integer> t2) {
+      return new Tuple2<>(t1._1(), t1._2() + t2._2());
+    }
+  }
+
+  private static class TupleKey extends GroupByFunction<Tuple2<String, 
Integer>, String> {
+
+    @Override
+    public String apply(Tuple2<String, Integer> tuple) {
+      return tuple._1();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
index 4f43fd4..401eac0 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -22,7 +22,7 @@ import java.time.{Duration, Instant}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp}
 import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, 
FixedWindow}
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.TaskContext

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
index 22f597c..1cbfb22 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -20,8 +20,8 @@ package org.apache.gearpump.streaming.examples.wordcount.dsl
 
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.StreamApp._
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._
 import org.apache.gearpump.util.AkkaApp
 
 /** Same WordCount with High level DSL syntax */

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
----------------------------------------------------------------------
diff --git 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
index 2417763..22efa89 100644
--- 
a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
+++ 
b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
@@ -18,13 +18,10 @@
 package org.apache.gearpump.external.hbase.dsl
 
 import scala.language.implicitConversions
-
 import org.apache.hadoop.conf.Configuration
-
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.external.hbase.HBaseSink
-import org.apache.gearpump.streaming.dsl.Stream
-import org.apache.gearpump.streaming.dsl.Stream.Sink
+import org.apache.gearpump.streaming.dsl.scalaapi.Stream
 
 /** Create a HBase DSL Sink */
 class HBaseDSLSink[T](stream: Stream[T]) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
index f1bb26a..996ae0b 100644
--- 
a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
+++ 
b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl
-import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, StreamApp}
 import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 
@@ -44,7 +44,7 @@ object KafkaDSL {
       parallelism: Int = 1,
       config: UserConfig = UserConfig.empty,
       description: String = "KafkaSource"
-      ): dsl.Stream[T] = {
+      ): Stream[T] = {
     app.source[T](new KafkaSource(topics, properties), parallelism, config, 
description)
   }
 
@@ -66,19 +66,19 @@ object KafkaDSL {
       properties: Properties,
       parallelism: Int = 1,
       config: UserConfig = UserConfig.empty,
-      description: String = "KafkaSource"): dsl.Stream[T] = {
+      description: String = "KafkaSource"): Stream[T] = {
     val source = new KafkaSource(topics, properties)
     source.setCheckpointStore(checkpointStoreFactory)
     app.source[T](source, parallelism, config, description)
   }
 
   import scala.language.implicitConversions
-  implicit def streamToKafkaDSL[T](stream: dsl.Stream[T]): KafkaDSL[T] = {
+  implicit def streamToKafkaDSL[T](stream: Stream[T]): KafkaDSL[T] = {
     new KafkaDSL[T](stream)
   }
 }
 
-class KafkaDSL[T](stream: dsl.Stream[T]) {
+class KafkaDSL[T](stream: Stream[T]) {
 
   /**
    * Sinks data to Kafka
@@ -94,7 +94,7 @@ class KafkaDSL[T](stream: dsl.Stream[T]) {
       properties: Properties,
       parallelism: Int = 1,
       userConfig: UserConfig = UserConfig.empty,
-      description: String = "KafkaSink"): dsl.Stream[T] = {
+      description: String = "KafkaSink"): Stream[T] = {
     stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, 
description)
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
deleted file mode 100644
index f07ceff..0000000
--- 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Filter function
- *
- * @param <T> Message of type T
- */
-public interface FilterFunction<T> extends Serializable {
-  boolean apply(T t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
deleted file mode 100644
index 9788dd2..0000000
--- 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-/**
- * Function that converts a value of type T to a iterator of values of type R.
- *
- * @param <T> Input value type
- * @param <R> Return value type
- */
-public interface FlatMapFunction<T, R> extends Serializable {
-  Iterator<R> apply(T t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
deleted file mode 100644
index 6c71280..0000000
--- 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * GroupBy function which assign value of type T to groups
- *
- * @param <T> Input value type
- * @param <Group> Group Type
- */
-public interface GroupByFunction<T, Group> extends Serializable {
-  Group apply(T t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
deleted file mode 100644
index e1fc821..0000000
--- 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Function that map a value of type T to value of type R
- *
- * @param <T> Input value type
- * @param <R> Output value type
- */
-public interface MapFunction<T, R> extends Serializable {
-  R apply(T t);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
deleted file mode 100644
index 2bcac60..0000000
--- 
a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Function that applies reduce operation
- *
- * @param <T> Input value type
- */
-public interface ReduceFunction<T> extends Serializable {
-  T apply(T t1, T t2);
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
deleted file mode 100644
index 440a45e..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.plan._
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, 
ReduceFunction}
-import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.dsl.window.impl._
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.apache.gearpump.util.Graph
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.language.implicitConversions
-
-class Stream[T](
-    private val graph: Graph[Op, OpEdge], private val thisNode: Op,
-    private val edge: Option[OpEdge] = None) {
-
-  /**
-   * converts a value[T] to a list of value[R]
-   *
-   * @param fn FlatMap function
-   * @param description The description message for this operation
-   * @return A new stream with type [R]
-   */
-  def flatMap[R](fn: T => TraversableOnce[R], description: String = 
"flatMap"): Stream[R] = {
-    val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description))
-    graph.addVertex(flatMapOp)
-    graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
-    new Stream[R](graph, flatMapOp)
-  }
-
-  /**
-   * Maps message of type T message of type R
-   *
-   * @param fn Function
-   * @return A new stream with type [R]
-   */
-  def map[R](fn: T => R, description: String = "map"): Stream[R] = {
-    this.flatMap({ data =>
-      Option(fn(data))
-    }, description)
-  }
-
-  /**
-   * Keeps records when fun(T) == true
-   *
-   * @param fn  the filter
-   * @return  a new stream after filter
-   */
-  def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
-    this.flatMap({ data =>
-      if (fn(data)) Option(data) else None
-    }, description)
-  }
-
-  /**
-   * Reduces operations.
-   *
-   * @param fn  reduction function
-   * @param description description message for this operator
-   * @return a new stream after reduction
-   */
-  def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
-    val reduceOp = ChainableOp(new ReduceFunction(fn, description))
-    graph.addVertex(reduceOp)
-    graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
-    new Stream(graph, reduceOp)
-  }
-
-  /**
-   * Log to task log file
-   */
-  def log(): Unit = {
-    this.map(msg => {
-      LoggerFactory.getLogger("dsl").info(msg.toString)
-      msg
-    }, "log")
-  }
-
-  /**
-   * Merges data from two stream into one
-   *
-   * @param other the other stream
-   * @return  the merged stream
-   */
-  def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
-    val mergeOp = MergeOp(description, UserConfig.empty)
-    graph.addVertex(mergeOp)
-    graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
-    graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
-    new Stream[T](graph, mergeOp)
-  }
-
-  /**
-   * Group by function (T => Group)
-   *
-   * For example, we have T type, People(name: String, gender: String, age: 
Int)
-   * groupBy[People](_.gender) will group the people by gender.
-   *
-   * You can append other combinators after groupBy
-   *
-   * For example,
-   * {{{
-   * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
-   * }}}
-   *
-   * @param fn  Group by function
-   * @param parallelism  Parallelism level
-   * @param description  The description
-   * @return  the grouped stream
-   */
-  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
-      description: String = "groupBy"): Stream[T] = {
-    window(CountWindow.apply(1).accumulating)
-      .groupBy[GROUP](fn, parallelism, description)
-  }
-
-  /**
-   * Window function
-   *
-   * @param win window definition
-   * @param description window description
-   * @return [[WindowStream]] where groupBy could be applied
-   */
-  def window(win: Window, description: String = "window"): WindowStream[T] = {
-    new WindowStream[T](graph, edge, thisNode, win, description)
-  }
-
-  /**
-   * Connects with a low level Processor(TaskDescription)
-   *
-   * @param processor  a user defined processor
-   * @param parallelism  parallelism level
-   * @return  new stream after processing with type [R]
-   */
-  def process[R](
-      processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = 
UserConfig.empty,
-      description: String = "process"): Stream[R] = {
-    val processorOp = ProcessorOp(processor, parallelism, conf, description)
-    graph.addVertex(processorOp)
-    graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
-    new Stream[R](graph, processorOp, Some(Shuffle))
-  }
-}
-
-class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], 
thisNode: Op,
-    window: Window, winDesc: String) {
-
-  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
-      description: String = "groupBy"): Stream[T] = {
-    val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, 
window)
-    val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
-      s"$winDesc.$description")
-    graph.addVertex(groupOp)
-    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
-    new Stream[T](graph, groupOp)
-  }
-}
-
-class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
-  /**
-   * GroupBy key
-   *
-   * Applies to Stream[Tuple2[K,V]]
-   *
-   * @param parallelism  the parallelism for this operation
-   * @return  the new KV stream
-   */
-  def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
-    stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
-  }
-
-  /**
-   * Sum the value of the tuples
-   *
-   * Apply to Stream[Tuple2[K,V]], V must be of type Number
-   *
-   * For input (key, value1), (key, value2), will generate (key, value1 + 
value2)
-   * @param numeric  the numeric operations
-   * @return  the sum stream
-   */
-  def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
-    stream.reduce(Stream.sumByKey[K, V](numeric), "sum")
-  }
-}
-
-object Stream {
-
-  def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): 
Stream[T] = {
-    new Stream[T](graph, node, edge)
-  }
-
-  def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
-
-  def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => 
Tuple2[K, V]
-  = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
-
-  implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): 
KVStream[K, V] = {
-    new KVStream(stream)
-  }
-
-  implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
-    def sink(dataSink: DataSink, parallelism: Int = 1,
-        conf: UserConfig = UserConfig.empty, description: String = "sink"): 
Stream[T] = {
-      implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
-      stream.graph.addVertex(sink)
-      stream.graph.addEdge(stream.thisNode, Shuffle, sink)
-      new Stream[T](stream.graph, sink)
-    }
-  }
-}
-
-class LoggerSink[T] extends DataSink {
-  var logger: Logger = _
-
-  override def open(context: TaskContext): Unit = {
-    this.logger = context.logger
-  }
-
-  override def write(message: Message): Unit = {
-    logger.info("logging message " + message.msg)
-  }
-
-  override def close(): Unit = Unit
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
deleted file mode 100644
index 8116146..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import java.time.Instant
-
-import akka.actor.ActorSystem
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.StreamApplication
-import org.apache.gearpump.streaming.dsl.plan._
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.TaskContext
-import org.apache.gearpump.util.Graph
-import org.apache.gearpump.Message
-
-import scala.language.implicitConversions
-
-/**
- * Example:
- * {{{
- * val data = "This is a good start, bingo!! bingo!!"
- * app.fromCollection(data.lines.toList).
- * // word => (word, count)
- * flatMap(line => line.split("[\\s]+")).map((_, 1)).
- * // (word, count1), (word, count2) => (word, count1 + count2)
- * groupBy(kv => kv._1).reduce(sum(_, _))
- *
- * val appId = context.submit(app)
- * context.close()
- * }}}
- *
- * @param name name of app
- */
-class StreamApp(
-    name: String, system: ActorSystem, userConfig: UserConfig,
-    private val graph: Graph[Op, OpEdge]) {
-
-  def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
-    this(name, system, userConfig, Graph.empty[Op, OpEdge])
-  }
-
-  def plan(): StreamApplication = {
-    implicit val actorSystem = system
-    val planner = new Planner
-    val dag = planner.plan(graph)
-    StreamApplication(name, dag, userConfig)
-  }
-}
-
-object StreamApp {
-  def apply(name: String, context: ClientContext, userConfig: UserConfig = 
UserConfig.empty)
-    : StreamApp = {
-    new StreamApp(name, context.system, userConfig)
-  }
-
-  implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication 
= {
-    streamApp.plan()
-  }
-
-  implicit class Source(app: StreamApp) extends java.io.Serializable {
-
-    def source[T](dataSource: DataSource, parallelism: Int = 1,
-        conf: UserConfig = UserConfig.empty, description: String = "source"): 
Stream[T] = {
-      implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, 
description)
-      app.graph.addVertex(sourceOp)
-      new Stream[T](app.graph, sourceOp)
-    }
-
-    def source[T](seq: Seq[T], parallelism: Int, description: String): 
Stream[T] = {
-      this.source(new CollectionDataSource[T](seq), parallelism, 
UserConfig.empty, description)
-    }
-  }
-}
-
-/** A test message source which generated message sequence repeatedly. */
-class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
-  private lazy val iterator: Iterator[T] = seq.iterator
-
-  override def open(context: TaskContext, startTime: Instant): Unit = {}
-
-  override def read(): Message = {
-    if (iterator.hasNext) {
-      Message(iterator.next(), Instant.now().toEpochMilli)
-    } else {
-      null
-    }
-  }
-
-  override def close(): Unit = {}
-
-  override def getWatermark: Instant = Instant.now()
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
new file mode 100644
index 0000000..e4e7309
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.api.functions
+
+import 
org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+object FilterFunction {
+
+  def apply[T](fn: T => Boolean): FilterFunction[T] = {
+    new FilterFunction[T] {
+      override def apply(t: T): Boolean = {
+        fn(t)
+      }
+    }
+  }
+}
+
+/**
+ * Returns true to keep the input and false otherwise.
+ *
+ * @param T Input value type
+ */
+abstract class FilterFunction[T] extends SerializableFunction {
+
+  def apply(t: T): Boolean
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
new file mode 100644
index 0000000..70fe9d4
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.api.functions
+
+import 
org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+object MapFunction {
+
+  def apply[T, R](fn: T => R): MapFunction[T, R] = {
+    new MapFunction[T, R] {
+      override def apply(t: T): R = {
+        fn(t)
+      }
+    }
+  }
+}
+
+/**
+ * Transforms an input into an output of possibly different types.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class MapFunction[T, R] extends SerializableFunction {
+
+  def apply(t: T): R
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
new file mode 100644
index 0000000..25b12be
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.api.functions
+
+import 
org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+object ReduceFunction {
+
+  def apply[T](fn: (T, T) => T): ReduceFunction[T] = {
+    new ReduceFunction[T] {
+      override def apply(t1: T, t2: T): T = {
+        fn(t1, t2)
+      }
+    }
+  }
+}
+
+/**
+ * Combines two inputs into one output of the same type.
+ *
+ * @param T Type of both inputs and output
+ */
+abstract class ReduceFunction[T] extends SerializableFunction {
+
+  def apply(t1: T, t2: T): T
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index f2654ea..7f3c250 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -15,14 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gearpump.streaming.dsl.javaapi
 
-import scala.collection.JavaConverters._
 import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, 
MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => 
JFlatMapFunction, GroupByFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
 import org.apache.gearpump.streaming.dsl.window.api.Window
-import org.apache.gearpump.streaming.dsl.{Stream, WindowStream}
-import org.apache.gearpump.streaming.javaapi.dsl.functions._
 import org.apache.gearpump.streaming.task.Task
 
 /**
@@ -31,23 +31,23 @@ import org.apache.gearpump.streaming.task.Task
 class JavaStream[T](val stream: Stream[T]) {
 
   /** FlatMap on stream */
-  def flatMap[R](fn: FlatMapFunction[T, R], description: String): 
JavaStream[R] = {
-    new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description))
+  def flatMap[R](fn: JFlatMapFunction[T, R], description: String): 
JavaStream[R] = {
+    new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap"))
   }
 
   /** Map on stream */
   def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
-    new JavaStream[R](stream.map({ t: T => fn(t) }, description))
+    new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description))
   }
 
   /** Only keep the messages that FilterFunction returns true.  */
   def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.filter({ t: T => fn(t) }, description))
+    new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description))
   }
 
   /** Does aggregation on the stream */
   def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, 
description))
+    new JavaStream[T](stream.reduce(fn, description))
   }
 
   def log(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
index 82a284e..b8d1f4c 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -19,13 +19,14 @@
 package org.apache.gearpump.streaming.dsl.javaapi
 
 import java.util.Collection
-import scala.collection.JavaConverters._
 
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
+import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, 
StreamApp}
 import org.apache.gearpump.streaming.source.DataSource
 
+import scala.collection.JavaConverters._
+
 class JavaStreamApp(name: String, context: ClientContext, userConfig: 
UserConfig) {
 
   private val streamApp = StreamApp(name, context, userConfig)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
new file mode 100644
index 0000000..85d597d
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.javaapi.functions
+
+import 
org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Java version of FlatMapFunction returns a java.util.Iterator.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+  def apply(t: T): java.util.Iterator[R]
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
new file mode 100644
index 0000000..7656cba
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.javaapi.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction
+
+/**
+ * Assigns the input value into a group.
+ *
+ * @param T Input value type
+ * @param GROUP Group value type
+ */
+abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index f15d875..82ea7c7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -22,7 +22,7 @@ import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, 
SingleInputFunction}
 import org.apache.gearpump.streaming.{Constants, Processor}
 import org.apache.gearpump.streaming.dsl.task.TransformTask
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
@@ -134,7 +134,7 @@ case class ChainableOp[IN, OUT](
     other match {
       case op: ChainableOp[OUT, _] =>
         // TODO: preserve type info
-        ChainableOp(fn.andThen(op.fn))
+        ChainableOp(AndThen(fn, op.fn))
       case _ =>
         throw new OpChainException(this, other)
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
index 5322648..687fd2e 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -17,23 +17,35 @@
  */
 package org.apache.gearpump.streaming.dsl.plan.functions
 
-trait SingleInputFunction[IN, OUT] extends Serializable {
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+
+/**
+ * Internal function to process single input
+ *
+ * @param IN input value type
+ * @param OUT output value type
+ */
+sealed trait SingleInputFunction[IN, OUT] extends java.io.Serializable {
+
+  def setup(): Unit = {}
+
   def process(value: IN): TraversableOnce[OUT]
-  def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): 
SingleInputFunction[IN, OUTER] = {
-    AndThen(this, other)
-  }
+
   def finish(): TraversableOnce[OUT] = None
-  def clearState(): Unit = {}
+
+  def teardown(): Unit = {}
+
   def description: String
 }
 
-case class AndThen[IN, MIDDLE, OUT](
-    first: SingleInputFunction[IN, MIDDLE], second: 
SingleInputFunction[MIDDLE, OUT])
+case class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE],
+    second: SingleInputFunction[MIDDLE, OUT])
   extends SingleInputFunction[IN, OUT] {
 
-  override def andThen[OUTER](
-      other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] 
= {
-    first.andThen(second.andThen(other))
+  override def setup(): Unit = {
+    first.setup()
+    second.setup()
   }
 
   override def process(value: IN): TraversableOnce[OUT] = {
@@ -49,9 +61,9 @@ case class AndThen[IN, MIDDLE, OUT](
     }
   }
 
-  override def clearState(): Unit = {
-    first.clearState()
-    second.clearState()
+  override def teardown(): Unit = {
+    first.teardown()
+    second.teardown()
   }
 
   override def description: String = {
@@ -61,22 +73,31 @@ case class AndThen[IN, MIDDLE, OUT](
   }
 }
 
-class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], 
descriptionMessage: String)
+class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: 
String)
   extends SingleInputFunction[IN, OUT] {
 
+  override def setup(): Unit = {
+    fn.setup()
+  }
+
   override def process(value: IN): TraversableOnce[OUT] = {
     fn(value)
   }
 
-  override def description: String = descriptionMessage
+  override def teardown(): Unit = {
+    fn.teardown()
+  }
 }
 
-
-class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
+class Reducer[T](fn: ReduceFunction[T], val description: String)
   extends SingleInputFunction[T, T] {
 
   private var state: Option[T] = None
 
+  override def setup(): Unit = {
+    fn.setup()
+  }
+
   override def process(value: T): TraversableOnce[T] = {
     if (state.isEmpty) {
       state = Option(value)
@@ -90,23 +111,18 @@ class ReduceFunction[T](fn: (T, T) => T, 
descriptionMessage: String)
     state
   }
 
-  override def clearState(): Unit = {
+  override def teardown(): Unit = {
     state = None
+    fn.teardown()
   }
-
-  override def description: String = descriptionMessage
 }
 
-class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
+class Emit[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
 
   override def process(value: T): TraversableOnce[Unit] = {
     emit(value)
     None
   }
 
-  override def andThen[R](other: SingleInputFunction[Unit, R]): 
SingleInputFunction[T, R] = {
-    throw new UnsupportedOperationException("andThen is not supposed to be 
called on EmitFunction")
-  }
-
   override def description: String = ""
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
new file mode 100644
index 0000000..430d795
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, 
MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.dsl.plan.functions._
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, 
GroupAlsoByWindow}
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.language.implicitConversions
+
+class Stream[T](
+    private val graph: Graph[Op, OpEdge], private val thisNode: Op,
+    private val edge: Option[OpEdge] = None) {
+
+  /**
+   * Returns a new stream by applying a flatMap function to each element
+   * and flatten the results.
+   *
+   * @param fn flatMap function
+   * @param description The description message for this operation
+   * @return A new stream with type [R]
+   */
+  def flatMap[R](fn: T => TraversableOnce[R], description: String = 
"flatMap"): Stream[R] = {
+    this.flatMap(FlatMapFunction(fn), description)
+  }
+
+  /**
+   * Returns a new stream by applying a flatMap function to each element
+   * and flatten the results.
+   *
+   * @param fn flatMap function
+   * @param description The description message for this operation
+   * @return A new stream with type [R]
+   */
+  def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = {
+    transform(new FlatMapper[T, R](fn, description))
+  }
+
+  /**
+   * Returns a new stream by applying a map function to each element.
+   *
+   * @param fn map function
+   * @return A new stream with type [R]
+   */
+  def map[R](fn: T => R, description: String = "map"): Stream[R] = {
+    this.map(MapFunction(fn), description)
+  }
+
+  /**
+   * Returns a new stream by applying a map function to each element.
+   *
+   * @param fn map function
+   * @return A new stream with type [R]
+   */
+  def map[R](fn: MapFunction[T, R], description: String): Stream[R] = {
+    this.flatMap(FlatMapFunction(fn), description)
+  }
+
+  /**
+   * Returns a new Stream keeping the elements that satisfy the filter 
function.
+   *
+   * @param fn filter function
+   * @return a new stream after filter
+   */
+  def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
+    this.filter(FilterFunction(fn), description)
+  }
+
+  /**
+   * Returns a new Stream keeping the elements that satisfy the filter 
function.
+   *
+   * @param fn filter function
+   * @return a new stream after filter
+   */
+  def filter(fn: FilterFunction[T], description: String): Stream[T] = {
+    this.flatMap(FlatMapFunction(fn), description)
+  }
+  /**
+   * Returns a new stream by applying a reduce function over all the elements.
+   *
+   * @param fn reduce function
+   * @param description description message for this operator
+   * @return a new stream after reduce
+   */
+  def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
+    reduce(ReduceFunction(fn), description)
+  }
+
+  /**
+   * Returns a new stream by applying a reduce function over all the elements.
+   *
+   * @param fn reduce function
+   * @param description description message for this operator
+   * @return a new stream after reduce
+   */
+  def reduce(fn: ReduceFunction[T], description: String): Stream[T] = {
+    transform(new Reducer[T](fn, description))
+  }
+
+  private def transform[R](fn: SingleInputFunction[T, R]): Stream[R] = {
+    val op = ChainableOp(fn)
+    graph.addVertex(op)
+    graph.addEdge(thisNode, edge.getOrElse(Direct), op)
+    new Stream(graph, op)
+  }
+
+  /**
+   * Log to task log file
+   */
+  def log(): Unit = {
+    this.map(msg => {
+      LoggerFactory.getLogger("dsl").info(msg.toString)
+      msg
+    }, "log")
+  }
+
+  /**
+   * Merges data from two stream into one
+   *
+   * @param other the other stream
+   * @return  the merged stream
+   */
+  def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
+    val mergeOp = MergeOp(description, UserConfig.empty)
+    graph.addVertex(mergeOp)
+    graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
+    graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
+    new Stream[T](graph, mergeOp)
+  }
+
+  /**
+   * Group by function (T => Group)
+   *
+   * For example, we have T type, People(name: String, gender: String, age: 
Int)
+   * groupBy[People](_.gender) will group the people by gender.
+   *
+   * You can append other combinators after groupBy
+   *
+   * For example,
+   * {{{
+   * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
+   * }}}
+   *
+   * @param fn  Group by function
+   * @param parallelism  Parallelism level
+   * @param description  The description
+   * @return  the grouped stream
+   */
+  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+      description: String = "groupBy"): Stream[T] = {
+    window(CountWindow.apply(1).accumulating)
+      .groupBy[GROUP](fn, parallelism, description)
+  }
+
+  /**
+   * Window function
+   *
+   * @param win window definition
+   * @param description window description
+   * @return [[WindowStream]] where groupBy could be applied
+   */
+  def window(win: Window, description: String = "window"): WindowStream[T] = {
+    new WindowStream[T](graph, edge, thisNode, win, description)
+  }
+
+  /**
+   * Connects with a low level Processor(TaskDescription)
+   *
+   * @param processor  a user defined processor
+   * @param parallelism  parallelism level
+   * @return  new stream after processing with type [R]
+   */
+  def process[R](
+      processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = 
UserConfig.empty,
+      description: String = "process"): Stream[R] = {
+    val processorOp = ProcessorOp(processor, parallelism, conf, description)
+    graph.addVertex(processorOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
+    new Stream[R](graph, processorOp, Some(Shuffle))
+  }
+}
+
+class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], 
thisNode: Op,
+    window: Window, winDesc: String) {
+
+  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+      description: String = "groupBy"): Stream[T] = {
+    val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, 
window)
+    val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
+      s"$winDesc.$description")
+    graph.addVertex(groupOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+    new Stream[T](graph, groupOp)
+  }
+}
+
+class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
+  /**
+   * GroupBy key
+   *
+   * Applies to Stream[Tuple2[K,V]]
+   *
+   * @param parallelism  the parallelism for this operation
+   * @return  the new KV stream
+   */
+  def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
+    stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
+  }
+
+  /**
+   * Sum the value of the tuples
+   *
+   * Apply to Stream[Tuple2[K,V]], V must be of type Number
+   *
+   * For input (key, value1), (key, value2), will generate (key, value1 + 
value2)
+   * @param numeric  the numeric operations
+   * @return  the sum stream
+   */
+  def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
+    stream.reduce(Stream.sumByKey[K, V](numeric), "sum")
+  }
+}
+
+object Stream {
+
+  def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): 
Stream[T] = {
+    new Stream[T](graph, node, edge)
+  }
+
+  def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
+
+  def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => 
Tuple2[K, V]
+  = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+
+  implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): 
KVStream[K, V] = {
+    new KVStream(stream)
+  }
+
+  implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
+    def sink(dataSink: DataSink, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "sink"): 
Stream[T] = {
+      implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
+      stream.graph.addVertex(sink)
+      stream.graph.addEdge(stream.thisNode, Shuffle, sink)
+      new Stream[T](stream.graph, sink)
+    }
+  }
+}
+
+class LoggerSink[T] extends DataSink {
+  var logger: Logger = _
+
+  override def open(context: TaskContext): Unit = {
+    this.logger = context.logger
+  }
+
+  override def write(message: Message): Unit = {
+    logger.info("logging message " + message.msg)
+  }
+
+  override def close(): Unit = Unit
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
new file mode 100644
index 0000000..d6eed2e
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.Graph
+
+import scala.language.implicitConversions
+
+/**
+ * Example:
+ * {{{
+ * val data = "This is a good start, bingo!! bingo!!"
+ * app.fromCollection(data.lines.toList).
+ * // word => (word, count)
+ * flatMap(line => line.split("[\\s]+")).map((_, 1)).
+ * // (word, count1), (word, count2) => (word, count1 + count2)
+ * groupBy(kv => kv._1).reduce(sum(_, _))
+ *
+ * val appId = context.submit(app)
+ * context.close()
+ * }}}
+ *
+ * @param name name of app
+ */
+class StreamApp(
+    name: String, system: ActorSystem, userConfig: UserConfig,
+    private val graph: Graph[Op, OpEdge]) {
+
+  def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
+    this(name, system, userConfig, Graph.empty[Op, OpEdge])
+  }
+
+  def plan(): StreamApplication = {
+    implicit val actorSystem = system
+    val planner = new Planner
+    val dag = planner.plan(graph)
+    StreamApplication(name, dag, userConfig)
+  }
+}
+
+object StreamApp {
+  def apply(name: String, context: ClientContext, userConfig: UserConfig = 
UserConfig.empty)
+    : StreamApp = {
+    new StreamApp(name, context.system, userConfig)
+  }
+
+  implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication 
= {
+    streamApp.plan()
+  }
+
+  implicit class Source(app: StreamApp) extends java.io.Serializable {
+
+    def source[T](dataSource: DataSource, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "source"): 
Stream[T] = {
+      implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, 
description)
+      app.graph.addVertex(sourceOp)
+      new Stream[T](app.graph, sourceOp)
+    }
+
+    def source[T](seq: Seq[T], parallelism: Int, description: String): 
Stream[T] = {
+      this.source(new CollectionDataSource[T](seq), parallelism, 
UserConfig.empty, description)
+    }
+  }
+}
+
+/** A test message source which generated message sequence repeatedly. */
+class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
+  private lazy val iterator: Iterator[T] = seq.iterator
+
+  override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+  override def read(): Message = {
+    if (iterator.hasNext) {
+      Message(iterator.next(), Instant.now().toEpochMilli)
+    } else {
+      null
+    }
+  }
+
+  override def close(): Unit = {}
+
+  override def getWatermark: Instant = Instant.now()
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
new file mode 100644
index 0000000..f10a3db
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, 
MapFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => 
JFlatMapFunction}
+
+import scala.collection.JavaConverters._
+
+object FlatMapFunction {
+
+  def apply[T, R](fn: JFlatMapFunction[T, R]): FlatMapFunction[T, R] = {
+    new FlatMapFunction[T, R] {
+
+      override def setup(): Unit = {
+        fn.setup()
+      }
+
+      override def apply(t: T): TraversableOnce[R] = {
+        fn.apply(t).asScala
+      }
+
+
+      override def teardown(): Unit = {
+        fn.teardown()
+      }
+    }
+  }
+
+  def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = {
+    new FlatMapFunction[T, R] {
+      override def apply(t: T): TraversableOnce[R] = {
+        fn(t)
+      }
+    }
+  }
+
+  def apply[T, R](fn: MapFunction[T, R]): FlatMapFunction[T, R] = {
+    new FlatMapFunction[T, R] {
+
+      override def setup(): Unit = {
+        fn.setup()
+      }
+
+      override def apply(t: T): TraversableOnce[R] = {
+        Option(fn(t))
+      }
+
+      override def teardown(): Unit = {
+        fn.teardown()
+      }
+    }
+  }
+
+  def apply[T, R](fn: FilterFunction[T]): FlatMapFunction[T, T] = {
+    new FlatMapFunction[T, T] {
+
+      override def setup(): Unit = {
+        fn.setup()
+      }
+
+      override def apply(t: T): TraversableOnce[T] = {
+        if (fn(t)) {
+          Option(t)
+        } else {
+          None
+        }
+      }
+
+      override def teardown(): Unit = {
+        fn.teardown()
+      }
+    }
+  }
+}
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Scala version of FlatMapFunction returns a TraversableOnce.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+  def apply(t: T): TraversableOnce[R]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
new file mode 100644
index 0000000..ab88bf1
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
+
+/**
+ * Superclass for all user defined function interfaces.
+ * This ensures all functions are serializable and provides common methods
+ * like setup and teardown. Users should not extend this class directly
+ * but subclasses like [[FlatMapFunction]].
+ */
+abstract class SerializableFunction extends java.io.Serializable {
+
+  def setup(): Unit = {}
+
+  def teardown(): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index e35f085..c13a4fb 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -23,9 +23,8 @@ import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
-class TransformTask[IN, OUT](
-    operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
-    userConf: UserConfig) extends Task(taskContext, userConf) {
+class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]],
+    taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, 
userConf) {
 
   def this(taskContext: TaskContext, userConf: UserConfig) = {
     this(userConf.getValue[SingleInputFunction[IN, OUT]](

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index d87a9e4..223a4af 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -28,7 +28,7 @@ import com.gs.collections.impl.map.mutable.UnifiedMap
 import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
 import com.gs.collections.impl.set.mutable.UnifiedSet
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, 
SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, 
SingleInputFunction}
 import org.apache.gearpump.streaming.dsl.window.api.Discarding
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.LogUtil
@@ -39,7 +39,6 @@ trait WindowRunner {
   def process(message: Message): Unit
 
   def trigger(time: Instant): Unit
-
 }
 
 object DefaultWindowRunner {
@@ -59,7 +58,6 @@ class DefaultWindowRunner[IN, GROUP, OUT](
   private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]]
   private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
 
-
   override def process(message: Message): Unit = {
     val (group, buckets) = groupBy.groupBy(message)
     buckets.foreach { bucket =>
@@ -72,8 +70,11 @@ class DefaultWindowRunner[IN, GROUP, OUT](
       inputs.add(message.msg.asInstanceOf[IN])
       windowGroups.put(wg, inputs)
     }
-    groupFns.putIfAbsent(group,
-      userConfig.getValue[SingleInputFunction[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get)
+    if (!groupFns.containsKey(group)) {
+      val fn = userConfig.getValue[SingleInputFunction[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR).get
+      fn.setup()
+      groupFns.put(group, fn)
+    }
   }
 
   override def trigger(time: Instant): Unit = {
@@ -88,8 +89,7 @@ class DefaultWindowRunner[IN, GROUP, OUT](
           wgs.forEach(new Procedure[WindowGroup[GROUP]] {
             override def value(each: WindowGroup[GROUP]): Unit = {
               val inputs = windowGroups.remove(each)
-              val reduceFn = groupFns.get(each.group)
-                .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
+              val reduceFn = AndThen(groupFns.get(each.group), new 
Emit[OUT](emitResult(_, time)))
               inputs.forEach(new Procedure[IN] {
                 override def value(t: IN): Unit = {
                   // .toList forces eager evaluation
@@ -99,7 +99,7 @@ class DefaultWindowRunner[IN, GROUP, OUT](
               // .toList forces eager evaluation
               reduceFn.finish().toList
               if (groupBy.window.accumulationMode == Discarding) {
-                reduceFn.clearState()
+                reduceFn.teardown()
               }
             }
           })

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
deleted file mode 100644
index db4db93..0000000
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import akka.actor.ActorSystem
-import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.util.Graph
-import org.mockito.Mockito.when
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
-
-  implicit var system: ActorSystem = _
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "be able to generate multiple new streams" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-
-    val dsl = StreamApp("dsl", context)
-    dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]]
-    dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]]
-
-    val application = dsl.plan()
-    application shouldBe a [StreamApplication]
-    application.name shouldBe "dsl"
-    val dag = application.userConfig
-      .getValue[Graph[ProcessorDescription, 
PartitionerDescription]](StreamApplication.DAG).get
-    dag.vertices.size shouldBe 2
-    dag.vertices.foreach { processor =>
-      processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
-      if (processor.description == "A") {
-        processor.parallelism shouldBe 2
-      } else if (processor.description == "B") {
-        processor.parallelism shouldBe 3
-      } else {
-        fail(s"undefined source ${processor.description}")
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a23a40f5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala 
b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
deleted file mode 100644
index 8def61e..0000000
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import akka.actor._
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, 
HashPartitioner, PartitionerDescription}
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
-import org.apache.gearpump.streaming.dsl.StreamSpec.Join
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.apache.gearpump.util.Graph
-import org.apache.gearpump.util.Graph._
-import org.mockito.Mockito.when
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.util.{Either, Left, Right}
-
-class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with 
MockitoSugar {
-
-  implicit var system: ActorSystem = _
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "translate the DSL to a DAG" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-
-    val dsl = StreamApp("dsl", context)
-
-    val data =
-      """
-        five  four three  two    one
-        five  four three  two
-        five  four three
-        five  four
-        five
-      """
-    val stream = dsl.source(data.lines.toList, 1, "").
-      flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty).
-      map(word => (word, 1)).
-      groupBy(_._1, parallelism = 2).
-      reduce((left, right) => (left._1, left._2 + right._2)).
-      map[Either[(String, Int), String]](Left(_))
-
-    val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), 
String]](Right(_))
-    stream.merge(query).process[(String, Int)](classOf[Join], 1)
-
-    val app: StreamApplication = dsl.plan()
-    val dag = app.userConfig
-      .getValue[Graph[ProcessorDescription, 
PartitionerDescription]](StreamApplication.DAG).get
-
-    val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, 
node2) =>
-      edge.partitionerFactory.partitioner.getClass.getName
-    }
-    val expectedDagTopology = getExpectedDagTopology
-
-    dagTopology.vertices.toSet should contain theSameElementsAs 
expectedDagTopology.vertices.toSet
-    dagTopology.edges.toSet should contain theSameElementsAs 
expectedDagTopology.edges.toSet
-  }
-
-  private def getExpectedDagTopology: Graph[String, String] = {
-    val source = classOf[DataSourceTask[_, _]].getName
-    val group = classOf[CountTriggerTask[_, _]].getName
-    val merge = classOf[TransformTask[_, _]].getName
-    val join = classOf[Join].getName
-
-    val hash = classOf[HashPartitioner].getName
-    val groupBy = classOf[GroupByPartitioner[_, _]].getName
-    val colocation = classOf[CoLocationPartitioner].getName
-
-    val expectedDagTopology = Graph(
-      source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join,
-      source ~ hash ~> merge
-    )
-    expectedDagTopology
-  }
-}
-
-object StreamSpec {
-
-  class Join(taskContext: TaskContext, userConf: UserConfig) extends 
Task(taskContext, userConf) {
-
-    var query: String = _
-
-    override def onNext(msg: Message): Unit = {
-      msg.msg match {
-        case Left(wordCount: (String @unchecked, Int @unchecked)) =>
-          if (query != null && wordCount._1 == query) {
-            taskContext.output(new Message(wordCount))
-          }
-
-        case Right(query: String) =>
-          this.query = query
-      }
-    }
-  }
-}


Reply via email to