Repository: spark
Updated Branches:
  refs/heads/master 99d2e4e00 -> 72ecfd095


[SPARK-25149][GRAPHX] Update Parallel Personalized Page Rank to test with large 
vertexIds

## What changes were proposed in this pull request?

runParallelPersonalizedPageRank in graphx checks that `sources` are <= 
Int.MaxValue.toLong, but this is not actually required. This check seems to 
have been added because we use sparse vectors in the implementation and sparse 
vectors cannot be indexed by values > MAX_INT. However we do not ever index the 
sparse vector by the source vertexIds so this isn't an issue. I've added a test 
with large vertexIds to confirm this works as expected.

## How was this patch tested?

Unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Closes #22139 from MrBago/remove-veretexId-check-pppr.

Authored-by: Bago Amirbekian <b...@databricks.com>
Signed-off-by: Joseph K. Bradley <jos...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72ecfd09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72ecfd09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72ecfd09

Branch: refs/heads/master
Commit: 72ecfd095062ad61c073f9b97bf3c47644575d60
Parents: 99d2e4e
Author: Bago Amirbekian <b...@databricks.com>
Authored: Tue Aug 21 15:21:55 2018 -0700
Committer: Joseph K. Bradley <jos...@databricks.com>
Committed: Tue Aug 21 15:21:55 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/graphx/lib/PageRank.scala  | 28 ++++++-----------
 .../apache/spark/graphx/lib/PageRankSuite.scala | 32 +++++++++++++++-----
 2 files changed, 35 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/72ecfd09/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
index ebd65e8..96b635f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala
@@ -184,9 +184,11 @@ object PageRank extends Logging {
    *         indexed by the position of nodes in the sources list) and
    *         edge attributes the normalized edge weight
    */
-  def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: 
Graph[VD, ED],
-    numIter: Int, resetProb: Double = 0.15,
-    sources: Array[VertexId]): Graph[Vector, Double] = {
+  def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](
+      graph: Graph[VD, ED],
+      numIter: Int,
+      resetProb: Double = 0.15,
+      sources: Array[VertexId]): Graph[Vector, Double] = {
     require(numIter > 0, s"Number of iterations must be greater than 0," +
       s" but got ${numIter}")
     require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must 
belong" +
@@ -194,15 +196,11 @@ object PageRank extends Logging {
     require(sources.nonEmpty, s"The list of sources must be non-empty," +
       s" but got ${sources.mkString("[", ",", "]")}")
 
-    // TODO if one sources vertex id is outside of the int range
-    // we won't be able to store its activations in a sparse vector
-    require(sources.max <= Int.MaxValue.toLong,
-      s"This implementation currently only works for source vertex ids at most 
${Int.MaxValue}")
     val zero = Vectors.sparse(sources.size, List()).asBreeze
-    val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) =>
-      val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
-      (vid, v)
-    }.toMap
+    // map of vid -> vector where for each vid, the _position of vid in 
source_ is set to 1.0
+    val sourcesInitMap = sources.zipWithIndex.toMap.mapValues { i =>
+      Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze
+    }
     val sc = graph.vertices.sparkContext
     val sourcesInitMapBC = sc.broadcast(sourcesInitMap)
     // Initialize the PageRank graph with each edge attribute having
@@ -212,13 +210,7 @@ object PageRank extends Logging {
       .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => 
deg.getOrElse(0) }
       // Set the weight on the edges based on the degree
       .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src)
-      .mapVertices { (vid, attr) =>
-        if (sourcesInitMapBC.value contains vid) {
-          sourcesInitMapBC.value(vid)
-        } else {
-          zero
-        }
-      }
+      .mapVertices((vid, _) => sourcesInitMapBC.value.getOrElse(vid, zero))
 
     var i = 0
     while (i < numIter) {

http://git-wip-us.apache.org/repos/asf/spark/blob/72ecfd09/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index 9779553..1e4c6c7 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -203,24 +203,42 @@ class PageRankSuite extends SparkFunSuite with 
LocalSparkContext {
 
   test("Chain PersonalizedPageRank") {
     withSpark { sc =>
-      val chain1 = (0 until 9).map(x => (x, x + 1) )
+      // Check that implementation can handle large vertexIds, SPARK-25149
+      val vertexIdOffset = Int.MaxValue.toLong + 1
+      val sourceOffest = 4
+      val source = vertexIdOffset + sourceOffest
+      val numIter = 10
+      val vertices = vertexIdOffset until vertexIdOffset + numIter
+      val chain1 = vertices.zip(vertices.tail)
       val rawEdges = sc.parallelize(chain1, 1).map { case (s, d) => (s.toLong, 
d.toLong) }
       val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache()
       val resetProb = 0.15
       val tol = 0.0001
-      val numIter = 10
       val errorTol = 1.0e-1
 
-      val staticRanks = chain.staticPersonalizedPageRank(4, numIter, 
resetProb).vertices
-      val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices
+      val a = resetProb / (1 - Math.pow(1 - resetProb, numIter - sourceOffest))
+      // We expect the rank to decay as (1 - resetProb) ^ distance
+      val expectedRanks = sc.parallelize(vertices).map { vid =>
+        val rank = if (vid < source) {
+          0.0
+        } else {
+          a * Math.pow(1 - resetProb, vid - source)
+        }
+        vid -> rank
+      }
+      val expected = VertexRDD(expectedRanks)
+
+      val staticRanks = chain.staticPersonalizedPageRank(source, numIter, 
resetProb).vertices
+      assert(compareRanks(staticRanks, expected) < errorTol)
 
-      assert(compareRanks(staticRanks, dynamicRanks) < errorTol)
+      val dynamicRanks = chain.personalizedPageRank(source, tol, 
resetProb).vertices
+      assert(compareRanks(dynamicRanks, expected) < errorTol)
 
       val parallelStaticRanks = chain
-        .staticParallelPersonalizedPageRank(Array(4), numIter, 
resetProb).mapVertices {
+        .staticParallelPersonalizedPageRank(Array(source), numIter, 
resetProb).mapVertices {
           case (vertexId, vector) => vector(0)
         }.vertices.cache()
-      assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol)
+      assert(compareRanks(parallelStaticRanks, expected) < errorTol)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to