http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
new file mode 100644
index 0000000..e065c90
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -0,0 +1,600 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.materializer
+
+import akka.actor.ActorSystem
+import akka.stream.impl.StreamLayout.Module
+import akka.stream.impl.Timers.{Completion, DelayInitial, Idle, IdleInject, 
IdleTimeoutBidi, Initial}
+import akka.stream.impl.fusing.{Batch, Collect, Delay, Drop, DropWhile, 
DropWithin, Filter, FlattenMerge, Fold, GraphStageModule, GroupBy, 
GroupedWithin, Intersperse, LimitWeighted, Log, MapAsync, MapAsyncUnordered, 
PrefixAndTail, Recover, Reduce, Scan, Split, StatefulMapConcat, SubSink, 
SubSource, Take, TakeWhile, TakeWithin, Map => FMap}
+import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, 
SimpleLinearGraphStage, SingleSource, TickSource}
+import akka.stream.impl.io.IncomingConnectionStage
+import akka.stream.impl.{HeadOptionStage, Stages, Throttle, Unfold, 
UnfoldAsync}
+import akka.stream.scaladsl.{Balance, Broadcast, Concat, Interleave, Merge, 
MergePreferred, MergeSorted, ModuleExtractor, Unzip, Zip, ZipWith2}
+import akka.stream.stage.AbstractStage.PushPullGraphStageWithMaterializedValue
+import akka.stream.stage.GraphStage
+import org.apache.gearpump.akkastream.GearAttributes
+import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge
+import org.apache.gearpump.akkastream.module.{GroupByModule, ProcessorModule, 
ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, 
SourceTaskModule}
+import org.apache.gearpump.akkastream.task.{BalanceTask, BatchTask, 
BroadcastTask, ConcatTask, DelayInitialTask, DropWithinTask, FlattenMergeTask, 
FoldTask, GraphTask, GroupedWithinTask, InterleaveTask, MapAsyncTask, 
MergeTask, SingleSourceTask, SinkBridgeTask, SourceBridgeTask, 
StatefulMapConcatTask, TakeWithinTask, ThrottleTask, TickSourceTask, Zip2Task}
+import org.apache.gearpump.akkastream.task.TickSourceTask.{INITIAL_DELAY, 
INTERVAL, TICK}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapper
+import org.apache.gearpump.streaming.dsl.plan.{ChainableOp, DataSinkOp, 
DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle}
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.window.api.CountWindow
+import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
+import org.apache.gearpump.util.Graph
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.Promise
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a 
Gearpump
+ * Streaming Application.
+ *
+ * @param graph Graph
+ * @param system ActorSystem
+ */
+class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
+
+  import RemoteMaterializerImpl._
+
+  type ID = String
+  private implicit val actorSystem = system
+
+  private def uuid: String = {
+    java.util.UUID.randomUUID.toString
+  }
+
+  def materialize: (StreamApplication, Map[Module, ProcessorId]) = {
+    val (opGraph, ids) = toOpGraph
+    val app: StreamApplication = new StreamApp("app", system, 
UserConfig.empty, opGraph)
+    val processorIds = resolveIds(app, ids)
+
+    val updatedApp = updateJunctionConfig(processorIds, app)
+    (removeIds(updatedApp), processorIds)
+  }
+
+  private def updateJunctionConfig(processorIds: Map[Module, ProcessorId],
+      app: StreamApplication): StreamApplication = {
+    val config = junctionConfig(processorIds)
+
+    val dag = app.dag.mapVertex { vertex =>
+      val processorId = vertex.id
+      val newConf = vertex.taskConf.withConfig(config(processorId))
+      vertex.copy(taskConf = newConf)
+    }
+    new StreamApplication(app.name, app.inputUserConfig, dag)
+  }
+
+  private def junctionConfig(processorIds: Map[Module, ProcessorId]):
+  Map[ProcessorId, UserConfig] = {
+    val updatedConfigs = graph.vertices.flatMap { vertex =>
+      buildShape(vertex, processorIds)
+    }.toMap
+    updatedConfigs
+  }
+
+  private def buildShape(vertex: Module, processorIds: Map[Module, 
ProcessorId]):
+  Option[(ProcessorId, UserConfig)] = {
+    def inProcessors(vertex: Module): List[ProcessorId] = {
+      vertex.shape.inlets.flatMap { inlet =>
+        graph.incomingEdgesOf(vertex).find(
+          _._2.to == inlet).map(_._1
+        ).flatMap(processorIds.get)
+      }.toList
+    }
+    def outProcessors(vertex: Module): List[ProcessorId] = {
+      vertex.shape.outlets.flatMap { outlet =>
+        graph.outgoingEdgesOf(vertex).find(
+          _._2.from == outlet).map(_._3
+        ).flatMap(processorIds.get)
+      }.toList
+    }
+    processorIds.get(vertex).map(processorId => {
+      (processorId, UserConfig.empty.
+        withValue(GraphTask.OUT_PROCESSORS, outProcessors(vertex)).
+        withValue(GraphTask.IN_PROCESSORS, inProcessors(vertex)))
+    })
+  }
+
+  private def resolveIds(app: StreamApplication, ids: Map[Module, ID]):
+  Map[Module, ProcessorId] = {
+    ids.flatMap { kv =>
+      val (module, id) = kv
+      val processorId = app.dag.vertices.find { processor =>
+        processor.taskConf.getString(id).isDefined
+      }.map(_.id)
+      processorId.map((module, _))
+    }
+  }
+
+  private def removeIds(app: StreamApplication): StreamApplication = {
+    val graph = app.dag.mapVertex { processor =>
+      val conf = removeId(processor.taskConf)
+      processor.copy(taskConf = conf)
+    }
+    new StreamApplication(app.name, app.inputUserConfig, graph)
+  }
+
+  private def removeId(conf: UserConfig): UserConfig = {
+    conf.filter { kv =>
+      kv._2 != RemoteMaterializerImpl.TRACKABLE
+    }
+  }
+
+  private def toOpGraph: (Graph[Op, OpEdge], Map[Module, ID]) = {
+    var matValues = collection.mutable.Map.empty[Module, ID]
+    val opGraph = graph.mapVertex[Op] { module =>
+      val name = uuid
+      val conf = UserConfig.empty.withString(name, 
RemoteMaterializerImpl.TRACKABLE)
+      matValues += module -> name
+      val parallelism = GearAttributes.count(module.attributes)
+      val op = module match {
+        case source: SourceTaskModule[_] =>
+          val updatedConf = conf.withConfig(source.conf)
+          DataSourceOp(source.source, parallelism, updatedConf, "source")
+        case sink: SinkTaskModule[_] =>
+          val updatedConf = conf.withConfig(sink.conf)
+          DataSinkOp(sink.sink, parallelism, updatedConf, "sink")
+        case sourceBridge: SourceBridgeModule[_, _] =>
+          ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, 
"source")
+        case processor: ProcessorModule[_, _, _] =>
+          val updatedConf = conf.withConfig(processor.conf)
+          ProcessorOp(processor.processor, parallelism, updatedConf, "source")
+        case sinkBridge: SinkBridgeModule[_, _] =>
+          ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
+        case groupBy: GroupByModule[_, _] =>
+          GroupByOp(GroupAlsoByWindow(groupBy.groupBy, 
CountWindow.apply(1).accumulating),
+            parallelism, "groupBy", conf)
+        case reduce: ReduceModule[_] =>
+          reduceOp(reduce.f, conf)
+        case graphStage: GraphStageModule =>
+          translateGraphStageWithMaterializedValue(graphStage, parallelism, 
conf)
+        case _ =>
+          null
+      }
+      if (op == null) {
+        throw new UnsupportedOperationException(
+          module.getClass.toString + " is not supported with 
RemoteMaterializer"
+        )
+      }
+      op
+    }.mapEdge[OpEdge] { (n1, edge, n2) =>
+      n2 match {
+        case chainableOp: ChainableOp[_, _]
+          if !n1.isInstanceOf[ProcessorOp[_]] && 
!n2.isInstanceOf[ProcessorOp[_]] =>
+          Direct
+        case _ =>
+          Shuffle
+      }
+    }
+    (opGraph, matValues.toMap)
+  }
+
+  private def translateGraphStageWithMaterializedValue(module: 
GraphStageModule,
+      parallelism: Int, conf: UserConfig): Op = {
+    module.stage match {
+      case tickSource: TickSource[_] =>
+        val tick: AnyRef = tickSource.tick.asInstanceOf[AnyRef]
+        val tiConf = conf.withValue[FiniteDuration](INITIAL_DELAY, 
tickSource.initialDelay).
+          withValue[FiniteDuration](INTERVAL, tickSource.interval).
+          withValue(TICK, tick)
+        ProcessorOp(classOf[TickSourceTask[_]], parallelism, tiConf, 
"tickSource")
+      case graphStage: GraphStage[_] =>
+        translateGraphStage(module, parallelism, conf)
+      case headOptionStage: HeadOptionStage[_] =>
+        headOptionOp(headOptionStage, conf)
+      case pushPullGraphStageWithMaterializedValue:
+        PushPullGraphStageWithMaterializedValue[_, _, _, _] =>
+        translateSymbolic(pushPullGraphStageWithMaterializedValue, conf)
+    }
+  }
+
+  private def translateGraphStage(module: GraphStageModule,
+      parallelism: Int, conf: UserConfig): Op = {
+    module.stage match {
+      case balance: Balance[_] =>
+        ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance")
+      case batch: Batch[_, _] =>
+        val batchConf = conf.withValue[_ => Long](BatchTask.COST, 
batch.costFn).
+          withLong(BatchTask.MAX, batch.max).
+          withValue[(_, _) => _](BatchTask.AGGREGATE, batch.aggregate).
+          withValue[_ => _](BatchTask.SEED, batch.seed)
+        ProcessorOp(classOf[BatchTask[_, _]],
+          parallelism, batchConf, "batch")
+      case broadcast: Broadcast[_] =>
+        val name = 
ModuleExtractor.unapply(broadcast).map(_.attributes.nameOrDefault()).get
+        ProcessorOp(classOf[BroadcastTask], parallelism, conf, name)
+      case collect: Collect[_, _] =>
+        collectOp(collect.pf, conf)
+      case concat: Concat[_] =>
+        ProcessorOp(classOf[ConcatTask], parallelism, conf, "concat")
+      case delayInitial: DelayInitial[_] =>
+        val dIConf = conf.withValue[FiniteDuration](
+          DelayInitialTask.DELAY_INITIAL, delayInitial.delay)
+        ProcessorOp(classOf[DelayInitialTask[_]], parallelism, dIConf, 
"delayInitial")
+      case dropWhile: DropWhile[_] =>
+        dropWhileOp(dropWhile.p, conf)
+      case flattenMerge: FlattenMerge[_, _] =>
+        ProcessorOp(classOf[FlattenMergeTask], parallelism, conf, 
"flattenMerge")
+      case fold: Fold[_, _] =>
+        val foldConf = conf.withValue(FoldTask.ZERO, 
fold.zero.asInstanceOf[AnyRef]).
+          withValue(FoldTask.AGGREGATOR, fold.f)
+        ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold")
+      case groupBy: GroupBy[_, _] =>
+        GroupByOp(GroupAlsoByWindow(groupBy.keyFor, 
CountWindow.apply(1).accumulating),
+          groupBy.maxSubstreams, "groupBy", conf)
+      case groupedWithin: GroupedWithin[_] =>
+        val diConf = 
conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d).
+          withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n)
+        ProcessorOp(classOf[GroupedWithinTask[_]], parallelism, diConf, 
"groupedWithin")
+      case idleInject: IdleInject[_, _] =>
+        // TODO
+        null
+      case idleTimeoutBidi: IdleTimeoutBidi[_, _] =>
+        // TODO
+        null
+      case incomingConnectionStage: IncomingConnectionStage =>
+        // TODO
+        null
+      case interleave: Interleave[_] =>
+        val ilConf = conf.withInt(InterleaveTask.INPUT_PORTS, 
interleave.inputPorts).
+          withInt(InterleaveTask.SEGMENT_SIZE, interleave.segmentSize)
+        ProcessorOp(classOf[InterleaveTask], parallelism, ilConf, "interleave")
+        null
+      case intersperse: Intersperse[_] =>
+        // TODO
+        null
+      case limitWeighted: LimitWeighted[_] =>
+        // TODO
+        null
+      case map: FMap[_, _] =>
+        mapOp(map.f, conf)
+      case mapAsync: MapAsync[_, _] =>
+        ProcessorOp(classOf[MapAsyncTask[_, _]],
+          mapAsync.parallelism, conf.withValue(MapAsyncTask.MAPASYNC_FUNC, 
mapAsync.f), "mapAsync")
+      case mapAsyncUnordered: MapAsyncUnordered[_, _] =>
+        ProcessorOp(classOf[MapAsyncTask[_, _]],
+          mapAsyncUnordered.parallelism,
+          conf.withValue(MapAsyncTask.MAPASYNC_FUNC, mapAsyncUnordered.f), 
"mapAsyncUnordered")
+      case materializedValueSource: MaterializedValueSource[_] =>
+        // TODO
+        null
+      case merge: Merge[_] =>
+        val mergeConf = conf.withBoolean(MergeTask.EAGER_COMPLETE, 
merge.eagerComplete).
+          withInt(MergeTask.INPUT_PORTS, merge.inputPorts)
+        ProcessorOp(classOf[MergeTask], parallelism, mergeConf, "merge")
+      case mergePreferred: MergePreferred[_] =>
+        MergeOp("mergePreferred", conf)
+      case mergeSorted: MergeSorted[_] =>
+        MergeOp("mergeSorted", conf)
+      case prefixAndTail: PrefixAndTail[_] =>
+        // TODO
+        null
+      case recover: Recover[_] =>
+        // TODO
+        null
+      case scan: Scan[_, _] =>
+        scanOp(scan.zero, scan.f, conf)
+      case simpleLinearGraphStage: SimpleLinearGraphStage[_] =>
+        translateSimpleLinearGraph(simpleLinearGraphStage, parallelism, conf)
+      case singleSource: SingleSource[_] =>
+        val singleSourceConf = conf.withValue[AnyRef](SingleSourceTask.ELEMENT,
+          singleSource.elem.asInstanceOf[AnyRef])
+        ProcessorOp(classOf[SingleSourceTask[_]], parallelism, 
singleSourceConf, "singleSource")
+      case split: Split[_] =>
+        // TODO
+        null
+      case statefulMapConcat: StatefulMapConcat[_, _] =>
+        val func = statefulMapConcat.f
+        val statefulMapConf =
+          conf.withValue[() => _ => Iterable[_]](StatefulMapConcatTask.FUNC, 
func)
+        ProcessorOp(classOf[StatefulMapConcatTask[_, _]], parallelism,
+          statefulMapConf, "statefulMapConcat")
+      case subSink: SubSink[_] =>
+        // TODO
+        null
+      case subSource: SubSource[_] =>
+        // TODO
+        null
+      case unfold: Unfold[_, _] =>
+        // TODO
+        null
+      case unfoldAsync: UnfoldAsync[_, _] =>
+        // TODO
+        null
+      case unzip: Unzip[_, _] =>
+        // ProcessorOp(classOf[Unzip2Task[_, _, _]], parallelism,
+        //   conf.withValue(
+        //     Unzip2Task.UNZIP2_FUNCTION, 
Unzip2Task.UnZipFunction(unzip.unzipper)), "unzip")
+        // TODO
+        null
+      case zip: Zip[_, _] =>
+        zipWithOp(zip.zipper, conf)
+      case zipWith2: ZipWith2[_, _, _] =>
+        ProcessorOp(classOf[Zip2Task[_, _, _]],
+          parallelism,
+          conf.withValue(
+            Zip2Task.ZIP2_FUNCTION, Zip2Task.ZipFunction(zipWith2.zipper)
+          ), "zipWith2")
+    }
+  }
+
+  private def translateSimpleLinearGraph(stage: SimpleLinearGraphStage[_],
+      parallelism: Int, conf: UserConfig): Op = {
+    stage match {
+      case completion: Completion[_] =>
+        // TODO
+        null
+      case delay: Delay[_] =>
+        // TODO
+        null
+      case drop: Drop[_] =>
+        dropOp(drop.count, conf)
+      case dropWithin: DropWithin[_] =>
+        val dropWithinConf =
+          conf.withValue[FiniteDuration](DropWithinTask.TIMEOUT, 
dropWithin.timeout)
+        ProcessorOp(classOf[DropWithinTask[_]],
+          parallelism, dropWithinConf, "dropWithin")
+      case filter: Filter[_] =>
+        filterOp(filter.p, conf)
+      case idle: Idle[_] =>
+        // TODO
+        null
+      case initial: Initial[_] =>
+        // TODO
+        null
+      case log: Log[_] =>
+        logOp(log.name, log.extract, conf)
+      case reduce: Reduce[_] =>
+        reduceOp(reduce.f, conf)
+      case take: Take[_] =>
+        takeOp(take.count, conf)
+      case takeWhile: TakeWhile[_] =>
+        filterOp(takeWhile.p, conf)
+      case takeWithin: TakeWithin[_] =>
+        val takeWithinConf =
+          conf.withValue[FiniteDuration](TakeWithinTask.TIMEOUT, 
takeWithin.timeout)
+        ProcessorOp(classOf[TakeWithinTask[_]],
+          parallelism, takeWithinConf, "takeWithin")
+      case throttle: Throttle[_] =>
+        val throttleConf = conf.withInt(ThrottleTask.COST, throttle.cost).
+          withInt(ThrottleTask.MAX_BURST, throttle.maximumBurst).
+          withValue[_ => Int](ThrottleTask.COST_CALC, 
throttle.costCalculation).
+          withValue[FiniteDuration](ThrottleTask.TIME_PERIOD, throttle.per)
+        ProcessorOp(classOf[ThrottleTask[_]],
+          parallelism, throttleConf, "throttle")
+    }
+  }
+
+  private def translateSymbolic(stage: 
PushPullGraphStageWithMaterializedValue[_, _, _, _],
+      conf: UserConfig): Op = {
+    stage match {
+      case symbolicGraphStage: Stages.SymbolicGraphStage[_, _, _]
+        if symbolicGraphStage.symbolicStage.attributes.equals(
+          Stages.DefaultAttributes.buffer) => {
+            // ignore the buffering operation
+            identity("buffer", conf)
+        }
+    }
+  }
+
+}
+
+object RemoteMaterializerImpl {
+  final val NotApplied: Any => Any = _ => NotApplied
+
+  def collectOp[In, Out](collect: PartialFunction[In, Out], conf: UserConfig): 
Op = {
+    flatMapOp({ data: In =>
+      collect.applyOrElse(data, NotApplied) match {
+        case NotApplied => None
+        case result: Any => Option(result)
+      }
+    }, "collect", conf)
+  }
+
+  def filterOp[In](filter: In => Boolean, conf: UserConfig): Op = {
+    flatMapOp({ data: In =>
+      if (filter(data)) Option(data) else None
+    }, "filter", conf)
+  }
+
+  def headOptionOp[T](headOptionStage: HeadOptionStage[T], conf: UserConfig): 
Op = {
+    val promise: Promise[Option[T]] = Promise()
+    flatMapOp({ data: T =>
+      data match {
+        case None =>
+          Some(promise.future.failed)
+        case Some(d) =>
+          promise.future.value
+      }
+    }, "headOption", conf)
+  }
+
+  def reduceOp[T](reduce: (T, T) => T, conf: UserConfig): Op = {
+    var result: Option[T] = None
+    val flatMap = { elem: T =>
+      result match {
+        case None =>
+          result = Some(elem)
+        case Some(r) =>
+          result = Some(reduce(r, elem))
+      }
+      List(result)
+    }
+    flatMapOp(flatMap, "reduce", conf)
+  }
+
+  def zipWithOp[In1, In2](zipWith: (In1, In2) => (In1, In2), conf: 
UserConfig): Op = {
+    val flatMap = { elem: (In1, In2) =>
+      val (e1, e2) = elem
+      val result: (In1, In2) = zipWith(e1, e2)
+      List(result)
+    }
+    flatMapOp(flatMap, "zipWith", conf)
+  }
+
+  def zipWithOp2[In1, In2, Out](zipWith: (In1, In2) => Out, conf: UserConfig): 
Op = {
+    val flatMap = { elem: (In1, In2) =>
+      val (e1, e2) = elem
+      val result: Out = zipWith(e1, e2)
+      List(result)
+    }
+    flatMapOp(flatMap, "zipWith", conf)
+  }
+
+  def identity(description: String, conf: UserConfig): Op = {
+    flatMapOp({ data: Any =>
+      List(data)
+    }, description, conf)
+  }
+
+  def mapOp[In, Out](map: In => Out, conf: UserConfig): Op = {
+    val flatMap = (data: In) => List(map(data))
+    flatMapOp (flatMap, conf)
+  }
+
+  def flatMapOp[In, Out](flatMap: In => Iterable[Out], conf: UserConfig): Op = 
{
+    flatMapOp(flatMap, "flatmap", conf)
+  }
+
+  def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String,
+      conf: UserConfig): Op = {
+    ChainableOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), 
conf)
+  }
+
+  def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out,
+    conf: UserConfig): Op = {
+    var agg = None: Option[Out]
+    val flatMap = {elem: In =>
+      agg = agg match {
+        case None =>
+          Some(seed(elem))
+        case Some(value) =>
+          Some(aggregate(value, elem))
+      }
+      List(agg.get)
+    }
+    flatMapOp (flatMap, "conflate", conf)
+  }
+
+  def foldOp[In, Out](zero: Out, fold: (Out, In) => Out, conf: UserConfig): Op 
= {
+    var aggregator: Out = zero
+    val map = { elem: In =>
+      aggregator = fold(aggregator, elem)
+      List(aggregator)
+    }
+    flatMapOp(map, "fold", conf)
+  }
+
+  def groupedOp(count: Int, conf: UserConfig): Op = {
+    var left = count
+    val buf = {
+      val b = Vector.newBuilder[Any]
+      b.sizeHint(count)
+      b
+    }
+
+    val flatMap: Any => Iterable[Any] = {input: Any =>
+      buf += input
+      left -= 1
+      if (left == 0) {
+        val emit = buf.result()
+        buf.clear()
+        left = count
+        Some(emit)
+      } else {
+        None
+      }
+    }
+    flatMapOp(flatMap, conf: UserConfig)
+  }
+
+  def dropOp[T](number: Long, conf: UserConfig): Op = {
+    var left = number
+    val flatMap: T => Iterable[T] = {input: T =>
+      if (left > 0) {
+        left -= 1
+        None
+      } else {
+        Some(input)
+      }
+    }
+    flatMapOp(flatMap, "drop", conf)
+  }
+
+  def dropWhileOp[In](drop: In => Boolean, conf: UserConfig): Op = {
+    flatMapOp({ data: In =>
+      if (drop(data))  None else Option(data)
+    }, "dropWhile", conf)
+  }
+
+  def logOp[T](name: String, extract: T => Any, conf: UserConfig): Op = {
+    val flatMap = {elem: T =>
+      LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}")
+      List(elem)
+    }
+    flatMapOp(flatMap, "log", conf)
+  }
+
+  def scanOp[In, Out](zero: Out, f: (Out, In) => Out, conf: UserConfig): Op = {
+    var aggregator = zero
+    var pushedZero = false
+
+    val flatMap = {elem: In =>
+      aggregator = f(aggregator, elem)
+
+      if (pushedZero) {
+        pushedZero = true
+        List(zero, aggregator)
+      } else {
+        List(aggregator)
+      }
+    }
+    flatMapOp(flatMap, "scan", conf)
+  }
+
+  def statefulMapOp[In, Out](f: In => Iterable[Out], conf: UserConfig): Op = {
+    flatMapOp ({ data: In =>
+      f(data)
+    }, conf)
+  }
+
+  def takeOp(count: Long, conf: UserConfig): Op = {
+    var left: Long = count
+
+    val filter: Any => Iterable[Any] = {elem: Any =>
+      left -= 1
+      if (left > 0) Some(elem)
+      else if (left == 0) Some(elem)
+      else None
+    }
+    flatMapOp(filter, "take", conf)
+  }
+
+  /**
+   * We use this attribute to track module to Processor
+   *
+   */
+  val TRACKABLE = "track how module is fused to processor"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
new file mode 100644
index 0000000..5b8c71b
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/BridgeModule.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ *
+ *
+ *   [[IN]] -> [[BridgeModule]] -> [[OUT]]
+ *                   /
+ *                  /
+ *       out of band data input or output channel [[MAT]]
+ *
+ *
+ * [[BridgeModule]] is used as a bridge between different materializers.
+ * Different [[akka.stream.Materializer]]s can use out of band channel to
+ * exchange messages.
+ *
+ * For example:
+ *
+ *                              Remote Materializer
+ *                         -----------------------------
+ *                         |                            |
+ *                         | BridgeModule -> RemoteSink |
+ *                         |  /                         |
+ *                         --/----------------------------
+ *   Local Materializer     /  out of band channel.
+ *   ----------------------/----
+ *   | Local              /    |
+ *   | Source ->  BridgeModule |
+ *   |                         |
+ *   ---------------------------
+ *
+ *
+ * Typically [[BridgeModule]] is created implicitly as a temporary intermediate
+ * module during materialization.
+ *
+ * However, user can still declare it explicitly. In this case, it means we 
have a
+ * boundary Source or Sink module which accept out of band channel inputs or
+ * outputs.
+ *
+ * @tparam IN input
+ * @tparam OUT output
+ * @tparam MAT materializer
+ */
+abstract class BridgeModule[IN, OUT, MAT] extends AtomicModule {
+  val inPort = Inlet[IN]("BridgeModule.in")
+  val outPort = Outlet[OUT]("BridgeModule.out")
+  override val shape = new FlowShape(inPort, outPort)
+
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a 
FlowModule")
+  } else {
+    this
+  }
+
+  def attributes: Attributes
+  def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT]
+
+  protected def newInstance: BridgeModule[IN, OUT, MAT]
+  override def carbonCopy: Module = newInstance
+}
+
+
+/**
+ *
+ * Bridge module which accept out of band channel Input
+ * [[org.reactivestreams.Publisher]][IN].
+ *
+ *
+ *         [[SourceBridgeModule]] -> [[OUT]]
+ *                   /|
+ *                  /
+ *       out of band data input [[org.reactivestreams.Publisher]][IN]
+ *
+ * @see [[BridgeModule]]
+ * @param attributes Attributes
+ * @tparam IN, input data type from out of band 
[[org.reactivestreams.Publisher]]
+ * @tparam OUT out put data type to next module.
+ */
+class SourceBridgeModule[IN, OUT](val attributes: Attributes =
+    Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, 
Subscriber[IN]] {
+  override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] =
+    new SourceBridgeModule[IN, OUT](attributes)
+
+  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, 
Subscriber[IN]] = {
+    new SourceBridgeModule( attributes)
+  }
+}
+
+/**
+ *
+ * Bridge module which accept out of band channel Output
+ * [[org.reactivestreams.Subscriber]][OUT].
+ *
+ *
+ *   [[IN]] -> [[BridgeModule]]
+ *                    \
+ *                     \
+ *                      \|
+ *       out of band data output [[org.reactivestreams.Subscriber]][OUT]
+ *
+ * @see [[BridgeModule]]
+ * @param attributes Attributes
+ * @tparam IN, input data type from previous module
+ * @tparam OUT out put data type to out of band subscriber
+ */
+class SinkBridgeModule[IN, OUT](val attributes: Attributes =
+    Attributes.name("sinkBridgeModule")) extends BridgeModule[IN, OUT, 
Publisher[OUT]] {
+  override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] =
+    new SinkBridgeModule[IN, OUT](attributes)
+
+  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, 
Publisher[OUT]] = {
+    new SinkBridgeModule[IN, OUT](attributes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
new file mode 100644
index 0000000..ea76bb0
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/DummyModule.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+import akka.stream.impl.{SinkModule, SourceModule}
+import akka.stream.{Attributes, MaterializationContext, SinkShape, SourceShape}
+import org.reactivestreams.{Publisher, Subscriber}
+
+/**
+ * [[DummyModule]] is a set of special module to help construct a 
RunnableGraph,
+ * so that all ports are closed.
+ *
+ * In runtime, [[DummyModule]] should be ignored during materialization.
+ *
+ * For example, if you have a [[BridgeModule]] which only accept the input
+ * message from out of band channel, then you can use DummySource to fake
+ * a Message Source Like this.
+ *
+ * [[DummySource]] -> [[BridgeModule]] -> Sink
+ *                      /|
+ *                     /
+ *       out of band input message [[Publisher]]
+ *
+ *  After materialization, [[DummySource]] will be removed.
+
+ *              [[BridgeModule]] -> Sink
+ *                      /|
+ *                     /
+ *           [[akka.stream.impl.PublisherSource]]
+ *
+ *
+ */
+trait DummyModule extends AtomicModule
+
+
+/**
+ *
+ *    [[DummySource]]-> [[BridgeModule]] -> Sink
+ *                        /|
+ *                       /
+ *       out of band input message Source
+ *
+ * @param attributes Attributes
+ * @param shape SourceShape[Out]
+ * @tparam Out Output
+ */
+class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out])
+  extends SourceModule[Out, Unit](shape) with DummyModule {
+
+  override def create(context: MaterializationContext): (Publisher[Out], Unit) 
= {
+    throw new UnsupportedOperationException()
+  }
+
+  override protected def newInstance(shape: SourceShape[Out]): 
SourceModule[Out, Unit] = {
+    new DummySource[Out](attributes, shape)
+  }
+
+  override def withAttributes(attr: Attributes): Module = {
+    new DummySource(attr, amendShape(attr))
+  }
+}
+
+
+/**
+ *
+ *    Source-> [[BridgeModule]] -> [[DummySink]]
+ *                    \
+ *                     \
+ *                      \|
+ *                   out of band output message [[Subscriber]]
+ *
+ * @param attributes Attributes
+ * @param shape SinkShape[IN]
+ */
+class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN])
+  extends SinkModule[IN, Unit](shape) with DummyModule {
+  override def create(context: MaterializationContext): (Subscriber[IN], Unit) 
= {
+    throw new UnsupportedOperationException()
+  }
+
+  override protected def newInstance(shape: SinkShape[IN]): SinkModule[IN, 
Unit] = {
+    new DummySink[IN](attributes, shape)
+  }
+
+  override def withAttributes(attr: Attributes): Module = {
+    new DummySink[IN](attr, amendShape(attr))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
new file mode 100644
index 0000000..dfbbee9
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GearpumpTaskModule.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+import akka.stream._
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * [[GearpumpTaskModule]] represent modules that can be materialized as 
Gearpump Tasks.
+ *
+ * This is specially designed for Gearpump runtime. It is not supposed to be 
used
+ * for local materializer.
+ * 
+ */
+trait GearpumpTaskModule extends AtomicModule
+
+/**
+ * This is used to represent the Gearpump Data Source
+ * @param source DataSource
+ * @param conf UserConfig
+ * @param shape SourceShape[T}
+ * @param attributes Attributes
+ * @tparam T type
+ */
+final case class SourceTaskModule[T](
+    source: DataSource,
+    conf: UserConfig,
+    shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")),
+    attributes: Attributes = Attributes.name("SourceTaskModule"))
+  extends GearpumpTaskModule {
+
+  override def withAttributes(attr: Attributes): Module =
+    this.copy(shape = amendShape(attr), attributes = attr)
+  override def carbonCopy: Module =
+    this.copy(shape = SourceShape( Outlet[T]("SourceTaskModule.out")))
+
+  override def replaceShape(s: Shape): Module =
+    if (s == shape) this
+    else throw new UnsupportedOperationException("cannot replace the shape of 
SourceTaskModule")
+
+  private def amendShape(attr: Attributes): SourceShape[T] = {
+    val thisN = attributes.nameOrDefault(null)
+    val thatN = attr.nameOrDefault(null)
+
+    if ((thatN eq null) || thisN == thatN) shape
+    else shape.copy(out = Outlet(thatN + ".out"))
+  }
+}
+
+/**
+ * This is used to represent the Gearpump Data Sink
+ * @param sink DataSink
+ * @param conf UserConfig
+ * @param shape SinkShape[IN]
+ * @param attributes Attributes
+ * @tparam IN type
+ */
+final case class SinkTaskModule[IN](
+    sink: DataSink,
+    conf: UserConfig,
+    shape: SinkShape[IN] = SinkShape[IN](Inlet[IN]("SinkTaskModule.in")),
+    attributes: Attributes = Attributes.name("SinkTaskModule"))
+  extends GearpumpTaskModule {
+
+  override def withAttributes(attr: Attributes): Module =
+    this.copy(shape = amendShape(attr), attributes = attr)
+  override def carbonCopy: Module =
+    this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in")))
+
+  override def replaceShape(s: Shape): Module =
+    if (s == shape) this
+    else throw new UnsupportedOperationException("cannot replace the shape of 
SinkTaskModule")
+
+  private def amendShape(attr: Attributes): SinkShape[IN] = {
+    val thisN = attributes.nameOrDefault(null)
+    val thatN = attr.nameOrDefault(null)
+
+    if ((thatN eq null) || thisN == thatN) shape
+    else shape.copy(in = Inlet(thatN + ".out"))
+  }
+}
+
+/**
+ * This is to represent the Gearpump Processor which has exact one input and 
one output
+ * @param processor Class[_ <: Task]
+ * @param conf UserConfig
+ * @param attributes Attributes
+ * @tparam IN type
+ * @tparam OUT type
+ * @tparam Unit void
+ */
+case class ProcessorModule[IN, OUT, Unit](
+    processor: Class[_ <: Task],
+    conf: UserConfig,
+    attributes: Attributes = Attributes.name("processorModule"))
+  extends AtomicModule with GearpumpTaskModule {
+  val inPort = Inlet[IN]("ProcessorModule.in")
+  val outPort = Outlet[IN]("ProcessorModule.out")
+  override val shape = new FlowShape(inPort, outPort)
+
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a 
FlowModule")
+  } else {
+    this
+  }
+  
+  override def carbonCopy: Module = newInstance
+
+  protected def newInstance: ProcessorModule[IN, OUT, Unit] =
+    new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
+
+  override def withAttributes(attributes: Attributes): ProcessorModule[IN, 
OUT, Unit] = {
+    new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
new file mode 100644
index 0000000..b06dd0e
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/GroupByModule.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+
+
+/**
+ *
+ * Group the T value groupBy function
+ *
+ * @param groupBy T => Group
+ * @param attributes Attributes
+ * @tparam T type
+ * @tparam Group type
+ */
+case class GroupByModule[T, Group](groupBy: T => Group,
+    attributes: Attributes = Attributes.name("groupByModule"))
+  extends AtomicModule {
+  val inPort = Inlet[T]("GroupByModule.in")
+  val outPort = Outlet[T]("GroupByModule.out")
+  override val shape = new FlowShape(inPort, outPort)
+
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a 
FlowModule")
+  } else {
+    this
+  }
+
+  override def carbonCopy: Module = newInstance
+
+  protected def newInstance: GroupByModule[T, Group] =
+    new GroupByModule[T, Group](groupBy, attributes)
+
+  override def withAttributes(attributes: Attributes): GroupByModule[T, Group] 
= {
+    new GroupByModule[T, Group](groupBy, attributes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
new file mode 100644
index 0000000..462d967
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/module/ReduceModule.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.module
+
+import akka.stream._
+import akka.stream.impl.StreamLayout.{AtomicModule, Module}
+
+
+/**
+ *
+ * Reduce Module
+ *
+ * @param f (T,T) => T
+ * @param attributes Attributes
+ * @tparam T type
+ */
+case class ReduceModule[T](f: (T, T) => T, attributes: Attributes =
+Attributes.name("reduceModule")) extends AtomicModule {
+  val inPort = Inlet[T]("GroupByModule.in")
+  val outPort = Outlet[T]("GroupByModule.out")
+  override val shape = new FlowShape(inPort, outPort)
+
+  override def replaceShape(s: Shape): Module = if (s != shape) {
+    throw new UnsupportedOperationException("cannot replace the shape of a 
FlowModule")
+  } else {
+    this
+  }
+
+  override def carbonCopy: Module = newInstance
+
+  protected def newInstance: ReduceModule[T] = new ReduceModule[T](f, 
attributes)
+
+  override def withAttributes(attributes: Attributes): ReduceModule[T] = {
+    new ReduceModule[T](f, attributes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
new file mode 100644
index 0000000..8e43c16
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/scaladsl/Api.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.scaladsl
+
+import akka.stream.Attributes
+import org.apache.gearpump.akkastream.module._
+import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.Task
+import org.reactivestreams.{Publisher, Subscriber}
+
+
+object GearSource{
+
+  /**
+   * Construct a Source which accepts out of band input messages.
+   *
+   *                   [[SourceBridgeModule]] -> Sink
+   *                          /
+   *                         /
+   *                        V
+   *                materialize to [[Subscriber]]
+   *                                   /|
+   *                                  /
+   *       upstream [[Publisher]] send out of band message
+   *
+   */
+  def bridge[IN, OUT]: Source[OUT, Subscriber[IN]] = {
+    val source = new Source(new DummySource[IN](Attributes.name("dummy"), 
Source.shape("dummy")))
+    val flow = new Flow[IN, OUT, Subscriber[IN]](new SourceBridgeModule[IN, 
OUT]())
+    source.viaMat(flow)(Keep.right)
+  }
+
+  /**
+   * Construct a Source from Gearpump [[DataSource]].
+   *
+   *    [[SourceTaskModule]] -> downstream Sink
+   *
+   */
+  def from[OUT](source: DataSource): Source[OUT, Unit] = {
+    val taskSource = new Source[OUT, Unit](SourceTaskModule(source, 
UserConfig.empty))
+    taskSource
+  }
+
+  /**
+   * Construct a Source from Gearpump 
[[org.apache.gearpump.streaming.Processor]].
+   *
+   *    [[ProcessorModule]] -> downstream Sink
+   *
+   */
+  def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, 
Unit] = {
+    val source = new Source(new DummySource[Unit](Attributes.name("dummy"), 
Source.shape("dummy")))
+    val flow = Processor.apply[Unit, OUT](processor, conf)
+    source.viaMat(flow)(Keep.right)
+  }
+}
+
+object GearSink {
+
+  /**
+   * Construct a Sink which output messages to a out of band channel.
+   *
+   *   Souce ->   [[SinkBridgeModule]]
+   *                    \
+   *                     \|
+   *         materialize to [[Publisher]]
+   *                              \
+   *                               \
+   *                                \|
+   *       send out of band message to downstream [[Subscriber]]
+   *
+   */
+  def bridge[IN, OUT]: Sink[IN, Publisher[OUT]] = {
+    val sink = new Sink(new DummySink[OUT](Attributes.name("dummy"), 
Sink.shape("dummy")))
+    val flow = new Flow[IN, OUT, Publisher[OUT]](new SinkBridgeModule[IN, 
OUT]())
+    flow.to(sink)
+  }
+
+  /**
+   * Construct a Sink from Gearpump [[DataSink]].
+   *
+   *    Upstream Source -> [[SinkTaskModule]]
+   *
+   */
+  def to[IN](sink: DataSink): Sink[IN, Unit] = {
+    val taskSink = new Sink[IN, Unit](new SinkTaskModule(sink, 
UserConfig.empty))
+    taskSink
+  }
+
+  /**
+   * Construct a Sink from Gearpump 
[[org.apache.gearpump.streaming.Processor]].
+   *
+   *    Upstream Source -> [[ProcessorModule]]
+   *
+   */
+  def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = {
+    val sink = new Sink(new DummySink[Unit](Attributes.name("dummy"), 
Sink.shape("dummy")))
+    val flow = Processor.apply[IN, Unit](processor, conf)
+    flow.to(sink)
+  }
+}
+
+/**
+ *
+ * GroupBy will divide the main input stream to a set of sub-streams.
+ * This is a work-around to bypass the limitation of official API Flow.groupBy
+ *
+ *
+ * For example, to do a word count, we can write code like this:
+ *
+ * case class KV(key: String, value: String)
+ * case class Count(key: String, count: Int)
+ *
+ * val flow: Flow[KV] = GroupBy[KV](foo).map{ kv =>
+ *   Count(kv.key, 1)
+ * }.fold(Count(null, 0)) {(base, add) =>
+ *   Count(add.key, base.count + add.count)
+ * }.log("count of current key")
+ * .flatten()
+ * .to(sink)
+ *
+ * map, fold will transform data on all sub-streams, If there are 10 groups,
+ * then there will be 10 sub-streams, and for each sub-stream, there will be
+ * a map and fold.
+ *
+ * flatten will collect all sub-stream into the main stream,
+ *
+ * sink will only operate on the main stream.
+ *
+ */
+object GroupBy{
+  def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = {
+    new Flow[T, T, Unit](new GroupByModule(groupBy))
+  }
+}
+
+/**
+ * Aggregate on the data.
+ *
+ * val flow = Reduce({(a: Int, b: Int) => a + b})
+ *
+ *
+ */
+object Reduce{
+  def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = {
+    new Flow[T, T, Unit](new ReduceModule(reduce))
+  }
+}
+
+
+/**
+ * Create a Flow by providing a Gearpump Processor class and configuration
+ *
+ *
+ */
+object Processor{
+  def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, 
Out, Unit] = {
+    new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, 
conf))
+  }
+}
+
+object Implicits {
+
+  /**
+   * Help util to support reduce and groupBy
+   */
+  implicit class SourceOps[T, Mat](source: Source[T, Mat]) {
+
+    // TODO It is named as groupBy2 to avoid conflict with built-in
+    // groupBy. Eventually, we think the built-in groupBy should
+    // be replace with this implementation.
+    def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = {
+      val stage = GroupBy.apply(groupBy)
+      source.via[T, Unit](stage)
+    }
+
+
+    def reduce(reduce: (T, T) => T): Source[T, Mat] = {
+      val stage = Reduce.apply(reduce)
+      source.via[T, Unit](stage)
+    }
+
+    def process[R](processor: Class[_ <: Task], conf: UserConfig): Source[R, 
Mat] = {
+      val stage = Processor.apply[T, R](processor, conf)
+      source.via(stage)
+    }
+  }
+
+  /**
+   * Help util to support reduce and groupBy
+   */
+  implicit class FlowOps[IN, OUT, Mat](flow: Flow[IN, OUT, Mat]) {
+    def groupBy2[Group](groupBy: OUT => Group): Flow[IN, OUT, Mat] = {
+      val stage = GroupBy.apply(groupBy)
+      flow.via(stage)
+    }
+
+    def reduce(reduce: (OUT, OUT) => OUT): Flow[IN, OUT, Mat] = {
+      val stage = Reduce.apply(reduce)
+      flow.via(stage)
+    }
+
+    def process[R](processor: Class[_ <: Task], conf: UserConfig): Flow[IN, R, 
Mat] = {
+      val stage = Processor.apply[OUT, R](processor, conf)
+      flow.via(stage)
+    }
+  }
+
+  /**
+   * Help util to support groupByKey and sum
+   */
+  implicit class KVSourceOps[K, V, Mat](source: Source[(K, V), Mat]) {
+
+    /**
+     * if it is a KV Pair, we can group the KV pair by the key.
+     * @return
+     */
+    def groupByKey: Source[(K, V), Mat] = {
+      val stage = GroupBy.apply(getTupleKey[K, V])
+      source.via(stage)
+    }
+
+    /**
+     * do sum on values
+     *
+     * Before doing this, you need to do groupByKey to group same key together
+     * , otherwise, it will do the sum no matter what current key is.
+     *
+     * @param numeric Numeric[V]
+     * @return
+     */
+    def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = {
+      val stage = Reduce.apply(sumByKey[K, V](numeric))
+      source.via(stage)
+    }
+  }
+
+  /**
+   * Help util to support groupByKey and sum
+   */
+  implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) {
+
+    /**
+     * if it is a KV Pair, we can group the KV pair by the key.
+     * @return
+     */
+    def groupByKey: Flow[(K, V), (K, V), Mat] = {
+      val stage = GroupBy.apply(getTupleKey[K, V])
+      flow.via(stage)
+    }
+
+    /**
+     * do sum on values
+     *
+     * Before doing this, you need to do groupByKey to group same key together
+     * , otherwise, it will do the sum no matter what current key is.
+     *
+     * @param numeric Numeric[V]
+     * @return
+     */
+    def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = {
+      val stage = Reduce.apply(sumByKey[K, V](numeric))
+      flow.via(stage)
+    }
+  }
+
+  private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
+
+  private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, 
V]) => Tuple2[K, V] =
+    (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
new file mode 100644
index 0000000..43f07c4
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BalanceTask.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class BalanceTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val sizeOfOutputs = sizeOfOutPorts
+  var index = 0
+
+  override def onNext(msg : Message) : Unit = {
+    output(index, msg)
+    index += 1
+    if (index == sizeOfOutputs) {
+      index = 0
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
new file mode 100644
index 0000000..5c2485b
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BatchTask.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+class BatchTask[In, Out](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val max = userConf.getLong(BatchTask.MAX)
+  val costFunc = userConf.getValue[In => Long](BatchTask.COST)
+  val aggregate = userConf.getValue[(Out, In) => Out](BatchTask.AGGREGATE)
+  val seed = userConf.getValue[In => Out](BatchTask.SEED)
+
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[In]
+    val time = msg.timestamp
+    context.output(msg)
+  }
+}
+
+object BatchTask {
+  val AGGREGATE = "AGGREGATE"
+  val COST = "COST"
+  val MAX = "MAX"
+  val SEED = "SEED"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
new file mode 100644
index 0000000..292468d
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/BroadcastTask.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class BroadcastTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+  override def onNext(msg : Message) : Unit = {
+    context.output(msg)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
new file mode 100644
index 0000000..b77b9bd
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/ConcatTask.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class ConcatTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val sizeOfOutputs = sizeOfOutPorts
+  var index = 0
+
+  override def onNext(msg : Message) : Unit = {
+    output(index, msg)
+    index += 1
+    if (index == sizeOfOutputs) {
+      index = 0
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
new file mode 100644
index 0000000..7c335dc
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DelayInitialTask.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
+
+case object DelayInitialTime
+
+class DelayInitialTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val delayInitial = 
userConf.getValue[FiniteDuration](DelayInitialTask.DELAY_INITIAL).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  var delayInitialActive = true
+
+  override def onStart(startTime: Instant): Unit = {
+    context.scheduleOnce(delayInitial)(
+      self ! Message(DelayInitialTime, System.currentTimeMillis())
+    )
+  }
+  override def onNext(msg : Message) : Unit = {
+    msg.msg match {
+      case DelayInitialTime =>
+        delayInitialActive = false
+      case _ =>
+        delayInitialActive match {
+          case true =>
+          case false =>
+            context.output(msg)
+        }
+    }
+  }
+}
+
+object DelayInitialTask {
+  val DELAY_INITIAL = "DELAY_INITIAL"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
new file mode 100644
index 0000000..0c54829
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/DropWithinTask.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+case object DropWithinTimeout
+
+class DropWithinTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val timeout = userConf.getValue[FiniteDuration](DropWithinTask.TIMEOUT).
+    getOrElse(FiniteDuration(0, TimeUnit.MINUTES))
+  var timeoutActive = true
+
+  override def onStart(startTime: Instant): Unit = {
+    context.scheduleOnce(timeout)(
+      self ! Message(DropWithinTimeout, System.currentTimeMillis())
+    )
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    msg.msg match {
+      case DropWithinTimeout =>
+        timeoutActive = false
+      case _ =>
+
+    }
+    timeoutActive match {
+      case true =>
+      case false =>
+        context.output(msg)
+    }
+  }
+}
+
+object DropWithinTask {
+  val TIMEOUT = "TIMEOUT"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
new file mode 100644
index 0000000..14ff537
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FlattenMergeTask.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class FlattenMergeTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val sizeOfOutputs = sizeOfOutPorts
+  var index = 0
+
+  override def onNext(msg : Message) : Unit = {
+    output(index, msg)
+    index += 1
+    if (index == sizeOfOutputs) {
+      index = 0
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
new file mode 100644
index 0000000..d982ebd
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/FoldTask.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class FoldTask[In, Out](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val zero = userConf.getValue[Out](FoldTask.ZERO)
+  val aggregator = userConf.getValue[(Out, In) => Out](FoldTask.AGGREGATOR)
+  var aggregated: Out = _
+  implicit val ec = context.system.dispatcher
+
+  override def onStart(instant: Instant): Unit = {
+    zero.foreach(value => {
+      aggregated = value
+    })
+  }
+
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[In]
+    val time = msg.timestamp
+    aggregator.foreach(func => {
+      aggregated = func(aggregated, data)
+      LOG.info(s"aggregated = $aggregated")
+      val msg = new Message(aggregated, time)
+      context.output(msg)
+    })
+  }
+}
+
+object FoldTask {
+  val ZERO = "ZERO"
+  val AGGREGATOR = "AGGREGATOR"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
new file mode 100644
index 0000000..3310ab9
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GraphTask.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.akkastream.task.GraphTask.{Index, PortId}
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.ProcessorId
+import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskWrapper}
+
+class GraphTask(inputTaskContext : TaskContext, userConf : UserConfig)
+  extends Task(inputTaskContext, userConf) {
+
+  private val context = inputTaskContext.asInstanceOf[TaskWrapper]
+  protected val outMapping =
+    
portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.OUT_PROCESSORS).get)
+  protected val inMapping =
+    
portsMapping(userConf.getValue[List[ProcessorId]](GraphTask.IN_PROCESSORS).get)
+
+  val sizeOfOutPorts = outMapping.keys.size
+  val sizeOfInPorts = inMapping.keys.size
+  
+  private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] 
= {
+    val portToProcessor = processors.zipWithIndex.map{kv =>
+      (kv._2, kv._1)
+    }.toMap
+
+    val processorToIndex = processors.sorted.zipWithIndex.toMap
+
+    val portToIndex = portToProcessor.map{kv =>
+      val (outlet, processorId) = kv
+      val index = processorToIndex(processorId)
+      (outlet, index)
+    }
+    portToIndex
+  }
+
+  def output(outletId: Int, msg: Message): Unit = {
+    context.output(outMapping(outletId), msg)
+  }
+
+  override def onStart(startTime : Instant) : Unit = {}
+
+  override def onStop() : Unit = {}
+}
+
+object GraphTask {
+  val OUT_PROCESSORS = "org.apache.gearpump.akkastream.task.outprocessors"
+  val IN_PROCESSORS = "org.apache.gearpump.akkastream.task.inprocessors"
+
+  type PortId = Int
+  type Index = Int
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
new file mode 100644
index 0000000..eaf2b3f
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/GroupedWithinTask.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.collection.immutable.VectorBuilder
+import scala.concurrent.duration.FiniteDuration
+
+class GroupedWithinTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  case object GroupedWithinTrigger
+  val buf: VectorBuilder[T] = new VectorBuilder
+  val timeWindow = 
userConf.getValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW)
+  val batchSize = userConf.getInt(GroupedWithinTask.BATCH_SIZE)
+
+  override def onNext(msg : Message) : Unit = {
+
+  }
+}
+
+object GroupedWithinTask {
+  val BATCH_SIZE = "BATCH_SIZE"
+  val TIME_WINDOW = "TIME_WINDOW"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
new file mode 100644
index 0000000..741ec43
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/InterleaveTask.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class InterleaveTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val sizeOfInputs = sizeOfInPorts
+  var index = 0
+
+  // TODO access upstream and pull
+  override def onNext(msg : Message) : Unit = {
+    output(index, msg)
+    index += 1
+    if (index == sizeOfInputs) {
+      index = 0
+    }
+  }
+}
+
+object InterleaveTask {
+  val INPUT_PORTS = "INPUT_PORTS"
+  val SEGMENT_SIZE = "SEGMENT_SIZE"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
new file mode 100644
index 0000000..daa1afc
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MapAsyncTask.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.Future
+
+class MapAsyncTask[In, Out](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val f = userConf.getValue[In => Future[Out]](MapAsyncTask.MAPASYNC_FUNC)
+  implicit val ec = context.system.dispatcher
+
+  override def onNext(msg : Message) : Unit = {
+    val data = msg.msg.asInstanceOf[In]
+    val time = msg.timestamp
+    f match {
+      case Some(func) =>
+        val fout = func(data)
+        fout.onComplete(value => {
+          value.foreach(out => {
+            val msg = new Message(out, time)
+            context.output(msg)
+          })
+        })
+      case None =>
+    }
+  }
+}
+
+object MapAsyncTask {
+  val MAPASYNC_FUNC = "MAPASYNC_FUNC"
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
new file mode 100644
index 0000000..ad18f72
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/MergeTask.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+class MergeTask(context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val eagerComplete = userConf.getBoolean(MergeTask.EAGER_COMPLETE)
+  val inputPorts = userConf.getInt(MergeTask.INPUT_PORTS)
+
+  override def onNext(msg : Message) : Unit = {
+    context.output(msg)
+  }
+}
+
+object MergeTask {
+  val EAGER_COMPLETE = "EAGER_COMPLETE"
+  val INPUT_PORTS = "INPUT_PORTS"
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
new file mode 100644
index 0000000..458bb4e
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/task/SingleSourceTask.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream.task
+
+import java.time.Instant
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.task.TaskContext
+
+import scala.concurrent.duration.FiniteDuration
+
+class SingleSourceTask[T](context: TaskContext, userConf : UserConfig)
+  extends GraphTask(context, userConf) {
+
+  val elem = userConf.getValue[T](SingleSourceTask.ELEMENT).get
+
+  override def onNext(msg : Message) : Unit = {
+    context.output(Message(elem, msg.timestamp))
+  }
+}
+
+object SingleSourceTask {
+  val ELEMENT = "ELEMENT"
+}

Reply via email to