[GEARPUMP-311] refactor state management

Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/fe410304
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/fe410304
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/fe410304

Branch: refs/heads/state
Commit: fe4103046ae3bddacce2cd7a17270a994a494dbd
Parents: 50316ce
Author: vinoyang <[email protected]>
Authored: Fri Jul 21 20:32:10 2017 +0800
Committer: vinoyang <[email protected]>
Committed: Sat Jul 22 17:16:56 2017 +0800

----------------------------------------------------------------------
 .../state/refactor/ProduceProcessor.scala       |  53 +++++
 .../examples/state/refactor/SumProcessor.scala  |  92 ++++++++
 .../wordcountjava/dsl/refactor/WordCount.java   | 211 +++++++++++++++++++
 .../wordcount/dsl/refactor/WordCount.scala      | 151 +++++++++++++
 project/BuildExamples.scala                     |  36 +++-
 .../streaming/refactor/javaapi/Processor.java   | 143 +++++++++++++
 .../api/functions/MapWithStateFunction.scala    |  55 +++++
 .../refactor/dsl/javaapi/JavaStreamApp.scala    |  49 +++++
 .../functions/FlatMapWithStateFunction.scala    |  48 +++++
 .../streaming/refactor/dsl/plan/Planner.scala   |  90 ++++++++
 .../dsl/plan/functions/FunctionRunner.scala     | 135 ++++++++++++
 .../refactor/dsl/scalaapi/StreamApp.scala       | 116 ++++++++++
 .../functions/FlatMapWithStateFunction.scala    | 120 +++++++++++
 .../refactor/dsl/task/GroupByTask.scala         |  89 ++++++++
 .../streaming/refactor/dsl/task/TaskUtil.scala  |  65 ++++++
 .../refactor/dsl/task/TransformTask.scala       |  55 +++++
 .../dsl/window/impl/ReduceFnRunner.scala        |  29 +++
 .../refactor/dsl/window/impl/WindowRunner.scala | 160 ++++++++++++++
 .../refactor/sink/DataSinkProcessor.scala       |  36 ++++
 .../streaming/refactor/sink/DataSinkTask.scala  |  52 +++++
 .../refactor/source/DataSourceProcessor.scala   |  55 +++++
 .../refactor/source/DataSourceTask.scala        |  91 ++++++++
 .../refactor/state/RuntimeContext.scala         |  35 +++
 .../streaming/refactor/state/StatefulTask.scala | 174 +++++++++++++++
 24 files changed, 2136 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/ProduceProcessor.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/ProduceProcessor.scala
 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/ProduceProcessor.scala
new file mode 100644
index 0000000..11f99e2
--- /dev/null
+++ 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/ProduceProcessor.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.refactor
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StatefulTask}
+import org.apache.gearpump.streaming.task.TaskContext
+
+/**
+ *  a produce processor for generating a specific num sequence
+ */
+class ProduceProcessor(taskContext: TaskContext, conf: UserConfig)
+  extends StatefulTask(taskContext, conf) {
+
+  override def open(runtimeContext: RuntimeContext): Unit = {}
+
+  override def invoke(message: Message): Unit = {
+    message.value match {
+      case msgBytes: Array[Byte] => {
+        val msgStr = new String(msgBytes)
+        LOG.info("got total sequence num : {}", msgStr)
+
+        val n: Int = Integer.valueOf(msgStr)
+        var sumResult: Long = 0
+        for (i <- 1 to n) {
+          taskContext.output(Message(String.valueOf(i).getBytes))
+          sumResult = sumResult + i
+        }
+
+        LOG.info(" total sum result : {}", sumResult)
+      }
+    }
+  }
+
+  override def close(runtimeContext: RuntimeContext): Unit = {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/SumProcessor.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/SumProcessor.scala
 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/SumProcessor.scala
new file mode 100644
index 0000000..438b337
--- /dev/null
+++ 
b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/refactor/SumProcessor.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.state.refactor
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.coder._
+import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, 
ValueState}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StateNamespaces, StateTags, StatefulTask}
+import org.apache.gearpump.streaming.task.TaskContext
+
+/**
+ *  a sum processor for continues sum message from kafka
+ *  it is a example for using state api and verifying state exactly-once 
guarantee
+ */
+class SumProcessor(taskContext: TaskContext, conf: UserConfig)
+  extends StatefulTask(taskContext, conf) {
+
+  private implicit val valueStateTag = "tag1"
+  private implicit val counterStateTag = "tag2"
+
+  private var stateInternals: Option[StateInternals] = None
+  private var valueState: Option[ValueState[java.lang.Long]] = None
+  private var counterState: Option[ValueState[java.lang.Long]] = None
+
+  override def open(stateContext: RuntimeContext): Unit = {
+    stateInternals = Some(stateContext.getStateInternals(StringUtf8Coder.of, 
"partitionedKey"))
+    valueState = Some(
+      stateInternals.get.state(
+        StateNamespaces.global, StateTags.value(valueStateTag, 
VarLongCoder.of))
+    )
+
+    counterState = Some(
+      stateInternals.get.state(
+        StateNamespaces.global, StateTags.value(counterStateTag, 
VarLongCoder.of))
+    )
+
+    // init
+    if (valueState.get.read == null) {
+      LOG.info("[open] value state current is null, init it to 0")
+      valueState.get.write(0L)
+    } else {
+      LOG.info("[open] load from snapshot value state current is : {}", 
valueState.get.read)
+    }
+
+    if (counterState.get.read == null) {
+      LOG.info("[open] counter state current is null, init it to 0")
+      counterState.get.write(0L)
+    } else {
+      LOG.info("[open] load from snapshot counter state current is : {}", 
counterState.get.read)
+    }
+  }
+
+  override def invoke(message: Message): Unit = {
+    message.value match {
+      case numberByte: Array[Byte] => {
+        val number = new String(numberByte)
+        val oldVal = valueState.get.read
+        valueState.get.write(oldVal + java.lang.Long.valueOf(number))
+
+        val oldCounter = counterState.get.read
+        counterState.get.write(oldCounter + 1)
+      }
+
+      case other => LOG.error("received unsupported message {}", other)
+    }
+
+    if (counterState.get.read % 1000000 == 0) {
+      LOG.info("counter state is : {}", counterState.get.read)
+      LOG.info("value state is : {}", valueState.get.read)
+    }
+  }
+
+  override def close(stateContext: RuntimeContext): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/refactor/WordCount.java
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/refactor/WordCount.java
 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/refactor/WordCount.java
new file mode 100644
index 0000000..900c35a
--- /dev/null
+++ 
b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/refactor/WordCount.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcountjava.dsl.refactor;
+
+import com.typesafe.config.Config;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.gearpump.DefaultMessage;
+import org.apache.gearpump.Message;
+import org.apache.gearpump.cluster.ClusterConfig;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.cluster.client.ClientContext;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
+import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory;
+import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation;
+import org.apache.gearpump.streaming.refactor.coder.StringUtf8Coder;
+import org.apache.gearpump.streaming.refactor.coder.VarLongCoder;
+import 
org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction;
+import org.apache.gearpump.streaming.refactor.dsl.javaapi.JavaStream;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
+import org.apache.gearpump.streaming.refactor.dsl.javaapi.JavaStreamApp;
+import 
org.apache.gearpump.streaming.refactor.dsl.javaapi.functions.FlatMapWithStateFunction;
+import org.apache.gearpump.streaming.refactor.state.RuntimeContext;
+import org.apache.gearpump.streaming.refactor.state.StateNamespaces;
+import org.apache.gearpump.streaming.refactor.state.StateTags;
+import org.apache.gearpump.streaming.refactor.state.api.StateInternals;
+import org.apache.gearpump.streaming.refactor.state.api.ValueState;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.state.impl.PersistentStateConfig;
+import org.apache.gearpump.streaming.task.TaskContext;
+import org.apache.hadoop.conf.Configuration;
+import scala.Tuple2;
+
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Iterator;
+
+/** Java version of WordCount with high level DSL API */
+public class WordCount {
+
+  public static void main(String[] args) throws InterruptedException {
+    main(ClusterConfig.defaultConfig(), args);
+  }
+
+  public static void main(Config akkaConf, String[] args) throws 
InterruptedException {
+    ClientContext context = new ClientContext(akkaConf);
+
+    Configuration hadoopConfig = new Configuration();
+    HadoopCheckpointStoreFactory checkpointStoreFactory = new 
HadoopCheckpointStoreFactory(
+            "MessageConsume", hadoopConfig,
+            // Rotates on 1MB
+            new FileSizeRotation(1000000)
+    );
+    UserConfig taskConfig = UserConfig.empty()
+            .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE(), true)
+            .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS(), 
1000L)
+            .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY(),
+                    checkpointStoreFactory,
+                    context.system()
+            );
+
+    JavaStreamApp app = new JavaStreamApp("JavaDSL", context, taskConfig);
+
+    JavaStream<String> sentence = app.source(new StringSource("This is a good 
start, bingo!! bingo!!"),
+        1, UserConfig.empty(), "source");
+
+    JavaStream<String> words = sentence.flatMapWithState(new 
StatefulSplitFunction(), "flatMap");
+
+    JavaStream<Tuple2<String, Integer>> ones = words.mapWithState(new 
StatefulMapFunction(), "map");
+
+    JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new 
TupleKey(), 1, "groupBy");
+
+    JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new 
Count(), "reduce");
+
+    wordcount.log();
+
+    app.submit().waitUntilFinish();
+    context.close();
+  }
+
+  private static class StringSource implements DataSource {
+
+    private final String str;
+
+    StringSource(String str) {
+      this.str = str;
+    }
+
+    @Override
+    public void open(TaskContext context, Instant startTime) {
+    }
+
+    @Override
+    public Message read() {
+      return new DefaultMessage(str, Instant.now());
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Instant getWatermark() {
+      return Instant.now();
+    }
+  }
+
+  private static class StatefulSplitFunction extends 
FlatMapWithStateFunction<String, String> {
+
+    private static final Log logger = 
LogFactory.getLog(StatefulSplitFunction.class);
+
+    private String counterStateTag = "tag1";
+
+    private StateInternals stateInternal;
+    private ValueState<Long> counterState;
+
+    @Override
+    public void setup(RuntimeContext runtimeContext) {
+      logger.info("StatefulSplitFunction setup.");
+      stateInternal = runtimeContext.getStateInternals(StringUtf8Coder.of(), 
"partitionedKey");
+
+      counterState = stateInternal.state(StateNamespaces.global(), 
StateTags.value(counterStateTag, VarLongCoder.of()));
+
+      if (counterState.read() == null) {
+        counterState.write(0L);
+      }
+    }
+
+    @Override
+    public Iterator<String> flatMap(String s) {
+      long oldVal = counterState.read();
+      logger.info("old value in flatMap : " + oldVal);
+      counterState.write(oldVal + 1);
+
+      return Arrays.asList(s.split("\\s+")).iterator();
+    }
+
+    @Override
+    public void teardown(RuntimeContext runtimeContext) {
+      logger.info("StatefulSplitFunction teardown.");
+    }
+  }
+
+  private static class StatefulMapFunction extends 
MapWithStateFunction<String, Tuple2<String, Integer>> {
+
+    private static final Log logger = 
LogFactory.getLog(StatefulMapFunction.class);
+
+    private String counterStateTag = "tag2";
+
+    private StateInternals stateInternal;
+    private ValueState<Long> counterState;
+
+    @Override
+    public void setup(RuntimeContext runtimeContext) {
+      logger.info("StatefulMapFunction setup.");
+      stateInternal = runtimeContext.getStateInternals(StringUtf8Coder.of(), 
"partitionedKey");
+
+      counterState = stateInternal.state(StateNamespaces.global(), 
StateTags.value(counterStateTag, VarLongCoder.of()));
+
+      if (counterState.read() == null) {
+        counterState.write(0L);
+      }
+    }
+
+    @Override
+    public Tuple2<String, Integer> map(String s) {
+      long oldVal = counterState.read();
+      logger.info("old value in map method : " + oldVal);
+      counterState.write(oldVal + 1);
+
+      return new Tuple2<>(s, 1);
+    }
+
+    @Override
+    public void teardown(RuntimeContext runtimeContext) {
+      logger.info("StatefulMapFunction teardown.");
+    }
+  }
+
+  private static class Count extends ReduceFunction<Tuple2<String, Integer>> {
+
+    @Override
+    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, 
Tuple2<String, Integer> t2) {
+      return new Tuple2<>(t1._1(), t1._2() + t2._2());
+    }
+  }
+
+  private static class TupleKey extends GroupByFunction<Tuple2<String, 
Integer>, String> {
+
+    @Override
+    public String groupBy(Tuple2<String, Integer> tuple) {
+      return tuple._1();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala
new file mode 100644
index 0000000..a9919e2
--- /dev/null
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/refactor/WordCount.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.examples.wordcount.dsl.refactor
+
+import org.apache.commons.logging.{Log, LogFactory}
+import org.apache.gearpump.streaming.refactor.coder.{StringUtf8Coder, 
VarLongCoder}
+import 
org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.StreamApp
+import 
org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction
+import org.apache.gearpump.streaming.refactor.state.api.{StateInternals, 
ValueState}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StateNamespaces, StateTags}
+import org.apache.hadoop.conf.Configuration
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
+import org.apache.gearpump.streaming.hadoop.HadoopCheckpointStoreFactory
+import org.apache.gearpump.streaming.hadoop.lib.rotation.FileSizeRotation
+import org.apache.gearpump.streaming.state.impl.PersistentStateConfig
+import org.apache.gearpump.streaming.{Processor, StreamApplication}
+import org.apache.gearpump.util.{AkkaApp, Graph}
+
+/**
+ *
+ */
+object WordCount extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val context = ClientContext(akkaConf)
+
+    val hadoopConfig = new Configuration
+    val checkpointStoreFactory = new 
HadoopCheckpointStoreFactory("MessageConsume", hadoopConfig,
+      // Rotates on 1MB
+      new FileSizeRotation(1000000))
+    val taskConfig = UserConfig.empty
+      .withBoolean(PersistentStateConfig.STATE_CHECKPOINT_ENABLE, true)
+      .withLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS, 1000L)
+      .withValue(PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY,
+        checkpointStoreFactory)(context.system)
+
+    val app = StreamApp("dsl", context, taskConfig)
+    val data = "This is a good start, bingo!! bingo!!"
+    app.source(data.lines.toList, 1, "source").
+      // word => (word, count)
+      flatMapWithState(new StatefulFlatMapFunction(), "a stateful flatmap 
udf").
+      mapWithState(new StatefulMapFunction(), "").
+      // (word, count1), (word, count2) => (word, count1 + count2)
+      groupByKey().sum.log
+
+    context.submit(app).waitUntilFinish()
+    context.close()
+  }
+
+
+  private class StatefulFlatMapFunction
+    extends FlatMapWithStateFunction[String, String] {
+
+    private val logger: Log = LogFactory.getLog(getClass)
+
+    private implicit val counterStateTag = "tag1"
+
+    private var stateInternals: Option[StateInternals] = None
+    private var counterState: Option[ValueState[java.lang.Long]] = None
+
+    override def setup(runtimeContext: RuntimeContext): Unit = {
+      logger.info("StatefulFlatMapFunction setup.")
+      stateInternals = 
Some(runtimeContext.getStateInternals(StringUtf8Coder.of, "partitionedKey"))
+
+      counterState = Some(
+        stateInternals.get.state(
+          StateNamespaces.global, StateTags.value(counterStateTag, 
VarLongCoder.of))
+      )
+
+      // init
+      if (counterState.get.read == null) {
+        counterState.get.write(0L)
+      }
+    }
+
+
+    override def flatMap(t: String): TraversableOnce[String] = {
+      val oldVal = counterState.get.read
+      logger.info("old value in flatmap : " + oldVal)
+      counterState.get.write(oldVal + 1)
+
+      t.split("[\\s]+")
+    }
+
+    override def teardown(runtimeContext: RuntimeContext): Unit = {
+      logger.info("StatefulFlatMapFunction teardown.")
+    }
+
+  }
+
+  private class StatefulMapFunction
+    extends MapWithStateFunction[String, (String, Int)] {
+
+    private val logger: Log = LogFactory.getLog(getClass)
+
+    private implicit val counterStateTag = "tag2"
+
+    private var stateInternals: Option[StateInternals] = None
+    private var counterState: Option[ValueState[java.lang.Long]] = None
+
+    override def setup(runtimeContext: RuntimeContext): Unit = {
+      logger.info("StatefulMapFunction setup.")
+      stateInternals = 
Some(runtimeContext.getStateInternals(StringUtf8Coder.of, "partitionedKey"))
+
+      counterState = Some(
+        stateInternals.get.state(
+          StateNamespaces.global, StateTags.value(counterStateTag, 
VarLongCoder.of))
+      )
+
+      // init
+      if (counterState.get.read == null) {
+        counterState.get.write(0L)
+      }
+    }
+
+    override def map(t: String): (String, Int) = {
+      val oldVal = counterState.get.read
+      logger.info("old value in map : " + oldVal)
+      counterState.get.write(oldVal + 1)
+
+      (t, 1)
+    }
+
+    override def teardown(runtimeContext: RuntimeContext): Unit = {
+      logger.info("StatefulMapFunction teardown.")
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/project/BuildExamples.scala
----------------------------------------------------------------------
diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala
index d7390b5..57210c0 100644
--- a/project/BuildExamples.scala
+++ b/project/BuildExamples.scala
@@ -62,14 +62,42 @@ object BuildExamples extends sbt.Build {
   lazy val wordcountJava = Project(
     id = "gearpump-examples-wordcountjava",
     base = file("examples/streaming/wordcount-java"),
-    settings = 
exampleSettings("org.apache.gearpump.streaming.examples.wordcountjava.WordCount")
-  ).dependsOn(core % "provided", streaming % "test->test; provided")
+    settings = 
exampleSettings("org.apache.gearpump.streaming.examples.wordcountjava.WordCount")
 ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "org.apache.hadoop" % "hadoop-common" % hadoopVersion
+            exclude("org.mortbay.jetty", "jetty-util")
+            exclude("org.mortbay.jetty", "jetty")
+            exclude("org.fusesource.leveldbjni", "leveldbjni-all")
+            exclude("tomcat", "jasper-runtime")
+            exclude("commons-beanutils", "commons-beanutils-core")
+            exclude("commons-beanutils", "commons-beanutils")
+            exclude("asm", "asm")
+            exclude("org.ow2.asm", "asm"),
+          "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion
+        )
+      )
+  ).dependsOn(core % "provided", streaming % "test->test; provided", 
external_hadoopfs)
 
   lazy val wordcount = Project(
     id = "gearpump-examples-wordcount",
     base = file("examples/streaming/wordcount"),
-    settings = 
exampleSettings("org.apache.gearpump.streaming.examples.wordcount.WordCount")
-  ).dependsOn(core % "provided", streaming % "test->test; provided")
+    settings = 
exampleSettings("org.apache.gearpump.streaming.examples.wordcount.WordCount") ++
+      Seq(
+        libraryDependencies ++= Seq(
+          "org.apache.hadoop" % "hadoop-common" % hadoopVersion
+            exclude("org.mortbay.jetty", "jetty-util")
+            exclude("org.mortbay.jetty", "jetty")
+            exclude("org.fusesource.leveldbjni", "leveldbjni-all")
+            exclude("tomcat", "jasper-runtime")
+            exclude("commons-beanutils", "commons-beanutils-core")
+            exclude("commons-beanutils", "commons-beanutils")
+            exclude("asm", "asm")
+            exclude("org.ow2.asm", "asm"),
+          "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion
+        )
+      )
+  ).dependsOn(core % "provided", streaming % "test->test; provided", 
external_hadoopfs)
 
   lazy val sol = Project(
     id = "gearpump-examples-sol",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/java/org/apache/gearpump/streaming/refactor/javaapi/Processor.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/gearpump/streaming/refactor/javaapi/Processor.java
 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/javaapi/Processor.java
new file mode 100644
index 0000000..7d85b09
--- /dev/null
+++ 
b/streaming/src/main/java/org/apache/gearpump/streaming/refactor/javaapi/Processor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.javaapi;
+
+import akka.actor.ActorSystem;
+import org.apache.gearpump.cluster.UserConfig;
+import org.apache.gearpump.streaming.javaapi.Task;
+import org.apache.gearpump.streaming.sink.DataSink;
+import org.apache.gearpump.streaming.refactor.sink.DataSinkProcessor;
+import org.apache.gearpump.streaming.refactor.sink.DataSinkTask;
+import org.apache.gearpump.streaming.source.DataSource;
+import org.apache.gearpump.streaming.refactor.source.DataSourceProcessor;
+import org.apache.gearpump.streaming.refactor.source.DataSourceTask;
+
+/**
+ * Java version of Processor
+ *
+ * See {@link org.apache.gearpump.streaming.Processor}
+ */
+public class Processor<T extends org.apache.gearpump.streaming.task.Task> 
implements org.apache.gearpump.streaming.Processor<T> {
+  private Class<T> _taskClass;
+  private int _parallelism = 1;
+  private String _description = "";
+  private UserConfig _userConf = UserConfig.empty();
+
+  public Processor(Class<T> taskClass) {
+    this._taskClass = taskClass;
+  }
+
+  public Processor(Class<T> taskClass, int parallelism) {
+    this._taskClass = taskClass;
+    this._parallelism = parallelism;
+  }
+
+  /**
+   * Creates a Sink Processor
+   *
+   * @param dataSink    the data sink itself
+   * @param parallelism the parallelism of this processor
+   * @param description the description for this processor
+   * @param taskConf    the configuration for this processor
+   * @param system      actor system
+   * @return the new created sink processor
+   */
+  public static Processor<DataSinkTask> sink(DataSink dataSink, int 
parallelism, String description, UserConfig taskConf, ActorSystem system) {
+    org.apache.gearpump.streaming.Processor<DataSinkTask> p = 
DataSinkProcessor.apply(dataSink, parallelism, description, taskConf, system);
+    return new Processor(p);
+  }
+
+  /**
+   * Creates a Source Processor
+   *
+   * @param source      the data source itself
+   * @param parallelism the parallelism of this processor
+   * @param description the description of this processor
+   * @param taskConf    the configuration of this processor
+   * @param system      actor system
+   * @return the new created source processor
+   */
+  public static Processor<DataSourceTask> source(DataSource source, int 
parallelism, String description, UserConfig taskConf, ActorSystem system) {
+    org.apache.gearpump.streaming.Processor<DataSourceTask<Object, Object>> p =
+        DataSourceProcessor.apply(source, parallelism, description, taskConf, 
system);
+    return new Processor(p);
+  }
+
+  public Processor(org.apache.gearpump.streaming.Processor<T> processor) {
+    this._taskClass = (Class) (processor.taskClass());
+    this._parallelism = processor.parallelism();
+    this._description = processor.description();
+    this._userConf = processor.taskConf();
+  }
+
+  /**
+   * Creates a general processor with user specified task logic.
+   *
+   * @param taskClass    task implementation class of this processor (shall be 
a derived class from {@link Task}
+   * @param parallelism, how many initial tasks you want to use
+   * @param description, some text to describe this processor
+   * @param taskConf,    Processor specific configuration
+   */
+  public Processor(Class<T> taskClass, int parallelism, String description, 
UserConfig taskConf) {
+    this._taskClass = taskClass;
+    this._parallelism = parallelism;
+    this._description = description;
+    this._userConf = taskConf;
+  }
+
+  public Processor<T> withParallelism(int parallel) {
+    return new Processor<T>(_taskClass, parallel, _description, _userConf);
+  }
+
+  public Processor<T> withDescription(String desc) {
+    return new Processor<T>(_taskClass, _parallelism, desc, _userConf);
+  }
+
+  public Processor<T> withConfig(UserConfig conf) {
+    return new Processor<T>(_taskClass, _parallelism, _description, conf);
+  }
+
+  @Override
+  public int parallelism() {
+    return _parallelism;
+  }
+
+  @Override
+  public UserConfig taskConf() {
+    return _userConf;
+  }
+
+  @Override
+  public String description() {
+    return _description;
+  }
+
+  @Override
+  public Class<? extends org.apache.gearpump.streaming.task.Task> taskClass() {
+    return _taskClass;
+  }
+
+  /**
+   * reference equal
+   */
+  @Override
+  public boolean equals(Object obj) {
+    return (this == obj);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/api/functions/MapWithStateFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/api/functions/MapWithStateFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/api/functions/MapWithStateFunction.scala
new file mode 100644
index 0000000..6724ab5
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/api/functions/MapWithStateFunction.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.dsl.api.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction
+import org.apache.gearpump.streaming.refactor.state.RuntimeContext
+
+object MapWithStateFunction {
+
+  def apply[T, R](fn: T => R): MapWithStateFunction[T, R] = {
+    new MapWithStateFunction[T, R] {
+      override def map(t: T): R = {
+        fn(t)
+      }
+    }
+  }
+
+}
+
+/**
+ *  map function support state
+ */
+abstract class MapWithStateFunction[T, R] extends MapFunction[T, R] {
+
+  final override def setup(): Unit = {
+    throw new UnsupportedOperationException("please call or override " +
+      "setup(runtimeContext: RuntimeContext) .")
+  }
+
+  final override def teardown(): Unit = {
+    throw new UnsupportedOperationException("please call or override " +
+      "teardown(runtimeContext: RuntimeContext) ")
+  }
+
+  def setup(runtimeContext: RuntimeContext): Unit = {}
+
+  def teardown(runtimeContext: RuntimeContext): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStreamApp.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStreamApp.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStreamApp.scala
new file mode 100644
index 0000000..5a88c17
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/JavaStreamApp.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.dsl.javaapi
+
+import java.util.Collection
+
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.{ClientContext, RunningApplication}
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.streaming.refactor.dsl.scalaapi.StreamApp
+import org.apache.gearpump.streaming.source.DataSource
+
+import scala.collection.JavaConverters._
+
+class JavaStreamApp(name: String, context: ClientContext, userConfig: 
UserConfig) {
+
+  private val streamApp = StreamApp(name, context, userConfig)
+
+  def source[T](collection: Collection[T], parallelism: Int,
+      conf: UserConfig, description: String): JavaStream[T] = {
+    val dataSource = new CollectionDataSource(collection.asScala.toSeq)
+    source(dataSource, parallelism, conf, description)
+  }
+
+  def source[T](dataSource: DataSource, parallelism: Int,
+      conf: UserConfig, description: String): JavaStream[T] = {
+    new JavaStream[T](streamApp.source(dataSource, parallelism, conf, 
description))
+  }
+
+  def submit(): RunningApplication = {
+    context.submit(streamApp)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/functions/FlatMapWithStateFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/functions/FlatMapWithStateFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/functions/FlatMapWithStateFunction.scala
new file mode 100644
index 0000000..4ed48ab
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/javaapi/functions/FlatMapWithStateFunction.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.refactor.dsl.javaapi.functions
+
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.refactor.state.RuntimeContext
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Java version of FlatMapFunction returns a java.util.Iterator.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapWithStateFunction[T, R] extends FlatMapFunction[T, R] {
+
+  def flatMap(t: T): java.util.Iterator[R]
+
+  final override def setup(): Unit = {
+    throw new UnsupportedOperationException("please call or override " +
+      "setup(runtimeContext: RuntimeContext) ")
+  }
+
+  final override def teardown(): Unit = {
+    throw new UnsupportedOperationException("please call or override " +
+      "teardown(runtimeContext: RuntimeContext) ")
+  }
+
+  def setup(runtimeContext: RuntimeContext): Unit = {}
+
+  def teardown(runtimeContext: RuntimeContext): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/Planner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/Planner.scala
new file mode 100644
index 0000000..03cdf43
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/Planner.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.dsl.plan
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, 
GroupByPartitioner, HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.refactor.dsl.plan.functions._
+import org.apache.gearpump.streaming.task.Task
+import org.apache.gearpump.util.Graph
+
+class Planner {
+
+  /**
+   * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of 
the low
+   * level Graph API.
+   */
+  def plan(dag: Graph[Op, OpEdge])
+    (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: 
Partitioner] = {
+
+    val graph = optimize(dag)
+    graph.mapEdge { (_, edge, node2) =>
+      edge match {
+        case Shuffle =>
+          node2 match {
+            case op: GroupByOp[_, _] =>
+              new GroupByPartitioner(op.groupBy)
+            case _ => new HashPartitioner
+          }
+        case Direct =>
+          new CoLocationPartitioner
+      }
+    }.mapVertex(_.toProcessor)
+  }
+
+  private def optimize(dag: Graph[Op, OpEdge])
+    (implicit system: ActorSystem): Graph[Op, OpEdge] = {
+    val graph = dag.copy
+    val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse
+    for (node <- nodes) {
+      val outGoingEdges = graph.outgoingEdgesOf(node)
+      for (edge <- outGoingEdges) {
+        merge(graph, edge._1, edge._3)
+      }
+    }
+    graph
+  }
+
+  private def merge(graph: Graph[Op, OpEdge], node1: Op, node2: Op)
+    (implicit system: ActorSystem): Unit = {
+    if (graph.outDegreeOf(node1) == 1 &&
+      graph.inDegreeOf(node2) == 1 &&
+      // For processor node, we don't allow it to merge with downstream 
operators
+      !node1.isInstanceOf[ProcessorOp[_ <: Task]] &&
+      !node2.isInstanceOf[ProcessorOp[_ <: Task]]) {
+      val (_, edge, _) = graph.outgoingEdgesOf(node1).head
+      if (edge == Direct) {
+        val chainedOp = node1.chain(node2)
+        graph.addVertex(chainedOp)
+        for (incomingEdge <- graph.incomingEdgesOf(node1)) {
+          graph.addEdge(incomingEdge._1, incomingEdge._2, chainedOp)
+        }
+
+        for (outgoingEdge <- graph.outgoingEdgesOf(node2)) {
+          graph.addEdge(chainedOp, outgoingEdge._2, outgoingEdge._3)
+        }
+
+        // Remove the old vertex
+        graph.removeVertex(node1)
+        graph.removeVertex(node2)
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/functions/FunctionRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/functions/FunctionRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/functions/FunctionRunner.scala
new file mode 100644
index 0000000..27f4265
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/plan/functions/FunctionRunner.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.refactor.dsl.plan.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import 
org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions.FlatMapWithStateFunction
+import org.apache.gearpump.streaming.refactor.state.RuntimeContext
+
+/**
+ * Interface to invoke SerializableFunction methods
+ *
+ * @param IN input value type
+ * @param OUT output value type
+ */
+sealed trait FunctionRunner[IN, OUT] extends java.io.Serializable {
+
+  def setup(runtimeContext: RuntimeContext): Unit = {}
+
+  def process(value: IN): TraversableOnce[OUT]
+
+  def finish(): TraversableOnce[OUT] = None
+
+  def teardown(runtimeContext: RuntimeContext): Unit = {}
+
+  def description: String
+}
+
+case class AndThen[IN, MIDDLE, OUT](first: FunctionRunner[IN, MIDDLE],
+    second: FunctionRunner[MIDDLE, OUT])
+  extends FunctionRunner[IN, OUT] {
+
+  override def setup(runtimeContext: RuntimeContext): Unit = {
+    first.setup(runtimeContext)
+    second.setup(runtimeContext)
+  }
+
+  override def process(value: IN): TraversableOnce[OUT] = {
+    first.process(value).flatMap(second.process)
+  }
+
+  override def finish(): TraversableOnce[OUT] = {
+    val firstResult = first.finish().flatMap(second.process)
+    if (firstResult.isEmpty) {
+      second.finish()
+    } else {
+      firstResult
+    }
+  }
+
+  override def teardown(runtimeContext: RuntimeContext): Unit = {
+    first.teardown(runtimeContext)
+    second.teardown(runtimeContext)
+  }
+
+  override def description: String = {
+    Option(first.description).flatMap { description =>
+      Option(second.description).map(description + "." + _)
+    }.orNull
+  }
+}
+
+class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: 
String)
+  extends FunctionRunner[IN, OUT] {
+
+  override def setup(runtimeContext: RuntimeContext): Unit = {
+    if (fn.isInstanceOf[FlatMapWithStateFunction[IN, OUT]]) {
+      fn.asInstanceOf[FlatMapWithStateFunction[IN, OUT]].setup(runtimeContext)
+    } else {
+      fn.setup()
+    }
+  }
+
+  override def process(value: IN): TraversableOnce[OUT] = {
+    fn.flatMap(value)
+  }
+
+  override def teardown(runtimeContext: RuntimeContext): Unit = {
+    if (fn.isInstanceOf[FlatMapWithStateFunction[IN, OUT]]) {
+      fn.asInstanceOf[FlatMapWithStateFunction[IN, 
OUT]].teardown(runtimeContext)
+    } else {
+      fn.teardown()
+    }
+  }
+}
+
+class FoldRunner[T, A](fn: FoldFunction[T, A], val description: String)
+  extends FunctionRunner[T, A] {
+
+  private var state: Option[A] = None
+
+  override def setup(runtimeContext: RuntimeContext): Unit = {
+    // TODO
+    fn.setup()
+    state = Option(fn.init)
+  }
+
+  override def process(value: T): TraversableOnce[A] = {
+    state = state.map(fn.fold(_, value))
+    None
+  }
+
+  override def finish(): TraversableOnce[A] = {
+    state
+  }
+
+  override def teardown(runtimeContext: RuntimeContext): Unit = {
+    // TODO
+    state = None
+    fn.teardown()
+  }
+}
+
+class DummyRunner[T] extends FunctionRunner[T, T] {
+
+  override def process(value: T): TraversableOnce[T] = Option(value)
+
+  override def description: String = ""
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/StreamApp.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/StreamApp.scala
new file mode 100644
index 0000000..a8dd727
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/StreamApp.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.dsl.scalaapi
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.refactor.dsl.plan._
+import 
org.apache.gearpump.streaming.refactor.dsl.plan.functions.{DataSourceOp, Op, 
OpEdge}
+import org.apache.gearpump.streaming.source.{DataSource, Watermark}
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.Graph
+
+import scala.language.implicitConversions
+
+/**
+ * Example:
+ * {{{
+ * val data = "This is a good start, bingo!! bingo!!"
+ * app.fromCollection(data.lines.toList).
+ * // word => (word, count)
+ * flatMap(line => line.split("[\\s]+")).map((_, 1)).
+ * // (word, count1), (word, count2) => (word, count1 + count2)
+ * groupBy(kv => kv._1).reduce(sum(_, _))
+ *
+ * val appId = context.submit(app)
+ * context.close()
+ * }}}
+ *
+ * @param name name of app
+ */
+class StreamApp(
+    name: String, system: ActorSystem, userConfig: UserConfig,
+    private val graph: Graph[Op, OpEdge]) {
+
+  def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
+    this(name, system, userConfig, Graph.empty[Op, OpEdge])
+  }
+
+  def plan(): StreamApplication = {
+    implicit val actorSystem = system
+    val planner = new Planner
+    val dag = planner.plan(graph)
+    StreamApplication(name, dag, userConfig)
+  }
+}
+
+object StreamApp {
+  def apply(name: String, context: ClientContext, userConfig: UserConfig = 
UserConfig.empty)
+    : StreamApp = {
+    new StreamApp(name, context.system, userConfig)
+  }
+
+  implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication 
= {
+    streamApp.plan()
+  }
+
+  implicit class Source(app: StreamApp) extends java.io.Serializable {
+
+    def source[T](dataSource: DataSource, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "source"): 
Stream[T] = {
+      implicit val sourceOp = DataSourceOp(dataSource, parallelism, 
description, conf)
+      app.graph.addVertex(sourceOp)
+      new Stream[T](app.graph, sourceOp)
+    }
+
+    def source[T](seq: Seq[T], parallelism: Int, description: String): 
Stream[T] = {
+      this.source(new CollectionDataSource[T](seq), parallelism, 
UserConfig.empty, description)
+    }
+  }
+}
+
+/** A test message source which generated message sequence repeatedly. */
+class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
+  private lazy val iterator: Iterator[T] = seq.iterator
+
+  override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+  override def read(): Message = {
+    if (iterator.hasNext) {
+      Message(iterator.next(), Instant.now())
+    } else {
+      null
+    }
+  }
+
+  override def close(): Unit = {}
+
+  override def getWatermark: Instant = {
+    if (iterator.hasNext) {
+      Instant.now()
+    } else {
+      Watermark.MAX
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/functions/FlatMapWithStateFunction.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/functions/FlatMapWithStateFunction.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/functions/FlatMapWithStateFunction.scala
new file mode 100644
index 0000000..cb878d6
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/scalaapi/functions/FlatMapWithStateFunction.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.refactor.dsl.scalaapi.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import 
org.apache.gearpump.streaming.refactor.dsl.javaapi.functions.{FlatMapWithStateFunction
 => JFlatMapWithStateFunction}
+import 
org.apache.gearpump.streaming.refactor.dsl.api.functions.MapWithStateFunction
+import org.apache.gearpump.streaming.refactor.state.RuntimeContext
+
+import scala.collection.JavaConverters._
+
+object FlatMapWithStateFunction {
+
+  def apply[T, R](fn: JFlatMapWithStateFunction[T, R]): 
FlatMapWithStateFunction[T, R] = {
+    new FlatMapWithStateFunction[T, R] {
+
+      override def setup(runtimeContext: RuntimeContext): Unit = {
+        fn.setup(runtimeContext)
+      }
+
+      override def flatMap(t: T): TraversableOnce[R] = {
+        fn.flatMap(t).asScala
+      }
+
+
+      override def teardown(runtimeContext: RuntimeContext): Unit = {
+        fn.teardown(runtimeContext)
+      }
+    }
+  }
+
+//  def apply[T, R](fn: T => TraversableOnce[R]): FlatMapWithStateFunction[T, 
R] = {
+//    new FlatMapWithStateFunction[T, R] {
+//      override def flatMap(t: T): TraversableOnce[R] = {
+//        fn(t)
+//      }
+//    }
+//  }
+
+  def apply[T, R](fn: MapWithStateFunction[T, R]): FlatMapWithStateFunction[T, 
R] = {
+    new FlatMapWithStateFunction[T, R] {
+
+      override def setup(runtimeContext: RuntimeContext): Unit = {
+        fn.setup(runtimeContext)
+      }
+
+      override def flatMap(t: T): TraversableOnce[R] = {
+        Option(fn.map(t))
+      }
+
+      override def teardown(runtimeContext: RuntimeContext): Unit = {
+        fn.teardown(runtimeContext)
+      }
+    }
+  }
+
+  def apply[T, R](fn: FilterFunction[T]): FlatMapWithStateFunction[T, T] = {
+    new FlatMapWithStateFunction[T, T] {
+
+      override def setup(runtimeContext: RuntimeContext): Unit = {
+        // TODO
+        fn.setup()
+      }
+
+      override def flatMap(t: T): TraversableOnce[T] = {
+        if (fn.filter(t)) {
+          Option(t)
+        } else {
+          None
+        }
+      }
+
+      override def teardown(runtimeContext: RuntimeContext): Unit = {
+        // TODO
+        fn.teardown()
+      }
+    }
+  }
+}
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Scala version of FlatMapFunction returns a TraversableOnce.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapWithStateFunction[T, R] extends FlatMapFunction[T, R] {
+
+  final override def setup(): Unit = {
+    throw new UnsupportedOperationException("please call or override " +
+      "setup(runtimeContext: RuntimeContext)")
+  }
+
+  final override def teardown(): Unit = {
+    throw new UnsupportedOperationException("please call or override " +
+      " teardown(runtimeContext: RuntimeContext)")
+  }
+
+  def setup(runtimeContext: RuntimeContext): Unit = {}
+
+  def teardown(runtimeContext: RuntimeContext): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/GroupByTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/GroupByTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/GroupByTask.scala
new file mode 100644
index 0000000..16f8cc9
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/GroupByTask.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.dsl.task
+
+import java.time.Instant
+import java.util.function.Consumer
+
+import com.gs.collections.impl.map.mutable.UnifiedMap
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import 
org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, 
GEARPUMP_STREAMING_OPERATOR}
+import 
org.apache.gearpump.streaming.refactor.dsl.window.impl.{TimestampedValue, 
WindowRunner}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StatefulTask}
+import org.apache.gearpump.streaming.source.Watermark
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil}
+
+/**
+ * Processes messages in groups as defined by groupBy function.
+ */
+class GroupByTask[IN, GROUP, OUT](
+    groupBy: IN => GROUP,
+    taskContext: TaskContext,
+    userConfig: UserConfig) extends StatefulTask(taskContext, userConfig) {
+
+  private var runtimeContext: RuntimeContext = null
+
+  def this(context: TaskContext, conf: UserConfig) = {
+    this(
+      conf.getValue[IN => 
GROUP](GEARPUMP_STREAMING_GROUPBY_FUNCTION)(context.system).get,
+      context, conf
+    )
+  }
+
+  private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] =
+    new UnifiedMap[GROUP, WindowRunner[IN, OUT]]
+
+
+  override def open(runtimeContext: RuntimeContext): Unit = {
+    this.runtimeContext = runtimeContext
+  }
+
+  override def invoke(message: Message): Unit = {
+    val input = message.value.asInstanceOf[IN]
+    val group = groupBy(input)
+
+    if (!groups.containsKey(group)) {
+      groups.put(group,
+        userConfig.getValue[WindowRunner[IN, OUT]](
+          GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get)
+    }
+
+    groups.get(group).process(TimestampedValue(message.value.asInstanceOf[IN],
+      message.timestamp))
+  }
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+    if (groups.isEmpty && watermark == Watermark.MAX) {
+      taskContext.updateWatermark(Watermark.MAX)
+    } else {
+      groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
+        override def accept(runner: WindowRunner[IN, OUT]): Unit = {
+          TaskUtil.trigger(watermark, runner, taskContext, runtimeContext)
+        }
+      })
+    }
+
+    super.onWatermarkProgress(watermark)
+  }
+
+  override def close(runtimeContext: RuntimeContext): Unit = {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TaskUtil.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TaskUtil.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TaskUtil.scala
new file mode 100644
index 0000000..fa474ec
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TaskUtil.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.dsl.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import 
org.apache.gearpump.streaming.refactor.dsl.window.impl.{TimestampedValue, 
WindowRunner}
+import org.apache.gearpump.streaming.refactor.state.RuntimeContext
+import org.apache.gearpump.streaming.task._
+
+object TaskUtil {
+
+  /**
+   * Resolves a classname to a Task class.
+   *
+   * @param className  the class name to resolve
+   * @return resolved class
+   */
+  def loadClass(className: String): Class[_ <: Task] = {
+    val loader = Thread.currentThread().getContextClassLoader()
+    loader.loadClass(className).asSubclass(classOf[Task])
+  }
+
+  def trigger[IN, OUT](watermark: Instant, runner: WindowRunner[IN, OUT],
+      context: TaskContext, runtimeContext: RuntimeContext): Unit = {
+    val triggeredOutputs = runner.trigger(watermark, runtimeContext)
+    context.updateWatermark(triggeredOutputs.watermark)
+    triggeredOutputs.outputs.foreach { case TimestampedValue(v, t) =>
+      context.output(Message(v, t))
+    }
+  }
+
+  /**
+   * @return t1 if t1 is not larger than t2 and t2 otherwise
+   */
+  def min(t1: Instant, t2: Instant): Instant = {
+    if (t1.isAfter(t2)) t2
+    else t1
+  }
+
+  /**
+   * @return t1 if t1 is not smaller than t2 and t2 otherwise
+   */
+  def max(t1: Instant, t2: Instant): Instant = {
+    if (t2.isBefore(t1)) t1
+    else t2
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TransformTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TransformTask.scala
new file mode 100644
index 0000000..b7dc895
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/task/TransformTask.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.refactor.dsl.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import 
org.apache.gearpump.streaming.refactor.dsl.window.impl.{TimestampedValue, 
WindowRunner}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StatefulTask}
+import org.apache.gearpump.streaming.task.{TaskContext}
+
+class TransformTask[IN, OUT](
+    runner: WindowRunner[IN, OUT],
+    taskContext: TaskContext, userConf: UserConfig) extends 
StatefulTask(taskContext, userConf) {
+
+  private var runtimeContext: RuntimeContext = null
+
+  def this(context: TaskContext, conf: UserConfig) = {
+    this(
+      conf.getValue[WindowRunner[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+      context, conf
+    )
+  }
+
+  override def open(stateContext: RuntimeContext): Unit = {
+    this.runtimeContext = stateContext
+  }
+
+  override def invoke(msg: Message): Unit = {
+    runner.process(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))
+  }
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+    TaskUtil.trigger(watermark, runner, taskContext, this.runtimeContext)
+    // do checkpoint
+    super.onWatermarkProgress(watermark)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
new file mode 100644
index 0000000..17c93bd
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/ReduceFnRunner.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.refactor.dsl.window.impl
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.dsl.window.api.Trigger
+
+trait ReduceFnRunner {
+
+  def process(message: Message): Unit
+
+  def onTrigger(trigger: Trigger): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/WindowRunner.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/WindowRunner.scala
new file mode 100644
index 0000000..43d28ef
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/dsl/window/impl/WindowRunner.scala
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.refactor.dsl.window.impl
+
+import java.time.Instant
+
+import com.gs.collections.api.block.predicate.Predicate
+import com.gs.collections.api.block.procedure.Procedure
+import com.gs.collections.impl.list.mutable.FastList
+import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
+import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
+import org.apache.gearpump.streaming.dsl.window.impl.Window
+import org.apache.gearpump.streaming.refactor.dsl.plan.functions.FunctionRunner
+import org.apache.gearpump.streaming.refactor.state.RuntimeContext
+import org.apache.gearpump.streaming.source.Watermark
+import org.apache.gearpump.streaming.task.TaskUtil
+
+import scala.collection.mutable.ArrayBuffer
+
+case class TimestampedValue[T](value: T, timestamp: Instant)
+
+case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]],
+    watermark: Instant)
+
+trait WindowRunner[IN, OUT] extends java.io.Serializable {
+
+  def process(timestampedValue: TimestampedValue[IN]): Unit
+
+  def trigger(time: Instant, runtimeContext: RuntimeContext): 
TriggeredOutputs[OUT]
+
+}
+
+case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
+    right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
+
+  override def process(timestampedValue: TimestampedValue[IN]): Unit = {
+    left.process(timestampedValue)
+  }
+
+  override def trigger(time: Instant, runtimeContext: RuntimeContext): 
TriggeredOutputs[OUT] = {
+    val lOutputs = left.trigger(time, runtimeContext)
+    lOutputs.outputs.foreach(right.process)
+    right.trigger(lOutputs.watermark, runtimeContext)
+  }
+}
+
+class DefaultWindowRunner[IN, OUT](
+    windows: Windows,
+    fnRunner: FunctionRunner[IN, OUT])
+  extends WindowRunner[IN, OUT] {
+
+  private val windowFn = windows.windowFn
+  private val windowInputs = new TreeSortedMap[Window, 
FastList[TimestampedValue[IN]]]
+  private var setup = false
+  private var watermark = Watermark.MIN
+
+  override def process(timestampedValue: TimestampedValue[IN]): Unit = {
+    val wins = windowFn(new Context[IN] {
+      override def element: IN = timestampedValue.value
+
+      override def timestamp: Instant = timestampedValue.timestamp
+    })
+    wins.foreach { win =>
+      if (windowFn.isNonMerging) {
+        if (!windowInputs.containsKey(win)) {
+          val inputs = new FastList[TimestampedValue[IN]]
+          windowInputs.put(win, inputs)
+        }
+        windowInputs.get(win).add(timestampedValue)
+      } else {
+        merge(windowInputs, win, timestampedValue)
+      }
+    }
+
+    def merge(
+        winIns: TreeSortedMap[Window, FastList[TimestampedValue[IN]]],
+        win: Window, tv: TimestampedValue[IN]): Unit = {
+      val intersected = winIns.keySet.select(new Predicate[Window] {
+        override def accept(each: Window): Boolean = {
+          win.intersects(each)
+        }
+      })
+      var mergedWin = win
+      val mergedInputs = FastList.newListWith(tv)
+      intersected.forEach(new Procedure[Window] {
+        override def value(each: Window): Unit = {
+          mergedWin = mergedWin.span(each)
+          mergedInputs.addAll(winIns.remove(each))
+        }
+      })
+      winIns.put(mergedWin, mergedInputs)
+    }
+  }
+
+  override def trigger(time: Instant, runtimeContext: RuntimeContext): 
TriggeredOutputs[OUT] = {
+    @annotation.tailrec
+    def onTrigger(
+        outputs: ArrayBuffer[TimestampedValue[OUT]],
+        wmk: Instant): TriggeredOutputs[OUT] = {
+      if (windowInputs.notEmpty()) {
+        val firstWin = windowInputs.firstKey
+        if (!time.isBefore(firstWin.endTime)) {
+          val inputs = windowInputs.remove(firstWin)
+          if (!setup) {
+            fnRunner.setup(runtimeContext)
+            setup = true
+          }
+          inputs.forEach(new Procedure[TimestampedValue[IN]] {
+            override def value(tv: TimestampedValue[IN]): Unit = {
+              fnRunner.process(tv.value).foreach {
+                out: OUT => outputs += TimestampedValue(out, tv.timestamp)
+              }
+            }
+          })
+          fnRunner.finish().foreach {
+            out: OUT =>
+              outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1))
+          }
+          val newWmk = TaskUtil.max(wmk, firstWin.endTime)
+          if (windows.accumulationMode == Discarding) {
+            fnRunner.teardown(runtimeContext)
+            // discarding, setup need to be called for each window
+            setup = false
+          }
+          onTrigger(outputs, newWmk)
+        } else {
+          // minimum of end of last triggered window and start of first 
un-triggered window
+          TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime))
+        }
+      } else {
+        if (time == Watermark.MAX) {
+          TriggeredOutputs(outputs, Watermark.MAX)
+        } else {
+          TriggeredOutputs(outputs, wmk)
+        }
+      }
+    }
+
+    val triggeredOutputs = onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]], 
watermark)
+    watermark = TaskUtil.max(watermark, triggeredOutputs.watermark)
+    TriggeredOutputs(triggeredOutputs.outputs, watermark)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
new file mode 100644
index 0000000..d0b84cb
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkProcessor.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.sink
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.sink.DataSink
+
+object DataSinkProcessor {
+  def apply(
+      dataSink: DataSink,
+      parallelism: Int = 1,
+      description: String = "",
+      taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
+    : Processor[DataSinkTask] = {
+    Processor[DataSinkTask](parallelism, description = description,
+      taskConf.withValue[DataSink](DataSinkTask.DATA_SINK, dataSink))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkTask.scala
new file mode 100644
index 0000000..24be828
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/sink/DataSinkTask.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.sink
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StatefulTask}
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.{TaskContext}
+
+object DataSinkTask {
+  val DATA_SINK = "data_sink"
+}
+
+class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: 
DataSink)
+  extends StatefulTask(context, conf) {
+
+  def this(context: TaskContext, conf: UserConfig) = {
+    this(context, conf, 
conf.getValue[DataSink](DataSinkTask.DATA_SINK)(context.system).get)
+  }
+
+  override def open(runtimeContext: RuntimeContext): Unit = {
+    LOG.info("opening data sink...")
+    sink.open(context)
+  }
+
+  override def invoke(message: Message): Unit = {
+    sink.write(message)
+  }
+
+  override def close(runtimeContext: RuntimeContext): Unit = {
+    LOG.info("closing data sink...")
+    sink.close()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceProcessor.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceProcessor.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceProcessor.scala
new file mode 100644
index 0000000..de584d5
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceProcessor.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.source
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.window.api.{WindowFunction, Windows}
+import org.apache.gearpump.streaming.dsl.window.impl.Window
+import org.apache.gearpump.streaming.refactor.dsl.plan.functions.DummyRunner
+import 
org.apache.gearpump.streaming.refactor.dsl.window.impl.{DefaultWindowRunner, 
WindowRunner}
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.{Constants, Processor}
+
+object DataSourceProcessor {
+  def apply(
+      dataSource: DataSource,
+      parallelism: Int = 1,
+      description: String = "",
+      taskConf: UserConfig = UserConfig.empty)(implicit system: ActorSystem)
+    : Processor[DataSourceTask[Any, Any]] = {
+    Processor[DataSourceTask[Any, Any]](parallelism, description,
+      taskConf
+        .withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource)
+        .withValue[WindowRunner[Any, 
Any]](Constants.GEARPUMP_STREAMING_OPERATOR,
+        new DefaultWindowRunner[Any, Any](
+          Windows(PerElementWindowFunction, description = "perElementWindows"),
+          new DummyRunner[Any])))
+  }
+
+
+  case object PerElementWindowFunction extends WindowFunction {
+    override def apply[T](
+        context: WindowFunction.Context[T]): Array[Window] = {
+      Array(Window(context.timestamp, context.timestamp.plusMillis(1)))
+    }
+
+    override def isNonMerging: Boolean = true
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceTask.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceTask.scala
new file mode 100644
index 0000000..ac6468d
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/source/DataSourceTask.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.source
+
+import java.time.Instant
+
+import org.apache.gearpump._
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.refactor.dsl.task.TaskUtil
+import 
org.apache.gearpump.streaming.refactor.dsl.window.impl.{TimestampedValue, 
WindowRunner}
+import org.apache.gearpump.streaming.refactor.state.{RuntimeContext, 
StatefulTask}
+import org.apache.gearpump.streaming.source.{DataSource, DataSourceConfig, 
Watermark}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * Default Task container for 
[[org.apache.gearpump.streaming.source.DataSource]] that
+ * reads from DataSource in batch
+ * See [[org.apache.gearpump.streaming.source.DataSourceProcessor]] for its 
usage
+ *
+ * DataSourceTask calls:
+ *  - `DataSource.open()` in `onStart` and pass in
+ *  [[org.apache.gearpump.streaming.task.TaskContext]]
+ * and application start time
+ *  - `DataSource.read()` in each `onNext`, which reads a batch of messages
+ *  - `DataSource.close()` in `onStop`
+ */
+class DataSourceTask[IN, OUT] private[source](
+    source: DataSource,
+    windowRunner: WindowRunner[IN, OUT],
+    context: TaskContext,
+    conf: UserConfig)
+  extends StatefulTask(context, conf) {
+
+  private var runtimeContext: RuntimeContext = null
+
+  def this(context: TaskContext, conf: UserConfig) = {
+    this(
+      conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
+      conf.getValue[WindowRunner[IN, 
OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+      context, conf
+    )
+  }
+
+  private val batchSize = 
conf.getInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE).getOrElse(1000)
+
+  override def open(runtimeContext: RuntimeContext): Unit = {
+    this.runtimeContext = runtimeContext
+    LOG.info(s"opening data source at 
${runtimeContext.getStartTime.toEpochMilli}")
+    source.open(context, runtimeContext.getStartTime)
+    self ! Watermark(source.getWatermark)
+    super.open(runtimeContext)
+  }
+
+  override def invoke(m: Message): Unit = {
+    0.until(batchSize).foreach { _ =>
+      Option(source.read()).foreach(
+        msg => windowRunner.process(
+          TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)))
+    }
+
+    self ! Watermark(source.getWatermark)
+  }
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+    TaskUtil.trigger(watermark, windowRunner, context, this.runtimeContext)
+  }
+
+
+  override def close(stateContext: RuntimeContext): Unit = {
+    LOG.info("closing data source...")
+    source.close()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/fe410304/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
new file mode 100644
index 0000000..c387960
--- /dev/null
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/refactor/state/RuntimeContext.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.refactor.state
+
+import java.time.Instant
+
+import org.apache.gearpump.streaming.refactor.coder.Coder
+import org.apache.gearpump.streaming.refactor.state.api.StateInternals
+
+/**
+ *
+ */
+trait RuntimeContext {
+
+  def getStateInternals[KT](keyCoder: Coder[KT], key: KT): StateInternals
+
+  def getStartTime: Instant
+
+}


Reply via email to