[GERAPUMP-22] Merge akka-streams branch into master

Author: manuzhang <[email protected]>
Author: Kam Kasravi <[email protected]>

Closes #137 from manuzhang/akka-streams.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/2913a1fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/2913a1fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/2913a1fd

Branch: refs/heads/master
Commit: 2913a1fd89c564ec20e2cec8f54caafb01988296
Parents: 5f90b70
Author: manuzhang <[email protected]>
Authored: Fri Jan 20 20:42:42 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Fri Jan 20 20:43:15 2017 +0800

----------------------------------------------------------------------
 experiments/akkastream/README.md                |   4 +-
 .../src/main/resources/geardefault.conf         |   2 +-
 .../scala/akka/stream/BaseMaterializer.scala    |  47 --
 .../main/scala/akka/stream/ModuleGraph.scala    | 298 ---------
 .../akka/stream/gearpump/GearAttributes.scala   |  90 ---
 .../stream/gearpump/GearpumpMaterializer.scala  |  71 ---
 .../akka/stream/gearpump/example/Test.scala     |  69 ---
 .../akka/stream/gearpump/example/Test2.scala    |  71 ---
 .../akka/stream/gearpump/example/Test3.scala    |  59 --
 .../akka/stream/gearpump/example/Test4.scala    |  50 --
 .../akka/stream/gearpump/example/Test5.scala    |  68 ---
 .../akka/stream/gearpump/example/Test6.scala    |  67 ---
 .../stream/gearpump/example/WikipediaApp.scala  | 143 -----
 .../stream/gearpump/graph/GraphCutter.scala     | 183 ------
 .../akka/stream/gearpump/graph/LocalGraph.scala |  81 ---
 .../stream/gearpump/graph/RemoteGraph.scala     | 106 ----
 .../akka/stream/gearpump/graph/SubGraph.scala   |  56 --
 .../materializer/LocalMaterializer.scala        | 152 -----
 .../materializer/LocalMaterializerImpl.scala    | 284 ---------
 .../materializer/RemoteMaterializerImpl.scala   | 453 --------------
 .../stream/gearpump/module/BridgeModule.scala   | 124 ----
 .../stream/gearpump/module/DummyModule.scala    | 103 ----
 .../gearpump/module/GearpumpTaskModule.scala    | 133 ----
 .../stream/gearpump/module/GroupByModule.scala  |  46 --
 .../stream/gearpump/module/ReduceModule.scala   |  44 --
 .../akka/stream/gearpump/scaladsl/Api.scala     | 282 ---------
 .../akka/stream/gearpump/task/BalanceTask.scala |  37 --
 .../stream/gearpump/task/BroadcastTask.scala    |  29 -
 .../akka/stream/gearpump/task/GraphTask.scala   |  70 ---
 .../stream/gearpump/task/SinkBridgeTask.scala   | 125 ----
 .../stream/gearpump/task/SourceBridgeTask.scala | 107 ----
 .../akka/stream/gearpump/task/UnZip2Task.scala  |  45 --
 .../gearpump/util/MaterializedValueOps.scala    |  38 --
 .../gearpump/akkastream/GearAttributes.scala    |  89 +++
 .../akkastream/GearpumpMaterializer.scala       | 295 +++++++++
 .../GearpumpMaterializerSession.scala           | 152 +++++
 .../gearpump/akkastream/example/Test.scala      |  62 ++
 .../gearpump/akkastream/example/Test10.scala    |  82 +++
 .../gearpump/akkastream/example/Test11.scala    |  72 +++
 .../gearpump/akkastream/example/Test12.scala    |  81 +++
 .../gearpump/akkastream/example/Test13.scala    | 177 ++++++
 .../gearpump/akkastream/example/Test14.scala    |  73 +++
 .../gearpump/akkastream/example/Test15.scala    |  72 +++
 .../gearpump/akkastream/example/Test16.scala    |  49 ++
 .../gearpump/akkastream/example/Test2.scala     |  77 +++
 .../gearpump/akkastream/example/Test3.scala     |  70 +++
 .../gearpump/akkastream/example/Test4.scala     |  50 ++
 .../gearpump/akkastream/example/Test5.scala     |  67 +++
 .../gearpump/akkastream/example/Test6.scala     |  90 +++
 .../gearpump/akkastream/example/Test7.scala     |  56 ++
 .../gearpump/akkastream/example/Test8.scala     |  66 ++
 .../gearpump/akkastream/example/Test9.scala     |  87 +++
 .../akkastream/example/WikipediaApp.scala       | 157 +++++
 .../akkastream/graph/GraphPartitioner.scala     | 205 +++++++
 .../gearpump/akkastream/graph/LocalGraph.scala  |  80 +++
 .../gearpump/akkastream/graph/RemoteGraph.scala | 113 ++++
 .../gearpump/akkastream/graph/SubGraph.scala    |  59 ++
 .../materializer/LocalMaterializerImpl.scala    | 333 ++++++++++
 .../materializer/RemoteMaterializerImpl.scala   | 600 +++++++++++++++++++
 .../akkastream/module/BridgeModule.scala        | 135 +++++
 .../akkastream/module/DummyModule.scala         | 105 ++++
 .../akkastream/module/GearpumpTaskModule.scala  | 135 +++++
 .../akkastream/module/GroupByModule.scala       |  55 ++
 .../akkastream/module/ReduceModule.scala        |  52 ++
 .../gearpump/akkastream/scaladsl/Api.scala      | 289 +++++++++
 .../gearpump/akkastream/task/BalanceTask.scala  |  38 ++
 .../gearpump/akkastream/task/BatchTask.scala    |  50 ++
 .../akkastream/task/BroadcastTask.scala         |  30 +
 .../gearpump/akkastream/task/ConcatTask.scala   |  38 ++
 .../akkastream/task/DelayInitialTask.scala      |  61 ++
 .../akkastream/task/DropWithinTask.scala        |  62 ++
 .../akkastream/task/FlattenMergeTask.scala      |  38 ++
 .../gearpump/akkastream/task/FoldTask.scala     |  56 ++
 .../gearpump/akkastream/task/GraphTask.scala    |  71 +++
 .../akkastream/task/GroupedWithinTask.scala     |  44 ++
 .../akkastream/task/InterleaveTask.scala        |  44 ++
 .../gearpump/akkastream/task/MapAsyncTask.scala |  53 ++
 .../gearpump/akkastream/task/MergeTask.scala    |  39 ++
 .../akkastream/task/SingleSourceTask.scala      |  43 ++
 .../akkastream/task/SinkBridgeTask.scala        | 131 ++++
 .../akkastream/task/SourceBridgeTask.scala      | 116 ++++
 .../akkastream/task/StatefulMapConcatTask.scala |  50 ++
 .../akkastream/task/TakeWithinTask.scala        |  62 ++
 .../gearpump/akkastream/task/ThrottleTask.scala |  53 ++
 .../akkastream/task/TickSourceTask.scala        |  56 ++
 .../gearpump/akkastream/task/Unzip2Task.scala   |  46 ++
 .../gearpump/akkastream/task/Zip2Task.scala     |  57 ++
 .../akkastream/util/MaterializedValueOps.scala  |  40 ++
 .../akka/stream/gearpump/AttributesSpec.scala   |  33 -
 .../gearpump/akkastream/AttributesSpec.scala    |  34 ++
 project/BuildDashboard.scala                    |   6 +-
 project/BuildExperiments.scala                  |   2 +-
 project/Dependencies.scala                      |   8 +-
 .../gearpump/services/AppMasterService.scala    |   2 +-
 .../gearpump/services/MasterService.scala       |   2 +-
 .../gearpump/services/SecurityService.scala     |   4 +-
 .../gearpump/services/StaticService.scala       |   8 +-
 .../gearpump/services/WorkerService.scala       |   2 +-
 .../gearpump/streaming/StreamApplication.scala  |   2 +-
 .../apache/gearpump/streaming/dsl/plan/OP.scala |   5 +-
 100 files changed, 5420 insertions(+), 3588 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/README.md
----------------------------------------------------------------------
diff --git a/experiments/akkastream/README.md b/experiments/akkastream/README.md
index 7c9a316..fe04554 100644
--- a/experiments/akkastream/README.md
+++ b/experiments/akkastream/README.md
@@ -1,4 +1,2 @@
 Akka Stream 
-=========
-
-TODO: This directory is obsolte. Working on updating it to Akka 2.4.3.
+=========
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/resources/geardefault.conf 
b/experiments/akkastream/src/main/resources/geardefault.conf
index 626a1dc..8584511 100644
--- a/experiments/akkastream/src/main/resources/geardefault.conf
+++ b/experiments/akkastream/src/main/resources/geardefault.conf
@@ -2,4 +2,4 @@ gearpump.serializers {
   "akka.stream.gearpump.example.WikipediaApp$WikidataElement" = ""
   "scala.collection.immutable.Map$Map1" = ""
   "scala.collection.immutable.Map$Map2" = ""
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala 
b/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala
deleted file mode 100644
index d2b328d..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream
-
-import scala.concurrent.ExecutionContextExecutor
-
-/**
- * [[BaseMaterializer]] is a extension to [[akka.stream.Materializer]].
- *
- * Compared with [[akka.stream.Materializer]], the difference is that
- * [[materialize]] accepts a [[ModuleGraph]] instead of a RunnableGraph.
- *
- * @see [[ModuleGraph]] for the difference between RunnableGraph and
- *      [[ModuleGraph]]
- *
- */
-abstract class BaseMaterializer extends akka.stream.Materializer {
-
-  override def withNamePrefix(name: String): Materializer = throw new 
UnsupportedOperationException()
-
-  override implicit def executionContext: ExecutionContextExecutor = throw new 
UnsupportedOperationException()
-
-  def materialize[Mat](graph: ModuleGraph[Mat]): Mat
-
-  override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = 
{
-    val graph = ModuleGraph(runnableGraph)
-    materialize(graph)
-  }
-
-  def shutdown(): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala 
b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
deleted file mode 100644
index 48d06f7..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * This code is similar to MaterializerSession and will be deprecated in the 
next release.
- */
-
-package akka.stream
-
-import scala.collection.mutable
-
-import akka.stream.Attributes.Attribute
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.util.MaterializedValueOps
-import akka.stream.impl.StreamLayout._
-import akka.stream.impl._
-import akka.stream.{Graph => AkkaGraph}
-
-import _root_.org.apache.gearpump.util
-import _root_.org.apache.gearpump.util.Graph
-
-/**
- *
- * ModuleGraph is a transformation on [[akka.stream.scaladsl.RunnableGraph]].
- * It carries all the information of [[akka.stream.scaladsl.RunnableGraph]], 
but
- * represents it in a different way.
- *
- * Here is the difference:
- *
- * RunnableGraph
- * ==============================
- * [[akka.stream.scaladsl.RunnableGraph]] is represented as a [[Module]] tree:
- *             TopLevelModule
- *                   |
- *          -------------------
- *          |                 |
- *      SubModule1         SubModule2
- *          |                    |
- *   ----------------           ----------
- *   |              |                    |
- * AtomicModule1 AtomicModule2  AtomicModule3
- *
- * ModuleGraph
- * ==============================
- * [[ModuleGraph]] is represented as a [[util.Graph]] of Atomic [[Module]]:
- *
- *        AtomicModule2 -> AtomicModule4
- *         /|                    \
- *       /                        \
- *     /                           \|
- * AtomicModule1           AtomicModule5
- *     \                   /|
- *      \                 /
- *       \|              /
- *         AtomicModule3
- *
- * Each vertex in the Graph is a [[Module]], each [[Edge]] in the Graph is a 
tuple
- * ([[OutPort]], [[InPort]]). [[OutPort]] is one of upstream Atomic Module
- * output ports. [[InPort]] is one of downstream Atomic Module input ports.
- *
- *
- * Why use [[ModuleGraph]] instead of [[akka.stream.scaladsl.RunnableGraph]]?
- * =========================
- * There are several good reasons:):
- * 1. [[ModuleGraph]] outlines explicitly the upstream/downstream relation.
- *   Each [[Edge]] of [[ModuleGraph]] represent a upstream/downstream pair.
- *   It is easier for user to understand the overall data flow.
- *
- * 2. It is easier for performance optimization.
- *  For the above Graph, if we want to fuse AtomicModule2 and AtomicModule3
- *  together, it can be done within [[ModuleGraph]]. We only need
- *  to substitute Pair(AtomicModule2, AtomicModule4) with a unified Module.
- *
- * 3. It avoids module duplication.
- *  In [[akka.stream.scaladsl.RunnableGraph]], composite Module can be re-used.
- *  It is possible that there can be duplicate Modules.
- *  The duplication problem causes big headache when doing materialization.
- *
- *  [[ModuleGraph]] doesn't have thjis problem. [[ModuleGraph]] does a 
transformation on the Module
- *  Tree to make sure each Atomic Module [[ModuleGraph]] is unique.
- *
- *
- * @param graph a Graph of Atomic modules.
- * @param mat is a function of:
- *             input => materialized value of each Atomic module
- *             output => final materialized value.
- * @tparam Mat
- */
-class ModuleGraph[Mat](val graph: util.Graph[Module, Edge], val mat: 
MaterializedValueNode) {
-
-  def resolve(materializedValues: Map[Module, Any]): Mat = {
-    MaterializedValueOps(mat).resolve[Mat](materializedValues)
-  }
-}
-
-object ModuleGraph {
-
-  def apply[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): ModuleGraph[Mat] 
= {
-    val topLevel = runnableGraph.module
-    val factory = new ModuleGraphFactory(topLevel)
-    val (graph, mat) = factory.create()
-    new ModuleGraph(graph, mat)
-  }
-
-  /**
-   *
-   * @param from outport of upstream module
-   * @param to inport of downstream module
-   */
-  case class Edge(from: OutPort, to: InPort)
-
-  private class ModuleGraphFactory(val topLevel: StreamLayout.Module) {
-
-    private var subscribersStack: List[mutable.Map[InPort, (InPort, Module)]] =
-      mutable.Map.empty[InPort, (InPort, Module)].withDefaultValue(null) :: Nil
-    private var publishersStack: List[mutable.Map[OutPort, (OutPort, Module)]] 
=
-      mutable.Map.empty[OutPort, (OutPort, Module)].withDefaultValue(null) :: 
Nil
-
-    /*
-     * Please note that this stack keeps track of the scoped modules wrapped 
in CopiedModule but not the CopiedModule
-     * itself. The reason is that the CopiedModule itself is only needed for 
the enterScope and exitScope methods but
-     * not elsewhere. For this reason they are just simply passed as 
parameters to those methods.
-     *
-     * The reason why the encapsulated (copied) modules are stored as mutable 
state to save subclasses of this class
-     * from passing the current scope around or even knowing about it.
-     */
-    private var moduleStack: List[Module] = topLevel :: Nil
-
-    private def subscribers: mutable.Map[InPort, (InPort, Module)] = 
subscribersStack.head
-    private def publishers: mutable.Map[OutPort, (OutPort, Module)] = 
publishersStack.head
-    private def currentLayout: Module = moduleStack.head
-
-    private val graph = Graph.empty[Module, Edge]
-
-    private def copyAtomicModule[T <: Module](module: T, parentAttributes: 
Attributes): T = {
-      val currentAttributes = mergeAttributes(parentAttributes, 
module.attributes)
-      module.withAttributes(currentAttributes).asInstanceOf[T]
-    }
-
-    private def materializeAtomic(atomic: Module, parentAttributes: 
Attributes): MaterializedValueNode = {
-      val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets)
-      val copied = copyAtomicModule(atomic, parentAttributes)
-
-      for ((in, id) <- inputs.zipWithIndex) {
-        val inPort = inPortMapping(atomic, copied)(in)
-        assignPort(in, (inPort, copied))
-      }
-
-      for ((out, id) <- outputs.zipWithIndex) {
-        val outPort = outPortMapping(atomic, copied)(out)
-        assignPort(out, (outPort, copied))
-      }
-
-      graph.addVertex(copied)
-      Atomic(copied)
-    }
-
-    def create(): (util.Graph[Module, Edge], MaterializedValueNode) = {
-      val mat = materializeModule(topLevel, Attributes.none)
-      (graph, mat)
-    }
-
-    private def outPortMapping(from: Module, to: Module): Map[OutPort, 
OutPort] = {
-      from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap
-    }
-
-    private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = 
{
-      from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap
-    }
-
-    private def materializeModule(module: Module, parentAttributes: 
Attributes): MaterializedValueNode = {
-
-      val materializedValues = collection.mutable.HashMap.empty[Module, 
MaterializedValueNode]
-      val currentAttributes = mergeAttributes(parentAttributes, 
module.attributes)
-
-      var materializedValueSources = List.empty[MaterializedValueSource[_]]
-
-      for (submodule <- module.subModules) {
-        submodule match {
-          case mv: MaterializedValueSource[_] =>
-            materializedValueSources :+= mv
-          case atomic if atomic.isAtomic =>
-            materializedValues.put(atomic, materializeAtomic(atomic, 
currentAttributes))
-          case copied: CopiedModule =>
-            enterScope(copied)
-            materializedValues.put(copied, materializeModule(copied, 
currentAttributes))
-            exitScope(copied)
-          case composite =>
-            materializedValues.put(composite, materializeComposite(composite, 
currentAttributes))
-        }
-      }
-
-      val mat = resolveMaterialized(module.materializedValueComputation, 
materializedValues)
-
-      materializedValueSources.foreach { module =>
-        val matAttribute = new MaterializedValueSourceAttribute(mat)
-        val copied = copyAtomicModule(module, parentAttributes and 
Attributes(matAttribute))
-        assignPort(module.shape.outlet, (copied.shape.outlet, copied))
-        graph.addVertex(copied)
-        materializedValues.put(copied, Atomic(copied))
-      }
-      mat
-    }
-
-    private def materializeComposite(composite: Module, effectiveAttributes: 
Attributes): MaterializedValueNode = {
-      materializeModule(composite, effectiveAttributes)
-    }
-
-    private def mergeAttributes(parent: Attributes, current: Attributes): 
Attributes = {
-      parent and current
-    }
-
-    private def resolveMaterialized(matNode: MaterializedValueNode, 
materializedValues: collection.Map[Module, MaterializedValueNode]): 
MaterializedValueNode = matNode match {
-      case Atomic(m) => materializedValues(m)
-      case Combine(f, d1, d2) => Combine(f, resolveMaterialized(d1, 
materializedValues), resolveMaterialized(d2, materializedValues))
-      case Transform(f, d) => Transform(f, resolveMaterialized(d, 
materializedValues))
-      case Ignore => Ignore
-    }
-
-    final protected def assignPort(in: InPort, subscriber: (InPort, Module)): 
Unit = {
-      addVertex(subscriber._2)
-      subscribers(in) = subscriber
-      // Interface (unconnected) ports of the current scope will be wired when 
exiting the scope
-      if (!currentLayout.inPorts(in)) {
-        val out = currentLayout.upstreams(in)
-        val publisher = publishers(out)
-        if (publisher ne null) addEdge(publisher, subscriber)
-      }
-    }
-
-    final protected def assignPort(out: OutPort, publisher: (OutPort, 
Module)): Unit = {
-      addVertex(publisher._2)
-      publishers(out) = publisher
-      // Interface (unconnected) ports of the current scope will be wired when 
exiting the scope
-      if (!currentLayout.outPorts(out)) {
-        val in = currentLayout.downstreams(out)
-        val subscriber = subscribers(in)
-        if (subscriber ne null) addEdge(publisher, subscriber)
-      }
-    }
-
-    // Enters a copied module and establishes a scope that prevents internals 
to leak out and interfere with copies
-    // of the same module.
-    // We don't store the enclosing CopiedModule itself as state since we 
don't use it anywhere else than exit and enter
-    private def enterScope(enclosing: CopiedModule): Unit = {
-      subscribersStack ::= mutable.Map.empty.withDefaultValue(null)
-      publishersStack ::= mutable.Map.empty.withDefaultValue(null)
-      moduleStack ::= enclosing.copyOf
-    }
-
-    // Exits the scope of the copied module and propagates 
Publishers/Subscribers to the enclosing scope assigning
-    // them to the copied ports instead of the original ones (since there 
might be multiple copies of the same module
-    // leading to port identity collisions)
-    // We don't store the enclosing CopiedModule itself as state since we 
don't use it anywhere else than exit and enter
-    private def exitScope(enclosing: CopiedModule): Unit = {
-      val scopeSubscribers = subscribers
-      val scopePublishers = publishers
-      subscribersStack = subscribersStack.tail
-      publishersStack = publishersStack.tail
-      moduleStack = moduleStack.tail
-
-      // When we exit the scope of a copied module,  pick up the 
Subscribers/Publishers belonging to exposed ports of
-      // the original module and assign them to the copy ports in the outer 
scope that we will return to
-      
enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach
 {
-        case (original, exposed) => assignPort(exposed, 
scopeSubscribers(original))
-      }
-
-      
enclosing.copyOf.shape.outlets.iterator.zip(enclosing.shape.outlets.iterator).foreach
 {
-        case (original, exposed) => assignPort(exposed, 
scopePublishers(original))
-      }
-    }
-
-    private def addEdge(publisher: (OutPort, Module), subscriber: (InPort, 
Module)): Unit = {
-      graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), 
subscriber._2)
-    }
-
-    private def addVertex(module: Module): Unit = {
-      graph.addVertex(module)
-    }
-  }
-
-  final case class MaterializedValueSourceAttribute(mat: 
MaterializedValueNode) extends Attribute
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
deleted file mode 100644
index 50c4450..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump
-
-import akka.stream.Attributes
-import akka.stream.Attributes.Attribute
-
-object GearAttributes {
-
-  /**
-   * Define how many parallel instance we want to use to run this module
-   * @param count
-   * @return
-   */
-  def count(count: Int): Attributes = Attributes(ParallismAttribute(count))
-
-  /**
-   * Define we want to render this module locally.
-   * @return
-   */
-  def local: Attributes = Attributes(LocationAttribute(Local))
-
-  /**
-   * Define we want to render this module remotely
-   * @return
-   */
-  def remote: Attributes = Attributes(LocationAttribute(Remote))
-
-  /**
-   * Get the effective location settings if child override the parent
-   * setttings.
-   *
-   * @param attrs
-   * @return
-   */
-  def location(attrs: Attributes): Location = {
-    attrs.attributeList.foldLeft(Local: Location) { (s, attr) =>
-      attr match {
-        case LocationAttribute(location) => location
-        case other => s
-      }
-    }
-  }
-
-  /**
-   * get effective parallelism settings if child override parent.
-   * @param attrs
-   * @return
-   */
-  def count(attrs: Attributes): Int = {
-    attrs.attributeList.foldLeft(1) { (s, attr) =>
-      attr match {
-        case ParallismAttribute(count) => count
-        case other => s
-      }
-    }
-  }
-
-  /**
-   * Where we want to render the module
-   */
-  sealed trait Location
-  object Local extends Location
-  object Remote extends Location
-
-  final case class LocationAttribute(tag: Location) extends Attribute
-
-  /**
-   * How many parallel instance we want to use for this module.
-   *
-   * @param parallelism
-   */
-  final case class ParallismAttribute(parallelism: Int) extends Attribute
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
deleted file mode 100644
index a11d7cb..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump
-
-import akka.actor.ActorSystem
-import akka.stream._
-import akka.stream.gearpump.graph.GraphCutter.Strategy
-import akka.stream.gearpump.graph.LocalGraph.LocalGraphMaterializer
-import akka.stream.gearpump.graph.RemoteGraph.RemoteGraphMaterializer
-import akka.stream.gearpump.graph.{GraphCutter, LocalGraph, RemoteGraph, 
SubGraphMaterializer}
-import akka.stream.impl.StreamLayout.Module
-
-/**
- *
- * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump
- * streaming application. If some module cannot be rendered remotely in 
Gearpump
- * Cluster, then it uses local Actor materializer as fallback to materialize
- * the module locally.
- *
- * User can custom a [[Strategy]] to determinie which module should be rendered
- * remotely, and which module should be rendered locally.
- *
- * @see [[GraphCutter]] to find out how we cut the [[ModuleGraph]] to two 
parts,
- *      and materialize them seperately.
- *
- * @param system
- * @param strategy
- * @param useLocalCluster whether to use built-in in-process local cluster
- */
-class GearpumpMaterializer(system: ActorSystem, strategy: Strategy = 
GraphCutter.AllRemoteStrategy,
-    useLocalCluster: Boolean = true)
-  extends BaseMaterializer {
-
-  private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map(
-    classOf[LocalGraph] -> new LocalGraphMaterializer(system),
-    classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, 
system)
-  )
-
-  override def materialize[Mat](graph: ModuleGraph[Mat]): Mat = {
-    val subGraphs = new GraphCutter(strategy).cut(graph)
-    val matValues = subGraphs.foldLeft(Map.empty[Module, Any]) { (map, 
subGraph) =>
-      val materializer = subMaterializers(subGraph.getClass)
-      map ++ materializer.materialize(subGraph, map)
-    }
-    graph.resolve(matValues)
-  }
-
-  override def shutdown(): Unit = {
-    subMaterializers.values.foreach(_.shutdown())
-  }
-}
-
-object GearpumpMaterializer {
-  def apply(system: ActorSystem): GearpumpMaterializer = new 
GearpumpMaterializer(system)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala
deleted file mode 100644
index 7808b52..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.graph.GraphCutter
-import akka.stream.scaladsl.{Sink, Source}
-
-/**
- * This tests how the [[GearpumpMaterializer]] materializes different partials 
of Graph
- * to different runtime.
- *
- * In this test, source module and sink module are materialized locally,
- * Other transformation module are materialized remotely in Gearpump
- * streaming Application.
- *
- * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- *
- *
- */
-object Test {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test...")
-
-    implicit val system = ActorSystem("akka-test")
-    implicit val materializer = new GearpumpMaterializer(system, 
GraphCutter.AllRemoteStrategy)
-
-    val echo = system.actorOf(Props(new Echo()))
-    val sink = Sink.actorRef(echo, "COMPLETE")
-    val source = Source(List("red hat", "yellow sweater", "blue jack", "red 
apple", "green plant",
-      "blue sky"))
-    source.filter(_.startsWith("red")).fold("Items:") { (a, b) =>
-      a + "|" + b
-    }.map("I want to order item: " + _).runWith(sink)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  class Echo extends Actor {
-    def receive: Receive = {
-      case any: AnyRef =>
-        // scalastyle:off println
-        println("Confirm received: " + any)
-      // scalastyle:on println
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
deleted file mode 100644
index 2426f5f..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.ActorMaterializer
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.{GearSink, GearSource}
-import akka.stream.scaladsl.{Flow, Sink, Source}
-
-/**
- *
- * This tests how different Materializers can be used together in an explicit 
way.
- * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- *
- */
-object Test2 {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test2...")
-    implicit val system = ActorSystem("akka-test")
-    val materializer = new GearpumpMaterializer(system)
-
-    val echo = system.actorOf(Props(new Echo()))
-    val source = GearSource.bridge[String, String]
-    val sink = GearSink.bridge[String, String]
-
-    val flow = Flow[String].filter(_.startsWith("red")).map("I want to order 
item: " + _)
-    val (entry, exit) = flow.runWith(source, sink)(materializer)
-
-    val actorMaterializer = ActorMaterializer()
-
-    val externalSource = Source(List("red hat", "yellow sweater", "blue jack", 
"red apple", "green plant", "blue sky"))
-    val externalSink = Sink.actorRef(echo, "COMPLETE")
-
-    val graph = FlowGraph.closed() { implicit b =>
-      externalSource ~> Sink(entry)
-      Source(exit) ~> externalSink
-    }
-    graph.run()(actorMaterializer)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  class Echo extends Actor {
-    def receive: Receive = {
-      case any: AnyRef =>
-        println("Confirm received: " + any)
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
deleted file mode 100644
index 976b1e6..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.GearSource
-import akka.stream.scaladsl.Sink
-
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
-
-/**
- * read from remote and write to local
- * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- */
-object Test3 {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test...")
-
-    implicit val system = ActorSystem("akka-test")
-    implicit val materializer = new GearpumpMaterializer(system)
-
-    val echo = system.actorOf(Props(new Echo()))
-    val sink = Sink.actorRef(echo, "COMPLETE")
-    val sourceData = new CollectionDataSource(List("red hat", "yellow 
sweater", "blue jack", "red apple", "green plant", "blue sky"))
-    val source = GearSource.from[String](sourceData)
-    source.filter(_.startsWith("red")).map("I want to order item: " + 
_).runWith(sink)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  class Echo extends Actor {
-    def receive: Receive = {
-      case any: AnyRef =>
-        println("Confirm received: " + any)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
deleted file mode 100644
index 7b80b7b..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.ActorSystem
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.GearSink
-import akka.stream.scaladsl.Source
-
-import org.apache.gearpump.streaming.dsl.LoggerSink
-
-/**
- * read from local and write to remote
- * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar
- */
-object Test4 {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test...")
-
-    implicit val system = ActorSystem("akka-test")
-    implicit val materializer = new GearpumpMaterializer(system)
-
-    val sink = GearSink.to(new LoggerSink[String])
-    val source = Source(List("red hat", "yellow sweater", "blue jack", "red 
apple", "green plant", "blue sky"))
-    source.filter(_.startsWith("red")).map("I want to order item: " + 
_).runWith(sink)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
deleted file mode 100644
index 052c018..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.graph.GraphCutter
-import akka.stream.scaladsl.{Sink, Source, Unzip}
-
-/**
-test fanout
-  */
-object Test5 {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test...")
-
-    implicit val system = ActorSystem("akka-test")
-    implicit val materializer = new GearpumpMaterializer(system, 
GraphCutter.AllRemoteStrategy)
-
-    val echo = system.actorOf(Props(new Echo()))
-    val sink = Sink.actorRef(echo, "COMPLETE")
-
-    val source = Source(List(("male", "24"), ("female", "23")))
-
-    val graph = FlowGraph.closed() { implicit b =>
-      val unzip = b.add(Unzip[String, String]())
-
-      val sink1 = Sink.actorRef(echo, "COMPLETE")
-      val sink2 = Sink.actorRef(echo, "COMPLETE")
-
-      source ~> unzip.in
-      unzip.out0 ~> sink1
-      unzip.out1 ~> sink1
-    }
-
-    graph.run()
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  class Echo extends Actor {
-    def receive: Receive = {
-      case any: AnyRef =>
-        println("Confirm received: " + any)
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
deleted file mode 100644
index 0fccd30..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
-import akka.actor.{Actor, ActorSystem, Props}
-import akka.stream.gearpump.GearpumpMaterializer
-import akka.stream.gearpump.scaladsl.GearSource
-import akka.stream.scaladsl.Sink
-
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
-
-/**
- * WordCount example
- * Test GroupBy
- */
-
-import akka.stream.gearpump.scaladsl.Implicits._
-
-object Test6 {
-
-  def main(args: Array[String]): Unit = {
-
-    println("running Test...")
-
-    implicit val system = ActorSystem("akka-test")
-    implicit val materializer = new GearpumpMaterializer(system)
-
-    val echo = system.actorOf(Props(new Echo()))
-    val sink = Sink.actorRef(echo, "COMPLETE")
-    val sourceData = new CollectionDataSource(List("this is a good start", 
"this is a good time", "time to start", "congratulations", "green plant", "blue 
sky"))
-    val source = GearSource.from[String](sourceData)
-    source.mapConcat { line =>
-      line.split(" ").toList
-    }.groupBy2(x => x).map(word => (word, 1))
-      .reduce({ (a, b) =>
-        (a._1, a._2 + b._2)
-      }).log("word-count").runWith(sink)
-
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  class Echo extends Actor {
-    def receive: Receive = {
-      case any: AnyRef =>
-        println("Confirm received: " + any)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
deleted file mode 100644
index 56b89bc..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.example
-
-import java.io.{File, FileInputStream}
-import java.util.zip.GZIPInputStream
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, ExecutionContext, Future}
-import scala.util.{Failure, Success, Try}
-
-import akka.actor.ActorSystem
-import akka.stream.gearpump.graph.GraphCutter
-import akka.stream.gearpump.{GearAttributes, GearpumpMaterializer}
-import akka.stream.scaladsl._
-import akka.util.ByteString
-import org.json4s.JsonAST.JString
-
-import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.util.AkkaApp
-
-/**
- * this example is ported from 
http://engineering.intenthq.com/2015/06/wikidata-akka-streams/
- * which showcases running Akka Streams DSL across JVMs on Gearpump
- *
- * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar
- * -input wikidata-${DATE}-all.json.gz -languages en,de
- *
- * (Note: Wikipedia data can be downloaded from 
https://dumps.wikimedia.org/wikidatawiki/entities/)
- *
- */
-object WikipediaApp extends ArgumentsParser with AkkaApp {
-
-  case class WikidataElement(id: String, sites: Map[String, String])
-
-  override val options: Array[(String, CLIOption[Any])] = Array(
-    "input" -> CLIOption[String]("<Wikidata JSON dump>", required = true),
-    "languages" -> CLIOption[String]("<languages to take into account>", 
required = true)
-  )
-
-  override def main(akkaConf: Config, args: Array[String]): Unit = {
-    val parsed = parse(args)
-    val input = new File(parsed.getString("input"))
-    val langs = parsed.getString("languages").split(",")
-
-    implicit val system = ActorSystem("wikidata-poc", akkaConf)
-    implicit val materializer = new GearpumpMaterializer(system, 
GraphCutter.TagAttributeStrategy, useLocalCluster = false)
-    import system.dispatcher
-
-    val elements = source(input).via(parseJson(langs))
-
-    val g = FlowGraph.closed(count) { implicit b =>
-      sinkCount => {
-
-        val broadcast = b.add(Broadcast[WikidataElement](2))
-        elements ~> broadcast ~> logEveryNSink(1000)
-        broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
-      }
-    }
-
-    g.run().onComplete { x =>
-      x match {
-        case Success((t, f)) => printResults(t, f)
-        case Failure(tr) => println("Something went wrong")
-      }
-      system.terminate()
-    }
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  def source(file: File): Source[String, Future[Long]] = {
-    val compressed = new GZIPInputStream(new FileInputStream(file), 65536)
-    InputStreamSource(() => compressed)
-      .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
-      .map(x => x.decodeString("utf-8"))
-  }
-
-  def parseJson(langs: Seq[String])(implicit ec: ExecutionContext): 
Flow[String, WikidataElement, Unit] =
-    Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, 
line))).collect {
-      case Some(v) => v
-    }
-
-  def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = {
-    import org.json4s.jackson.JsonMethods
-    Try(JsonMethods.parse(line)).toOption.flatMap { json =>
-      json \ "id" match {
-        case JString(itemId) =>
-          val sites = for {
-            lang <- langs
-            JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title"
-          } yield lang -> title
-
-          if (sites.isEmpty) None
-          else Some(WikidataElement(id = itemId, sites = sites.toMap))
-
-        case _ => None
-      }
-    }
-  }
-
-  def logEveryNSink[T](n: Int) = Sink.fold(0) { (x, y: T) =>
-    if (x % n == 0)
-      println(s"Processing element $x: $y")
-    x + 1
-  }
-
-  def checkSameTitles(langs: Set[String]): Flow[WikidataElement, Boolean, 
Unit] = Flow[WikidataElement]
-    .filter(_.sites.keySet == langs)
-    .map { x =>
-      val titles = x.sites.values
-      titles.forall(_ == titles.head)
-    }.withAttributes(GearAttributes.remote)
-
-  def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) {
-    case ((t, f), true) => (t + 1, f)
-    case ((t, f), false) => (t, f + 1)
-  }
-
-  def printResults(t: Int, f: Int) = {
-    val message =
-      s"""
-         | Number of items with the same title: $t
-         | Number of items with the different title: $f
-         | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
-                  """.stripMargin
-    println(message)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
deleted file mode 100644
index 19083f6..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.graph
-
-import akka.stream.ModuleGraph
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.GearAttributes
-import akka.stream.gearpump.GearAttributes.{Local, Location, Remote}
-import akka.stream.gearpump.graph.GraphCutter.Strategy
-import akka.stream.gearpump.module.{BridgeModule, DummyModule, 
GearpumpTaskModule, GroupByModule, SinkBridgeModule, SourceBridgeModule}
-import akka.stream.impl.Stages.DirectProcessor
-import akka.stream.impl.StreamLayout.{MaterializedValueNode, Module}
-import akka.stream.impl.{SinkModule, SourceModule}
-
-import org.apache.gearpump.util.Graph
-
-/**
- *
- * GraphCutter is used to decide which part is rendered locally
- * and which part should be rendered remotely.
- *
- * We will cut the graph based on the [[Strategy]] provided.
- *
- * For example, for the following graph, we can cut the graph to
- * two parts, each part will be a Sub Graph. The top SubGraph
- * can be materialized remotely. The bottom part can be materialized
- * locally.
- *
- *        AtomicModule2 -> AtomicModule4
- *           /|                 \
- *          /                    \
- * -----------cut line -------------cut line ----------
- *       /                        \
- *     /                           \|
- * AtomicModule1           AtomicModule5
- *     \                   /|
- *      \                 /
- *       \|              /
- *         AtomicModule3
- *
- *
- * @see [[ModuleGraph]] for more information of how Graph is organized.
- *
- */
-class GraphCutter(strategy: Strategy) {
-  def cut(moduleGraph: ModuleGraph[_]): List[SubGraph] = {
-    val graph = removeDummyModule(moduleGraph.graph)
-    val tags = tag(graph, strategy)
-    doCut(graph, tags, moduleGraph.mat)
-  }
-
-  private def doCut(graph: Graph[Module, Edge], tags: Map[Module, Location],
-      mat: MaterializedValueNode): List[SubGraph] = {
-    val local = Graph.empty[Module, Edge]
-    val remote = Graph.empty[Module, Edge]
-
-    graph.vertices.foreach { module =>
-      if (tags(module) == Local) {
-        local.addVertex(module)
-      } else {
-        remote.addVertex(module)
-      }
-    }
-
-    graph.edges.foreach { nodeEdgeNode =>
-      val (node1, edge, node2) = nodeEdgeNode
-      (tags(node1), tags(node2)) match {
-        case (Local, Local) =>
-          local.addEdge(nodeEdgeNode)
-        case (Remote, Remote) =>
-          remote.addEdge(nodeEdgeNode)
-        case (Local, Remote) =>
-          node2 match {
-            case bridge: BridgeModule[_, _, _] =>
-              local.addEdge(node1, edge, node2)
-            case _ =>
-              // Creates a bridge module in between
-              val bridge = new SourceBridgeModule[AnyRef, AnyRef]()
-              val remoteEdge = Edge(bridge.outPort, edge.to)
-              remote.addEdge(bridge, remoteEdge, node2)
-              val localEdge = Edge(edge.from, bridge.inPort)
-              local.addEdge(node1, localEdge, bridge)
-          }
-        case (Remote, Local) =>
-          node1 match {
-            case bridge: BridgeModule[_, _, _] =>
-              local.addEdge(node1, edge, node2)
-            case _ =>
-              // Creates a bridge module in between
-              val bridge = new SinkBridgeModule[AnyRef, AnyRef]()
-              val remoteEdge = Edge(edge.from, bridge.inPort)
-              remote.addEdge(node1, remoteEdge, bridge)
-              val localEdge = Edge(bridge.outPort, edge.to)
-              local.addEdge(bridge, localEdge, node2)
-          }
-      }
-    }
-
-    List(new RemoteGraph(remote), new LocalGraph(local))
-  }
-
-  private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, 
Location] = {
-    graph.vertices.map { vertex =>
-      vertex -> strategy.apply(vertex)
-    }.toMap
-  }
-
-  private def removeDummyModule(inputGraph: Graph[Module, Edge]): 
Graph[Module, Edge] = {
-    val graph = inputGraph.copy
-    val dummies = graph.vertices.filter { module =>
-      module match {
-        case dummy: DummyModule =>
-          true
-        case _ =>
-          false
-      }
-    }
-    dummies.foreach(module => graph.removeVertex(module))
-    graph
-  }
-}
-
-object GraphCutter {
-
-  type Strategy = PartialFunction[Module, Location]
-
-  val BaseStrategy: Strategy = {
-    case source: BridgeModule[_, _, _] =>
-      Remote
-    case task: GearpumpTaskModule =>
-      Remote
-    case groupBy: GroupByModule[_, _] =>
-      // TODO: groupBy is not supported by local materializer yet
-      Remote
-    case source: SourceModule[_, _] =>
-      Local
-    case sink: SinkModule[_, _] =>
-      Local
-    case matValueSource: MaterializedValueSource[_] =>
-      Local
-    case direct: DirectProcessor =>
-      Local
-    case time: TimerTransform =>
-      // Renders to local as it requires a timer.
-      Local
-  }
-
-  val AllRemoteStrategy: Strategy = BaseStrategy orElse {
-    case _ =>
-      Remote
-  }
-
-  /**
-   * Will decide whether to render a module locally or remotely
-   * based on Attribute settings.
-   *
-   */
-  val TagAttributeStrategy: Strategy = BaseStrategy orElse {
-    case module =>
-      GearAttributes.location(module.attributes)
-  }
-
-  val AllLocalStrategy: Strategy = BaseStrategy orElse {
-    case _ =>
-      Local
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
deleted file mode 100644
index 6ef8598..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.graph
-
-import akka.actor.ActorSystem
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.materializer.LocalMaterializer
-import akka.stream.gearpump.module.{SinkBridgeModule, SourceBridgeModule}
-import akka.stream.impl.Stages.DefaultAttributes
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{PublisherSource, SubscriberSink}
-import akka.stream.{Outlet, SinkShape, SourceShape}
-import org.reactivestreams.{Publisher, Subscriber}
-
-import org.apache.gearpump.util.Graph
-
-/**
- *
- * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only
- * contain module that can be materialized in local JVM.
- *
- * @param graph
- */
-class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph
-
-object LocalGraph {
-
-  /**
-   * materialize LocalGraph in local JVM
-   * @param system
-   */
-  class LocalGraphMaterializer(system: ActorSystem) extends 
SubGraphMaterializer {
-
-    // Creates a local materializer
-    val materializer = LocalMaterializer()(system)
-
-    /**
-     *
-     * @param matValues Materialized Values for each module before 
materialization
-     * @return Materialized Values for each Module after the materialization.
-     */
-    override def materialize(graph: SubGraph, matValues: Map[Module, Any]): 
Map[Module, Any] = {
-      val newGraph: Graph[Module, Edge] = graph.graph.mapVertex { module =>
-        module match {
-          case source: SourceBridgeModule[AnyRef, AnyRef] =>
-            val subscriber = matValues(source).asInstanceOf[Subscriber[AnyRef]]
-            val shape = SinkShape(source.inPort)
-            new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, 
shape)
-          case sink: SinkBridgeModule[AnyRef, AnyRef] =>
-            val publisher = matValues(sink).asInstanceOf[Publisher[AnyRef]]
-            val shape = SourceShape(sink.outPort.asInstanceOf[Outlet[AnyRef]])
-            new PublisherSource(publisher, DefaultAttributes.publisherSource, 
shape)
-          case other =>
-            other
-        }
-      }
-      materializer.materialize(newGraph, matValues)
-    }
-
-    override def shutdown(): Unit = {
-      materializer.shutdown()
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
deleted file mode 100644
index 3cea78a..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.graph
-
-import akka.actor.ActorSystem
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.materializer.RemoteMaterializerImpl
-import akka.stream.gearpump.module.{SinkBridgeModule, SourceBridgeModule}
-import akka.stream.gearpump.task.SinkBridgeTask.SinkBridgeTaskClient
-import akka.stream.gearpump.task.SourceBridgeTask.SourceBridgeTaskClient
-import akka.stream.impl.StreamLayout.Module
-
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.embedded.EmbeddedCluster
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.util.Graph
-
-/**
- *
- * [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only
- * contain modules that can be materialized in remote Gearpump cluster.
- *
- * @param graph
- */
-class RemoteGraph(override val graph: Graph[Module, Edge]) extends SubGraph
-
-object RemoteGraph {
-
-  /**
-   * * materialize LocalGraph in remote gearpump cluster
-   * @param useInProcessCluster
-   * @param system
-   */
-  class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: 
ActorSystem) extends SubGraphMaterializer {
-    private val local = if (useInProcessCluster) {
-      val cluster = EmbeddedCluster()
-      cluster.start()
-      Some(cluster)
-    } else {
-      None
-    }
-
-    private val context: ClientContext = local match {
-      case Some(local) => local.newClientContext
-      case None => ClientContext(system)
-    }
-
-    override def materialize(subGraph: SubGraph, inputMatValues: Map[Module, 
Any]): Map[Module, Any] = {
-      val graph = subGraph.graph
-
-      if (graph.isEmpty) {
-        inputMatValues
-      } else {
-        doMaterialize(graph: Graph[Module, Edge], inputMatValues)
-      }
-    }
-
-    private def doMaterialize(graph: Graph[Module, Edge], inputMatValues: 
Map[Module, Any]): Map[Module, Any] = {
-      val materializer = new RemoteMaterializerImpl(graph, system)
-      val (app, matValues) = materializer.materialize
-
-      val appId = context.submit(app)
-      println("sleep 5 second until the applicaiton is ready on cluster")
-      Thread.sleep(5000)
-
-      def resolve(matValues: Map[Module, ProcessorId]): Map[Module, Any] = {
-        matValues.toList.flatMap { kv =>
-          val (module, processorId) = kv
-          module match {
-            case source: SourceBridgeModule[AnyRef, AnyRef] =>
-              val bridge = new 
SourceBridgeTaskClient[AnyRef](system.dispatcher, context, appId, processorId)
-              Some((module, bridge))
-            case sink: SinkBridgeModule[AnyRef, AnyRef] =>
-              val bridge = new SinkBridgeTaskClient(system, context, appId, 
processorId)
-              Some((module, bridge))
-            case other =>
-              None
-          }
-        }.toMap
-      }
-
-      inputMatValues ++ resolve(matValues)
-    }
-
-    override def shutdown(): Unit = {
-      context.close()
-      local.map(_.stop())
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
deleted file mode 100644
index 564b6c7..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.graph
-
-import akka.stream.ModuleGraph.Edge
-import akka.stream.impl.StreamLayout.Module
-
-import org.apache.gearpump.util.Graph
-
-/**
- * [[SubGraph]] is a partial part of [[akka.stream.ModuleGraph]]
- *
- * The idea is that by dividing [[akka.stream.ModuleGraph]] to several
- * [[SubGraph]], we can materialize each [[SubGraph]] with different
- * materializer.
- */
-
-trait SubGraph {
-
-  /**
-   * the [[Graph]] representation of this SubGraph
-   * @return
-   */
-  def graph: Graph[Module, Edge]
-}
-
-/**
- * Materializer for Sub-Graph type
- */
-trait SubGraphMaterializer {
-  /**
-   *
-   * @param matValues Materialized Values for each module before 
materialization
-   * @return Materialized Values for each Module after the materialization.
-   */
-
-  def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, 
Any]
-
-  def shutdown(): Unit
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
deleted file mode 100644
index a5c6e48..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import scala.concurrent.{Await, ExecutionContextExecutor}
-
-import akka.actor.{ActorCell, ActorRef, ActorSystem, Deploy, LocalActorRef, 
PoisonPill, Props, RepointableActorRef}
-import akka.dispatch.Dispatchers
-import akka.pattern.ask
-import akka.stream.ModuleGraph.Edge
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.StreamSupervisor
-import akka.stream.{ActorAttributes, ActorMaterializer, 
ActorMaterializerSettings, Attributes, ClosedShape, Graph => AkkaGraph, 
MaterializationContext, ModuleGraph}
-
-import org.apache.gearpump.util.Graph
-
-/**
- * [[LocalMaterializer]] will use local actor to materialize the graph
- * Use LocalMaterializer.apply to construct the LocalMaterializer.
- *
- * It is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]]
- *
- *
- * @param system
- * @param settings
- * @param dispatchers
- * @param supervisor
- * @param haveShutDown
- * @param flowNameCounter
- * @param namePrefix
- * @param optimizations
- */
-abstract class LocalMaterializer(
-    val system: ActorSystem,
-    override val settings: ActorMaterializerSettings,
-    dispatchers: Dispatchers,
-    val supervisor: ActorRef,
-    val haveShutDown: AtomicBoolean,
-    flowNameCounter: AtomicLong,
-    namePrefix: String,
-    optimizations: Optimizations) extends ActorMaterializer {
-
-  override def effectiveSettings(opAttr: Attributes): 
ActorMaterializerSettings = {
-    import ActorAttributes._
-    import Attributes._
-    opAttr.attributeList.foldLeft(settings) { (s, attr) =>
-      attr match {
-        case InputBuffer(initial, max) => s.withInputBuffer(initial, max)
-        case Dispatcher(dispatcher) => s.withDispatcher(dispatcher)
-        case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider)
-        case l: LogLevels => s
-        case Name(_) => s
-        case other => s
-      }
-    }
-  }
-
-  override def shutdown(): Unit =
-    if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
-
-  override def isShutdown: Boolean = haveShutDown.get()
-
-  private[akka] def actorOf(props: Props, name: String, dispatcher: String): 
ActorRef = {
-    supervisor match {
-      case ref: LocalActorRef =>
-        ref.underlying.attachChild(props.withDispatcher(dispatcher), name, 
systemService = false)
-      case ref: RepointableActorRef =>
-        if (ref.isStarted) {
-          
ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher),
-            name, systemService = false)
-        } else {
-          implicit val timeout = ref.system.settings.CreationTimeout
-          val f = (supervisor ? 
StreamSupervisor.Materialize(props.withDispatcher(dispatcher),
-            name)).mapTo[ActorRef]
-          Await.result(f, timeout.duration)
-        }
-      case unknown =>
-        throw new IllegalStateException(
-          s"Stream supervisor must be a local actor, was 
[${unknown.getClass.getName}]")
-    }
-  }
-
-  override lazy val executionContext: ExecutionContextExecutor =
-    dispatchers.lookup(settings.dispatcher match {
-      case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId
-      case other => other
-    })
-
-  def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, 
Any]): Map[Module, Any]
-
-  override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): 
Mat = {
-    val graph = ModuleGraph(runnableGraph)
-    val matValues = materialize(graph.graph, Map.empty[Module, Any])
-    graph.resolve(matValues)
-  }
-
-  override def actorOf(context: MaterializationContext, props: Props): 
ActorRef = {
-    val dispatcher =
-      if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) {
-        effectiveSettings(context.effectiveAttributes).dispatcher
-      } else {
-        props.dispatcher
-      }
-    actorOf(props, context.stageName, dispatcher)
-  }
-}
-
-object LocalMaterializer {
-
-  def apply(materializerSettings: Option[ActorMaterializerSettings] = None,
-      namePrefix: Option[String] = None,
-      optimizations: Optimizations = Optimizations.none)(implicit system: 
ActorSystem)
-    : LocalMaterializerImpl = {
-
-    val settings = materializerSettings getOrElse 
ActorMaterializerSettings(system)
-    apply(settings, namePrefix.getOrElse("flow"), optimizations)(system)
-  }
-
-  def apply(materializerSettings: ActorMaterializerSettings,
-      namePrefix: String, optimizations: Optimizations)(implicit system: 
ActorSystem)
-    : LocalMaterializerImpl = {
-    val haveShutDown = new AtomicBoolean(false)
-
-    new LocalMaterializerImpl(
-      system,
-      materializerSettings,
-      system.dispatchers,
-      system.actorOf(StreamSupervisor.props(materializerSettings,
-        haveShutDown).withDispatcher(materializerSettings.dispatcher)),
-      haveShutDown,
-      FlowNameCounter(system).counter,
-      namePrefix,
-      optimizations)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
deleted file mode 100644
index 1ec724e..0000000
--- 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Dispatchers
-import akka.stream.ModuleGraph.{Edge, MaterializedValueSourceAttribute}
-import akka.stream.actor.ActorSubscriber
-import 
akka.stream.gearpump.materializer.LocalMaterializerImpl.MaterializedModule
-import akka.stream.gearpump.module.ReduceModule
-import akka.stream.gearpump.util.MaterializedValueOps
-import akka.stream.impl.Stages.{DirectProcessor, Fold, StageModule}
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, 
ExposedPublisher, FanIn, FanOut, SinkModule, SourceModule, VirtualProcessor}
-import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, 
InPort, MaterializationContext, Materializer, OutPort, Shape}
-import org.reactivestreams.{Processor, Publisher, Subscriber}
-
-import org.apache.gearpump.util.Graph
-
-/**
- * This materializer is functional equivalent to 
[[akka.stream.impl.ActorMaterializerImpl]]
- *
- * @param system
- * @param settings
- * @param dispatchers
- * @param supervisor
- * @param haveShutDown
- * @param flowNameCounter
- * @param namePrefix
- * @param optimizations
- */
-class LocalMaterializerImpl (
-    system: ActorSystem,
-    settings: ActorMaterializerSettings,
-    dispatchers: Dispatchers,
-    supervisor: ActorRef,
-    haveShutDown: AtomicBoolean,
-    flowNameCounter: AtomicLong,
-    namePrefix: String,
-    optimizations: Optimizations)
-  extends LocalMaterializer(
-    system, settings, dispatchers, supervisor,
-    haveShutDown, flowNameCounter, namePrefix, optimizations) {
-
-  override def materialize(graph: Graph[Module, Edge], inputMatValues: 
Map[Module, Any]): Map[Module, Any] = {
-    val materializedGraph = graph.mapVertex { module =>
-      materializeAtomic(module)
-    }
-
-    materializedGraph.edges.foreach { nodeEdgeNode =>
-      val (node1, edge, node2) = nodeEdgeNode
-      val from = edge.from
-      val to = edge.to
-      val publisher = node1.outputs(from).asInstanceOf[Publisher[Any]]
-      val subscriber = node2.inputs(to).asInstanceOf[Subscriber[Any]]
-      publisher.subscribe(subscriber)
-    }
-
-    val matValues = inputMatValues ++ materializedGraph.vertices.map { vertex 
=>
-      (vertex.module, vertex.matValue)
-    }.toMap
-
-    val matValueSources = 
materializedGraph.vertices.filter(_.module.isInstanceOf[MaterializedValueSource[_]])
-    publishToMaterializedValueSource(matValueSources, matValues)
-
-    matValues
-  }
-
-  private def publishToMaterializedValueSource(modules: 
List[MaterializedModule], matValues: Map[Module, Any]) = {
-    modules.foreach { module =>
-      val source = module.module.asInstanceOf[MaterializedValueSource[_]]
-      val attr = 
source.attributes.getAttribute(classOf[MaterializedValueSourceAttribute], null)
-
-      Option(attr).map { attr =>
-        val valueToPublish = 
MaterializedValueOps(attr.mat).resolve[Any](matValues)
-        module.outputs.foreach { portAndPublisher =>
-          val (port, publisher) = portAndPublisher
-          publisher match {
-            case valuePublisher: MaterializedValuePublisher =>
-              valuePublisher.setValue(valueToPublish)
-          }
-        }
-      }
-    }
-  }
-
-  private[this] def nextFlowNameCount(): Long = 
flowNameCounter.incrementAndGet()
-
-  private[this] def createFlowName(): String = 
s"$namePrefix-${nextFlowNameCount()}"
-
-  val flowName = createFlowName()
-  var nextId = 0
-
-  def stageName(attr: Attributes): String = {
-    val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
-    nextId += 1
-    name
-  }
-
-  private def materializeAtomic(atomic: Module): MaterializedModule = {
-    val effectiveAttributes = atomic.attributes
-
-    def newMaterializationContext() =
-      new MaterializationContext(LocalMaterializerImpl.this, 
effectiveAttributes, stageName(effectiveAttributes))
-
-    atomic match {
-      case matValue: MaterializedValueSource[_] =>
-        val pub = new MaterializedValuePublisher
-        val outputs = Map[OutPort, Publisher[_]](matValue.shape.outlet -> pub)
-        MaterializedModule(matValue, (), outputs = outputs)
-      case sink: SinkModule[_, _] =>
-        val (sub, mat) = sink.create(newMaterializationContext())
-        val inputs = Map[InPort, Subscriber[_]](sink.shape.inlet -> sub)
-        MaterializedModule(sink, mat, inputs)
-      case source: SourceModule[_, _] =>
-        val (pub, mat) = source.create(newMaterializationContext())
-        val outputs = Map[OutPort, Publisher[_]](source.shape.outlet -> pub)
-        MaterializedModule(source, mat, outputs = outputs)
-
-      case reduce: ReduceModule[Any] =>
-        //TODO: remove this after the official akka-stream API support the 
Reduce Module
-        val stage = LocalMaterializerImpl.toFoldModule(reduce)
-        val (processor, mat) = processorFor(stage, effectiveAttributes, 
effectiveSettings(effectiveAttributes))
-        val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor)
-        val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor)
-        MaterializedModule(stage, mat, inputs, outputs)
-
-      case stage: StageModule =>
-        val (processor, mat) = processorFor(stage, effectiveAttributes, 
effectiveSettings(effectiveAttributes))
-        val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor)
-        val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor)
-        MaterializedModule(stage, mat, inputs, outputs)
-      case tls: TlsModule => // TODO solve this so TlsModule doesn't need 
special treatment here
-        val es = effectiveSettings(effectiveAttributes)
-        val props =
-          SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, 
tracing = false, tls.role, tls.closing)
-        val impl = actorOf(props, stageName(effectiveAttributes), 
es.dispatcher)
-        def factory(id: Int) = new ActorPublisher[Any](impl) {
-          override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
-        }
-        val publishers = Vector.tabulate(2)(factory)
-        impl ! FanOut.ExposedPublishers(publishers)
-
-        val inputs = Map[InPort, Subscriber[_]](
-          tls.plainIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn),
-          tls.cipherIn -> FanIn.SubInput[Any](impl, 
SslTlsCipherActor.TransportIn))
-
-        val outputs = Map[OutPort, Publisher[_]](
-          tls.plainOut -> publishers(SslTlsCipherActor.UserOut),
-          tls.cipherOut -> publishers(SslTlsCipherActor.TransportOut))
-        MaterializedModule(tls, (), inputs, outputs)
-
-      case junction: JunctionModule =>
-        materializeJunction(junction, effectiveAttributes, 
effectiveSettings(effectiveAttributes))
-    }
-  }
-
-  private def processorFor(op: StageModule,
-    effectiveAttributes: Attributes,
-    effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) 
= op match {
-    case DirectProcessor(processorFactory, _) => processorFactory()
-    case Identity(attr) => (new VirtualProcessor, ())
-    case _ =>
-      val (opprops, mat) = 
ActorProcessorFactory.props(LocalMaterializerImpl.this, op, effectiveAttributes)
-      ActorProcessorFactory[Any, Any](
-            actorOf(opprops, stageName(effectiveAttributes), 
effectiveSettings.dispatcher)) -> mat
-  }
-
-  private def materializeJunction(
-    op: JunctionModule,
-    effectiveAttributes: Attributes,
-    effectiveSettings: ActorMaterializerSettings): MaterializedModule = {
-    op match {
-      case fanin: FanInModule =>
-        val (props, inputs, output) = fanin match {
-
-          case MergeModule(shape, _) =>
-            (FairMerge.props(effectiveSettings, shape.inSeq.size), 
shape.inSeq, shape.out)
-
-          case f: FlexiMergeModule[_, Shape] =>
-            val flexi = f.flexi(f.shape)
-            val shape: Shape = f.shape
-            (FlexiMerge.props(effectiveSettings, f.shape, flexi), 
shape.inlets, shape.outlets.head)
-
-          case MergePreferredModule(shape, _) =>
-            (UnfairMerge.props(effectiveSettings, shape.inlets.size), 
shape.preferred +: shape.inSeq, shape.out)
-
-          case ConcatModule(shape, _) =>
-            require(shape.inSeq.size == 2, "currently only supporting 
concatenation of exactly two inputs") // TODO
-            (Concat.props(effectiveSettings), shape.inSeq, shape.out)
-
-          case zip: ZipWithModule =>
-            (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head)
-        }
-
-        val impl = actorOf(props, stageName(effectiveAttributes), 
effectiveSettings.dispatcher)
-        val publisher = new ActorPublisher[Any](impl)
-        // Resolve cyclic dependency with actor. This MUST be the first 
message no matter what.
-        impl ! ExposedPublisher(publisher)
-
-        val inputMapping: Map[InPort, Subscriber[_]] = inputs.zipWithIndex.map 
{ pair =>
-          val (in, id) = pair
-          (in, FanIn.SubInput[Any](impl, id))
-        }.toMap
-
-        val outMapping = Map(output -> publisher)
-        MaterializedModule(fanin, (), inputMapping, outMapping)
-
-      case fanout: FanOutModule =>
-        val (props, in, outs) = fanout match {
-
-          case r: FlexiRouteModule[t, Shape] =>
-            val flexi = r.flexi(r.shape)
-            val shape: Shape = r.shape
-            (FlexiRoute.props(effectiveSettings, r.shape, flexi), 
shape.inlets.head: InPort, r.shape.outlets)
-
-          case BroadcastModule(shape, eagerCancel, _) =>
-            (Broadcast.props(effectiveSettings, eagerCancel, 
shape.outArray.size), shape.in, shape.outArray.toSeq)
-
-          case BalanceModule(shape, waitForDownstreams, _) =>
-            (Balance.props(effectiveSettings, shape.outArray.size, 
waitForDownstreams), shape.in, shape.outArray.toSeq)
-
-          case unzip: UnzipWithModule =>
-            (unzip.props(effectiveSettings), unzip.inPorts.head, 
unzip.shape.outlets)
-        }
-        val impl = actorOf(props, stageName(effectiveAttributes), 
effectiveSettings.dispatcher)
-        val size = outs.size
-        def factory(id: Int) =
-          new ActorPublisher[Any](impl) {
-            override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
-          }
-        val publishers =
-          if (outs.size < 8) Vector.tabulate(size)(factory)
-          else List.tabulate(size)(factory)
-
-        impl ! FanOut.ExposedPublishers(publishers)
-        val outputs: Map[OutPort, Publisher[_]] = 
publishers.iterator.zip(outs.iterator).map { case (pub, out) =>
-          (out, pub)
-        }.toMap
-
-        val inputs: Map[InPort, Subscriber[_]] = Map(in -> 
ActorSubscriber[Any](impl))
-        MaterializedModule(fanout, (), inputs, outputs)
-    }
-  }
-
-  override def withNamePrefix(name: String): Materializer = {
-    new LocalMaterializerImpl(system, settings, dispatchers, supervisor,
-      haveShutDown, flowNameCounter, namePrefix = name, optimizations)
-  }
-}
-
-object LocalMaterializerImpl {
-  case class MaterializedModule(val module: Module, val matValue: Any, inputs: 
Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]], outputs: 
Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
-
-  def toFoldModule(reduce: ReduceModule[Any]): Fold = {
-    val f = reduce.f
-    val aggregator = { (zero: Any, input: Any) =>
-      if (zero == null) {
-        input
-      } else {
-        f(zero, input)
-      }
-    }
-    new Fold(null, aggregator)
-  }
-}
\ No newline at end of file


Reply via email to