http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala new file mode 100644 index 0000000..75dc95a --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializer.scala @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream + +import java.util.concurrent.atomic.AtomicBoolean + +import akka.NotUsed +import akka.actor.{ActorContext, ActorRef, ActorRefFactory, ActorSystem, Cancellable, ExtendedActorSystem} +import akka.event.{Logging, LoggingAdapter} +import akka.stream.Attributes.Attribute +import akka.stream._ +import akka.stream.impl.Stages.SymbolicGraphStage +import akka.stream.impl.StreamLayout._ +import akka.stream.impl._ +import akka.stream.impl.fusing.{GraphInterpreterShell, GraphStageModule} +import akka.stream.scaladsl.ModuleExtractor +import akka.stream.stage.GraphStage +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy +import org.apache.gearpump.akkastream.graph.LocalGraph.LocalGraphMaterializer +import org.apache.gearpump.akkastream.graph.RemoteGraph.RemoteGraphMaterializer +import org.apache.gearpump.akkastream.graph._ +import org.apache.gearpump.akkastream.util.MaterializedValueOps + +import scala.collection.mutable +import scala.concurrent.{ExecutionContextExecutor, Promise} +import scala.concurrent.duration.FiniteDuration + +object GearpumpMaterializer { + + final val Debug = true + + final case class Edge(from: OutPort, to: InPort) + + final case class MaterializedValueSourceAttribute(mat: MaterializedValueNode) extends Attribute + + implicit def boolToAtomic(bool: Boolean): AtomicBoolean = new AtomicBoolean(bool) + + def apply(strategy: Strategy)(implicit context: ActorRefFactory): + ExtendedActorMaterializer = { + val system = actorSystemOf(context) + + apply(ActorMaterializerSettings( + system).withAutoFusing(false), strategy, useLocalCluster = false, "flow")(context) + } + + def apply(materializerSettings: Option[ActorMaterializerSettings] = None, + strategy: Strategy = GraphPartitioner.AllRemoteStrategy, + useLocalCluster: Boolean = true, + namePrefix: Option[String] = None)(implicit context: ActorRefFactory): + ExtendedActorMaterializer = { + val system = actorSystemOf(context) + + val settings = materializerSettings getOrElse + ActorMaterializerSettings(system).withAutoFusing(false) + apply(settings, strategy, useLocalCluster, namePrefix.getOrElse("flow"))(context) + } + + def apply(materializerSettings: ActorMaterializerSettings, + strategy: Strategy, + useLocalCluster: Boolean, + namePrefix: String)(implicit context: ActorRefFactory): + ExtendedActorMaterializer = { + val system = actorSystemOf(context) + + new GearpumpMaterializer( + system, + materializerSettings, + context.actorOf( + StreamSupervisor.props(materializerSettings, false).withDispatcher( + materializerSettings.dispatcher), StreamSupervisor.nextName())) + } + + + private def actorSystemOf(context: ActorRefFactory): ActorSystem = { + val system = context match { + case s: ExtendedActorSystem => s + case c: ActorContext => c.system + case null => throw new IllegalArgumentException("ActorRefFactory context must be defined") + case _ => + throw new IllegalArgumentException( + s""" + | context must be a ActorSystem or ActorContext, got [${context.getClass.getName}] + """.stripMargin + ) + } + system + } + +} + +/** + * + * [[GearpumpMaterializer]] allows you to render akka-stream DSL as a Gearpump + * streaming application. If some module cannot be rendered remotely in Gearpump + * Cluster, then it will use local Actor materializer as fallback to materialize + * the module locally. + * + * User can customize a [[org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy]] + * to determine which module should be rendered + * remotely, and which module should be rendered locally. + * + * @see [[org.apache.gearpump.akkastream.graph.GraphPartitioner]] + * to find out how we cut the runnableGraph to two parts, + * and materialize them separately. + * @param system ActorSystem + * @param strategy Strategy + * @param useLocalCluster whether to use built-in in-process local cluster + */ +class GearpumpMaterializer(override val system: ActorSystem, + override val settings: ActorMaterializerSettings, + override val supervisor: ActorRef, + strategy: Strategy = GraphPartitioner.AllRemoteStrategy, + useLocalCluster: Boolean = true, namePrefix: Option[String] = None) + extends ExtendedActorMaterializer { + + private val subMaterializers: Map[Class[_], SubGraphMaterializer] = Map( + classOf[LocalGraph] -> new LocalGraphMaterializer(system), + classOf[RemoteGraph] -> new RemoteGraphMaterializer(useLocalCluster, system) + ) + + override def logger: LoggingAdapter = Logging.getLogger(system, this) + + override def isShutdown: Boolean = system.isTerminated + + override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = { + import ActorAttributes._ + import Attributes._ + opAttr.attributeList.foldLeft(settings) { (s, attr) => + attr match { + case InputBuffer(initial, max) => s.withInputBuffer(initial, max) + case Dispatcher(dispatcher) => s.withDispatcher(dispatcher) + case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider) + case _ => s + } + } + } + + override def withNamePrefix(name: String): ExtendedActorMaterializer = + throw new UnsupportedOperationException() + + override implicit def executionContext: ExecutionContextExecutor = + throw new UnsupportedOperationException() + + override def schedulePeriodically(initialDelay: FiniteDuration, + interval: FiniteDuration, + task: Runnable): Cancellable = + system.scheduler.schedule(initialDelay, interval, task)(executionContext) + + override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = + system.scheduler.scheduleOnce(delay, task)(executionContext) + + override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat]): Mat = { + val initialAttributes = Attributes( + Attributes.InputBuffer( + settings.initialInputBufferSize, + settings.maxInputBufferSize + ) :: + ActorAttributes.Dispatcher(settings.dispatcher) :: + ActorAttributes.SupervisionStrategy(settings.supervisionDecider) :: + Nil) + + val info = Fusing.aggressive(runnableGraph).module.info + import _root_.org.apache.gearpump.util.{Graph => GGraph} + val graph = GGraph.empty[Module, Edge] + + info.allModules.foreach(module => { + if (module.isCopied) { + val original = module.asInstanceOf[CopiedModule].copyOf + graph.addVertex(original) + module.shape.outlets.zip(original.shape.outlets).foreach(out => { + val (cout, oout) = out + val cin = info.downstreams(cout) + val downStreamModule = info.inOwners(cin) + if(downStreamModule.isCopied) { + val downStreamOriginal = downStreamModule.asInstanceOf[CopiedModule].copyOf + downStreamModule.shape.inlets.zip(downStreamOriginal.shape.inlets).foreach(in => { + in._1 == cin match { + case true => + val oin = in._2 + graph.addEdge(original, Edge(oout, oin), downStreamOriginal) + case false => + } + }) + } + }) + } + }) + + if(Debug) { + val iterator = graph.topologicalOrderIterator + while (iterator.hasNext) { + val module = iterator.next() + // scalastyle:off println + module match { + case graphStageModule: GraphStageModule => + graphStageModule.stage match { + case symbolicGraphStage: SymbolicGraphStage[_, _, _] => + val symbolicName = symbolicGraphStage.symbolicStage.getClass.getSimpleName + println( + s"${module.getClass.getSimpleName}(${symbolicName})" + ) + case graphStage: GraphStage[_] => + val name = graphStage.getClass.getSimpleName + println( + s"${module.getClass.getSimpleName}(${name})" + ) + case other => + println( + s"${module.getClass.getSimpleName}(${other.getClass.getSimpleName})" + ) + } + case _ => + println(module.getClass.getSimpleName) + } + // scalastyle:on println + } + } + + val subGraphs = GraphPartitioner(strategy).partition(graph) + val matValues = subGraphs.foldLeft(mutable.Map.empty[Module, Any]) { (map, subGraph) => + val materializer = subMaterializers(subGraph.getClass) + map ++ materializer.materialize(subGraph, map) + } + val mat = matValues.flatMap(pair => { + val (module, any) = pair + any match { + case notUsed: NotUsed => + None + case others => + val rt = module.shape match { + case sink: SinkShape[_] => + Some(any) + case _ => + None + } + rt + } + }).toList + val matModule = subGraphs.last.graph.topologicalOrderIterator.toList.last + val mat2 = resolveMaterialized(matModule.materializedValueComputation, matValues) + val rt = Some(mat).flatMap(any => { + any match { + case promise: Promise[_] => + Some(promise.future) + case other => + Some(other) + } + }) + rt.getOrElse(null).asInstanceOf[Mat] + } + + override def materialize[Mat](runnableGraph: Graph[ClosedShape, Mat], + subflowFuser: GraphInterpreterShell => ActorRef): Mat = { + materialize(runnableGraph) + } + + def shutdown: Unit = { + subMaterializers.values.foreach(_.shutdown) + } + + private def resolveMaterialized(mat: MaterializedValueNode, + materializedValues: mutable.Map[Module, Any]): Any = mat match { + case Atomic(m) => + materializedValues.getOrElse(m, ()) + case Combine(f, d1, d2) => + f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues)) + case Transform(f, d) => + f(resolveMaterialized(d, materializedValues)) + case Ignore => + () + } +} +
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala new file mode 100644 index 0000000..871dcf8 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearpumpMaterializerSession.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream + +import java.{util => ju} + +import _root_.org.apache.gearpump.util.{Graph => GGraph} +import akka.actor.ActorSystem +import akka.stream._ +import org.apache.gearpump.akkastream.GearpumpMaterializer.{Edge, MaterializedValueSourceAttribute} +import akka.stream.impl.StreamLayout._ +import akka.stream.impl._ +import akka.stream.impl.fusing.GraphStages.MaterializedValueSource + +class GearpumpMaterializerSession(system: ActorSystem, topLevel: Module, + initialAttributes: Attributes, namePrefix: Option[String] = None) + extends MaterializerSession(topLevel, initialAttributes) { + + private[this] def createFlowName(): String = + FlowNames(system).name.copy(namePrefix.getOrElse("flow")).next() + + private val flowName = createFlowName() + private var nextId = 0 + + private def stageName(attr: Attributes): String = { + val name = s"$flowName-$nextId-${attr.nameOrDefault()}" + nextId += 1 + name + } + + val graph = GGraph.empty[Module, Edge] + + def addEdge(publisher: (OutPort, Module), subscriber: (InPort, Module)): Unit = { + graph.addEdge(publisher._2, Edge(publisher._1, subscriber._1), subscriber._2) + } + + def addVertex(module: Module): Unit = { + graph.addVertex(module) + } + + override def materializeModule(module: Module, parentAttributes: Attributes): Any = { + + val materializedValues: ju.Map[Module, Any] = new ju.HashMap + val currentAttributes = mergeAttributes(parentAttributes, module.attributes) + + val materializedValueSources = List.empty[MaterializedValueSource[_]] + + for (submodule <- module.subModules) { + submodule match { + case atomic: AtomicModule => + materializeAtomic(atomic, currentAttributes, materializedValues) + case copied: CopiedModule => + enterScope(copied) + materializedValues.put(copied, materializeModule(copied, currentAttributes)) + exitScope(copied) + case composite => + materializedValues.put(composite, materializeComposite(composite, currentAttributes)) + case EmptyModule => + } + } + + val mat = resolveMaterialized(module.materializedValueComputation, materializedValues) + + materializedValueSources.foreach { module => + val matAttribute = + new MaterializedValueSourceAttribute(mat.asInstanceOf[MaterializedValueNode]) + val copied = copyAtomicModule(module.module, parentAttributes + and Attributes(matAttribute)) + // TODO + // assignPort(module.shape.out, (copied.shape.outlets.head, copied)) + addVertex(copied) + materializedValues.put(copied, Atomic(copied)) + } + mat + + } + + override protected def materializeComposite(composite: Module, + effectiveAttributes: Attributes): Any = { + materializeModule(composite, effectiveAttributes) + } + + protected def materializeAtomic(atomic: AtomicModule, + parentAttributes: Attributes, + matVal: ju.Map[Module, Any]): Unit = { + + val (inputs, outputs) = (atomic.shape.inlets, atomic.shape.outlets) + val copied = copyAtomicModule(atomic, parentAttributes) + + for ((in, id) <- inputs.zipWithIndex) { + val inPort = inPortMapping(atomic, copied)(in) + // assignPort(in, (inPort, copied)) + } + + for ((out, id) <- outputs.zipWithIndex) { + val outPort = outPortMapping(atomic, copied)(out) + // TODO + // assignPort(out, (outPort, copied)) + } + + addVertex(copied) + matVal.put(atomic, Atomic(copied)) + } + + private def copyAtomicModule[T <: Module](module: T, parentAttributes: Attributes): T = { + val currentAttributes = mergeAttributes(parentAttributes, module.attributes) + module.withAttributes(currentAttributes).asInstanceOf[T] + } + + private def outPortMapping(from: Module, to: Module): Map[OutPort, OutPort] = { + from.shape.outlets.iterator.zip(to.shape.outlets.iterator).toList.toMap + } + + private def inPortMapping(from: Module, to: Module): Map[InPort, InPort] = { + from.shape.inlets.iterator.zip(to.shape.inlets.iterator).toList.toMap + } + + protected def resolveMaterialized(matNode: MaterializedValueNode, + materializedValues: ju.Map[Module, Any]): + Any = + matNode match { + case Atomic(m) => materializedValues.get(m) + case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues), + resolveMaterialized(d2, materializedValues)) + case Transform(f, d) => f(resolveMaterialized(d, materializedValues)) + case Ignore => Ignore + } +} + +object GearpumpMaterializerSession { + def apply(system: ActorSystem, topLevel: Module, + initialAttributes: Attributes, namePrefix: Option[String] = None): + GearpumpMaterializerSession = { + new GearpumpMaterializerSession(system, topLevel, initialAttributes, namePrefix) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala new file mode 100644 index 0000000..2ce4e19 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.scaladsl.{Sink, Source} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + + +/** + * Source and Sink are materialized locally. + * Remaining GraphStages are materialized remotely: + * statefulMap, filter, fold, flatMap + */ +object Test extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test", akkaConf) + implicit val materializer = GearpumpMaterializer() + + val echo = system.actorOf(Props(new Echo())) + val sink = Sink.actorRef(echo, "COMPLETE") + + Source( + List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky") + ).filter(_.startsWith("red")).fold("Items:") {(a, b) => + a + "|" + b + }.map("I want to order item: " + _).runWith(sink) + + Await.result(system.whenTerminated, 60.minutes) + } + + class Echo extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala new file mode 100644 index 0000000..71678c3 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.NotUsed +import akka.stream.{ClosedShape, ThrottleMode} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * Stream example showing Conflate, Throttle + */ +object Test10 extends AkkaApp with ArgumentsParser { + + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + import akka.actor.ActorSystem + import akka.stream.scaladsl._ + + implicit val system = ActorSystem("Test10", akkaConfig) + implicit val materializer = GearpumpMaterializer() + implicit val ec = system.dispatcher + + // Conflate[A] - (2 inputs, 1 output) concatenates two streams + // (first consumes one, then the second one) + def stream(x: String) = Stream.continually(x) + + val sourceA = Source(stream("A")) + val sourceB = Source(stream("B")) + + val throttler: Flow[String, String, NotUsed] = + Flow[String].throttle(1, 1.second, 1, ThrottleMode.Shaping) + val conflateFlow: Flow[String, String, NotUsed] = + Flow[String].conflate((x: String, y: String) => x: String) + ((acc: String, x: String) => s"$acc::$x") + + val printFlow: Flow[(String, String), String, NotUsed] = + Flow[(String, String)].map { + x => + println(s" lengths are : ${x._1.length} and ${x._2.length} ; ${x._1} zip ${x._2}") + x.toString + } + + val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val zipping = b.add(Zip[String, String]()) + + sourceA ~> throttler ~> zipping.in0 + sourceB ~> conflateFlow ~> zipping.in1 + + zipping.out ~> printFlow ~> Sink.ignore + + ClosedShape + }) + + graph.run() + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala new file mode 100644 index 0000000..b80398c --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.NotUsed +import akka.stream.ClosedShape +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * Stream example showing Broadcast and Merge + */ +object Test11 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + import akka.actor.ActorSystem + import akka.stream.scaladsl._ + + implicit val system = ActorSystem("Test11", akkaConfig) + implicit val materializer = GearpumpMaterializer() +// implicit val materializer = +// ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false)) + implicit val ec = system.dispatcher + + val g = RunnableGraph.fromGraph(GraphDSL.create() { + implicit builder: GraphDSL.Builder[NotUsed] => + + import GraphDSL.Implicits._ + val in = Source(1 to 10) + val output: (Any) => Unit = any => { + val s = s"**** $any" + println(s) + } + val out = Sink.foreach(output) + + val broadcast = builder.add(Broadcast[Int](2)) + val merge = builder.add(Merge[Int](2)) + + val f1, f2, f3, f4 = Flow[Int].map(_ + 10) + + in ~> f1 ~> broadcast ~> f2 ~> merge ~> f3 ~> out + broadcast ~> f4 ~> merge + + ClosedShape + }) + + g.run() + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala new file mode 100644 index 0000000..a9e8b08 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.stream.{ClosedShape, UniformFanInShape} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.{Await, Future} + +/** + * Partial source, sink example + */ +object Test12 extends AkkaApp with ArgumentsParser{ + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + import akka.actor.ActorSystem + import akka.stream.scaladsl._ + + import scala.concurrent.duration._ + + implicit val system = ActorSystem("Test12", akkaConfig) +// implicit val materializer = ActorMaterializer( +// ActorMaterializerSettings(system).withAutoFusing(false) +// ) + implicit val materializer = GearpumpMaterializer() + implicit val ec = system.dispatcher + + val pickMaxOfThree = GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val zip1 = b.add(ZipWith[Int, Int, Int](math.max)) + val zip2 = b.add(ZipWith[Int, Int, Int](math.max)) + + zip1.out ~> zip2.in0 + + UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1) + } + + val resultSink = Sink.head[Int] + + val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b => + sink => + import GraphDSL.Implicits._ + + // Importing the partial shape will return its shape (inlets & outlets) + val pm3 = b.add(pickMaxOfThree) + + Source.single(1) ~> pm3.in(0) + Source.single(2) ~> pm3.in(1) + Source.single(3) ~> pm3.in(2) + + pm3.out ~> sink.in + + ClosedShape + }) + + val max: Future[Int] = g.run() + max.map(x => println(s"maximum of three numbers : $x")) + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala new file mode 100644 index 0000000..984c861 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import java.time._ + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.Implicits._ +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Random + +/** + * GroupBy example + */ + +/* +// Original example +val f = Source + .tick(0.seconds, 1.second, "") + .map { _ => + val now = System.currentTimeMillis() + val delay = random.nextInt(8) + MyEvent(now - delay * 1000L) + } + .statefulMapConcat { () => + val generator = new CommandGenerator() + ev => generator.forEvent(ev) + } + .groupBy(64, command => command.w) + .takeWhile(!_.isInstanceOf[CloseWindow]) + .fold(AggregateEventData((0L, 0L), 0))({ + case (agg, OpenWindow(window)) => agg.copy(w = window) + // always filtered out by takeWhile + case (agg, CloseWindow(_)) => agg + case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1) + }) + .async + .mergeSubstreams + .runForeach { agg => + println(agg.toString) + } + */ +object Test13 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + + override def main(akkaConf: Config, args: Array[String]): Unit = { + + implicit val system = ActorSystem("Test13", akkaConfig) + implicit val materializer = GearpumpMaterializer() + + val random = new Random() + + val result = Source + .tick(0.seconds, 1.second, "tick data") + .map { _ => + val now = System.currentTimeMillis() + val delay = random.nextInt(8) + MyEvent(now - delay * 1000L) + } + .statefulMapConcat { () => + val generator = new CommandGenerator() + ev => generator.forEvent(ev) + } + .groupBy2(command => command.w) + .takeWhile(!_.isInstanceOf[CloseWindow]) + .fold(AggregateEventData((0L, 0L), 0))({ + case (agg, OpenWindow(window)) => agg.copy(w = window) + // always filtered out by takeWhile + case (agg, CloseWindow(_)) => agg + case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1) + }) + .runForeach(agg => + println(agg.toString) + ) + + Await.result(system.whenTerminated, 60.minutes) + } + + case class MyEvent(timestamp: Long) + + type Window = (Long, Long) + + object Window { + val WindowLength = 10.seconds.toMillis + val WindowStep = 1.second.toMillis + val WindowsPerEvent = (WindowLength / WindowStep).toInt + + def windowsFor(ts: Long): Set[Window] = { + val firstWindowStart = ts - ts % WindowStep - WindowLength + WindowStep + (for (i <- 0 until WindowsPerEvent) yield + (firstWindowStart + i * WindowStep, + firstWindowStart + i * WindowStep + WindowLength) + ).toSet + } + } + + sealed trait WindowCommand { + def w: Window + } + + case class OpenWindow(w: Window) extends WindowCommand + + case class CloseWindow(w: Window) extends WindowCommand + + case class AddToWindow(ev: MyEvent, w: Window) extends WindowCommand + + class CommandGenerator { + private val MaxDelay = 5.seconds.toMillis + private var watermark = 0L + private val openWindows = mutable.Set[Window]() + + def forEvent(ev: MyEvent): List[WindowCommand] = { + watermark = math.max(watermark, ev.timestamp - MaxDelay) + if (ev.timestamp < watermark) { + println(s"Dropping event with timestamp: ${tsToString(ev.timestamp)}") + Nil + } else { + val eventWindows = Window.windowsFor(ev.timestamp) + + val closeCommands = openWindows.flatMap { ow => + if (!eventWindows.contains(ow) && ow._2 < watermark) { + openWindows.remove(ow) + Some(CloseWindow(ow)) + } else None + } + + val openCommands = eventWindows.flatMap { w => + if (!openWindows.contains(w)) { + openWindows.add(w) + Some(OpenWindow(w)) + } else None + } + + val addCommands = eventWindows.map(w => AddToWindow(ev, w)) + + openCommands.toList ++ closeCommands.toList ++ addCommands.toList + } + } + } + + case class AggregateEventData(w: Window, eventCount: Int) { + override def toString: String = + s"Between ${tsToString(w._1)} and ${tsToString(w._2)}, there were $eventCount events." + } + + def tsToString(ts: Long): String = OffsetDateTime + .ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault()) + .toLocalTime + .toString + // scalastyle:on println + +} + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala new file mode 100644 index 0000000..0542f43 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import java.io.File + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream._ +import akka.stream.scaladsl._ +import akka.util.ByteString +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent._ +import scala.concurrent.duration._ + +object Test14 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test14", akkaConf) + implicit val materializer = GearpumpMaterializer() + + def lineSink(filename: String): Sink[String, Future[IOResult]] = { + Flow[String] + .alsoTo(Sink.foreach(s => println(s"$filename: $s"))) + .map(s => ByteString(s + "\n")) + .toMat(FileIO.toPath(new File(filename).toPath))(Keep.right) + } + + val source: Source[Int, NotUsed] = Source(1 to 100) + val factorials: Source[BigInt, NotUsed] = source.scan(BigInt(1))((acc, next) => acc * next) + val sink1 = lineSink("factorial1.txt") + val sink2 = lineSink("factorial2.txt") + val slowSink2 = Flow[String].via( + Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping) + ).toMat(sink2)(Keep.right) + val bufferedSink2 = Flow[String].buffer(50, OverflowStrategy.backpressure).via( + Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping) + ).toMat(sink2)(Keep.right) + + val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + val bcast = b.add(Broadcast[String](2)) + factorials.map(_.toString) ~> bcast.in + bcast.out(0) ~> sink1 + bcast.out(1) ~> bufferedSink2 + ClosedShape + }) + + g.run() + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala new file mode 100644 index 0000000..c2f8d5f --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.ActorSystem +import akka.stream._ +import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + + +object Test15 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override val options: Array[(String, CLIOption[Any])] = Array( + "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false)) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test15", akkaConf) + implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match { + case true => + GearpumpMaterializer() + case false => + ActorMaterializer( + ActorMaterializerSettings(system).withAutoFusing(false) + ) + } + import akka.stream.scaladsl.GraphDSL.Implicits._ + RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => + val A = builder.add(Source.single(0)).out + val B = builder.add(Broadcast[Int](2)) + val C = builder.add(Merge[Int](2).named("C")) + val D = builder.add(Flow[Int].map(_ + 1).named("D")) + val E = builder.add(Balance[Int](2).named("E")) + val F = builder.add(Merge[Int](2).named("F")) + val G = builder.add(Sink.foreach(println).named("G")).in + + C <~ F + A ~> B ~> C ~> F + B ~> D ~> E ~> F + E ~> G + + ClosedShape + }).run() + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala new file mode 100644 index 0000000..eb0b5c7 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.{Actor, ActorSystem, Props} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource} +import akka.stream.scaladsl.Sink +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.streaming.dsl.{CollectionDataSource, LoggerSink} +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * All remote + */ +object Test16 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test16", akkaConf) + implicit val materializer = GearpumpMaterializer() + + val sink = GearSink.to(new LoggerSink[String]) + val sourceData = new CollectionDataSource( + List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) + val source = GearSource.from[String](sourceData) + source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink) + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala new file mode 100644 index 0000000..21f1b8c --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.scaladsl._ +import akka.stream.{ActorMaterializer, ClosedShape} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource} +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * + * This tests how different Materializers can be used together in an explicit way. + * + */ +object Test2 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test2", akkaConf) + val gearpumpMaterializer = GearpumpMaterializer() + + val echo = system.actorOf(Props(new Echo())) + val source = GearSource.bridge[String, String] + val sink = GearSink.bridge[String, String] + + val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _) + val (entry, exit) = flow.runWith(source, sink)(gearpumpMaterializer) + + val actorMaterializer = ActorMaterializer() + + val externalSource = Source( + List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky") + ) + val externalSink = Sink.actorRef(echo, "COMPLETE") + + RunnableGraph.fromGraph( + GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + externalSource ~> Sink.fromSubscriber(entry) + Source.fromPublisher(exit) ~> externalSink + ClosedShape + } + ).run()(actorMaterializer) + + Await.result(system.whenTerminated, 60.minutes) + } + + class Echo extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala new file mode 100644 index 0000000..0a51078 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.{ActorMaterializer, ActorMaterializerSettings} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.GearSource +import akka.stream.scaladsl.Sink +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.streaming.dsl.CollectionDataSource +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * read from remote and write to local + */ +object Test3 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override val options: Array[(String, CLIOption[Any])] = Array( + "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false)) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test3", akkaConf) + implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match { + case true => + GearpumpMaterializer() + case false => + ActorMaterializer( + ActorMaterializerSettings(system).withAutoFusing(false) + ) + } + val echo = system.actorOf(Props(new Echo())) + val sink = Sink.actorRef(echo, "COMPLETE") + val sourceData = new CollectionDataSource( + List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) + val source = GearSource.from[String](sourceData) + source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink) + + Await.result(system.whenTerminated, 60.minutes) + } + + class Echo extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala new file mode 100644 index 0000000..3cb69ce --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.GearSink +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.streaming.dsl.LoggerSink +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * read from local and write to remote + */ +object Test4 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test4", akkaConf) + implicit val materializer = GearpumpMaterializer() + + Source( + List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky") + ).filter(_.startsWith("red")). + map("I want to order item: " + _). + runWith(GearSink.to(new LoggerSink[String])) + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala new file mode 100644 index 0000000..72e21c7 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.ClosedShape +import akka.stream.scaladsl._ +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * test fanout + */ +object Test5 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test5", akkaConf) + implicit val materializer = GearpumpMaterializer() + + val echo = system.actorOf(Props(new Echo())) + val source = Source(List(("male", "24"), ("female", "23"))) + val sink = Sink.actorRef(echo, "COMPLETE") + + RunnableGraph.fromGraph( + GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + val unzip = b.add(Unzip[String, String]()) + val sink1 = Sink.actorRef(echo, "COMPLETE") + val sink2 = Sink.actorRef(echo, "COMPLETE") + source ~> unzip.in + unzip.out0 ~> sink1 + unzip.out1 ~> sink1 + ClosedShape + } + ).run() + + Await.result(system.whenTerminated, 60.minutes) + } + + class Echo extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala new file mode 100644 index 0000000..6f54933 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.scaladsl.Sink +import akka.stream.{ActorMaterializer, ActorMaterializerSettings} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.GearSource +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.streaming.dsl.CollectionDataSource +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + + +/** + * WordCount example + * Test GroupBy2 (groupBy which uses SubFlow is not implemented yet) + */ + +import org.apache.gearpump.akkastream.scaladsl.Implicits._ + +object Test6 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override val options: Array[(String, CLIOption[Any])] = Array( + "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false)) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test6", akkaConf) + implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match { + case true => + GearpumpMaterializer() + case false => + ActorMaterializer( + ActorMaterializerSettings(system).withAutoFusing(false) + ) + } + val echo = system.actorOf(Props(Echo())) + val sink = Sink.actorRef(echo, "COMPLETE") + val sourceData = new CollectionDataSource( + List( + "this is a good start", + "this is a good time", + "time to start", + "congratulations", + "green plant", + "blue sky") + ) + val source = GearSource.from[String](sourceData) + source.mapConcat({line => + line.split(" ").toList + }).groupBy2(x => x) + .map(word => (word, 1)) + .reduce({(a, b) => + (a._1, a._2 + b._2) + }) + .log("word-count") + .runWith(sink) + + Await.result(system.whenTerminated, 60.minutes) + } + + case class Echo() extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala new file mode 100644 index 0000000..be91610 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Broadcast, Merge, Sink, Source} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + + +/** + * This is a simplified API you can use to combine sources and sinks + * with junctions like: Broadcast[T], Balance[T], Merge[In] and Concat[A] + * without the need for using the Graph DSL + */ + +object Test7 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test7", akkaConf) + implicit val materializer = GearpumpMaterializer() + implicit val ec = system.dispatcher + + val sourceA = Source(List(1)) + val sourceB = Source(List(2)) + val mergedSource = Source.combine(sourceA, sourceB)(Merge(_)) + + val sinkA = Sink.foreach[Int](x => println(s"In SinkA : $x")) + val sinkB = Sink.foreach[Int](x => println(s"In SinkB : $x")) + val sink = Sink.combine(sinkA, sinkB)(Broadcast[Int](_)) + mergedSource.runWith(sink) + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala new file mode 100644 index 0000000..434aa33 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer} +import akka.stream.scaladsl._ +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +/** + * Stream example to find sum of elements + */ +object Test8 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override val options: Array[(String, CLIOption[Any])] = Array( + "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false)) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test8", akkaConf) + implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match { + case true => + GearpumpMaterializer() + case false => + ActorMaterializer( + ActorMaterializerSettings(system).withAutoFusing(false) + ) + } + implicit val ec = system.dispatcher + + // Source gives 1 to 100 elements + val source: Source[Int, NotUsed] = Source(Stream.from(1).take(100)) + val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) + + val result: Future[Int] = source.runWith(sink) + result.map(sum => { + println(s"Sum of stream elements => $sum") + }) + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala new file mode 100644 index 0000000..63f9e2d --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.NotUsed +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape} +import akka.stream.scaladsl._ +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +/** + * Stream example showing Broadcast + */ +object Test9 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override val options: Array[(String, CLIOption[Any])] = Array( + "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false)) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test9", akkaConf) + implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match { + case true => + GearpumpMaterializer() + case false => + ActorMaterializer( + ActorMaterializerSettings(system).withAutoFusing(false) + ) + } + implicit val ec = system.dispatcher + + val sinkActor = system.actorOf(Props(new SinkActor())) + val source = Source((1 to 5)) + val sink = Sink.actorRef(sinkActor, "COMPLETE") + val flowA: Flow[Int, Int, NotUsed] = Flow[Int].map { + x => println(s"processing broadcasted element : $x in flowA"); x + } + val flowB: Flow[Int, Int, NotUsed] = Flow[Int].map { + x => println(s"processing broadcasted element : $x in flowB"); x + } + + val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[Int](2)) + val merge = b.add(Merge[Int](2)) + source ~> broadcast + broadcast ~> flowA ~> merge + broadcast ~> flowB ~> merge + merge ~> sink + ClosedShape + }) + + graph.run() + + Await.result(system.whenTerminated, 60.minutes) + } + + class SinkActor extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala new file mode 100644 index 0000000..7e2211d --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import java.io.{File, FileInputStream} +import java.util.zip.GZIPInputStream + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl._ +import akka.stream.{ClosedShape, IOResult} +import akka.util.ByteString +import org.apache.gearpump.akkastream.graph.GraphPartitioner +import org.apache.gearpump.akkastream.{GearAttributes, GearpumpMaterializer} +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.util.AkkaApp +import org.json4s.JsonAST.JString +import org.json4s.jackson.JsonMethods + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +/** + * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/ + * which showcases running Akka Streams DSL across JVMs on Gearpump + * + * Usage: output/target/pack/bin/gear app + * -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar + * -input wikidata-${DATE}-all.json.gz -languages en,de + * + * (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/) + * + */ +object WikipediaApp extends ArgumentsParser with AkkaApp { + + case class WikidataElement(id: String, sites: Map[String, String]) + + override val options: Array[(String, CLIOption[Any])] = Array( + "input" -> CLIOption[String]("<Wikidata JSON dump>", required = true), + "languages" -> CLIOption[String]("<languages to take into account>", required = true) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val parsed = parse(args) + val input = new File(parsed.getString("input")) + val langs = parsed.getString("languages").split(",") + + implicit val system = ActorSystem("WikipediaApp", akkaConf) + implicit val materializer = + GearpumpMaterializer(GraphPartitioner.TagAttributeStrategy) + import system.dispatcher + + val elements = source(input).via(parseJson(langs)) + + val g = RunnableGraph.fromGraph( + GraphDSL.create(count) { implicit b => + sinkCount => { + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[WikidataElement](2)) + elements ~> broadcast ~> logEveryNSink(1000) + broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount + ClosedShape + } + } + ) + + g.run().onComplete { x => + x match { + case Success((t, f)) => printResults(t, f) + // scalastyle:off println + case Failure(tr) => println("Something went wrong") + // scalastyle:on println + } + } + Await.result(system.whenTerminated, 60.minutes) + } + + def source(file: File): Source[String, Future[IOResult]] = { + val compressed = new GZIPInputStream(new FileInputStream(file), 65536) + StreamConverters.fromInputStream(() => compressed) + .via(Framing.delimiter(ByteString("\n"), Int.MaxValue)) + .map(x => x.decodeString("utf-8")) + } + + def parseJson(langs: Seq[String])(implicit ec: ExecutionContext): + Flow[String, WikidataElement, NotUsed] = + Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect({ + case Some(v) => v + }) + + def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = { + Try(JsonMethods.parse(line)).toOption.flatMap { json => + json \ "id" match { + case JString(itemId) => + + val sites: Seq[(String, String)] = for { + lang <- langs + JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title" + } yield lang -> title + + if(sites.isEmpty) None + else Some(WikidataElement(id = itemId, sites = sites.toMap)) + + case _ => None + } + } + } + + def logEveryNSink[T](n: Int): Sink[T, Future[Int]] = Sink.fold(0) { (x, y: T) => + if (x % n == 0) { + // scalastyle:off println + println(s"Processing element $x: $y") + // scalastyle:on println + } + x + 1 + } + + def checkSameTitles(langs: Set[String]): + Flow[WikidataElement, Boolean, NotUsed] = Flow[WikidataElement] + .filter(_.sites.keySet == langs) + .map { x => + val titles = x.sites.values + titles.forall( _ == titles.head) + }.withAttributes(GearAttributes.remote) + + def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) { + case ((t, f), true) => (t + 1, f) + case ((t, f), false) => (t, f + 1) + } + + def printResults(t: Int, f: Int): Unit = { + val message = s""" + | Number of items with the same title: $t + | Number of items with the different title: $f + | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)} + """.stripMargin + // scalastyle:off println + println(message) + // scalastyle:on println + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala new file mode 100644 index 0000000..c1e95bb --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.graph + +import akka.stream.{Shape, SinkShape, SourceShape} +import org.apache.gearpump.akkastream.GearAttributes +import org.apache.gearpump.akkastream.GearAttributes.{Local, Location, Remote} +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy +import org.apache.gearpump.akkastream.module._ +import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.fusing.GraphStageModule +import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource} +import akka.stream.impl.{SinkModule, SourceModule} +import org.apache.gearpump.util.Graph + +/** + * + * GraphPartitioner is used to decide which part will be rendered locally + * and which part should be rendered remotely. + * + * We will cut the graph based on the [[Strategy]] provided. + * + * For example, for the following graph, we can cut the graph to + * two parts, each part will be a Sub Graph. The top SubGraph + * can be materialized remotely. The bottom part can be materialized + * locally. + * + * AtomicModule2 -> AtomicModule4 + * /| \ + * / \ + * -----------cut line -------------cut line ---------- + * / \ + * / \| + * AtomicModule1 AtomicModule5 + * \ /| + * \ / + * \| / + * AtomicModule3 + * + * @see [[akka.stream.impl.MaterializerSession]] for more information of how Graph is organized. + * + */ +class GraphPartitioner(strategy: Strategy) { + def partition(moduleGraph: Graph[Module, Edge]): List[SubGraph] = { + val graph = removeDummyModule(moduleGraph) + val tags = tag(graph, strategy) + doPartition(graph, tags) + } + + private def doPartition(graph: Graph[Module, Edge], tags: Map[Module, Location]): + List[SubGraph] = { + val local = Graph.empty[Module, Edge] + val remote = Graph.empty[Module, Edge] + + graph.vertices.foreach{ module => + if (tags(module) == Local) { + local.addVertex(module) + } else { + remote.addVertex(module) + } + } + + graph.edges.foreach{ nodeEdgeNode => + val (node1, edge, node2) = nodeEdgeNode + (tags(node1), tags(node2)) match { + case (Local, Local) => + local.addEdge(nodeEdgeNode) + case (Remote, Remote) => + remote.addEdge(nodeEdgeNode) + case (Local, Remote) => + node2 match { + case bridge: BridgeModule[_, _, _] => + local.addEdge(node1, edge, node2) + case _ => + // create a bridge module in between + val bridge = new SourceBridgeModule[AnyRef, AnyRef]() + val remoteEdge = Edge(bridge.outPort, edge.to) + remote.addEdge(bridge, remoteEdge, node2) + val localEdge = Edge(edge.from, bridge.inPort) + local.addEdge(node1, localEdge, bridge) + } + case (Remote, Local) => + node1 match { + case bridge: BridgeModule[_, _, _] => + local.addEdge(node1, edge, node2) + case _ => + // create a bridge module in between + val bridge = new SinkBridgeModule[AnyRef, AnyRef]() + val remoteEdge = Edge(edge.from, bridge.inPort) + remote.addEdge(node1, remoteEdge, bridge) + val localEdge = Edge(bridge.outPort, edge.to) + local.addEdge(bridge, localEdge, node2) + } + } + } + + List(new RemoteGraph(remote), new LocalGraph(local)) + } + + private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = { + graph.vertices.map{vertex => + vertex -> strategy.apply(vertex) + }.toMap + } + + private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = { + val graph = inputGraph.copy + val dummies = graph.vertices.filter {module => + module match { + case dummy: DummyModule => + true + case _ => + false + } + } + dummies.foreach(module => graph.removeVertex(module)) + graph + } +} + +object GraphPartitioner { + + type Strategy = PartialFunction[Module, Location] + + val BaseStrategy: Strategy = { + case source: BridgeModule[_, _, _] => + Remote + case task: GearpumpTaskModule => + Remote + case groupBy: GroupByModule[_, _] => + // TODO: groupBy is not supported by local materializer + Remote + case source: SourceModule[_, _] => + Local + case sink: SinkModule[_, _] => + Local + case remaining: Module => + remaining.shape match { + case sourceShape: SourceShape[_] => + Local + case sinkShape: SinkShape[_] => + Local + case otherShapes: Shape => + Remote + } + } + + val AllRemoteStrategy: Strategy = BaseStrategy orElse { + case graphStageModule: GraphStageModule => + graphStageModule.stage match { + case matValueSource: MaterializedValueSource[_] => + Local + case singleSource: SingleSource[_] => + Local + case _ => + Remote + } + case _ => + Remote + } + + /** + * Will decide whether to render a module locally or remotely + * based on Attribute settings. + * + */ + val TagAttributeStrategy: Strategy = BaseStrategy orElse { + case module => + GearAttributes.location(module.attributes) + } + + val AllLocalStrategy: Strategy = BaseStrategy orElse { + case graphStageModule: GraphStageModule => + // TODO kasravi review + graphStageModule.stage match { + case matValueSource: MaterializedValueSource[_] => + Local + case _ => + Local + } + case _ => + Local + } + + def apply(strategy: Strategy): GraphPartitioner = { + new GraphPartitioner(strategy) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala new file mode 100644 index 0000000..c03fce2 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.graph + +import akka.actor.ActorSystem +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.{PublisherSource, SubscriberSink} +import akka.stream.{SinkShape, SourceShape} +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.materializer.LocalMaterializerImpl +import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule} +import org.apache.gearpump.util.Graph +import org.reactivestreams.{Publisher, Subscriber} + +/** + * + * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only + * contain module that can be materialized in local JVM. + * + * @param graph Graph[Module, Edge] + */ +class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph + +object LocalGraph { + + /** + * materialize LocalGraph in local JVM + * @param system ActorSystem + */ + class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer { + + // create a local materializer + val materializer = LocalMaterializerImpl()(system) + + /** + * + * @param matValues Materialized Values for each module before materialization + * @return Materialized Values for each Module after the materialization. + */ + override def materialize(graph: SubGraph, + matValues: scala.collection.mutable.Map[Module, Any]): + scala.collection.mutable.Map[Module, Any] = { + val newGraph: Graph[Module, Edge] = graph.graph.mapVertex { + case source: SourceBridgeModule[in, out] => + val subscriber = matValues(source).asInstanceOf[Subscriber[in]] + val shape: SinkShape[in] = SinkShape(source.inPort) + new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape) + case sink: SinkBridgeModule[in, out] => + val publisher = matValues(sink).asInstanceOf[Publisher[out]] + val shape: SourceShape[out] = SourceShape(sink.outPort) + new PublisherSource(publisher, DefaultAttributes.publisherSource, shape) + case other => + other + } + materializer.materialize(newGraph, matValues) + } + + override def shutdown: Unit = { + materializer.shutdown() + } + } +} +
