http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala new file mode 100644 index 0000000..826cdcf --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test10.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.NotUsed +import akka.stream.{ClosedShape, ThrottleMode} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * Stream example showing Conflate, Throttle + */ +object Test10 extends AkkaApp with ArgumentsParser { + + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + import akka.actor.ActorSystem + import akka.stream.scaladsl._ + + implicit val system = ActorSystem("Test10", akkaConfig) + implicit val materializer = GearpumpMaterializer() + implicit val ec = system.dispatcher + + // Conflate[A] - (2 inputs, 1 output) concatenates two streams + // (first consumes one, then the second one) + def stream(x: String) = Stream.continually(x) + + val sourceA = Source(stream("A")) + val sourceB = Source(stream("B")) + + val throttler: Flow[String, String, NotUsed] = + Flow[String].throttle(1, 1.second, 1, ThrottleMode.Shaping) + val conflateFlow: Flow[String, String, NotUsed] = + Flow[String].conflate((x: String, y: String) => x: String) + ((acc: String, x: String) => s"$acc::$x") + + val printFlow: Flow[(String, String), String, NotUsed] = + Flow[(String, String)].map { + x => + println(s" lengths are : ${x._1.length} and ${x._2.length} ; ${x._1} zip ${x._2}") + x.toString + } + + val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val zipping = b.add(Zip[String, String]()) + + sourceA ~> throttler ~> zipping.in0 + sourceB ~> conflateFlow ~> zipping.in1 + + zipping.out ~> printFlow ~> Sink.ignore + + ClosedShape + }) + + graph.run() + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala new file mode 100644 index 0000000..087c57d --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test11.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.NotUsed +import akka.stream.ClosedShape +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * Stream example showing Broadcast and Merge + */ +object Test11 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + import akka.actor.ActorSystem + import akka.stream.scaladsl._ + + implicit val system = ActorSystem("Test11", akkaConfig) + implicit val materializer = GearpumpMaterializer() + // implicit val materializer = + // ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false)) + implicit val ec = system.dispatcher + + val g = RunnableGraph.fromGraph(GraphDSL.create() { + implicit builder: GraphDSL.Builder[NotUsed] => + + import GraphDSL.Implicits._ + val in = Source(1 to 10) + val output: (Any) => Unit = any => { + val s = s"**** $any" + println(s) + } + val out = Sink.foreach(output) + + val broadcast = builder.add(Broadcast[Int](2)) + val merge = builder.add(Merge[Int](2)) + + val f1, f2, f3, f4 = Flow[Int].map(_ + 10) + + in ~> f1 ~> broadcast ~> f2 ~> merge ~> f3 ~> out + broadcast ~> f4 ~> merge + + ClosedShape + }) + + g.run() + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala new file mode 100644 index 0000000..b4f4bce --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test12.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.stream.{ClosedShape, UniformFanInShape} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.{Await, Future} + +/** + * Partial source, sink example + */ +object Test12 extends AkkaApp with ArgumentsParser{ + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + import akka.actor.ActorSystem + import akka.stream.scaladsl._ + + import scala.concurrent.duration._ + + implicit val system = ActorSystem("Test12", akkaConfig) + // implicit val materializer = ActorMaterializer( + // ActorMaterializerSettings(system).withAutoFusing(false) + // ) + implicit val materializer = GearpumpMaterializer() + implicit val ec = system.dispatcher + + val pickMaxOfThree = GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + + val zip1 = b.add(ZipWith[Int, Int, Int](math.max)) + val zip2 = b.add(ZipWith[Int, Int, Int](math.max)) + + zip1.out ~> zip2.in0 + + UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1) + } + + val resultSink = Sink.head[Int] + + val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b => + sink => + import GraphDSL.Implicits._ + + // Importing the partial shape will return its shape (inlets & outlets) + val pm3 = b.add(pickMaxOfThree) + + Source.single(1) ~> pm3.in(0) + Source.single(2) ~> pm3.in(1) + Source.single(3) ~> pm3.in(2) + + pm3.out ~> sink.in + + ClosedShape + }) + + val max: Future[Int] = g.run() + max.map(x => println(s"maximum of three numbers : $x")) + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala new file mode 100644 index 0000000..2e036cb --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test13.scala @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import java.time._ + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.Implicits._ +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.collection.mutable +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Random + +/** + * GroupBy example + */ + +/* +// Original example +val f = Source + .tick(0.seconds, 1.second, "") + .map { _ => + val now = System.currentTimeMillis() + val delay = random.nextInt(8) + MyEvent(now - delay * 1000L) + } + .statefulMapConcat { () => + val generator = new CommandGenerator() + ev => generator.forEvent(ev) + } + .groupBy(64, command => command.w) + .takeWhile(!_.isInstanceOf[CloseWindow]) + .fold(AggregateEventData((0L, 0L), 0))({ + case (agg, OpenWindow(window)) => agg.copy(w = window) + // always filtered out by takeWhile + case (agg, CloseWindow(_)) => agg + case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1) + }) + .async + .mergeSubstreams + .runForeach { agg => + println(agg.toString) + } + */ +object Test13 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + + override def main(akkaConf: Config, args: Array[String]): Unit = { + + implicit val system = ActorSystem("Test13", akkaConfig) + implicit val materializer = GearpumpMaterializer() + + val random = new Random() + + val result = Source + .tick(0.seconds, 1.second, "tick data") + .map { _ => + val now = System.currentTimeMillis() + val delay = random.nextInt(8) + MyEvent(now - delay * 1000L) + } + .statefulMapConcat { () => + val generator = new CommandGenerator() + ev => generator.forEvent(ev) + } + .groupBy2(command => command.w) + .takeWhile(!_.isInstanceOf[CloseWindow]) + .fold(AggregateEventData((0L, 0L), 0))({ + case (agg, OpenWindow(window)) => agg.copy(w = window) + // always filtered out by takeWhile + case (agg, CloseWindow(_)) => agg + case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount + 1) + }) + .runForeach(agg => + println(agg.toString) + ) + + Await.result(system.whenTerminated, 60.minutes) + } + + case class MyEvent(timestamp: Long) + + type Window = (Long, Long) + + object Window { + val WindowLength = 10.seconds.toMillis + val WindowStep = 1.second.toMillis + val WindowsPerEvent = (WindowLength / WindowStep).toInt + + def windowsFor(ts: Long): Set[Window] = { + val firstWindowStart = ts - ts % WindowStep - WindowLength + WindowStep + (for (i <- 0 until WindowsPerEvent) yield + (firstWindowStart + i * WindowStep, + firstWindowStart + i * WindowStep + WindowLength) + ).toSet + } + } + + sealed trait WindowCommand { + def w: Window + } + + case class OpenWindow(w: Window) extends WindowCommand + + case class CloseWindow(w: Window) extends WindowCommand + + case class AddToWindow(ev: MyEvent, w: Window) extends WindowCommand + + class CommandGenerator { + private val MaxDelay = 5.seconds.toMillis + private var watermark = 0L + private val openWindows = mutable.Set[Window]() + + def forEvent(ev: MyEvent): List[WindowCommand] = { + watermark = math.max(watermark, ev.timestamp - MaxDelay) + if (ev.timestamp < watermark) { + println(s"Dropping event with timestamp: ${tsToString(ev.timestamp)}") + Nil + } else { + val eventWindows = Window.windowsFor(ev.timestamp) + + val closeCommands = openWindows.flatMap { ow => + if (!eventWindows.contains(ow) && ow._2 < watermark) { + openWindows.remove(ow) + Some(CloseWindow(ow)) + } else None + } + + val openCommands = eventWindows.flatMap { w => + if (!openWindows.contains(w)) { + openWindows.add(w) + Some(OpenWindow(w)) + } else None + } + + val addCommands = eventWindows.map(w => AddToWindow(ev, w)) + + openCommands.toList ++ closeCommands.toList ++ addCommands.toList + } + } + } + + case class AggregateEventData(w: Window, eventCount: Int) { + override def toString: String = + s"Between ${tsToString(w._1)} and ${tsToString(w._2)}, there were $eventCount events." + } + + def tsToString(ts: Long): String = OffsetDateTime + .ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault()) + .toLocalTime + .toString + // scalastyle:on println + +} + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala new file mode 100644 index 0000000..c436130 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test14.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import java.io.File + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream._ +import akka.stream.scaladsl._ +import akka.util.ByteString +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent._ +import scala.concurrent.duration._ + +object Test14 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test14", akkaConf) + implicit val materializer = GearpumpMaterializer() + + def lineSink(filename: String): Sink[String, Future[IOResult]] = { + Flow[String] + .alsoTo(Sink.foreach(s => println(s"$filename: $s"))) + .map(s => ByteString(s + "\n")) + .toMat(FileIO.toPath(new File(filename).toPath))(Keep.right) + } + + val source: Source[Int, NotUsed] = Source(1 to 100) + val factorials: Source[BigInt, NotUsed] = source.scan(BigInt(1))((acc, next) => acc * next) + val sink1 = lineSink("factorial1.txt") + val sink2 = lineSink("factorial2.txt") + val slowSink2 = Flow[String].via( + Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping) + ).toMat(sink2)(Keep.right) + val bufferedSink2 = Flow[String].buffer(50, OverflowStrategy.backpressure).via( + Flow[String].throttle(1, 1.second, 1, ThrottleMode.shaping) + ).toMat(sink2)(Keep.right) + + val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + val bcast = b.add(Broadcast[String](2)) + factorials.map(_.toString) ~> bcast.in + bcast.out(0) ~> sink1 + bcast.out(1) ~> bufferedSink2 + ClosedShape + }) + + g.run() + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala new file mode 100644 index 0000000..f4e4dbd --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test15.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.ActorSystem +import akka.stream._ +import akka.stream.scaladsl.{Balance, Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + + +object Test15 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override val options: Array[(String, CLIOption[Any])] = Array( + "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false)) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test15", akkaConf) + implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match { + case true => + GearpumpMaterializer() + case false => + ActorMaterializer( + ActorMaterializerSettings(system).withAutoFusing(false) + ) + } + import akka.stream.scaladsl.GraphDSL.Implicits._ + RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => + val A = builder.add(Source.single(0)).out + val B = builder.add(Broadcast[Int](2)) + val C = builder.add(Merge[Int](2).named("C")) + val D = builder.add(Flow[Int].map(_ + 1).named("D")) + val E = builder.add(Balance[Int](2).named("E")) + val F = builder.add(Merge[Int](2).named("F")) + val G = builder.add(Sink.foreach(println).named("G")).in + + C <~ F + A ~> B ~> C ~> F + B ~> D ~> E ~> F + E ~> G + + ClosedShape + }).run() + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} + + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala new file mode 100644 index 0000000..9691496 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test16.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.ActorSystem +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource} +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, LoggerSink} +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * All remote + */ +object Test16 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test16", akkaConf) + implicit val materializer = GearpumpMaterializer() + + val sink = GearSink.to(new LoggerSink[String]) + val sourceData = new CollectionDataSource( + List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) + val source = GearSource.from[String](sourceData) + source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink) + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala new file mode 100644 index 0000000..a6049cd --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test2.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.scaladsl._ +import akka.stream.{ActorMaterializer, ClosedShape} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.{GearSink, GearSource} +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * + * This tests how different Materializers can be used together in an explicit way. + * + */ +object Test2 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test2", akkaConf) + val gearpumpMaterializer = GearpumpMaterializer() + + val echo = system.actorOf(Props(new Echo())) + val source = GearSource.bridge[String, String] + val sink = GearSink.bridge[String, String] + + val flow = Flow[String].filter(_.startsWith("red")).map("I want to order item: " + _) + val (entry, exit) = flow.runWith(source, sink)(gearpumpMaterializer) + + val actorMaterializer = ActorMaterializer() + + val externalSource = Source( + List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky") + ) + val externalSink = Sink.actorRef(echo, "COMPLETE") + + RunnableGraph.fromGraph( + GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + externalSource ~> Sink.fromSubscriber(entry) + Source.fromPublisher(exit) ~> externalSink + ClosedShape + } + ).run()(actorMaterializer) + + Await.result(system.whenTerminated, 60.minutes) + } + + class Echo extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala new file mode 100644 index 0000000..24faeb3 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test3.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.{ActorMaterializer, ActorMaterializerSettings} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.GearSource +import akka.stream.scaladsl.Sink +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * read from remote and write to local + */ +object Test3 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override val options: Array[(String, CLIOption[Any])] = Array( + "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false)) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test3", akkaConf) + implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match { + case true => + GearpumpMaterializer() + case false => + ActorMaterializer( + ActorMaterializerSettings(system).withAutoFusing(false) + ) + } + val echo = system.actorOf(Props(new Echo())) + val sink = Sink.actorRef(echo, "COMPLETE") + val sourceData = new CollectionDataSource( + List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky")) + val source = GearSource.from[String](sourceData) + source.filter(_.startsWith("red")).map("I want to order item: " + _).runWith(sink) + + Await.result(system.whenTerminated, 60.minutes) + } + + class Echo extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala new file mode 100644 index 0000000..6a44a35 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test4.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.GearSink +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.streaming.dsl.scalaapi.LoggerSink +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * read from local and write to remote + */ +object Test4 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test4", akkaConf) + implicit val materializer = GearpumpMaterializer() + + Source( + List("red hat", "yellow sweater", "blue jack", "red apple", "green plant", "blue sky") + ).filter(_.startsWith("red")). + map("I want to order item: " + _). + runWith(GearSink.to(new LoggerSink[String])) + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala new file mode 100644 index 0000000..ad87a97 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test5.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.ClosedShape +import akka.stream.scaladsl._ +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + +/** + * test fanout + */ +object Test5 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test5", akkaConf) + implicit val materializer = GearpumpMaterializer() + + val echo = system.actorOf(Props(new Echo())) + val source = Source(List(("male", "24"), ("female", "23"))) + val sink = Sink.actorRef(echo, "COMPLETE") + + RunnableGraph.fromGraph( + GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + val unzip = b.add(Unzip[String, String]()) + val sink1 = Sink.actorRef(echo, "COMPLETE") + val sink2 = Sink.actorRef(echo, "COMPLETE") + source ~> unzip.in + unzip.out0 ~> sink1 + unzip.out1 ~> sink1 + ClosedShape + } + ).run() + + Await.result(system.whenTerminated, 60.minutes) + } + + class Echo extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala new file mode 100644 index 0000000..a525471 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test6.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.scaladsl.Sink +import akka.stream.{ActorMaterializer, ActorMaterializerSettings} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.akkastream.scaladsl.GearSource +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + + +/** + * WordCount example + * Test GroupBy2 (groupBy which uses SubFlow is not implemented yet) + */ + +import org.apache.gearpump.akkastream.scaladsl.Implicits._ + +object Test6 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override val options: Array[(String, CLIOption[Any])] = Array( + "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false)) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test6", akkaConf) + implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match { + case true => + GearpumpMaterializer() + case false => + ActorMaterializer( + ActorMaterializerSettings(system).withAutoFusing(false) + ) + } + val echo = system.actorOf(Props(Echo())) + val sink = Sink.actorRef(echo, "COMPLETE") + val sourceData = new CollectionDataSource( + List( + "this is a good start", + "this is a good time", + "time to start", + "congratulations", + "green plant", + "blue sky") + ) + val source = GearSource.from[String](sourceData) + source.mapConcat({line => + line.split(" ").toList + }).groupBy2(x => x) + .map(word => (word, 1)) + .reduce({(a, b) => + (a._1, a._2 + b._2) + }) + .log("word-count") + .runWith(sink) + + Await.result(system.whenTerminated, 60.minutes) + } + + case class Echo() extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala new file mode 100644 index 0000000..8c837af --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test7.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.actor.ActorSystem +import akka.stream.scaladsl.{Broadcast, Merge, Sink, Source} +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.ArgumentsParser +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.Await +import scala.concurrent.duration._ + + +/** + * This is a simplified API you can use to combine sources and sinks + * with junctions like: Broadcast[T], Balance[T], Merge[In] and Concat[A] + * without the need for using the Graph DSL + */ + +object Test7 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override def main(akkaConf: Config, args: Array[String]): Unit = { + implicit val system = ActorSystem("Test7", akkaConf) + implicit val materializer = GearpumpMaterializer() + implicit val ec = system.dispatcher + + val sourceA = Source(List(1)) + val sourceB = Source(List(2)) + val mergedSource = Source.combine(sourceA, sourceB)(Merge(_)) + + val sinkA = Sink.foreach[Int](x => println(s"In SinkA : $x")) + val sinkB = Sink.foreach[Int](x => println(s"In SinkB : $x")) + val sink = Sink.combine(sinkA, sinkB)(Broadcast[Int](_)) + mergedSource.runWith(sink) + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala new file mode 100644 index 0000000..ad2ac61 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test8.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer} +import akka.stream.scaladsl._ +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +/** + * Stream example to find sum of elements + */ +object Test8 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override val options: Array[(String, CLIOption[Any])] = Array( + "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false)) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test8", akkaConf) + implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match { + case true => + GearpumpMaterializer() + case false => + ActorMaterializer( + ActorMaterializerSettings(system).withAutoFusing(false) + ) + } + implicit val ec = system.dispatcher + + // Source gives 1 to 100 elements + val source: Source[Int, NotUsed] = Source(Stream.from(1).take(100)) + val sink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _) + + val result: Future[Int] = source.runWith(sink) + result.map(sum => { + println(s"Sum of stream elements => $sum") + }) + + Await.result(system.whenTerminated, 60.minutes) + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala new file mode 100644 index 0000000..66414e0 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/Test9.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import akka.NotUsed +import akka.actor.{Actor, ActorSystem, Props} +import akka.stream.{ActorMaterializer, ActorMaterializerSettings, ClosedShape} +import akka.stream.scaladsl._ +import org.apache.gearpump.akkastream.GearpumpMaterializer +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.util.AkkaApp + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ + +/** + * Stream example showing Broadcast + */ +object Test9 extends AkkaApp with ArgumentsParser { + // scalastyle:off println + override val options: Array[(String, CLIOption[Any])] = Array( + "gearpump" -> CLIOption[Boolean]("<boolean>", required = false, defaultValue = Some(false)) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val config = parse(args) + implicit val system = ActorSystem("Test9", akkaConf) + implicit val materializer: ActorMaterializer = config.getBoolean("gearpump") match { + case true => + GearpumpMaterializer() + case false => + ActorMaterializer( + ActorMaterializerSettings(system).withAutoFusing(false) + ) + } + implicit val ec = system.dispatcher + + val sinkActor = system.actorOf(Props(new SinkActor())) + val source = Source((1 to 5)) + val sink = Sink.actorRef(sinkActor, "COMPLETE") + val flowA: Flow[Int, Int, NotUsed] = Flow[Int].map { + x => println(s"processing broadcasted element : $x in flowA"); x + } + val flowB: Flow[Int, Int, NotUsed] = Flow[Int].map { + x => println(s"processing broadcasted element : $x in flowB"); x + } + + val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[Int](2)) + val merge = b.add(Merge[Int](2)) + source ~> broadcast + broadcast ~> flowA ~> merge + broadcast ~> flowB ~> merge + merge ~> sink + ClosedShape + }) + + graph.run() + + Await.result(system.whenTerminated, 60.minutes) + } + + class SinkActor extends Actor { + def receive: Receive = { + case any: AnyRef => + println("Confirm received: " + any) + } + } + // scalastyle:on println +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala new file mode 100644 index 0000000..2a1e7ff --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/example/WikipediaApp.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.example + +import java.io.{File, FileInputStream} +import java.util.zip.GZIPInputStream + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl._ +import akka.stream.{ClosedShape, IOResult} +import akka.util.ByteString +import org.apache.gearpump.akkastream.graph.GraphPartitioner +import org.apache.gearpump.akkastream.{GearAttributes, GearpumpMaterializer} +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.util.AkkaApp +import org.json4s.JsonAST.JString +import org.json4s.jackson.JsonMethods + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +/** + * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/ + * which showcases running Akka Streams DSL across JVMs on Gearpump + * + * Usage: output/target/pack/bin/gear app + * -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar + * -input wikidata-${DATE}-all.json.gz -languages en,de + * + * (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/) + * + */ +object WikipediaApp extends ArgumentsParser with AkkaApp { + + case class WikidataElement(id: String, sites: Map[String, String]) + + override val options: Array[(String, CLIOption[Any])] = Array( + "input" -> CLIOption[String]("<Wikidata JSON dump>", required = true), + "languages" -> CLIOption[String]("<languages to take into account>", required = true) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val parsed = parse(args) + val input = new File(parsed.getString("input")) + val langs = parsed.getString("languages").split(",") + + implicit val system = ActorSystem("WikipediaApp", akkaConf) + implicit val materializer = + GearpumpMaterializer(GraphPartitioner.TagAttributeStrategy) + import system.dispatcher + + val elements = source(input).via(parseJson(langs)) + + val g = RunnableGraph.fromGraph( + GraphDSL.create(count) { implicit b => + sinkCount => { + import GraphDSL.Implicits._ + val broadcast = b.add(Broadcast[WikidataElement](2)) + elements ~> broadcast ~> logEveryNSink(1000) + broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount + ClosedShape + } + } + ) + + g.run().onComplete { + case Success((t, f)) => printResults(t, f) + // scalastyle:off println + case Failure(tr) => println("Something went wrong") + // scalastyle:on println + } + Await.result(system.whenTerminated, 60.minutes) + } + + def source(file: File): Source[String, Future[IOResult]] = { + val compressed = new GZIPInputStream(new FileInputStream(file), 65536) + StreamConverters.fromInputStream(() => compressed) + .via(Framing.delimiter(ByteString("\n"), Int.MaxValue)) + .map(x => x.decodeString("utf-8")) + } + + def parseJson(langs: Seq[String])(implicit ec: ExecutionContext): + Flow[String, WikidataElement, NotUsed] = + Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect({ + case Some(v) => v + }) + + def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = { + Try(JsonMethods.parse(line)).toOption.flatMap { json => + json \ "id" match { + case JString(itemId) => + + val sites: Seq[(String, String)] = for { + lang <- langs + JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title" + } yield lang -> title + + if(sites.isEmpty) None + else Some(WikidataElement(id = itemId, sites = sites.toMap)) + + case _ => None + } + } + } + + def logEveryNSink[T](n: Int): Sink[T, Future[Int]] = Sink.fold(0) { (x, y: T) => + if (x % n == 0) { + // scalastyle:off println + println(s"Processing element $x: $y") + // scalastyle:on println + } + x + 1 + } + + def checkSameTitles(langs: Set[String]): + Flow[WikidataElement, Boolean, NotUsed] = Flow[WikidataElement] + .filter(_.sites.keySet == langs) + .map { x => + val titles = x.sites.values + titles.forall( _ == titles.head) + }.withAttributes(GearAttributes.remote) + + def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0, 0)) { + case ((t, f), true) => (t + 1, f) + case ((t, f), false) => (t, f + 1) + } + + def printResults(t: Int, f: Int): Unit = { + val message = s""" + | Number of items with the same title: $t + | Number of items with the different title: $f + | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)} + """.stripMargin + // scalastyle:off println + println(message) + // scalastyle:on println + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala new file mode 100644 index 0000000..f7919c0 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.graph + +import akka.stream.{Shape, SinkShape, SourceShape} +import org.apache.gearpump.akkastream.GearAttributes +import org.apache.gearpump.akkastream.GearAttributes.{Local, Location, Remote} +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.graph.GraphPartitioner.Strategy +import org.apache.gearpump.akkastream.module._ +import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.fusing.GraphStageModule +import akka.stream.impl.fusing.GraphStages.{MaterializedValueSource, SimpleLinearGraphStage, SingleSource} +import akka.stream.impl.{SinkModule, SourceModule} +import org.apache.gearpump.util.Graph + +/** + * + * GraphPartitioner is used to decide which part will be rendered locally + * and which part should be rendered remotely. + * + * We will cut the graph based on the [[Strategy]] provided. + * + * For example, for the following graph, we can cut the graph to + * two parts, each part will be a Sub Graph. The top SubGraph + * can be materialized remotely. The bottom part can be materialized + * locally. + * + * AtomicModule2 -> AtomicModule4 + * /| \ + * / \ + * -----------cut line -------------cut line ---------- + * / \ + * / \| + * AtomicModule1 AtomicModule5 + * \ /| + * \ / + * \| / + * AtomicModule3 + * + * @see [[akka.stream.impl.MaterializerSession]] for more information of how Graph is organized. + * + */ +class GraphPartitioner(strategy: Strategy) { + def partition(moduleGraph: Graph[Module, Edge]): List[SubGraph] = { + val graph = removeDummyModule(moduleGraph) + val tags = tag(graph, strategy) + doPartition(graph, tags) + } + + private def doPartition(graph: Graph[Module, Edge], tags: Map[Module, Location]): + List[SubGraph] = { + val local = Graph.empty[Module, Edge] + val remote = Graph.empty[Module, Edge] + + graph.vertices.foreach{ module => + if (tags(module) == Local) { + local.addVertex(module) + } else { + remote.addVertex(module) + } + } + + graph.edges.foreach{ nodeEdgeNode => + val (node1, edge, node2) = nodeEdgeNode + (tags(node1), tags(node2)) match { + case (Local, Local) => + local.addEdge(nodeEdgeNode) + case (Remote, Remote) => + remote.addEdge(nodeEdgeNode) + case (Local, Remote) => + node2 match { + case bridge: BridgeModule[_, _, _] => + local.addEdge(node1, edge, node2) + case _ => + // create a bridge module in between + val bridge = new SourceBridgeModule[AnyRef, AnyRef]() + val remoteEdge = Edge(bridge.outPort, edge.to) + remote.addEdge(bridge, remoteEdge, node2) + val localEdge = Edge(edge.from, bridge.inPort) + local.addEdge(node1, localEdge, bridge) + } + case (Remote, Local) => + node1 match { + case bridge: BridgeModule[_, _, _] => + local.addEdge(node1, edge, node2) + case _ => + // create a bridge module in between + val bridge = new SinkBridgeModule[AnyRef, AnyRef]() + val remoteEdge = Edge(edge.from, bridge.inPort) + remote.addEdge(node1, remoteEdge, bridge) + val localEdge = Edge(bridge.outPort, edge.to) + local.addEdge(bridge, localEdge, node2) + } + } + } + + List(new RemoteGraph(remote), new LocalGraph(local)) + } + + private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = { + graph.vertices.map{vertex => + vertex -> strategy.apply(vertex) + }.toMap + } + + private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = { + val graph = inputGraph.copy + val dummies = graph.vertices.filter {module => + module match { + case dummy: DummyModule => + true + case _ => + false + } + } + dummies.foreach(module => graph.removeVertex(module)) + graph + } +} + +object GraphPartitioner { + + type Strategy = PartialFunction[Module, Location] + + val BaseStrategy: Strategy = { + case source: BridgeModule[_, _, _] => + Remote + case task: GearpumpTaskModule => + Remote + case groupBy: GroupByModule[_, _] => + // TODO: groupBy is not supported by local materializer + Remote + case source: SourceModule[_, _] => + Local + case sink: SinkModule[_, _] => + Local + case remaining: Module => + remaining.shape match { + case sourceShape: SourceShape[_] => + Local + case sinkShape: SinkShape[_] => + Local + case otherShapes: Shape => + Remote + } + } + + val AllRemoteStrategy: Strategy = BaseStrategy orElse { + case graphStageModule: GraphStageModule => + graphStageModule.stage match { + case matValueSource: MaterializedValueSource[_] => + Local + case singleSource: SingleSource[_] => + Local + case _ => + Remote + } + case _ => + Remote + } + + /** + * Will decide whether to render a module locally or remotely + * based on Attribute settings. + * + */ + val TagAttributeStrategy: Strategy = BaseStrategy orElse { + case module => + GearAttributes.location(module.attributes) + } + + val AllLocalStrategy: Strategy = BaseStrategy orElse { + case graphStageModule: GraphStageModule => + // TODO kasravi review + graphStageModule.stage match { + case matValueSource: MaterializedValueSource[_] => + Local + case _ => + Local + } + case _ => + Local + } + + def apply(strategy: Strategy): GraphPartitioner = { + new GraphPartitioner(strategy) + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala new file mode 100644 index 0000000..fe86951 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/LocalGraph.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.graph + +import akka.actor.ActorSystem +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.{PublisherSource, SubscriberSink} +import akka.stream.{SinkShape, SourceShape} +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.materializer.LocalMaterializerImpl +import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule} +import org.apache.gearpump.util.Graph +import org.reactivestreams.{Publisher, Subscriber} + +/** + * + * [[LocalGraph]] is a [[SubGraph]] of the application DSL Graph, which only + * contain module that can be materialized in local JVM. + * + * @param graph Graph[Module, Edge] + */ +class LocalGraph(override val graph: Graph[Module, Edge]) extends SubGraph + +object LocalGraph { + + /** + * materialize LocalGraph in local JVM + * @param system ActorSystem + */ + class LocalGraphMaterializer(system: ActorSystem) extends SubGraphMaterializer { + + // create a local materializer + val materializer = LocalMaterializerImpl()(system) + + /** + * + * @param matValues Materialized Values for each module before materialization + * @return Materialized Values for each Module after the materialization. + */ + override def materialize(graph: SubGraph, + matValues: scala.collection.mutable.Map[Module, Any]): + scala.collection.mutable.Map[Module, Any] = { + val newGraph: Graph[Module, Edge] = graph.graph.mapVertex { + case source: SourceBridgeModule[in, out] => + val subscriber = matValues(source).asInstanceOf[Subscriber[in]] + val shape: SinkShape[in] = SinkShape(source.inPort) + new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape) + case sink: SinkBridgeModule[in, out] => + val publisher = matValues(sink).asInstanceOf[Publisher[out]] + val shape: SourceShape[out] = SourceShape(sink.outPort) + new PublisherSource(publisher, DefaultAttributes.publisherSource, shape) + case other => + other + } + materializer.materialize(newGraph, matValues) + } + + override def shutdown: Unit = { + materializer.shutdown() + } + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala new file mode 100644 index 0000000..99ebe17 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.graph + +import akka.actor.ActorSystem +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.materializer.RemoteMaterializerImpl +import org.apache.gearpump.akkastream.module.{SinkBridgeModule, SourceBridgeModule} +import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient +import org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient +import akka.stream.impl.StreamLayout.Module +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.embedded.EmbeddedCluster +import org.apache.gearpump.streaming.ProcessorId +import org.apache.gearpump.util.Graph + +/** + * + * [[RemoteGraph]] is a [[SubGraph]] of the application DSL Graph, which only + * contain modules that can be materialized in remote Gearpump cluster. + * + * @param graph Graph + */ +class RemoteGraph(override val graph: Graph[Module, Edge]) extends SubGraph + +object RemoteGraph { + + /** + * * materialize LocalGraph in remote gearpump cluster + * @param useInProcessCluster Boolean + * @param system ActorSystem + */ + class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem) + extends SubGraphMaterializer { + private val local = if (useInProcessCluster) { + val cluster = EmbeddedCluster() + cluster.start() + Some(cluster) + } else { + None + } + + private val context: ClientContext = local match { + case Some(l) => l.newClientContext + case None => ClientContext(system) + } + + override def materialize(subGraph: SubGraph, + inputMatValues: scala.collection.mutable.Map[Module, Any]): + scala.collection.mutable.Map[Module, Any] = { + val graph = subGraph.graph + + if (graph.isEmpty) { + inputMatValues + } else { + doMaterialize(graph: Graph[Module, Edge], inputMatValues) + } + } + + private def doMaterialize(graph: Graph[Module, Edge], + inputMatValues: scala.collection.mutable.Map[Module, Any]): + scala.collection.mutable.Map[Module, Any] = { + val materializer = new RemoteMaterializerImpl(graph, system) + val (app, matValues) = materializer.materialize + + val appId = context.submit(app).appId + // scalastyle:off println + println("sleep 5 second until the application is ready on cluster") + // scalastyle:on println + Thread.sleep(5000) + + def resolve(matValues: Map[Module, ProcessorId]): Map[Module, Any] = { + matValues.toList.flatMap { kv => + val (module, processorId) = kv + module match { + case source: SourceBridgeModule[_, _] => + val bridge = new SourceBridgeTaskClient[AnyRef](system.dispatcher, + context, appId, processorId) + Some((module, bridge)) + case sink: SinkBridgeModule[_, _] => + val bridge = new SinkBridgeTaskClient(system, context, appId, processorId) + Some((module, bridge)) + case other => + None + } + }.toMap + } + + inputMatValues ++ resolve(matValues) + } + + override def shutdown: Unit = { + context.close() + local.foreach(_.stop()) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala new file mode 100644 index 0000000..a74143e --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/SubGraph.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.graph + +import akka.actor.ActorSystem +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import akka.stream.impl.StreamLayout.Module +import org.apache.gearpump.util.Graph + +/** + * [[SubGraph]] is a partial DAG + * + * The idea is that by dividing [[Graph]] to several + * [[SubGraph]], we can materialize each [[SubGraph]] with different + * materializer. + */ + +trait SubGraph { + + /** + * the [[Graph]] representation of this SubGraph + * @return + */ + def graph: Graph[Module, Edge] +} + + +/** + * Materializer for Sub-Graph type + */ +trait SubGraphMaterializer { + /** + * + * @param matValues Materialized Values for each module before materialization + * @return Materialized Values for each Module after the materialization. + */ + + def materialize(graph: SubGraph, + matValues: scala.collection.mutable.Map[Module, Any]): + scala.collection.mutable.Map[Module, Any] + + def shutdown: Unit +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/2913a1fd/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala new file mode 100644 index 0000000..477f4d3 --- /dev/null +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.akkastream.materializer + +import java.util.concurrent.atomic.AtomicBoolean +import java.{util => ju} + +import org.apache.gearpump.util.{Graph => GGraph} +import akka.actor.{ActorRef, ActorSystem, Cancellable, Deploy, PoisonPill} +import akka.dispatch.Dispatchers +import akka.event.{Logging, LoggingAdapter} +import akka.stream.impl.StreamLayout._ +import akka.stream.impl._ +import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly +import akka.stream.impl.fusing.{ActorGraphInterpreter, Fold, GraphInterpreterShell, GraphModule, GraphStageModule} +import akka.stream.impl.fusing.GraphStages.MaterializedValueSource +import akka.stream.scaladsl.ModuleExtractor +import akka.stream.{ClosedShape, Graph => AkkaGraph, _} +import org.apache.gearpump.akkastream.GearpumpMaterializer.Edge +import org.apache.gearpump.akkastream.module.ReduceModule +import org.apache.gearpump.akkastream.util.MaterializedValueOps +import org.reactivestreams.{Publisher, Subscriber} + +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.duration.FiniteDuration + +/** + * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]] + * + * @param system System + * @param settings ActorMaterializerSettings + * @param dispatchers Dispatchers + * @param supervisor ActorRef + * @param haveShutDown AtomicBoolean + * @param flowNames SeqActorName + */ +case class LocalMaterializerImpl ( + override val system: ActorSystem, + override val settings: ActorMaterializerSettings, + dispatchers: Dispatchers, + override val supervisor: ActorRef, + haveShutDown: AtomicBoolean, + flowNames: SeqActorName) + extends ExtendedActorMaterializer { + + override def logger: LoggingAdapter = Logging.getLogger(system, this) + + override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, + task: Runnable): Cancellable = + system.scheduler.schedule(initialDelay, interval, task)(executionContext) + + override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = + system.scheduler.scheduleOnce(delay, task)(executionContext) + + override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = { + import ActorAttributes._ + import Attributes._ + opAttr.attributeList.foldLeft(settings) { (s, attr) => + attr match { + case InputBuffer(initial, max) => s.withInputBuffer(initial, max) + case Dispatcher(dispatcher) => s.withDispatcher(dispatcher) + case SupervisionStrategy(decider) => s.withSupervisionStrategy(decider) + case l: LogLevels => s + case Name(_) => s + case other => s + } + } + } + + override def shutdown(): Unit = + if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill + + override def isShutdown: Boolean = haveShutDown.get() + + override lazy val executionContext: ExecutionContextExecutor = + dispatchers.lookup(settings.dispatcher match { + case Deploy.NoDispatcherGiven => Dispatchers.DefaultDispatcherId + case other => other + }) + + + case class LocalMaterializerSession(module: Module, iAttributes: Attributes, + subflowFuser: GraphInterpreterShell => ActorRef = null) + extends MaterializerSession(module, iAttributes) { + + override def materializeAtomic(atomic: AtomicModule, + effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = { + + def newMaterializationContext() = + new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes, + stageName(effectiveAttributes)) + atomic match { + case sink: SinkModule[_, _] => + val (sub, mat) = sink.create(newMaterializationContext()) + assignPort(sink.shape.in, sub.asInstanceOf[Subscriber[Any]]) + matVal.put(atomic, mat) + case source: SourceModule[_, _] => + val (pub, mat) = source.create(newMaterializationContext()) + assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]]) + matVal.put(atomic, mat) + case stage: ProcessorModule[_, _, _] => + val (processor, mat) = stage.createProcessor() + assignPort(stage.inPort, processor) + assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]]) + matVal.put(atomic, mat) + // FIXME + // case tls: TlsModule => + // TODO solve this so TlsModule doesn't need special treatment here + // val es = effectiveSettings(effectiveAttributes) + // val props = + // TLSActor.props(es, tls.sslContext, tls.sslConfig, + // tls.firstSession, tls.role, tls.closing, tls.hostInfo) + // val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher) + // def factory(id: Int) = new ActorPublisher[Any](impl) { + // override val wakeUpMsg = FanOut.SubstreamSubscribePending(id) + // } + // val publishers = Vector.tabulate(2)(factory) + // impl ! FanOut.ExposedPublishers(publishers) + // + // assignPort(tls.plainOut, publishers(TLSActor.UserOut)) + // assignPort(tls.cipherOut, publishers(TLSActor.TransportOut)) + // + // assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn)) + // assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, TLSActor.TransportIn)) + // + // matVal.put(atomic, NotUsed) + case graph: GraphModule => + matGraph(graph, effectiveAttributes, matVal) + case stage: GraphStageModule => + val graph = + GraphModule(GraphAssembly(stage.shape.inlets, stage.shape.outlets, stage.stage), + stage.shape, stage.attributes, Array(stage)) + matGraph(graph, effectiveAttributes, matVal) + } + } + + private def matGraph(graph: GraphModule, effectiveAttributes: Attributes, + matVal: ju.Map[Module, Any]): Unit = { + val calculatedSettings = effectiveSettings(effectiveAttributes) + val (handlers, logics) = + graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc) + + val shell = new GraphInterpreterShell(graph.assembly, handlers, + logics, graph.shape, calculatedSettings, LocalMaterializerImpl.this) + + val impl = + if (subflowFuser != null && !effectiveAttributes.contains(Attributes.AsyncBoundary)) { + subflowFuser(shell) + } else { + val props = ActorGraphInterpreter.props(shell) + actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher) + } + + for ((inlet, i) <- graph.shape.inlets.iterator.zipWithIndex) { + val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, i) + assignPort(inlet, subscriber) + } + for ((outlet, i) <- graph.shape.outlets.iterator.zipWithIndex) { + val publisher = new ActorGraphInterpreter.BoundaryPublisher(impl, shell, i) + impl ! ActorGraphInterpreter.ExposedPublisher(shell, i, publisher) + assignPort(outlet, publisher) + } + } + } + + override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat]): Mat = { + + LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get, + null, null).materialize().asInstanceOf[Mat] + + } + + override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat], + initialAttributes: Attributes): Mat = { + materialize(runnableGraph) + } + + override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat], + subflowFuser: GraphInterpreterShell => ActorRef): Mat = { + + LocalMaterializerSession(ModuleExtractor.unapply(runnableGraph).get, + null, null).materialize().asInstanceOf[Mat] + + } + + override def materialize[Mat](runnableGraph: AkkaGraph[ClosedShape, Mat], + subflowFuser: (GraphInterpreterShell) => ActorRef, initialAttributes: Attributes): Mat = { + materialize(runnableGraph) + } + + override def makeLogger(logSource: Class[_]): LoggingAdapter = { + logger + } + + def buildToplevelModule(graph: GGraph[Module, Edge]): Module = { + var moduleInProgress: Module = EmptyModule + graph.vertices.foreach(module => { + moduleInProgress = moduleInProgress.compose(module) + }) + graph.edges.foreach(value => { + val (node1, edge, node2) = value + moduleInProgress = moduleInProgress.wire(edge.from, edge.to) + }) + + moduleInProgress + } + + def materialize(graph: GGraph[Module, Edge], + inputMatValues: scala.collection.mutable.Map[Module, Any]): + scala.collection.mutable.Map[Module, Any] = { + val topLevelModule = buildToplevelModule(graph) + val session = LocalMaterializerSession(topLevelModule, null, null) + import scala.collection.JavaConverters._ + val matV = inputMatValues.asJava + val materializedGraph = graph.mapVertex { module => + session.materializeAtomic(module.asInstanceOf[AtomicModule], module.attributes, matV) + matV.get(module) + } + materializedGraph.edges.foreach { nodeEdgeNode => + val (node1, edge, node2) = nodeEdgeNode + val from = edge.from + val to = edge.to + node1 match { + case module1: Module => + node2 match { + case module2: Module => + val publisher = module1.downstreams(from).asInstanceOf[Publisher[Any]] + val subscriber = module2.upstreams(to).asInstanceOf[Subscriber[Any]] + publisher.subscribe(subscriber) + case _ => + } + case _ => + } + } + val matValSources = graph.vertices.flatMap(module => { + val rt: Option[MaterializedValueSource[_]] = module match { + case graphStage: GraphStageModule => + graphStage.stage match { + case materializedValueSource: MaterializedValueSource[_] => + Some(materializedValueSource) + case _ => + None + } + case _ => + None + } + rt + }) + publishToMaterializedValueSource(matValSources, inputMatValues) + inputMatValues + } + + private def publishToMaterializedValueSource(modules: List[MaterializedValueSource[_]], + matValues: scala.collection.mutable.Map[Module, Any]): Unit = { + modules.foreach { source => + Option(source.computation).map { attr => + val valueToPublish = MaterializedValueOps(attr).resolve(matValues) + source.setValue(valueToPublish) + } + } + } + + private[this] def createFlowName(): String = flowNames.next() + + val flowName = createFlowName() + var nextId = 0 + + def stageName(attr: Attributes): String = { + val name = s"$flowName-$nextId-${attr.nameOrDefault()}" + nextId += 1 + name + } + + override def withNamePrefix(name: String): LocalMaterializerImpl = + this.copy(flowNames = flowNames.copy(name)) + +} + +object LocalMaterializerImpl { + case class MaterializedModule(module: Module, matValue: Any, + inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]], + outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]]) + + def apply(materializerSettings: Option[ActorMaterializerSettings] = None, + namePrefix: Option[String] = None)(implicit system: ActorSystem): + LocalMaterializerImpl = { + + val settings = materializerSettings getOrElse ActorMaterializerSettings(system) + apply(settings, namePrefix.getOrElse("flow"))(system) + } + + def apply(materializerSettings: ActorMaterializerSettings, + namePrefix: String)(implicit system: ActorSystem): LocalMaterializerImpl = { + val haveShutDown = new AtomicBoolean(false) + + new LocalMaterializerImpl( + system, + materializerSettings, + system.dispatchers, + system.actorOf(StreamSupervisor.props(materializerSettings, + haveShutDown).withDispatcher(materializerSettings.dispatcher)), + haveShutDown, + FlowNames(system).name.copy(namePrefix)) + } + + def toFoldModule(reduce: ReduceModule[Any]): Fold[Any, Any] = { + val f = reduce.f + val aggregator = {(zero: Any, input: Any) => + if (zero == null) { + input + } else { + f(zero, input) + } + } + Fold(null, aggregator) + } +}
