[GERAPUMP-22] Merge akka-streams branch into master Author: manuzhang <[email protected]> Author: Kam Kasravi <[email protected]>
Closes #137 from manuzhang/akka-streams. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/2913a1fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/2913a1fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/2913a1fd Branch: refs/heads/master Commit: 2913a1fd89c564ec20e2cec8f54caafb01988296 Parents: 5f90b70 Author: manuzhang <[email protected]> Authored: Fri Jan 20 20:42:42 2017 +0800 Committer: manuzhang <[email protected]> Committed: Fri Jan 20 20:43:15 2017 +0800 ---------------------------------------------------------------------- experiments/akkastream/README.md | 4 +- .../src/main/resources/geardefault.conf | 2 +- .../scala/akka/stream/BaseMaterializer.scala | 47 -- .../main/scala/akka/stream/ModuleGraph.scala | 298 --------- .../akka/stream/gearpump/GearAttributes.scala | 90 --- .../stream/gearpump/GearpumpMaterializer.scala | 71 --- .../akka/stream/gearpump/example/Test.scala | 69 --- .../akka/stream/gearpump/example/Test2.scala | 71 --- .../akka/stream/gearpump/example/Test3.scala | 59 -- .../akka/stream/gearpump/example/Test4.scala | 50 -- .../akka/stream/gearpump/example/Test5.scala | 68 --- .../akka/stream/gearpump/example/Test6.scala | 67 --- .../stream/gearpump/example/WikipediaApp.scala | 143 ----- .../stream/gearpump/graph/GraphCutter.scala | 183 ------ .../akka/stream/gearpump/graph/LocalGraph.scala | 81 --- .../stream/gearpump/graph/RemoteGraph.scala | 106 ---- .../akka/stream/gearpump/graph/SubGraph.scala | 56 -- .../materializer/LocalMaterializer.scala | 152 ----- .../materializer/LocalMaterializerImpl.scala | 284 --------- .../materializer/RemoteMaterializerImpl.scala | 453 -------------- .../stream/gearpump/module/BridgeModule.scala | 124 ---- .../stream/gearpump/module/DummyModule.scala | 103 ---- .../gearpump/module/GearpumpTaskModule.scala | 133 ---- .../stream/gearpump/module/GroupByModule.scala | 46 -- .../stream/gearpump/module/ReduceModule.scala | 44 -- .../akka/stream/gearpump/scaladsl/Api.scala | 282 --------- .../akka/stream/gearpump/task/BalanceTask.scala | 37 -- .../stream/gearpump/task/BroadcastTask.scala | 29 - .../akka/stream/gearpump/task/GraphTask.scala | 70 --- .../stream/gearpump/task/SinkBridgeTask.scala | 125 ---- .../stream/gearpump/task/SourceBridgeTask.scala | 107 ---- .../akka/stream/gearpump/task/UnZip2Task.scala | 45 -- .../gearpump/util/MaterializedValueOps.scala | 38 -- .../gearpump/akkastream/GearAttributes.scala | 89 +++ .../akkastream/GearpumpMaterializer.scala | 295 +++++++++ .../GearpumpMaterializerSession.scala | 152 +++++ .../gearpump/akkastream/example/Test.scala | 62 ++ .../gearpump/akkastream/example/Test10.scala | 82 +++ .../gearpump/akkastream/example/Test11.scala | 72 +++ .../gearpump/akkastream/example/Test12.scala | 81 +++ .../gearpump/akkastream/example/Test13.scala | 177 ++++++ .../gearpump/akkastream/example/Test14.scala | 73 +++ .../gearpump/akkastream/example/Test15.scala | 72 +++ .../gearpump/akkastream/example/Test16.scala | 49 ++ .../gearpump/akkastream/example/Test2.scala | 77 +++ .../gearpump/akkastream/example/Test3.scala | 70 +++ .../gearpump/akkastream/example/Test4.scala | 50 ++ .../gearpump/akkastream/example/Test5.scala | 67 +++ .../gearpump/akkastream/example/Test6.scala | 90 +++ .../gearpump/akkastream/example/Test7.scala | 56 ++ .../gearpump/akkastream/example/Test8.scala | 66 ++ .../gearpump/akkastream/example/Test9.scala | 87 +++ .../akkastream/example/WikipediaApp.scala | 157 +++++ .../akkastream/graph/GraphPartitioner.scala | 205 +++++++ .../gearpump/akkastream/graph/LocalGraph.scala | 80 +++ .../gearpump/akkastream/graph/RemoteGraph.scala | 113 ++++ .../gearpump/akkastream/graph/SubGraph.scala | 59 ++ .../materializer/LocalMaterializerImpl.scala | 333 ++++++++++ .../materializer/RemoteMaterializerImpl.scala | 600 +++++++++++++++++++ .../akkastream/module/BridgeModule.scala | 135 +++++ .../akkastream/module/DummyModule.scala | 105 ++++ .../akkastream/module/GearpumpTaskModule.scala | 135 +++++ .../akkastream/module/GroupByModule.scala | 55 ++ .../akkastream/module/ReduceModule.scala | 52 ++ .../gearpump/akkastream/scaladsl/Api.scala | 289 +++++++++ .../gearpump/akkastream/task/BalanceTask.scala | 38 ++ .../gearpump/akkastream/task/BatchTask.scala | 50 ++ .../akkastream/task/BroadcastTask.scala | 30 + .../gearpump/akkastream/task/ConcatTask.scala | 38 ++ .../akkastream/task/DelayInitialTask.scala | 61 ++ .../akkastream/task/DropWithinTask.scala | 62 ++ .../akkastream/task/FlattenMergeTask.scala | 38 ++ .../gearpump/akkastream/task/FoldTask.scala | 56 ++ .../gearpump/akkastream/task/GraphTask.scala | 71 +++ .../akkastream/task/GroupedWithinTask.scala | 44 ++ .../akkastream/task/InterleaveTask.scala | 44 ++ .../gearpump/akkastream/task/MapAsyncTask.scala | 53 ++ .../gearpump/akkastream/task/MergeTask.scala | 39 ++ .../akkastream/task/SingleSourceTask.scala | 43 ++ .../akkastream/task/SinkBridgeTask.scala | 131 ++++ .../akkastream/task/SourceBridgeTask.scala | 116 ++++ .../akkastream/task/StatefulMapConcatTask.scala | 50 ++ .../akkastream/task/TakeWithinTask.scala | 62 ++ .../gearpump/akkastream/task/ThrottleTask.scala | 53 ++ .../akkastream/task/TickSourceTask.scala | 56 ++ .../gearpump/akkastream/task/Unzip2Task.scala | 46 ++ .../gearpump/akkastream/task/Zip2Task.scala | 57 ++ .../akkastream/util/MaterializedValueOps.scala | 40 ++ .../akka/stream/gearpump/AttributesSpec.scala | 33 - .../gearpump/akkastream/AttributesSpec.scala | 34 ++ project/BuildDashboard.scala | 6 +- project/BuildExperiments.scala | 2 +- project/Dependencies.scala | 8 +- .../gearpump/services/AppMasterService.scala | 2 +- .../gearpump/services/MasterService.scala | 2 +- .../gearpump/services/SecurityService.scala | 4 +- .../gearpump/services/StaticService.scala | 8 +- .../gearpump/services/WorkerService.scala | 2 +- .../gearpump/streaming/StreamApplication.scala | 2 +- .../apache/gearpump/streaming/dsl/plan/OP.scala | 5 +- 100 files changed, 5420 insertions(+), 3588 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/README.md ---------------------------------------------------------------------- diff --git a/experiments/akkastream/README.md b/experiments/akkastream/README.md index 7c9a316..fe04554 100644 --- a/experiments/akkastream/README.md +++ b/experiments/akkastream/README.md @@ -1,4 +1,2 @@ Akka Stream -========= - -TODO: This directory is obsolte. Working on updating it to Akka 2.4.3. +========= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/resources/geardefault.conf b/experiments/akkastream/src/main/resources/geardefault.conf index 626a1dc..8584511 100644 --- a/experiments/akkastream/src/main/resources/geardefault.conf +++ b/experiments/akkastream/src/main/resources/geardefault.conf @@ -2,4 +2,4 @@ gearpump.serializers { "akka.stream.gearpump.example.WikipediaApp$WikidataElement" = "" "scala.collection.immutable.Map$Map1" = "" "scala.collection.immutable.Map$Map2" = "" -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala deleted file mode 100644 index d2b328d..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/BaseMaterializer.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream - -import scala.concurrent.ExecutionContextExecutor - -/** - * [[BaseMaterializer]] is a extension to [[akka.stream.Materializer]]. - * - * Compared with [[akka.stream.Materializer]], the difference is that - * [[materialize]] accepts a [[ModuleGraph]] instead of a RunnableGraph. - * - * @see [[ModuleGraph]] for the difference between RunnableGraph and - * [[ModuleGraph]] - * - */ -abstract class BaseMaterializer extends akka.stream.Materializer { - - override def withNamePrefix(name: String): Materializer = throw new UnsupportedOperationException() - - override implicit def executionContext: ExecutionContextExecutor = throw new UnsupportedOperationException() - - def materialize[Mat](graph: ModuleGraph[Mat]): Mat - - override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = { - val graph = ModuleGraph(runnableGraph) - materialize(graph) - } - - def shutdown(): Unit -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala deleted file mode 100644 index 48d06f7..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/ModuleGraph.scala +++ /dev/null @@ -1,298 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This code is similar to MaterializerSession and will be deprecated in the next release. - */ - -package akka.stream - -import scala.collection.mutable - -import akka.stream.Attributes.Attribute -import akka.stream.ModuleGraph.Edge -import akka.stream.gearpump.util.MaterializedValueOps -import akka.stream.impl.StreamLayout._ -import akka.stream.impl._ -import akka.stream.{Graph => AkkaGraph} - -import _root_.org.apache.gearpump.util -import _root_.org.apache.gearpump.util.Graph - -/** - * - * ModuleGraph is a transformation on [[akka.stream.scaladsl.RunnableGraph]]. - * It carries all the information of [[akka.stream.scaladsl.RunnableGraph]], but - * represents it in a different way. - * - * Here is the difference: - * - * RunnableGraph - * ============================== - * [[akka.stream.scaladsl.RunnableGraph]] is represented as a [[Module]] tree: - * TopLevelModule - * | - * ------------------- - * | | - * SubModule1 SubModule2 - * | | - * ---------------- ---------- - * | | | - * AtomicModule1 AtomicModule2 AtomicModule3 - * - * ModuleGraph - * ============================== - * [[ModuleGraph]] is represented as a [[util.Graph]] of Atomic [[Module]]: - * - * AtomicModule2 -> AtomicModule4 - * /| \ - * / \ - * / \| - * AtomicModule1 AtomicModule5 - * \ /| - * \ / - * \| / - * AtomicModule3 - * - * Each vertex in the Graph is a [[Module]], each [[Edge]] in the Graph is a tuple - * ([[OutPort]], [[InPort]]). [[OutPort]] is one of upstream Atomic Module - * output ports. [[InPort]] is one of downstream Atomic Module input ports. - * - * - * Why use [[ModuleGraph]] instead of [[akka.stream.scaladsl.RunnableGraph]]? - * ========================= - * There are several good reasons:): - * 1. [[ModuleGraph]] outlines explicitly the upstream/downstream relation. - * Each [[Edge]] of [[ModuleGraph]] represent a upstream/downstream pair. - * It is easier for user to understand the overall data flow. - * - * 2. It is easier for performance optimization. - * For the above Graph, if we want to fuse AtomicModule2 and AtomicModule3 - * together, it can be done within [[ModuleGraph]]. We only need - * to substitute Pair(AtomicModule2, AtomicModule4) with a unified Module. - * - * 3. It avoids module duplication. - * In [[akka.stream.scaladsl.RunnableGraph]], composite Module can be re-used. - * It is possible that there can be duplicate Modules. - * The duplication problem causes big headache when doing materialization. - * - * [[ModuleGraph]] doesn't have thjis problem. [[ModuleGraph]] does a transformation on the Module - * Tree to make sure each Atomic Module [[ModuleGraph]] is unique. - * - * - * @param graph a Graph of Atomic modules. - * @param mat is a function of: - * input => materialized value of each Atomic module - * output => final materialized value. - * @tparam Mat - */ -class ModuleGraph[Mat](val graph: util.Graph[Module, Edge], val mat: MaterializedValueNode) { - - def resolve(materializedValues: Map[Module, Any]): Mat = { - MaterializedValueOps(mat).resolve[Mat](materializedValues) - } -} - -object ModuleGraph { - - def apply[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): ModuleGraph[Mat] = { - val topLevel = runnableGraph.module - val factory = new ModuleGraphFactory(topLevel) - val (graph, mat) = factory.create() - new ModuleGraph(graph, mat) - } - - /** - * - * @param from outport of upstream module - * @param to inport of downstream module - */ - case class Edge(from: OutPort, to: InPort) - - private class ModuleGraphFactory(val topLevel: StreamLayout.Module) { - - private var subscribersStack: List[mutable.Map[InPort, (InPort, Module)]] = - mutable.Map.empty[InPort, (InPort, Module)].withDefaultValue(null) :: Nil - private var publishersStack: List[mutable.Map[OutPort, (OutPort, Module)]] = - mutable.Map.empty[OutPort, (OutPort, Module)].withDefaultValue(null) :: Nil - - /* - * Please note that this stack keeps track of the scoped modules wrapped in CopiedModule but not the CopiedModule - * itself. The reason is that the CopiedModule itself is only needed for the enterScope and exitScope methods but - * not elsewhere. For this reason they are just simply passed as parameters to those methods. - * - * The reason why the encapsulated (copied) modules are stored as mutable state to save subclasses of this class - * from passing the current scope around or even knowing about it. - */ - private var moduleStack: List[Module] = topLevel :: Nil - - private def subscribers: mutable.Map[InPort, (InPort, Module)] = subscribersStack.head - private def publishers: mutable.Map[OutPort, (OutPort, Module)] = publishersStack.head - private def currentLayout: Module = moduleStack.head - - private val graph = Graph.empty[Module, Edge] - - private def copyAtomicModule[T <: Module](module: T, parentAttributes: Attributes): T = { - val currentAttributes = mergeAttributes(parentAttributes, module.attributes) - module.withAttributes(currentAttributes).asInstanceOf[T] - } - - private def materializeAtomic(atomic: Module, parentAttributes: Attributes): MaterializedValueNode = { - val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets) - val copied = copyAtomicModule(atomic, parentAttributes) - - for ((in, id) <- inputs.zipWithIndex) { - val inPort = inPortMapping(atomic, copied)(in) - assignPort(in, (inPort, copied)) - } - - for ((out, id) <- outputs.zipWithIndex) { - val outPort = outPortMapping(atomic, copied)(out) - assignPort(out, (outPort, copied)) - } - - graph.addVertex(copied) - Atomic(copied) - } - - def create(): (util.Graph[Module, Edge], MaterializedValueNode) = { - val mat = materializeModule(topLevel, Attributes.none) - (graph, mat) - } - - private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] = { - from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap - } - - private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = { - from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap - } - - private def materializeModule(module: Module, parentAttributes: Attributes): MaterializedValueNode = { - - val materializedValues = collection.mutable.HashMap.empty[Module, MaterializedValueNode] - val currentAttributes = mergeAttributes(parentAttributes, module.attributes) - - var materializedValueSources = List.empty[MaterializedValueSource[_]] - - for (submodule <- module.subModules) { - submodule match { - case mv: MaterializedValueSource[_] => - materializedValueSources :+= mv - case atomic if atomic.isAtomic => - materializedValues.put(atomic, materializeAtomic(atomic, currentAttributes)) - case copied: CopiedModule => - enterScope(copied) - materializedValues.put(copied, materializeModule(copied, currentAttributes)) - exitScope(copied) - case composite => - materializedValues.put(composite, materializeComposite(composite, currentAttributes)) - } - } - - val mat = resolveMaterialized(module.materializedValueComputation, materializedValues) - - materializedValueSources.foreach { module => - val matAttribute = new MaterializedValueSourceAttribute(mat) - val copied = copyAtomicModule(module, parentAttributes and Attributes(matAttribute)) - assignPort(module.shape.outlet, (copied.shape.outlet, copied)) - graph.addVertex(copied) - materializedValues.put(copied, Atomic(copied)) - } - mat - } - - private def materializeComposite(composite: Module, effectiveAttributes: Attributes): MaterializedValueNode = { - materializeModule(composite, effectiveAttributes) - } - - private def mergeAttributes(parent: Attributes, current: Attributes): Attributes = { - parent and current - } - - private def resolveMaterialized(matNode: MaterializedValueNode, materializedValues: collection.Map[Module, MaterializedValueNode]): MaterializedValueNode = matNode match { - case Atomic(m) => materializedValues(m) - case Combine(f, d1, d2) => Combine(f, resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues)) - case Transform(f, d) => Transform(f, resolveMaterialized(d, materializedValues)) - case Ignore => Ignore - } - - final protected def assignPort(in: InPort, subscriber: (InPort, Module)): Unit = { - addVertex(subscriber._2) - subscribers(in) = subscriber - // Interface (unconnected) ports of the current scope will be wired when exiting the scope - if (!currentLayout.inPorts(in)) { - val out = currentLayout.upstreams(in) - val publisher = publishers(out) - if (publisher ne null) addEdge(publisher, subscriber) - } - } - - final protected def assignPort(out: OutPort, publisher: (OutPort, Module)): Unit = { - addVertex(publisher._2) - publishers(out) = publisher - // Interface (unconnected) ports of the current scope will be wired when exiting the scope - if (!currentLayout.outPorts(out)) { - val in = currentLayout.downstreams(out) - val subscriber = subscribers(in) - if (subscriber ne null) addEdge(publisher, subscriber) - } - } - - // Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies - // of the same module. - // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter - private def enterScope(enclosing: CopiedModule): Unit = { - subscribersStack ::= mutable.Map.empty.withDefaultValue(null) - publishersStack ::= mutable.Map.empty.withDefaultValue(null) - moduleStack ::= enclosing.copyOf - } - - // Exits the scope of the copied module and propagates Publishers/Subscribers to the enclosing scope assigning - // them to the copied ports instead of the original ones (since there might be multiple copies of the same module - // leading to port identity collisions) - // We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter - private def exitScope(enclosing: CopiedModule): Unit = { - val scopeSubscribers = subscribers - val scopePublishers = publishers - subscribersStack = subscribersStack.tail - publishersStack = publishersStack.tail - moduleStack = moduleStack.tail - - // When we exit the scope of a copied module, pick up the Subscribers/Publishers belonging to exposed ports of - // the original module and assign them to the copy ports in the outer scope that we will return to - enclosing.copyOf.shape.inlets.iterator.zip(enclosing.shape.inlets.iterator).foreach { - case (original, exposed) => assignPort(exposed, scopeSubscribers(original)) - } - - enclosing.copyOf.shape.outlets.iterator.zip(enclosing.shape.outlets.iterator).foreach { - case (original, exposed) => assignPort(exposed, scopePublishers(original)) - } - } - - private def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): Unit = { - graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), subscriber._2) - } - - private def addVertex(module: Module): Unit = { - graph.addVertex(module) - } - } - - final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala deleted file mode 100644 index 50c4450..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearAttributes.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump - -import akka.stream.Attributes -import akka.stream.Attributes.Attribute - -object GearAttributes { - - /** - * Define how many parallel instance we want to use to run this module - * @param count - * @return - */ - def count(count: Int): Attributes = Attributes(ParallismAttribute(count)) - - /** - * Define we want to render this module locally. - * @return - */ - def local: Attributes = Attributes(LocationAttribute(Local)) - - /** - * Define we want to render this module remotely - * @return - */ - def remote: Attributes = Attributes(LocationAttribute(Remote)) - - /** - * Get the effective location settings if child override the parent - * setttings. - * - * @param attrs - * @return - */ - def location(attrs: Attributes): Location = { - attrs.attributeList.foldLeft(Local: Location) { (s, attr) => - attr match { - case LocationAttribute(location) => location - case other => s - } - } - } - - /** - * get effective parallelism settings if child override parent. - * @param attrs - * @return - */ - def count(attrs: Attributes): Int = { - attrs.attributeList.foldLeft(1) { (s, attr) => - attr match { - case ParallismAttribute(count) => count - case other => s - } - } - } - - /** - * Where we want to render the module - */ - sealed trait Location - object Local extends Location - object Remote extends Location - - final case class LocationAttribute(tag: Location) extends Attribute - - /** - * How many parallel instance we want to use for this module. - * - * @param parallelism - */ - final case class ParallismAttribute(parallelism: Int) extends Attribute -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala deleted file mode 100644 index a11d7cb..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/GearpumpMaterializer.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump - -import akka.actor.ActorSystem -import akka.stream._ -import akka.stream.gearpump.graph.GraphCutter.Strategy -import akka.stream.gearpump.graph.LocalGraph.LocalGraphMaterializer -import akka.stream.gearpump.graph.RemoteGraph.RemoteGraphMaterializer -import akka.stream.gearpump.graph.{GraphCutter, LocalGraph, RemoteGraph, SubGraphMaterializer} -import akka.stream.impl.StreamLayout.Module - -/** - * - * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump - * streaming application. If some module cannot be rendered remotely in Gearpump - * Cluster, then it uses local Actor materializer as fallback to materialize - * the module locally. - * - * User can custom a [[Strategy]] to determinie which module should be rendered - * remotely, and which module should be rendered locally. - * - * @see [[GraphCutter]] to find out how we cut the [[ModuleGraph]] to two parts, - * and materialize them seperately. - * - * @param system - * @param strategy - * @param useLocalCluster whether to use built-in in-process local cluster - */ -class GearpumpMaterializer(system: ActorSystem, strategy: Strategy = GraphCutter.AllRemoteStrategy, - useLocalCluster: Boolean = true) - extends BaseMaterializer { - - private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map( - classOf[LocalGraph] -> new LocalGraphMaterializer(system), - classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, system) - ) - - override def materialize[Mat](graph: ModuleGraph[Mat]): Mat = { - val subGraphs = new GraphCutter(strategy).cut(graph) - val matValues = subGraphs.foldLeft(Map.empty[Module, Any]) { (map, subGraph) => - val materializer = subMaterializers(subGraph.getClass) - map ++ materializer.materialize(subGraph, map) - } - graph.resolve(matValues) - } - - override def shutdown(): Unit = { - subMaterializers.values.foreach(_.shutdown()) - } -} - -object GearpumpMaterializer { - def apply(system: ActorSystem): GearpumpMaterializer = new GearpumpMaterializer(system) -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala deleted file mode 100644 index 7808b52..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.graph.GraphCutter -import akka.stream.scaladsl.{Sink, Source} - -/** - * This tests how the [[GearpumpMaterializer]] materializes different partials of Graph - * to different runtime. - * - * In this test, source module and sink module are materialized locally, - * Other transformation module are materialized remotely in Gearpump - * streaming Application. - * - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - * - * - */ -object Test { - - def main(args: Array[String]): Unit = { - - println("running Test...") - - implicit val system = ActorSystem("akka-test") - implicit val materializer = new GearpumpMaterializer(system, GraphCutter.AllRemoteStrategy) - - val echo = system.actorOf(Props(new Echo())) - val sink = Sink.actorRef(echo, "COMPLETE") - val source = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", - "blue sky")) - source.filter(_.startsWith("red")).fold("Items:") { (a, b) => - a + "|" + b - }.map("I want to order item: " + _).runWith(sink) - - Await.result(system.whenTerminated, Duration.Inf) - } - - class Echo extends Actor { - def receive: Receive = { - case any: AnyRef => - // scalastyle:off println - println("Confirm received: " + any) - // scalastyle:on println - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala deleted file mode 100644 index 2426f5f..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test2.scala +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.stream.ActorMaterializer -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.scaladsl.{GearSink, GearSource} -import akka.stream.scaladsl.{Flow, Sink, Source} - -/** - * - * This tests how different Materializers can be used together in an explicit way. - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - * - */ -object Test2 { - - def main(args: Array[String]): Unit = { - - println("running Test2...") - implicit val system = ActorSystem("akka-test") - val materializer = new GearpumpMaterializer(system) - - val echo = system.actorOf(Props(new Echo())) - val source = GearSource.bridge[String, String] - val sink = GearSink.bridge[String, String] - - val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _) - val (entry, exit) = flow.runWith(source, sink)(materializer) - - val actorMaterializer = ActorMaterializer() - - val externalSource = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) - val externalSink = Sink.actorRef(echo, "COMPLETE") - - val graph = FlowGraph.closed() { implicit b => - externalSource ~> Sink(entry) - Source(exit) ~> externalSink - } - graph.run()(actorMaterializer) - - Await.result(system.whenTerminated, Duration.Inf) - } - - class Echo extends Actor { - def receive: Receive = { - case any: AnyRef => - println("Confirm received: " + any) - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala deleted file mode 100644 index 976b1e6..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test3.scala +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.scaladsl.GearSource -import akka.stream.scaladsl.Sink - -import org.apache.gearpump.streaming.dsl.CollectionDataSource - -/** - * read from remote and write to local - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - */ -object Test3 { - - def main(args: Array[String]): Unit = { - - println("running Test...") - - implicit val system = ActorSystem("akka-test") - implicit val materializer = new GearpumpMaterializer(system) - - val echo = system.actorOf(Props(new Echo())) - val sink = Sink.actorRef(echo, "COMPLETE") - val sourceData = new CollectionDataSource(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) - val source = GearSource.from[String](sourceData) - source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink) - - Await.result(system.whenTerminated, Duration.Inf) - } - - class Echo extends Actor { - def receive: Receive = { - case any: AnyRef => - println("Confirm received: " + any) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala deleted file mode 100644 index 7b80b7b..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test4.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.ActorSystem -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.scaladsl.GearSink -import akka.stream.scaladsl.Source - -import org.apache.gearpump.streaming.dsl.LoggerSink - -/** - * read from local and write to remote - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala.11/akkastream-2.11.5-0.6.2-SNAPSHOT-assembly.jar - */ -object Test4 { - - def main(args: Array[String]): Unit = { - - println("running Test...") - - implicit val system = ActorSystem("akka-test") - implicit val materializer = new GearpumpMaterializer(system) - - val sink = GearSink.to(new LoggerSink[String]) - val source = Source(List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) - source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink) - - Await.result(system.whenTerminated, Duration.Inf) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala deleted file mode 100644 index 052c018..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test5.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.graph.GraphCutter -import akka.stream.scaladsl.{Sink, Source, Unzip} - -/** -test fanout - */ -object Test5 { - - def main(args: Array[String]): Unit = { - - println("running Test...") - - implicit val system = ActorSystem("akka-test") - implicit val materializer = new GearpumpMaterializer(system, GraphCutter.AllRemoteStrategy) - - val echo = system.actorOf(Props(new Echo())) - val sink = Sink.actorRef(echo, "COMPLETE") - - val source = Source(List(("male", "24"), ("female", "23"))) - - val graph = FlowGraph.closed() { implicit b => - val unzip = b.add(Unzip[String, String]()) - - val sink1 = Sink.actorRef(echo, "COMPLETE") - val sink2 = Sink.actorRef(echo, "COMPLETE") - - source ~> unzip.in - unzip.out0 ~> sink1 - unzip.out1 ~> sink1 - } - - graph.run() - - Await.result(system.whenTerminated, Duration.Inf) - } - - class Echo extends Actor { - def receive: Receive = { - case any: AnyRef => - println("Confirm received: " + any) - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala deleted file mode 100644 index 0fccd30..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/Test6.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import scala.concurrent.Await -import scala.concurrent.duration.Duration - -import akka.actor.{Actor, ActorSystem, Props} -import akka.stream.gearpump.GearpumpMaterializer -import akka.stream.gearpump.scaladsl.GearSource -import akka.stream.scaladsl.Sink - -import org.apache.gearpump.streaming.dsl.CollectionDataSource - -/** - * WordCount example - * Test GroupBy - */ - -import akka.stream.gearpump.scaladsl.Implicits._ - -object Test6 { - - def main(args: Array[String]): Unit = { - - println("running Test...") - - implicit val system = ActorSystem("akka-test") - implicit val materializer = new GearpumpMaterializer(system) - - val echo = system.actorOf(Props(new Echo())) - val sink = Sink.actorRef(echo, "COMPLETE") - val sourceData = new CollectionDataSource(List("this is a good start", "this is a good time", "time to start", "congratulations", "green plant", "blue sky")) - val source = GearSource.from[String](sourceData) - source.mapConcat { line => - line.split(" ").toList - }.groupBy2(x => x).map(word => (word, 1)) - .reduce({ (a, b) => - (a._1, a._2 + b._2) - }).log("word-count").runWith(sink) - - Await.result(system.whenTerminated, Duration.Inf) - } - - class Echo extends Actor { - def receive: Receive = { - case any: AnyRef => - println("Confirm received: " + any) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala deleted file mode 100644 index 56b89bc..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.example - -import java.io.{File, FileInputStream} -import java.util.zip.GZIPInputStream -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, ExecutionContext, Future} -import scala.util.{Failure, Success, Try} - -import akka.actor.ActorSystem -import akka.stream.gearpump.graph.GraphCutter -import akka.stream.gearpump.{GearAttributes, GearpumpMaterializer} -import akka.stream.scaladsl._ -import akka.util.ByteString -import org.json4s.JsonAST.JString - -import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import org.apache.gearpump.util.AkkaApp - -/** - * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/ - * which showcases running Akka Streams DSL across JVMs on Gearpump - * - * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar - * -input wikidata-${DATE}-all.json.gz -languages en,de - * - * (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/) - * - */ -object WikipediaApp extends ArgumentsParser with AkkaApp { - - case class WikidataElement(id: String, sites: Map[String, String]) - - override val options: Array[(String, CLIOption[Any])] = Array( - "input" -> CLIOption[String]("<Wikidata JSON dump>", required = true), - "languages" -> CLIOption[String]("<languages to take into account>", required = true) - ) - - override def main(akkaConf: Config, args: Array[String]): Unit = { - val parsed = parse(args) - val input = new File(parsed.getString("input")) - val langs = parsed.getString("languages").split(",") - - implicit val system = ActorSystem("wikidata-poc", akkaConf) - implicit val materializer = new GearpumpMaterializer(system, GraphCutter.TagAttributeStrategy, useLocalCluster = false) - import system.dispatcher - - val elements = source(input).via(parseJson(langs)) - - val g = FlowGraph.closed(count) { implicit b => - sinkCount => { - - val broadcast = b.add(Broadcast[WikidataElement](2)) - elements ~> broadcast ~> logEveryNSink(1000) - broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount - } - } - - g.run().onComplete { x => - x match { - case Success((t, f)) => printResults(t, f) - case Failure(tr) => println("Something went wrong") - } - system.terminate() - } - Await.result(system.whenTerminated, Duration.Inf) - } - - def source(file: File): Source[String, Future[Long]] = { - val compressed = new GZIPInputStream(new FileInputStream(file), 65536) - InputStreamSource(() => compressed) - .via(Framing.delimiter(ByteString("\n"), Int.MaxValue)) - .map(x => x.decodeString("utf-8")) - } - - def parseJson(langs: Seq[String])(implicit ec: ExecutionContext): Flow[String, WikidataElement, Unit] = - Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect { - case Some(v) => v - } - - def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = { - import org.json4s.jackson.JsonMethods - Try(JsonMethods.parse(line)).toOption.flatMap { json => - json \ "id" match { - case JString(itemId) => - val sites = for { - lang <- langs - JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title" - } yield lang -> title - - if (sites.isEmpty) None - else Some(WikidataElement(id = itemId, sites = sites.toMap)) - - case _ => None - } - } - } - - def logEveryNSink[T](n: Int) = Sink.fold(0) { (x, y: T) => - if (x % n == 0) - println(s"Processing element $x: $y") - x + 1 - } - - def checkSameTitles(langs: Set[String]): Flow[WikidataElement, Boolean, Unit] = Flow[WikidataElement] - .filter(_.sites.keySet == langs) - .map { x => - val titles = x.sites.values - titles.forall(_ == titles.head) - }.withAttributes(GearAttributes.remote) - - def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) { - case ((t, f), true) => (t + 1, f) - case ((t, f), false) => (t, f + 1) - } - - def printResults(t: Int, f: Int) = { - val message = - s""" - | Number of items with the same title: $t - | Number of items with the different title: $f - | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)} - """.stripMargin - println(message) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala deleted file mode 100644 index 19083f6..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/GraphCutter.scala +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.graph - -import akka.stream.ModuleGraph -import akka.stream.ModuleGraph.Edge -import akka.stream.gearpump.GearAttributes -import akka.stream.gearpump.GearAttributes.{Local, Location, Remote} -import akka.stream.gearpump.graph.GraphCutter.Strategy -import akka.stream.gearpump.module.{BridgeModule, DummyModule, GearpumpTaskModule, GroupByModule, SinkBridgeModule, SourceBridgeModule} -import akka.stream.impl.Stages.DirectProcessor -import akka.stream.impl.StreamLayout.{MaterializedValueNode, Module} -import akka.stream.impl.{SinkModule, SourceModule} - -import org.apache.gearpump.util.Graph - -/** - * - * GraphCutter is used to decide which part is rendered locally - * and which part should be rendered remotely. - * - * We will cut the graph based on the [[Strategy]] provided. - * - * For example, for the following graph, we can cut the graph to - * two parts, each part will be a Sub Graph. The top SubGraph - * can be materialized remotely. The bottom part can be materialized - * locally. - * - * AtomicModule2 -> AtomicModule4 - * /| \ - * / \ - * -----------cut line -------------cut line ---------- - * / \ - * / \| - * AtomicModule1 AtomicModule5 - * \ /| - * \ / - * \| / - * AtomicModule3 - * - * - * @see [[ModuleGraph]] for more information of how Graph is organized. - * - */ -class GraphCutter(strategy: Strategy) { - def cut(moduleGraph: ModuleGraph[_]): List[SubGraph] = { - val graph = removeDummyModule(moduleGraph.graph) - val tags = tag(graph, strategy) - doCut(graph, tags, moduleGraph.mat) - } - - private def doCut(graph: Graph[Module, Edge], tags: Map[Module, Location], - mat: MaterializedValueNode): List[SubGraph] = { - val local = Graph.empty[Module, Edge] - val remote = Graph.empty[Module, Edge] - - graph.vertices.foreach { module => - if (tags(module) == Local) { - local.addVertex(module) - } else { - remote.addVertex(module) - } - } - - graph.edges.foreach { nodeEdgeNode => - val (node1, edge, node2) = nodeEdgeNode - (tags(node1), tags(node2)) match { - case (Local, Local) => - local.addEdge(nodeEdgeNode) - case (Remote, Remote) => - remote.addEdge(nodeEdgeNode) - case (Local, Remote) => - node2 match { - case bridge: BridgeModule[_, _, _] => - local.addEdge(node1, edge, node2) - case _ => - // Creates a bridge module in between - val bridge = new SourceBridgeModule[AnyRef, AnyRef]() - val remoteEdge = Edge(bridge.outPort, edge.to) - remote.addEdge(bridge, remoteEdge, node2) - val localEdge = Edge(edge.from, bridge.inPort) - local.addEdge(node1, localEdge, bridge) - } - case (Remote, Local) => - node1 match { - case bridge: BridgeModule[_, _, _] => - local.addEdge(node1, edge, node2) - case _ => - // Creates a bridge module in between - val bridge = new SinkBridgeModule[AnyRef, AnyRef]() - val remoteEdge = Edge(edge.from, bridge.inPort) - remote.addEdge(node1, remoteEdge, bridge) - val localEdge = Edge(bridge.outPort, edge.to) - local.addEdge(bridge, localEdge, node2) - } - } - } - - List(new RemoteGraph(remote), new LocalGraph(local)) - } - - private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = { - graph.vertices.map { vertex => - vertex -> strategy.apply(vertex) - }.toMap - } - - private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = { - val graph = inputGraph.copy - val dummies = graph.vertices.filter { module => - module match { - case dummy: DummyModule => - true - case _ => - false - } - } - dummies.foreach(module => graph.removeVertex(module)) - graph - } -} - -object GraphCutter { - - type Strategy = PartialFunction[Module, Location] - - val BaseStrategy: Strategy = { - case source: BridgeModule[_, _, _] => - Remote - case task: GearpumpTaskModule => - Remote - case groupBy: GroupByModule[_, _] => - // TODO: groupBy is not supported by local materializer yet - Remote - case source: SourceModule[_, _] => - Local - case sink: SinkModule[_, _] => - Local - case matValueSource: MaterializedValueSource[_] => - Local - case direct: DirectProcessor => - Local - case time: TimerTransform => - // Renders to local as it requires a timer. - Local - } - - val AllRemoteStrategy: Strategy = BaseStrategy orElse { - case _ => - Remote - } - - /** - * Will decide whether to render a module locally or remotely - * based on Attribute settings. - * - */ - val TagAttributeStrategy: Strategy = BaseStrategy orElse { - case module => - GearAttributes.location(module.attributes) - } - - val AllLocalStrategy: Strategy = BaseStrategy orElse { - case _ => - Local - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala deleted file mode 100644 index 6ef8598..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/LocalGraph.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.graph - -import akka.actor.ActorSystem -import akka.stream.ModuleGraph.Edge -import akka.stream.gearpump.materializer.LocalMaterializer -import akka.stream.gearpump.module.{SinkBridgeModule, SourceBridgeModule} -import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.{PublisherSource, SubscriberSink} -import akka.stream.{Outlet, SinkShape, SourceShape} -import org.reactivestreams.{Publisher, Subscriber} - -import org.apache.gearpump.util.Graph - -/** - * - * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only - * contain module that can be materialized in local JVM. - * - * @param graph - */ -class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph - -object LocalGraph { - - /** - * materialize LocalGraph in local JVM - * @param system - */ - class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer { - - // Creates a local materializer - val materializer = LocalMaterializer()(system) - - /** - * - * @param matValues Materialized Values for each module before materialization - * @return Materialized Values for each Module after the materialization. - */ - override def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, Any] = { - val newGraph: Graph[Module, Edge] = graph.graph.mapVertex { module => - module match { - case source: SourceBridgeModule[AnyRef, AnyRef] => - val subscriber = matValues(source).asInstanceOf[Subscriber[AnyRef]] - val shape = SinkShape(source.inPort) - new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape) - case sink: SinkBridgeModule[AnyRef, AnyRef] => - val publisher = matValues(sink).asInstanceOf[Publisher[AnyRef]] - val shape = SourceShape(sink.outPort.asInstanceOf[Outlet[AnyRef]]) - new PublisherSource(publisher, DefaultAttributes.publisherSource, shape) - case other => - other - } - } - materializer.materialize(newGraph, matValues) - } - - override def shutdown(): Unit = { - materializer.shutdown() - } - } -} - http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala deleted file mode 100644 index 3cea78a..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/RemoteGraph.scala +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.graph - -import akka.actor.ActorSystem -import akka.stream.ModuleGraph.Edge -import akka.stream.gearpump.materializer.RemoteMaterializerImpl -import akka.stream.gearpump.module.{SinkBridgeModule, SourceBridgeModule} -import akka.stream.gearpump.task.SinkBridgeTask.SinkBridgeTaskClient -import akka.stream.gearpump.task.SourceBridgeTask.SourceBridgeTaskClient -import akka.stream.impl.StreamLayout.Module - -import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.embedded.EmbeddedCluster -import org.apache.gearpump.streaming.ProcessorId -import org.apache.gearpump.util.Graph - -/** - * - * [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only - * contain modules that can be materialized in remote Gearpump cluster. - * - * @param graph - */ -class RemoteGraph(override val graph: Graph[Module, Edge]) extends SubGraph - -object RemoteGraph { - - /** - * * materialize LocalGraph in remote gearpump cluster - * @param useInProcessCluster - * @param system - */ - class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem) extends SubGraphMaterializer { - private val local = if (useInProcessCluster) { - val cluster = EmbeddedCluster() - cluster.start() - Some(cluster) - } else { - None - } - - private val context: ClientContext = local match { - case Some(local) => local.newClientContext - case None => ClientContext(system) - } - - override def materialize(subGraph: SubGraph, inputMatValues: Map[Module, Any]): Map[Module, Any] = { - val graph = subGraph.graph - - if (graph.isEmpty) { - inputMatValues - } else { - doMaterialize(graph: Graph[Module, Edge], inputMatValues) - } - } - - private def doMaterialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] = { - val materializer = new RemoteMaterializerImpl(graph, system) - val (app, matValues) = materializer.materialize - - val appId = context.submit(app) - println("sleep 5 second until the applicaiton is ready on cluster") - Thread.sleep(5000) - - def resolve(matValues: Map[Module, ProcessorId]): Map[Module, Any] = { - matValues.toList.flatMap { kv => - val (module, processorId) = kv - module match { - case source: SourceBridgeModule[AnyRef, AnyRef] => - val bridge = new SourceBridgeTaskClient[AnyRef](system.dispatcher, context, appId, processorId) - Some((module, bridge)) - case sink: SinkBridgeModule[AnyRef, AnyRef] => - val bridge = new SinkBridgeTaskClient(system, context, appId, processorId) - Some((module, bridge)) - case other => - None - } - }.toMap - } - - inputMatValues ++ resolve(matValues) - } - - override def shutdown(): Unit = { - context.close() - local.map(_.stop()) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala deleted file mode 100644 index 564b6c7..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/graph/SubGraph.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.graph - -import akka.stream.ModuleGraph.Edge -import akka.stream.impl.StreamLayout.Module - -import org.apache.gearpump.util.Graph - -/** - * [[SubGraph]] is a partial part of [[akka.stream.ModuleGraph]] - * - * The idea is that by dividing [[akka.stream.ModuleGraph]] to several - * [[SubGraph]], we can materialize each [[SubGraph]] with different - * materializer. - */ - -trait SubGraph { - - /** - * the [[Graph]] representation of this SubGraph - * @return - */ - def graph: Graph[Module, Edge] -} - -/** - * Materializer for Sub-Graph type - */ -trait SubGraphMaterializer { - /** - * - * @param matValues Materialized Values for each module before materialization - * @return Materialized Values for each Module after the materialization. - */ - - def materialize(graph: SubGraph, matValues: Map[Module, Any]): Map[Module, Any] - - def shutdown(): Unit -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala deleted file mode 100644 index a5c6e48..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializer.scala +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.materializer - -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import scala.concurrent.{Await, ExecutionContextExecutor} - -import akka.actor.{ActorCell, ActorRef, ActorSystem, Deploy, LocalActorRef, PoisonPill, Props, RepointableActorRef} -import akka.dispatch.Dispatchers -import akka.pattern.ask -import akka.stream.ModuleGraph.Edge -import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.StreamSupervisor -import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Attributes, ClosedShape, Graph => AkkaGraph, MaterializationContext, ModuleGraph} - -import org.apache.gearpump.util.Graph - -/** - * [[LocalMaterializer]] will use local actor to materialize the graph - * Use LocalMaterializer.apply to construct the LocalMaterializer. - * - * It is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]] - * - * - * @param system - * @param settings - * @param dispatchers - * @param supervisor - * @param haveShutDown - * @param flowNameCounter - * @param namePrefix - * @param optimizations - */ -abstract class LocalMaterializer( - val system: ActorSystem, - override val settings: ActorMaterializerSettings, - dispatchers: Dispatchers, - val supervisor: ActorRef, - val haveShutDown: AtomicBoolean, - flowNameCounter: AtomicLong, - namePrefix: String, - optimizations: Optimizations) extends ActorMaterializer { - - override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = { - import ActorAttributes._ - import Attributes._ - opAttr.attributeList.foldLeft(settings) { (s, attr) => - attr match { - case InputBuffer(initial, max) => s.withInputBuffer(initial, max) - case Dispatcher(dispatcher) => s.withDispatcher(dispatcher) - case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider) - case l: LogLevels => s - case Name(_) => s - case other => s - } - } - } - - override def shutdown(): Unit = - if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill - - override def isShutdown: Boolean = haveShutDown.get() - - private[akka] def actorOf(props: Props, name: String, dispatcher: String): ActorRef = { - supervisor match { - case ref: LocalActorRef => - ref.underlying.attachChild(props.withDispatcher(dispatcher), name, systemService = false) - case ref: RepointableActorRef => - if (ref.isStarted) { - ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher), - name, systemService = false) - } else { - implicit val timeout = ref.system.settings.CreationTimeout - val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher), - name)).mapTo[ActorRef] - Await.result(f, timeout.duration) - } - case unknown => - throw new IllegalStateException( - s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") - } - } - - override lazy val executionContext: ExecutionContextExecutor = - dispatchers.lookup(settings.dispatcher match { - case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId - case other => other - }) - - def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] - - override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): Mat = { - val graph = ModuleGraph(runnableGraph) - val matValues = materialize(graph.graph, Map.empty[Module, Any]) - graph.resolve(matValues) - } - - override def actorOf(context: MaterializationContext, props: Props): ActorRef = { - val dispatcher = - if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) { - effectiveSettings(context.effectiveAttributes).dispatcher - } else { - props.dispatcher - } - actorOf(props, context.stageName, dispatcher) - } -} - -object LocalMaterializer { - - def apply(materializerSettings: Option[ActorMaterializerSettings] = None, - namePrefix: Option[String] = None, - optimizations: Optimizations = Optimizations.none)(implicit system: ActorSystem) - : LocalMaterializerImpl = { - - val settings = materializerSettings getOrElse ActorMaterializerSettings(system) - apply(settings, namePrefix.getOrElse("flow"), optimizations)(system) - } - - def apply(materializerSettings: ActorMaterializerSettings, - namePrefix: String, optimizations: Optimizations)(implicit system: ActorSystem) - : LocalMaterializerImpl = { - val haveShutDown = new AtomicBoolean(false) - - new LocalMaterializerImpl( - system, - materializerSettings, - system.dispatchers, - system.actorOf(StreamSupervisor.props(materializerSettings, - haveShutDown).withDispatcher(materializerSettings.dispatcher)), - haveShutDown, - FlowNameCounter(system).counter, - namePrefix, - optimizations) - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala deleted file mode 100644 index 1ec724e..0000000 --- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala +++ /dev/null @@ -1,284 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package akka.stream.gearpump.materializer - -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} - -import akka.actor.{ActorRef, ActorSystem} -import akka.dispatch.Dispatchers -import akka.stream.ModuleGraph.{Edge, MaterializedValueSourceAttribute} -import akka.stream.actor.ActorSubscriber -import akka.stream.gearpump.materializer.LocalMaterializerImpl.MaterializedModule -import akka.stream.gearpump.module.ReduceModule -import akka.stream.gearpump.util.MaterializedValueOps -import akka.stream.impl.Stages.{DirectProcessor, Fold, StageModule} -import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, ExposedPublisher, FanIn, FanOut, SinkModule, SourceModule, VirtualProcessor} -import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, InPort, MaterializationContext, Materializer, OutPort, Shape} -import org.reactivestreams.{Processor, Publisher, Subscriber} - -import org.apache.gearpump.util.Graph - -/** - * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]] - * - * @param system - * @param settings - * @param dispatchers - * @param supervisor - * @param haveShutDown - * @param flowNameCounter - * @param namePrefix - * @param optimizations - */ -class LocalMaterializerImpl ( - system: ActorSystem, - settings: ActorMaterializerSettings, - dispatchers: Dispatchers, - supervisor: ActorRef, - haveShutDown: AtomicBoolean, - flowNameCounter: AtomicLong, - namePrefix: String, - optimizations: Optimizations) - extends LocalMaterializer( - system, settings, dispatchers, supervisor, - haveShutDown, flowNameCounter, namePrefix, optimizations) { - - override def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] = { - val materializedGraph = graph.mapVertex { module => - materializeAtomic(module) - } - - materializedGraph.edges.foreach { nodeEdgeNode => - val (node1, edge, node2) = nodeEdgeNode - val from = edge.from - val to = edge.to - val publisher = node1.outputs(from).asInstanceOf[Publisher[Any]] - val subscriber = node2.inputs(to).asInstanceOf[Subscriber[Any]] - publisher.subscribe(subscriber) - } - - val matValues = inputMatValues ++ materializedGraph.vertices.map { vertex => - (vertex.module, vertex.matValue) - }.toMap - - val matValueSources = materializedGraph.vertices.filter(_.module.isInstanceOf[MaterializedValueSource[_]]) - publishToMaterializedValueSource(matValueSources, matValues) - - matValues - } - - private def publishToMaterializedValueSource(modules: List[MaterializedModule], matValues: Map[Module, Any]) = { - modules.foreach { module => - val source = module.module.asInstanceOf[MaterializedValueSource[_]] - val attr = source.attributes.getAttribute(classOf[MaterializedValueSourceAttribute], null) - - Option(attr).map { attr => - val valueToPublish = MaterializedValueOps(attr.mat).resolve[Any](matValues) - module.outputs.foreach { portAndPublisher => - val (port, publisher) = portAndPublisher - publisher match { - case valuePublisher: MaterializedValuePublisher => - valuePublisher.setValue(valueToPublish) - } - } - } - } - } - - private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() - - private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" - - val flowName = createFlowName() - var nextId = 0 - - def stageName(attr: Attributes): String = { - val name = s"$flowName-$nextId-${attr.nameOrDefault()}" - nextId += 1 - name - } - - private def materializeAtomic(atomic: Module): MaterializedModule = { - val effectiveAttributes = atomic.attributes - - def newMaterializationContext() = - new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes)) - - atomic match { - case matValue: MaterializedValueSource[_] => - val pub = new MaterializedValuePublisher - val outputs = Map[OutPort, Publisher[_]](matValue.shape.outlet -> pub) - MaterializedModule(matValue, (), outputs = outputs) - case sink: SinkModule[_, _] => - val (sub, mat) = sink.create(newMaterializationContext()) - val inputs = Map[InPort, Subscriber[_]](sink.shape.inlet -> sub) - MaterializedModule(sink, mat, inputs) - case source: SourceModule[_, _] => - val (pub, mat) = source.create(newMaterializationContext()) - val outputs = Map[OutPort, Publisher[_]](source.shape.outlet -> pub) - MaterializedModule(source, mat, outputs = outputs) - - case reduce: ReduceModule[Any] => - //TODO: remove this after the official akka-stream API support the Reduce Module - val stage = LocalMaterializerImpl.toFoldModule(reduce) - val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes)) - val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor) - val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor) - MaterializedModule(stage, mat, inputs, outputs) - - case stage: StageModule => - val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes)) - val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor) - val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor) - MaterializedModule(stage, mat, inputs, outputs) - case tls: TlsModule => // TODO solve this so TlsModule doesn't need special treatment here - val es = effectiveSettings(effectiveAttributes) - val props = - SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing) - val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) - def factory(id: Int) = new ActorPublisher[Any](impl) { - override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) - } - val publishers = Vector.tabulate(2)(factory) - impl ! FanOut.ExposedPublishers(publishers) - - val inputs = Map[InPort, Subscriber[_]]( - tls.plainIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn), - tls.cipherIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.TransportIn)) - - val outputs = Map[OutPort, Publisher[_]]( - tls.plainOut -> publishers(SslTlsCipherActor.UserOut), - tls.cipherOut -> publishers(SslTlsCipherActor.TransportOut)) - MaterializedModule(tls, (), inputs, outputs) - - case junction: JunctionModule => - materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes)) - } - } - - private def processorFor(op: StageModule, - effectiveAttributes: Attributes, - effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) = op match { - case DirectProcessor(processorFactory, _) => processorFactory() - case Identity(attr) => (new VirtualProcessor, ()) - case _ => - val (opprops, mat) = ActorProcessorFactory.props(LocalMaterializerImpl.this, op, effectiveAttributes) - ActorProcessorFactory[Any, Any]( - actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) -> mat - } - - private def materializeJunction( - op: JunctionModule, - effectiveAttributes: Attributes, - effectiveSettings: ActorMaterializerSettings): MaterializedModule = { - op match { - case fanin: FanInModule => - val (props, inputs, output) = fanin match { - - case MergeModule(shape, _) => - (FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, shape.out) - - case f: FlexiMergeModule[_, Shape] => - val flexi = f.flexi(f.shape) - val shape: Shape = f.shape - (FlexiMerge.props(effectiveSettings, f.shape, flexi), shape.inlets, shape.outlets.head) - - case MergePreferredModule(shape, _) => - (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inSeq, shape.out) - - case ConcatModule(shape, _) => - require(shape.inSeq.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO - (Concat.props(effectiveSettings), shape.inSeq, shape.out) - - case zip: ZipWithModule => - (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head) - } - - val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) - val publisher = new ActorPublisher[Any](impl) - // Resolve cyclic dependency with actor. This MUST be the first message no matter what. - impl ! ExposedPublisher(publisher) - - val inputMapping: Map[InPort, Subscriber[_]] = inputs.zipWithIndex.map { pair => - val (in, id) = pair - (in, FanIn.SubInput[Any](impl, id)) - }.toMap - - val outMapping = Map(output -> publisher) - MaterializedModule(fanin, (), inputMapping, outMapping) - - case fanout: FanOutModule => - val (props, in, outs) = fanout match { - - case r: FlexiRouteModule[t, Shape] => - val flexi = r.flexi(r.shape) - val shape: Shape = r.shape - (FlexiRoute.props(effectiveSettings, r.shape, flexi), shape.inlets.head: InPort, r.shape.outlets) - - case BroadcastModule(shape, eagerCancel, _) => - (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq) - - case BalanceModule(shape, waitForDownstreams, _) => - (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) - - case unzip: UnzipWithModule => - (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets) - } - val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) - val size = outs.size - def factory(id: Int) = - new ActorPublisher[Any](impl) { - override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) - } - val publishers = - if (outs.size < 8) Vector.tabulate(size)(factory) - else List.tabulate(size)(factory) - - impl ! FanOut.ExposedPublishers(publishers) - val outputs: Map[OutPort, Publisher[_]] = publishers.iterator.zip(outs.iterator).map { case (pub, out) => - (out, pub) - }.toMap - - val inputs: Map[InPort, Subscriber[_]] = Map(in -> ActorSubscriber[Any](impl)) - MaterializedModule(fanout, (), inputs, outputs) - } - } - - override def withNamePrefix(name: String): Materializer = { - new LocalMaterializerImpl(system, settings, dispatchers, supervisor, - haveShutDown, flowNameCounter, namePrefix = name, optimizations) - } -} - -object LocalMaterializerImpl { - case class MaterializedModule(val module: Module, val matValue: Any, inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]], outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]]) - - def toFoldModule(reduce: ReduceModule[Any]): Fold = { - val f = reduce.f - val aggregator = { (zero: Any, input: Any) => - if (zero == null) { - input - } else { - f(zero, input) - } - } - new Fold(null, aggregator) - } -} \ No newline at end of file
