Repository: spark Updated Branches: refs/heads/master 99d2e4e00 -> 72ecfd095
[SPARK-25149][GRAPHX] Update Parallel Personalized Page Rank to test with large vertexIds ## What changes were proposed in this pull request? runParallelPersonalizedPageRank in graphx checks that `sources` are <= Int.MaxValue.toLong, but this is not actually required. This check seems to have been added because we use sparse vectors in the implementation and sparse vectors cannot be indexed by values > MAX_INT. However we do not ever index the sparse vector by the source vertexIds so this isn't an issue. I've added a test with large vertexIds to confirm this works as expected. ## How was this patch tested? Unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22139 from MrBago/remove-veretexId-check-pppr. Authored-by: Bago Amirbekian <b...@databricks.com> Signed-off-by: Joseph K. Bradley <jos...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72ecfd09 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72ecfd09 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72ecfd09 Branch: refs/heads/master Commit: 72ecfd095062ad61c073f9b97bf3c47644575d60 Parents: 99d2e4e Author: Bago Amirbekian <b...@databricks.com> Authored: Tue Aug 21 15:21:55 2018 -0700 Committer: Joseph K. Bradley <jos...@databricks.com> Committed: Tue Aug 21 15:21:55 2018 -0700 ---------------------------------------------------------------------- .../org/apache/spark/graphx/lib/PageRank.scala | 28 ++++++----------- .../apache/spark/graphx/lib/PageRankSuite.scala | 32 +++++++++++++++----- 2 files changed, 35 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/72ecfd09/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index ebd65e8..96b635f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -184,9 +184,11 @@ object PageRank extends Logging { * indexed by the position of nodes in the sources list) and * edge attributes the normalized edge weight */ - def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], - numIter: Int, resetProb: Double = 0.15, - sources: Array[VertexId]): Graph[Vector, Double] = { + def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], + numIter: Int, + resetProb: Double = 0.15, + sources: Array[VertexId]): Graph[Vector, Double] = { require(numIter > 0, s"Number of iterations must be greater than 0," + s" but got ${numIter}") require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" + @@ -194,15 +196,11 @@ object PageRank extends Logging { require(sources.nonEmpty, s"The list of sources must be non-empty," + s" but got ${sources.mkString("[", ",", "]")}") - // TODO if one sources vertex id is outside of the int range - // we won't be able to store its activations in a sparse vector - require(sources.max <= Int.MaxValue.toLong, - s"This implementation currently only works for source vertex ids at most ${Int.MaxValue}") val zero = Vectors.sparse(sources.size, List()).asBreeze - val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) => - val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze - (vid, v) - }.toMap + // map of vid -> vector where for each vid, the _position of vid in source_ is set to 1.0 + val sourcesInitMap = sources.zipWithIndex.toMap.mapValues { i => + Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze + } val sc = graph.vertices.sparkContext val sourcesInitMapBC = sc.broadcast(sourcesInitMap) // Initialize the PageRank graph with each edge attribute having @@ -212,13 +210,7 @@ object PageRank extends Logging { .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } // Set the weight on the edges based on the degree .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src) - .mapVertices { (vid, attr) => - if (sourcesInitMapBC.value contains vid) { - sourcesInitMapBC.value(vid) - } else { - zero - } - } + .mapVertices((vid, _) => sourcesInitMapBC.value.getOrElse(vid, zero)) var i = 0 while (i < numIter) { http://git-wip-us.apache.org/repos/asf/spark/blob/72ecfd09/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala ---------------------------------------------------------------------- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala index 9779553..1e4c6c7 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala @@ -203,24 +203,42 @@ class PageRankSuite extends SparkFunSuite with LocalSparkContext { test("Chain PersonalizedPageRank") { withSpark { sc => - val chain1 = (0 until 9).map(x => (x, x + 1) ) + // Check that implementation can handle large vertexIds, SPARK-25149 + val vertexIdOffset = Int.MaxValue.toLong + 1 + val sourceOffest = 4 + val source = vertexIdOffset + sourceOffest + val numIter = 10 + val vertices = vertexIdOffset until vertexIdOffset + numIter + val chain1 = vertices.zip(vertices.tail) val rawEdges = sc.parallelize(chain1, 1).map { case (s, d) => (s.toLong, d.toLong) } val chain = Graph.fromEdgeTuples(rawEdges, 1.0).cache() val resetProb = 0.15 val tol = 0.0001 - val numIter = 10 val errorTol = 1.0e-1 - val staticRanks = chain.staticPersonalizedPageRank(4, numIter, resetProb).vertices - val dynamicRanks = chain.personalizedPageRank(4, tol, resetProb).vertices + val a = resetProb / (1 - Math.pow(1 - resetProb, numIter - sourceOffest)) + // We expect the rank to decay as (1 - resetProb) ^ distance + val expectedRanks = sc.parallelize(vertices).map { vid => + val rank = if (vid < source) { + 0.0 + } else { + a * Math.pow(1 - resetProb, vid - source) + } + vid -> rank + } + val expected = VertexRDD(expectedRanks) + + val staticRanks = chain.staticPersonalizedPageRank(source, numIter, resetProb).vertices + assert(compareRanks(staticRanks, expected) < errorTol) - assert(compareRanks(staticRanks, dynamicRanks) < errorTol) + val dynamicRanks = chain.personalizedPageRank(source, tol, resetProb).vertices + assert(compareRanks(dynamicRanks, expected) < errorTol) val parallelStaticRanks = chain - .staticParallelPersonalizedPageRank(Array(4), numIter, resetProb).mapVertices { + .staticParallelPersonalizedPageRank(Array(source), numIter, resetProb).mapVertices { case (vertexId, vector) => vector(0) }.vertices.cache() - assert(compareRanks(staticRanks, parallelStaticRanks) < errorTol) + assert(compareRanks(parallelStaticRanks, expected) < errorTol) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org