Repository: incubator-gearpump
Updated Branches:
  refs/heads/master d5343681e -> c176e4485


fix #2002, add akka stream examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ef11f16f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ef11f16f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ef11f16f

Branch: refs/heads/master
Commit: ef11f16fb61c6bf81e7dad20fc798093be8fd8a7
Parents: d534368
Author: manuzhang <[email protected]>
Authored: Fri Feb 19 19:14:45 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Apr 26 14:21:55 2016 +0800

----------------------------------------------------------------------
 .../src/main/resources/geardefault.conf         |   5 +
 .../stream/gearpump/example/WikipediaApp.scala  | 144 +++++++++++++++++++
 project/Build.scala                             |   3 +
 3 files changed, 152 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ef11f16f/experiments/akkastream/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/resources/geardefault.conf 
b/experiments/akkastream/src/main/resources/geardefault.conf
new file mode 100644
index 0000000..626a1dc
--- /dev/null
+++ b/experiments/akkastream/src/main/resources/geardefault.conf
@@ -0,0 +1,5 @@
+gearpump.serializers {
+  "akka.stream.gearpump.example.WikipediaApp$WikidataElement" = ""
+  "scala.collection.immutable.Map$Map1" = ""
+  "scala.collection.immutable.Map$Map2" = ""
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ef11f16f/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
----------------------------------------------------------------------
diff --git 
a/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
new file mode 100644
index 0000000..915624f
--- /dev/null
+++ 
b/experiments/akkastream/src/main/scala/akka/stream/gearpump/example/WikipediaApp.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package akka.stream.gearpump.example
+
+import java.io.{File, FileInputStream}
+import java.util.zip.GZIPInputStream
+
+import akka.actor.ActorSystem
+import akka.stream.gearpump.{GearAttributes, GearpumpMaterializer}
+import akka.stream.gearpump.graph.GraphCutter
+import akka.stream.io.{Framing, InputStreamSource}
+import akka.stream.scaladsl._
+import akka.util.ByteString
+import io.gearpump.cluster.main.{CLIOption, ArgumentsParser}
+import io.gearpump.util.AkkaApp
+import org.json4s.JsonAST.JString
+
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.{Failure, Success, Try}
+
+/**
+ * this example is ported from 
http://engineering.intenthq.com/2015/06/wikidata-akka-streams/
+ * which showcases running Akka Streams DSL across JVMs on Gearpump
+ *
+ * Usage: output/target/pack/bin/gear app -jar 
experiments/akkastream/target/scala_2.11/akkastream-${VERSION}-SNAPSHOT-assembly.jar
+ *            -input wikidata-${DATE}-all.json.gz -languages en,de
+ *
+ * (Note: Wikipedia data can be downloaded from 
https://dumps.wikimedia.org/wikidatawiki/entities/)
+ *
+ */
+object WikipediaApp extends ArgumentsParser with AkkaApp {
+
+  case class WikidataElement(id: String, sites: Map[String, String])
+
+  override val options: Array[(String, CLIOption[Any])] = Array(
+    "input" -> CLIOption[String]("<Wikidata JSON dump>", required = true),
+    "languages" -> CLIOption[String]("<languages to take into account>", 
required = true)
+  )
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val parsed = parse(args)
+    val input = new File(parsed.getString("input"))
+    val langs = parsed.getString("languages").split(",")
+
+    implicit val system = ActorSystem("wikidata-poc", akkaConf)
+    implicit val materializer = new GearpumpMaterializer(system, 
GraphCutter.TagAttributeStrategy, useLocalCluster = false)
+    import system.dispatcher
+
+    val elements = source(input).via(parseJson(langs))
+
+    val g = FlowGraph.closed(count) { implicit b =>
+      sinkCount => {
+        import FlowGraph.Implicits._
+
+        val broadcast = b.add(Broadcast[WikidataElement](2))
+        elements ~> broadcast ~> logEveryNSink(1000)
+                    broadcast ~> checkSameTitles(langs.toSet) ~> sinkCount
+      }
+    }
+
+    g.run().onComplete { x =>
+      x match {
+        case Success((t, f)) => printResults(t, f)
+        case Failure(tr) => println("Something went wrong")
+      }
+      system.shutdown()
+    }
+    system.awaitTermination()
+  }
+
+  def source(file: File): Source[String, Future[Long]] = {
+    val compressed = new GZIPInputStream(new FileInputStream(file), 65536)
+    InputStreamSource(() => compressed)
+      .via(Framing.delimiter(ByteString("\n"), Int.MaxValue))
+      .map(x => x.decodeString("utf-8"))
+  }
+
+  def parseJson(langs: Seq[String])(implicit ec: ExecutionContext): 
Flow[String, WikidataElement, Unit] =
+    Flow[String].mapAsyncUnordered(8)(line => Future(parseItem(langs, 
line))).collect {
+      case Some(v) => v
+    }
+
+  def parseItem(langs: Seq[String], line: String): Option[WikidataElement] = {
+    import org.json4s.jackson.JsonMethods
+    Try(JsonMethods.parse(line)).toOption.flatMap { json =>
+      json \ "id" match {
+        case JString(itemId) =>
+          val sites = for {
+            lang <- langs
+            JString(title) <- json \ "sitelinks" \ s"${lang}wiki" \ "title"
+          } yield lang -> title
+
+          if (sites.isEmpty) None
+          else Some(WikidataElement(id = itemId, sites = sites.toMap))
+
+        case _ => None
+      }
+    }
+  }
+
+  def logEveryNSink[T](n: Int) = Sink.fold(0) { (x, y: T) =>
+    if (x % n == 0)
+      println(s"Processing element $x: $y")
+    x + 1
+  }
+
+  def checkSameTitles(langs: Set[String]): Flow[WikidataElement, Boolean, 
Unit] = Flow[WikidataElement]
+    .filter(_.sites.keySet == langs)
+    .map { x =>
+      val titles = x.sites.values
+      titles.forall( _ == titles.head)
+    }.withAttributes(GearAttributes.remote)
+
+  def count: Sink[Boolean, Future[(Int, Int)]] = Sink.fold((0,0)) {
+    case ((t, f), true) => (t+1, f)
+    case ((t, f), false) => (t, f+1)
+  }
+
+  def printResults(t: Int, f: Int) = {
+    val message = s"""
+                     | Number of items with the same title: $t
+        | Number of items with the different title: $f
+        | Ratios: ${t.toDouble / (t + f)} / ${f.toDouble / (t + f)}
+                  """.stripMargin
+    println(message)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ef11f16f/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index 3723020..eae876b 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -333,6 +333,9 @@ object Build extends sbt.Build {
     base = file("experiments/akkastream"),
     settings = commonSettings ++ noPublish ++ myAssemblySettings ++
       Seq(
+        libraryDependencies ++= Seq(
+          "org.json4s" %% "json4s-jackson" % "3.2.11"
+        ),
         mainClass in (Compile, packageBin) := 
Some("akka.stream.gearpump.example.Test")
       )
   ) dependsOn (streaming % "test->test; provided", daemon % "test->test; 
provided")

Reply via email to