Repository: incubator-gearpump Updated Branches: refs/heads/master d5343681e -> c176e4485
fix #2002, add akka stream examples Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ef11f16f Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ef11f16f Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ef11f16f Branch: refs/heads/master Commit: ef11f16fb61c6bf81e7dad20fc798093be8fd8a7 Parents: d534368 Author: manuzhang <[email protected]> Authored: Fri Feb 19 19:14:45 2016 +0800 Committer: manuzhang <[email protected]> Committed: Tue Apr 26 14:21:55 2016 +0800 ---------------------------------------------------------------------- .../src/main/resources/geardefault.conf | 5 + .../stream/gearpump/example/WikipediaApp.scala | 144 +++++++++++++++++++ project/Build.scala | 3 + 3 files changed, 152 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ef11f16f/experiments/akkastream/src/main/resources/geardefault.conf ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/resources/geardefault.conf b/experiments/akkastream/src/main/resources/geardefault.conf new file mode 100644 index 0000000..626a1dc --- /dev/null +++ b/experiments/akkastream/src/main/resources/geardefault.conf @@ -0,0 +1,5 @@ +gearpump.serializers { + "akka.stream.gearpump.example.WikipediaApp$WikidataElement" = "" + "scala.collection.immutable.Map$Map1" = "" + "scala.collection.immutable.Map$Map2" = "" +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ef11f16f/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala new file mode 100644 index 0000000..915624f --- /dev/null +++ b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package akka.stream.gearpump.example + +import java.io.{File, FileInputStream} +import java.util.zip.GZIPInputStream + +import akka.actor.ActorSystem +import akka.stream.gearpump.{GearAttributes, GearpumpMaterializer} +import akka.stream.gearpump.graph.GraphCutter +import akka.stream.io.{Framing, InputStreamSource} +import akka.stream.scaladsl._ +import akka.util.ByteString +import io.gearpump.cluster.main.{CLIOption, ArgumentsParser} +import io.gearpump.util.AkkaApp +import org.json4s.JsonAST.JString + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success, Try} + +/** + * this example is ported from http://engineering.intenthq.com/2015/06/wikidata-akka-streams/ + * which showcases running Akka Streams DSL across JVMs on Gearpump + * + * Usage: output/target/pack/bin/gear app -jar experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar + * -input wikidata-${DATE}-all.json.gz -languages en,de + * + * (Note: Wikipedia data can be downloaded from https://dumps.wikimedia.org/wikidatawiki/entities/) + * + */ +object WikipediaApp extends ArgumentsParser with AkkaApp { + + case class WikidataElement(id: String, sites: Map[String, String]) + + override val options: Array[(String, CLIOption[Any])] = Array( + "input" -> CLIOption[String]("<Wikidata JSON dump>", required = true), + "languages" -> CLIOption[String]("<languages to take into account>", required = true) + ) + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val parsed = parse(args) + val input = new File(parsed.getString("input")) + val langs = parsed.getString("languages").split(",") + + implicit val system = ActorSystem("wikidata-poc", akkaConf) + implicit val materializer = new GearpumpMaterializer(system, GraphCutter.TagAttributeStrategy, useLocalCluster = false) + import system.dispatcher + + val elements = source(input).via(parseJson(langs)) + + val g = FlowGraph.closed(count) { implicit b => + sinkCount => { + import FlowGraph.Implicits._ + + val broadcast = b.add(Broadcast[WikidataElement](2)) + elements ~> broadcast ~> logEveryNSink(1000) + broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount + } + } + + g.run().onComplete { x => + x match { + case Success((t, f)) => printResults(t, f) + case Failure(tr) => println("Something went wrong") + } + system.shutdown() + } + system.awaitTermination() + } + + def source(file: File): Source[String, Future[Long]] = { + val compressed = new GZIPInputStream(new FileInputStream(file), 65536) + InputStreamSource(() => compressed) + .via(Framing.delimiter(ByteString("\n"), Int.MaxValue)) + .map(x => x.decodeString("utf-8")) + } + + def parseJson(langs: Seq[String])(implicit ec: ExecutionContext): Flow[String, WikidataElement, Unit] = + Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, line))).collect { + case Some(v) => v + } + + def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = { + import org.json4s.jackson.JsonMethods + Try(JsonMethods.parse(line)).toOption.flatMap { json => + json \ "id" match { + case JString(itemId) => + val sites = for { + lang <- langs + JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title" + } yield lang -> title + + if (sites.isEmpty) None + else Some(WikidataElement(id = itemId, sites = sites.toMap)) + + case _ => None + } + } + } + + def logEveryNSink[T](n: Int) = Sink.fold(0) { (x, y: T) => + if (x % n == 0) + println(s"Processing element $x: $y") + x + 1 + } + + def checkSameTitles(langs: Set[String]): Flow[WikidataElement, Boolean, Unit] = Flow[WikidataElement] + .filter(_.sites.keySet == langs) + .map { x => + val titles = x.sites.values + titles.forall( _ == titles.head) + }.withAttributes(GearAttributes.remote) + + def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0,0)) { + case ((t, f), true) => (t+1, f) + case ((t, f), false) => (t, f+1) + } + + def printResults(t: Int, f: Int) = { + val message = s""" + | Number of items with the same title: $t + | Number of items with the different title: $f + | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)} + """.stripMargin + println(message) + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ef11f16f/project/Build.scala ---------------------------------------------------------------------- diff --git a/project/Build.scala b/project/Build.scala index 3723020..eae876b 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -333,6 +333,9 @@ object Build extends sbt.Build { base = file("experiments/akkastream"), settings = commonSettings ++ noPublish ++ myAssemblySettings ++ Seq( + libraryDependencies ++= Seq( + "org.json4s" %% "json4s-jackson" % "3.2.11" + ), mainClass in (Compile, packageBin) := Some("akka.stream.gearpump.example.Test") ) ) dependsOn (streaming % "test->test; provided", daemon % "test->test; provided")
