spark git commit: [SPARK-18845][GRAPHX] PageRank has incorrect initialization value that leads to slow convergence
Repository: spark Updated Branches: refs/heads/master 172a52f5d -> 78062b852 [SPARK-18845][GRAPHX] PageRank has incorrect initialization value that leads to slow convergence ## What changes were proposed in this pull request? Change the initial value in all PageRank implementations to be `1.0` instead of `resetProb` (default `0.15`) and use `outerJoinVertices` instead of `joinVertices` so that source vertices get updated in each iteration. This seems to have been introduced a long time ago in https://github.com/apache/spark/commit/15a564598fe63003652b1e24527c432080b5976c#diff-b2bf3f97dcd2f19d61c921836159cda9L90 With the exception of graphs with sinks (which currently give incorrect results see SPARK-18847) this gives faster convergence as the sum of ranks is already correct (sum of ranks should be number of vertices). Convergence comparision benchmark for small graph: http://imgur.com/a/HkkZf Code for benchmark: https://gist.github.com/aray/a7de1f3801a810f8b1fa00c271a1fefd ## How was this patch tested? (corrected) existing unit tests and additional test that verifies against result of igraph and NetworkX on a loop with a source. Author: Andrew RayCloses #16271 from aray/pagerank-initial-value. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78062b85 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78062b85 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78062b85 Branch: refs/heads/master Commit: 78062b8521bb02900baeec31992d697fa677f122 Parents: 172a52f Author: Andrew Ray Authored: Thu Dec 15 23:32:10 2016 -0800 Committer: Ankur Dave Committed: Thu Dec 15 23:32:10 2016 -0800 -- .../org/apache/spark/graphx/lib/PageRank.scala | 24 +++--- .../apache/spark/graphx/lib/PageRankSuite.scala | 34 +--- 2 files changed, 42 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/78062b85/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index feb3f47..37b6e45 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -115,9 +115,9 @@ object PageRank extends Logging { val src: VertexId = srcId.getOrElse(-1L) // Initialize the PageRank graph with each edge attribute having -// weight 1/outDegree and each vertex with attribute resetProb. +// weight 1/outDegree and each vertex with attribute 1.0. // When running personalized pagerank, only the source vertex -// has an attribute resetProb. All others are set to 0. +// has an attribute 1.0. All others are set to 0. var rankGraph: Graph[Double, Double] = graph // Associate the degree with each vertex .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } @@ -125,7 +125,7 @@ object PageRank extends Logging { .mapTriplets( e => 1.0 / e.srcAttr, TripletFields.Src ) // Set the vertex attributes to the initial pagerank values .mapVertices { (id, attr) => -if (!(id != src && personalized)) resetProb else 0.0 +if (!(id != src && personalized)) 1.0 else 0.0 } def delta(u: VertexId, v: VertexId): Double = { if (u == v) 1.0 else 0.0 } @@ -150,8 +150,8 @@ object PageRank extends Logging { (src: VertexId, id: VertexId) => resetProb } - rankGraph = rankGraph.joinVertices(rankUpdates) { -(id, oldRank, msgSum) => rPrb(src, id) + (1.0 - resetProb) * msgSum + rankGraph = rankGraph.outerJoinVertices(rankUpdates) { +(id, oldRank, msgSumOpt) => rPrb(src, id) + (1.0 - resetProb) * msgSumOpt.getOrElse(0.0) }.cache() rankGraph.edges.foreachPartition(x => {}) // also materializes rankGraph.vertices @@ -196,7 +196,7 @@ object PageRank extends Logging { // we won't be able to store its activations in a sparse vector val zero = Vectors.sparse(sources.size, List()).asBreeze val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) => - val v = Vectors.sparse(sources.size, Array(i), Array(resetProb)).asBreeze + val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze (vid, v) }.toMap val sc = graph.vertices.sparkContext @@ -225,11 +225,11 @@ object PageRank extends Logging { ctx => ctx.sendToDst(ctx.srcAttr :* ctx.attr), (a : BV[Double], b : BV[Double]) => a :+ b, TripletFields.Src) - rankGraph = rankGraph.joinVertices(rankUpdates) { -(vid,
spark git commit: [SPARK-9436] [GRAPHX] Pregel simplification patch
Repository: spark Updated Branches: refs/heads/master 5340dfaf9 - b715933fc [SPARK-9436] [GRAPHX] Pregel simplification patch Pregel code contains two consecutive joins: ``` g.vertices.innerJoin(messages)(vprog) ... g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) = newOpt.getOrElse(old) } ``` This can be simplified with one join. ankurdave proposed a patch based on our discussion in the mailing list: https://www.mail-archive.com/devspark.apache.org/msg10316.html Author: Alexander Ulanov na...@yandex.ru Closes #7749 from avulanov/SPARK-9436-pregel and squashes the following commits: 8568e06 [Alexander Ulanov] Pregel simplification patch Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b715933f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b715933f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b715933f Branch: refs/heads/master Commit: b715933fc69a49653abdb2fba0818dfc4f35d358 Parents: 5340dfa Author: Alexander Ulanov na...@yandex.ru Authored: Wed Jul 29 13:59:00 2015 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Wed Jul 29 13:59:00 2015 -0700 -- .../scala/org/apache/spark/graphx/Pregel.scala | 23 +--- 1 file changed, 10 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b715933f/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index cfcf724..2ca60d5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -127,28 +127,25 @@ object Pregel extends Logging { var prevG: Graph[VD, ED] = null var i = 0 while (activeMessages 0 i maxIterations) { - // Receive the messages. Vertices that didn't get any messages do not appear in newVerts. - val newVerts = g.vertices.innerJoin(messages)(vprog).cache() - // Update the graph with the new vertices. + // Receive the messages and update the vertices. prevG = g - g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) = newOpt.getOrElse(old) } - g.cache() + g = g.joinVertices(messages)(vprog).cache() val oldMessages = messages - // Send new messages. Vertices that didn't get any messages don't appear in newVerts, so don't - // get to send messages. We must cache messages so it can be materialized on the next line, - // allowing us to uncache the previous iteration. - messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDirection))).cache() - // The call to count() materializes `messages`, `newVerts`, and the vertices of `g`. This - // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the - // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g). + // Send new messages, skipping edges where neither side received a message. We must cache + // messages so it can be materialized on the next line, allowing us to uncache the previous + // iteration. + messages = g.mapReduceTriplets( +sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() + // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages + // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages + // and the vertices of g). activeMessages = messages.count() logInfo(Pregel finished iteration + i) // Unpersist the RDDs hidden by newly-materialized RDDs oldMessages.unpersist(blocking = false) - newVerts.unpersist(blocking = false) prevG.unpersistVertices(blocking = false) prevG.edges.unpersist(blocking = false) // count the iteration - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9109] [GRAPHX] Keep the cached edge in the graph
Repository: spark Updated Branches: refs/heads/master eba6a1af4 - 587c315b2 [SPARK-9109] [GRAPHX] Keep the cached edge in the graph The change here is to keep the cached RDDs in the graph object so that when the graph.unpersist() is called these RDDs are correctly unpersisted. ```java import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.slf4j.LoggerFactory import org.apache.spark.graphx.util.GraphGenerators // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, (rxin, student)), (7L, (jgonzal, postdoc)), (5L, (franklin, prof)), (2L, (istoica, prof // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, collab),Edge(5L, 3L, advisor), Edge(2L, 5L, colleague), Edge(5L, 7L, pi))) // Define a default user in case there are relationship with missing user val defaultUser = (John Doe, Missing) // Build the initial Graph val graph = Graph(users, relationships, defaultUser) graph.cache().numEdges graph.unpersist() sc.getPersistentRDDs.foreach( r = println( r._2.toString)) ``` Author: tien-dungle tien-dung...@realimpactanalytics.com Closes #7469 from tien-dungle/SPARK-9109_Graphx-unpersist and squashes the following commits: 8d87997 [tien-dungle] Keep the cached edge in the graph Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/587c315b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/587c315b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/587c315b Branch: refs/heads/master Commit: 587c315b204f1439f696620543c38166d95f8a3d Parents: eba6a1a Author: tien-dungle tien-dung...@realimpactanalytics.com Authored: Fri Jul 17 12:11:32 2015 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Fri Jul 17 12:11:32 2015 -0700 -- .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/587c315b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 90a74d2..da95314 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -332,9 +332,9 @@ object GraphImpl { edgeStorageLevel: StorageLevel, vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD]) - .withTargetStorageLevel(edgeStorageLevel).cache() + .withTargetStorageLevel(edgeStorageLevel) val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr) - .withTargetStorageLevel(vertexStorageLevel).cache() + .withTargetStorageLevel(vertexStorageLevel) GraphImpl(vertexRDD, edgeRDD) } @@ -346,9 +346,14 @@ object GraphImpl { def apply[VD: ClassTag, ED: ClassTag]( vertices: VertexRDD[VD], edges: EdgeRDD[ED]): GraphImpl[VD, ED] = { + +vertices.cache() + // Convert the vertex partitions in edges to the correct type val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]] .mapEdgePartitions((pid, part) = part.withoutVertexAttributes[VD]) + .cache() + GraphImpl.fromExistingRDDs(vertices, newEdges) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-9109] [GRAPHX] Keep the cached edge in the graph
Repository: spark Updated Branches: refs/heads/branch-1.4 bb1401507 - f34f3d71f [SPARK-9109] [GRAPHX] Keep the cached edge in the graph The change here is to keep the cached RDDs in the graph object so that when the graph.unpersist() is called these RDDs are correctly unpersisted. ```java import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import org.slf4j.LoggerFactory import org.apache.spark.graphx.util.GraphGenerators // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, (rxin, student)), (7L, (jgonzal, postdoc)), (5L, (franklin, prof)), (2L, (istoica, prof // Create an RDD for edges val relationships: RDD[Edge[String]] = sc.parallelize(Array(Edge(3L, 7L, collab),Edge(5L, 3L, advisor), Edge(2L, 5L, colleague), Edge(5L, 7L, pi))) // Define a default user in case there are relationship with missing user val defaultUser = (John Doe, Missing) // Build the initial Graph val graph = Graph(users, relationships, defaultUser) graph.cache().numEdges graph.unpersist() sc.getPersistentRDDs.foreach( r = println( r._2.toString)) ``` Author: tien-dungle tien-dung...@realimpactanalytics.com Closes #7469 from tien-dungle/SPARK-9109_Graphx-unpersist and squashes the following commits: 8d87997 [tien-dungle] Keep the cached edge in the graph (cherry picked from commit 587c315b204f1439f696620543c38166d95f8a3d) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f34f3d71 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f34f3d71 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f34f3d71 Branch: refs/heads/branch-1.4 Commit: f34f3d71f6551da5e96b0de99c0f61fa981967f6 Parents: bb14015 Author: tien-dungle tien-dung...@realimpactanalytics.com Authored: Fri Jul 17 12:11:32 2015 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Fri Jul 17 12:15:20 2015 -0700 -- .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala | 9 +++-- 1 file changed, 7 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f34f3d71/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 90a74d2..da95314 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -332,9 +332,9 @@ object GraphImpl { edgeStorageLevel: StorageLevel, vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD]) - .withTargetStorageLevel(edgeStorageLevel).cache() + .withTargetStorageLevel(edgeStorageLevel) val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr) - .withTargetStorageLevel(vertexStorageLevel).cache() + .withTargetStorageLevel(vertexStorageLevel) GraphImpl(vertexRDD, edgeRDD) } @@ -346,9 +346,14 @@ object GraphImpl { def apply[VD: ClassTag, ED: ClassTag]( vertices: VertexRDD[VD], edges: EdgeRDD[ED]): GraphImpl[VD, ED] = { + +vertices.cache() + // Convert the vertex partitions in edges to the correct type val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]] .mapEdgePartitions((pid, part) = part.withoutVertexAttributes[VD]) + .cache() + GraphImpl.fromExistingRDDs(vertices, newEdges) } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square number of partitions
Repository: spark Updated Branches: refs/heads/master d267c2834 - 0a4071eab [SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square number of partitions See https://github.com/aray/e2d/blob/master/EdgePartition2D.ipynb Author: Andrew Ray ray.and...@gmail.com Closes #7104 from aray/edge-partition-2d-improvement and squashes the following commits: 3729f84 [Andrew Ray] correct bounds and remove unneeded comments 97f8464 [Andrew Ray] change less 5141ab4 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement 925fd2c [Andrew Ray] use new interface for partitioning 001bfd0 [Andrew Ray] Refactor PartitionStrategy so that we can return a prtition function for a given number of parts. To keep compatibility we define default methods that translate between the two implementation options. Made EdgePartition2D use old strategy when we have a perfect square and implement new interface. 5d42105 [Andrew Ray] % - / 3560084 [Andrew Ray] Merge branch 'master' into edge-partition-2d-improvement f006364 [Andrew Ray] remove unneeded comments cfa2c5e [Andrew Ray] Modifications to EdgePartition2D so that it works for non perfect squares. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a4071ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a4071ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a4071ea Branch: refs/heads/master Commit: 0a4071eab30db1db80f61ed2cb2e7243291183ce Parents: d267c28 Author: Andrew Ray ray.and...@gmail.com Authored: Tue Jul 14 13:14:47 2015 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Jul 14 13:14:47 2015 -0700 -- .../apache/spark/graphx/PartitionStrategy.scala | 32 +--- 1 file changed, 21 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a4071ea/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 7372dfb..70a7592 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -32,7 +32,7 @@ trait PartitionStrategy extends Serializable { object PartitionStrategy { /** * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix, - * guaranteeing a `2 * sqrt(numParts) - 1` bound on vertex replication. + * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication. * * Suppose we have a graph with 12 vertices that we want to partition * over 9 machines. We can use the following sparse matrix representation: @@ -61,26 +61,36 @@ object PartitionStrategy { * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, * P6)` or the last * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be - * replicated to at most `2 * sqrt(numParts) - 1` machines. + * replicated to at most `2 * sqrt(numParts)` machines. * * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the * vertex locations. * - * One of the limitations of this approach is that the number of machines must either be a - * perfect square. We partially address this limitation by computing the machine assignment to - * the next - * largest perfect square and then mapping back down to the actual number of machines. - * Unfortunately, this can also lead to work imbalance and so it is suggested that a perfect - * square is used. + * When the number of partitions requested is not a perfect square we use a slightly different + * method where the last column can have a different number of rows than the others while still + * maintaining the same size per block. */ case object EdgePartition2D extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt val mixingPrime: VertexId = 1125899906842597L - val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt - val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt - (col * ceilSqrtNumParts + row) % numParts + if (numParts == ceilSqrtNumParts * ceilSqrtNumParts) { +// Use old method for perfect squared to ensure we get same results +val col: PartitionID = (math.abs(src *
spark git commit: [SPARK-6736][GraphX][Doc]Example of Graph#aggregateMessages has error
Repository: spark Updated Branches: refs/heads/master 6f0d55d76 - ae980eb41 [SPARK-6736][GraphX][Doc]Example of Graph#aggregateMessages has error Example of Graph#aggregateMessages has error. Since aggregateMessages is a method of Graph, It should be written rawGraph.aggregateMessages Author: Sasaki Toru sasaki...@nttdata.co.jp Closes #5388 from sasakitoa/aggregateMessagesExample and squashes the following commits: b1d631b [Sasaki Toru] Example of Graph#aggregateMessages has error Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ae980eb4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ae980eb4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ae980eb4 Branch: refs/heads/master Commit: ae980eb41c00b5f1f64c650f267b884e864693f0 Parents: 6f0d55d Author: Sasaki Toru sasaki...@nttdata.co.jp Authored: Tue Apr 7 01:55:32 2015 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Apr 7 01:55:32 2015 -0700 -- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ae980eb4/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 8494d06..36dc7b0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -409,7 +409,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * {{{ * val rawGraph: Graph[_, _] = Graph.textFile(twittergraph) * val inDeg: RDD[(VertexId, Int)] = - * aggregateMessages[Int](ctx = ctx.sendToDst(1), _ + _) + * rawGraph.aggregateMessages[Int](ctx = ctx.sendToDst(1), _ + _) * }}} * * @note By expressing computation at the edge level we achieve - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6510][GraphX]: Add Graph#minus method to act as Set#difference
Repository: spark Updated Branches: refs/heads/master aad003227 - 39fb57968 [SPARK-6510][GraphX]: Add Graph#minus method to act as Set#difference Adds a `Graph#minus` method which will return only unique `VertexId`'s from the calling `VertexRDD`. To demonstrate a basic example with pseudocode: ``` Set((0L,0),(1L,1)).minus(Set((1L,1),(2L,2))) Set((0L,0)) ``` Author: Brennon York brennon.y...@capitalone.com Closes #5175 from brennonyork/SPARK-6510 and squashes the following commits: 248d5c8 [Brennon York] added minus(VertexRDD[VD]) method to avoid createUsingIndex and updated the mask operations to simplify with andNot call 3fb7cce [Brennon York] updated graphx doc to reflect the addition of minus method 6575d92 [Brennon York] updated mima exclude aaa030b [Brennon York] completed graph#minus functionality 7227c0f [Brennon York] beginning work on minus functionality Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39fb5796 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39fb5796 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39fb5796 Branch: refs/heads/master Commit: 39fb57968352549f2276ac4fcd2b92988ed6fe42 Parents: aad0032 Author: Brennon York brennon.y...@capitalone.com Authored: Thu Mar 26 19:08:09 2015 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Thu Mar 26 19:08:09 2015 -0700 -- docs/graphx-programming-guide.md| 2 ++ .../org/apache/spark/graphx/VertexRDD.scala | 16 ++ .../graphx/impl/VertexPartitionBaseOps.scala| 15 + .../spark/graphx/impl/VertexRDDImpl.scala | 25 +++ .../apache/spark/graphx/VertexRDDSuite.scala| 33 ++-- project/MimaExcludes.scala | 3 ++ 6 files changed, 92 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/docs/graphx-programming-guide.md -- diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index c601d79..3f10cb2 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -899,6 +899,8 @@ class VertexRDD[VD] extends RDD[(VertexID, VD)] { // Transform the values without changing the ids (preserves the internal index) def mapValues[VD2](map: VD = VD2): VertexRDD[VD2] def mapValues[VD2](map: (VertexId, VD) = VD2): VertexRDD[VD2] + // Show only vertices unique to this set based on their VertexId's + def minus(other: RDD[(VertexId, VD)]) // Remove vertices from this set that appear in the other set def diff(other: VertexRDD[VD]): VertexRDD[VD] // Join operators that take advantage of the internal indexing to accelerate joins (substantially) http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index ad4bfe0..a9f04b5 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -122,6 +122,22 @@ abstract class VertexRDD[VD]( def mapValues[VD2: ClassTag](f: (VertexId, VD) = VD2): VertexRDD[VD2] /** + * For each VertexId present in both `this` and `other`, minus will act as a set difference + * operation returning only those unique VertexId's present in `this`. + * + * @param other an RDD to run the set operation against + */ + def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD] + + /** + * For each VertexId present in both `this` and `other`, minus will act as a set difference + * operation returning only those unique VertexId's present in `this`. + * + * @param other a VertexRDD to run the set operation against + */ + def minus(other: VertexRDD[VD]): VertexRDD[VD] + + /** * For each vertex present in both `this` and `other`, `diff` returns only those vertices with * differing values; for values that are different, keeps the values from `other`. This is * only guaranteed to work if the VertexRDDs share a common ancestor. http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala index 4fd2548..b90f9fa 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala +++
spark git commit: [SPARK-5922][GraphX]: Add diff(other: RDD[VertexId, VD]) in VertexRDD
Repository: spark Updated Branches: refs/heads/master aa6536fa3 - 45f4c6612 [SPARK-5922][GraphX]: Add diff(other: RDD[VertexId, VD]) in VertexRDD Changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]. This change maintains backwards compatibility and better unifies the VertexRDD methods to match each other. Author: Brennon York brennon.y...@capitalone.com Closes #4733 from brennonyork/SPARK-5922 and squashes the following commits: e800f08 [Brennon York] fixed merge conflicts b9274af [Brennon York] fixed merge conflicts f86375c [Brennon York] fixed minor include line 398ddb4 [Brennon York] fixed merge conflicts aac1810 [Brennon York] updated to aggregateUsingIndex and added test to ensure that method works properly 2af0b88 [Brennon York] removed deprecation line 753c963 [Brennon York] fixed merge conflicts and set preference to use the diff(other: VertexRDD[VD]) method 2c678c6 [Brennon York] added mima exclude to exclude new public diff method from VertexRDD 93186f3 [Brennon York] added back the original diff method to sustain binary compatibility f18356e [Brennon York] changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)] Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45f4c661 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45f4c661 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45f4c661 Branch: refs/heads/master Commit: 45f4c66122c57011e74c694a424756812ab77d99 Parents: aa6536f Author: Brennon York brennon.y...@capitalone.com Authored: Mon Mar 16 01:06:26 2015 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Mon Mar 16 01:06:26 2015 -0700 -- .../main/scala/org/apache/spark/graphx/VertexRDD.scala | 9 + .../org/apache/spark/graphx/impl/VertexRDDImpl.scala | 4 .../scala/org/apache/spark/graphx/VertexRDDSuite.scala | 13 + project/MimaExcludes.scala | 3 +++ 4 files changed, 29 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 40ecff7..ad4bfe0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -126,6 +126,15 @@ abstract class VertexRDD[VD]( * differing values; for values that are different, keeps the values from `other`. This is * only guaranteed to work if the VertexRDDs share a common ancestor. * + * @param other the other RDD[(VertexId, VD)] with which to diff against. + */ + def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] + + /** + * For each vertex present in both `this` and `other`, `diff` returns only those vertices with + * differing values; for values that are different, keeps the values from `other`. This is + * only guaranteed to work if the VertexRDDs share a common ancestor. + * * @param other the other VertexRDD with which to diff against. */ def diff(other: VertexRDD[VD]): VertexRDD[VD] http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 904be21..125692d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -103,6 +103,10 @@ class VertexRDDImpl[VD] private[graphx] ( override def mapValues[VD2: ClassTag](f: (VertexId, VD) = VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map(f)) + override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = { +diff(this.aggregateUsingIndex(other, (a: VD, b: VD) = a)) + } + override def diff(other: VertexRDD[VD]): VertexRDD[VD] = { val otherPartition = other match { case other: VertexRDD[_] if this.partitioner == other.partitioner = http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 97533dd..4f7a442 100644 ---
spark git commit: [SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing
Repository: spark Updated Branches: refs/heads/branch-1.3 eaffc6edd - 8073767f5 [SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or `leftJoin`ed and have different partition sizes they fail under the `zipPartitions` method. This fix tests whether the partitions are equal or not and, if not, will repartition the other to match the partition size of the calling VertexRDD. Author: Brennon York brennon.y...@capitalone.com Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits: 0882590 [Brennon York] updated to properly handle differently-partitioned vertexRDDs (cherry picked from commit 9f603fce78fcc997926e9a72dec44d48cbc396fc) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8073767f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8073767f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8073767f Branch: refs/heads/branch-1.3 Commit: 8073767f5144b84de4c019c3d07b33a6454a656e Parents: eaffc6e Author: Brennon York brennon.y...@capitalone.com Authored: Wed Feb 25 14:11:12 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Wed Feb 25 14:11:29 2015 -0800 -- .../org/apache/spark/graphx/impl/VertexRDDImpl.scala| 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8073767f/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 6dad167..904be21 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -104,8 +104,14 @@ class VertexRDDImpl[VD] private[graphx] ( this.mapVertexPartitions(_.map(f)) override def diff(other: VertexRDD[VD]): VertexRDD[VD] = { +val otherPartition = other match { + case other: VertexRDD[_] if this.partitioner == other.partitioner = +other.partitionsRDD + case _ = +VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD +} val newPartitionsRDD = partitionsRDD.zipPartitions( - other.partitionsRDD, preservesPartitioning = true + otherPartition, preservesPartitioning = true ) { (thisIter, otherIter) = val thisPart = thisIter.next() val otherPart = otherIter.next() @@ -133,7 +139,7 @@ class VertexRDDImpl[VD] private[graphx] ( // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient leftZipJoin other match { - case other: VertexRDD[_] = + case other: VertexRDD[_] if this.partitioner == other.partitioner = leftZipJoin(other)(f) case _ = this.withPartitionsRDD[VD3]( @@ -162,7 +168,7 @@ class VertexRDDImpl[VD] private[graphx] ( // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient innerZipJoin other match { - case other: VertexRDD[_] = + case other: VertexRDD[_] if this.partitioner == other.partitioner = innerZipJoin(other)(f) case _ = this.withPartitionsRDD( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing
Repository: spark Updated Branches: refs/heads/master a777c65da - 9f603fce7 [SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or `leftJoin`ed and have different partition sizes they fail under the `zipPartitions` method. This fix tests whether the partitions are equal or not and, if not, will repartition the other to match the partition size of the calling VertexRDD. Author: Brennon York brennon.y...@capitalone.com Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits: 0882590 [Brennon York] updated to properly handle differently-partitioned vertexRDDs Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f603fce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f603fce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f603fce Branch: refs/heads/master Commit: 9f603fce78fcc997926e9a72dec44d48cbc396fc Parents: a777c65 Author: Brennon York brennon.y...@capitalone.com Authored: Wed Feb 25 14:11:12 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Wed Feb 25 14:11:12 2015 -0800 -- .../org/apache/spark/graphx/impl/VertexRDDImpl.scala| 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9f603fce/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 6dad167..904be21 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -104,8 +104,14 @@ class VertexRDDImpl[VD] private[graphx] ( this.mapVertexPartitions(_.map(f)) override def diff(other: VertexRDD[VD]): VertexRDD[VD] = { +val otherPartition = other match { + case other: VertexRDD[_] if this.partitioner == other.partitioner = +other.partitionsRDD + case _ = +VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD +} val newPartitionsRDD = partitionsRDD.zipPartitions( - other.partitionsRDD, preservesPartitioning = true + otherPartition, preservesPartitioning = true ) { (thisIter, otherIter) = val thisPart = thisIter.next() val otherPart = otherIter.next() @@ -133,7 +139,7 @@ class VertexRDDImpl[VD] private[graphx] ( // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient leftZipJoin other match { - case other: VertexRDD[_] = + case other: VertexRDD[_] if this.partitioner == other.partitioner = leftZipJoin(other)(f) case _ = this.withPartitionsRDD[VD3]( @@ -162,7 +168,7 @@ class VertexRDDImpl[VD] private[graphx] ( // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient innerZipJoin other match { - case other: VertexRDD[_] = + case other: VertexRDD[_] if this.partitioner == other.partitioner = innerZipJoin(other)(f) case _ = this.withPartitionsRDD( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing
Repository: spark Updated Branches: refs/heads/branch-1.2 a9abcaa2c - 00112baf9 [SPARK-1955][GraphX]: VertexRDD can incorrectly assume index sharing Fixes the issue whereby when VertexRDD's are `diff`ed, `innerJoin`ed, or `leftJoin`ed and have different partition sizes they fail under the `zipPartitions` method. This fix tests whether the partitions are equal or not and, if not, will repartition the other to match the partition size of the calling VertexRDD. Author: Brennon York brennon.y...@capitalone.com Closes #4705 from brennonyork/SPARK-1955 and squashes the following commits: 0882590 [Brennon York] updated to properly handle differently-partitioned vertexRDDs (cherry picked from commit 9f603fce78fcc997926e9a72dec44d48cbc396fc) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00112baf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00112baf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00112baf Branch: refs/heads/branch-1.2 Commit: 00112baf9e9707e9a773e8076dc4ed2957803bfd Parents: a9abcaa Author: Brennon York brennon.y...@capitalone.com Authored: Wed Feb 25 14:11:12 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Wed Feb 25 14:14:22 2015 -0800 -- .../org/apache/spark/graphx/impl/VertexRDDImpl.scala| 12 +--- 1 file changed, 9 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/00112baf/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 9732c5b..d9bf9fe 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -94,8 +94,14 @@ class VertexRDDImpl[VD] private[graphx] ( this.mapVertexPartitions(_.map(f)) override def diff(other: VertexRDD[VD]): VertexRDD[VD] = { +val otherPartition = other match { + case other: VertexRDD[_] if this.partitioner == other.partitioner = +other.partitionsRDD + case _ = +VertexRDD(other.partitionBy(this.partitioner.get)).partitionsRDD +} val newPartitionsRDD = partitionsRDD.zipPartitions( - other.partitionsRDD, preservesPartitioning = true + otherPartition, preservesPartitioning = true ) { (thisIter, otherIter) = val thisPart = thisIter.next() val otherPart = otherIter.next() @@ -123,7 +129,7 @@ class VertexRDDImpl[VD] private[graphx] ( // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient leftZipJoin other match { - case other: VertexRDD[_] = + case other: VertexRDD[_] if this.partitioner == other.partitioner = leftZipJoin(other)(f) case _ = this.withPartitionsRDD[VD3]( @@ -152,7 +158,7 @@ class VertexRDDImpl[VD] private[graphx] ( // Test if the other vertex is a VertexRDD to choose the optimal join strategy. // If the other set is a VertexRDD then we use the much more efficient innerZipJoin other match { - case other: VertexRDD[_] = + case other: VertexRDD[_] if this.partitioner == other.partitioner = innerZipJoin(other)(f) case _ = this.withPartitionsRDD( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-3290 [GRAPHX] No unpersist callls in SVDPlusPlus
Repository: spark Updated Branches: refs/heads/branch-1.3 152147f5f - db5747921 SPARK-3290 [GRAPHX] No unpersist callls in SVDPlusPlus This just unpersist()s each RDD in this code that was cache()ed. Author: Sean Owen so...@cloudera.com Closes #4234 from srowen/SPARK-3290 and squashes the following commits: 66c1e11 [Sean Owen] unpersist() each RDD that was cache()ed (cherry picked from commit 0ce4e430a81532dc317136f968f28742e087d840) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/db574792 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/db574792 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/db574792 Branch: refs/heads/branch-1.3 Commit: db5747921a648c3f7cf1de6dba70b82584afd097 Parents: 152147f Author: Sean Owen so...@cloudera.com Authored: Fri Feb 13 20:12:52 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Fri Feb 13 20:14:49 2015 -0800 -- .../apache/spark/graphx/lib/SVDPlusPlus.scala | 40 1 file changed, 32 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/db574792/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala index f58587e..112ed09 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala @@ -72,17 +72,22 @@ object SVDPlusPlus { // construct graph var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache() +materialize(g) +edges.unpersist() // Calculate initial bias and norm val t0 = g.aggregateMessages[(Long, Double)]( ctx = { ctx.sendToSrc((1L, ctx.attr)); ctx.sendToDst((1L, ctx.attr)) }, (g1, g2) = (g1._1 + g2._1, g1._2 + g2._2)) -g = g.outerJoinVertices(t0) { +val gJoinT0 = g.outerJoinVertices(t0) { (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: Option[(Long, Double)]) = (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) -} +}.cache() +materialize(gJoinT0) +g.unpersist() +g = gJoinT0 def sendMsgTrainF(conf: Conf, u: Double) (ctx: EdgeContext[ @@ -114,12 +119,15 @@ object SVDPlusPlus { val t1 = g.aggregateMessages[DoubleMatrix]( ctx = ctx.sendToSrc(ctx.dstAttr._2), (g1, g2) = g1.addColumnVector(g2)) - g = g.outerJoinVertices(t1) { + val gJoinT1 = g.outerJoinVertices(t1) { (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: Option[DoubleMatrix]) = if (msg.isDefined) (vd._1, vd._1 .addColumnVector(msg.get.mul(vd._4)), vd._3, vd._4) else vd - } + }.cache() + materialize(gJoinT1) + g.unpersist() + g = gJoinT1 // Phase 2, update p for user nodes and q, y for item nodes g.cache() @@ -127,13 +135,16 @@ object SVDPlusPlus { sendMsgTrainF(conf, u), (g1: (DoubleMatrix, DoubleMatrix, Double), g2: (DoubleMatrix, DoubleMatrix, Double)) = (g1._1.addColumnVector(g2._1), g1._2.addColumnVector(g2._2), g1._3 + g2._3)) - g = g.outerJoinVertices(t2) { + val gJoinT2 = g.outerJoinVertices(t2) { (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: Option[(DoubleMatrix, DoubleMatrix, Double)]) = (vd._1.addColumnVector(msg.get._1), vd._2.addColumnVector(msg.get._2), vd._3 + msg.get._3, vd._4) - } + }.cache() + materialize(gJoinT2) + g.unpersist() + g = gJoinT2 } // calculate error on training set @@ -147,13 +158,26 @@ object SVDPlusPlus { val err = (ctx.attr - pred) * (ctx.attr - pred) ctx.sendToDst(err) } + g.cache() val t3 = g.aggregateMessages[Double](sendMsgTestF(conf, u), _ + _) -g = g.outerJoinVertices(t3) { +val gJoinT3 = g.outerJoinVertices(t3) { (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: Option[Double]) = if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd -} +}.cache() +materialize(gJoinT3) +g.unpersist() +g = gJoinT3 (g, u) } + + /** + * Forces materialization of a Graph by count()ing its RDDs. + */ + private def materialize(g: Graph[_,_]): Unit = { +g.vertices.count() +g.edges.count() + } + } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands,
spark git commit: [SPARK-5343][GraphX]: ShortestPaths traverses backwards
Repository: spark Updated Branches: refs/heads/branch-1.3 bba095399 - 5be8902f7 [SPARK-5343][GraphX]: ShortestPaths traverses backwards Corrected the logic with ShortestPaths so that the calculation will run forward rather than backwards. Output before looked like: ```scala import org.apache.spark.graphx._ val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L, lib.ShortestPaths.run(g,Array(3)).vertices.collect // res0: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map()), (3,Map(3 - 0)), (2,Map())) lib.ShortestPaths.run(g,Array(1)).vertices.collect // res1: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), (3,Map(1 - 2)), (2,Map(1 - 1))) ``` And new output after the changes looks like: ```scala import org.apache.spark.graphx._ val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L, lib.ShortestPaths.run(g,Array(3)).vertices.collect // res0: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(3 - 2)), (2,Map(3 - 1)), (3,Map(3 - 0))) lib.ShortestPaths.run(g,Array(1)).vertices.collect // res1: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), (2,Map()), (3,Map())) ``` Author: Brennon York brennon.y...@capitalone.com Closes #4478 from brennonyork/SPARK-5343 and squashes the following commits: aa57f83 [Brennon York] updated to set ShortestPaths to run 'forward' rather than 'backward' (cherry picked from commit 5820961289eb98e45eb467efa316c7592b8d619c) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5be8902f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5be8902f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5be8902f Branch: refs/heads/branch-1.3 Commit: 5be8902f7a7f6eafbe75a9b2cf9b0cc3e5bc6f2b Parents: bba0953 Author: Brennon York brennon.y...@capitalone.com Authored: Tue Feb 10 14:57:00 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Feb 10 14:57:18 2015 -0800 -- .../main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5be8902f/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala index 590f047..179f284 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala @@ -61,8 +61,8 @@ object ShortestPaths { } def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = { - val newAttr = incrementMap(edge.srcAttr) - if (edge.dstAttr != addMaps(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr)) + val newAttr = incrementMap(edge.dstAttr) + if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr)) else Iterator.empty } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5343][GraphX]: ShortestPaths traverses backwards
Repository: spark Updated Branches: refs/heads/master fd2c032f9 - 582096128 [SPARK-5343][GraphX]: ShortestPaths traverses backwards Corrected the logic with ShortestPaths so that the calculation will run forward rather than backwards. Output before looked like: ```scala import org.apache.spark.graphx._ val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L, lib.ShortestPaths.run(g,Array(3)).vertices.collect // res0: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map()), (3,Map(3 - 0)), (2,Map())) lib.ShortestPaths.run(g,Array(1)).vertices.collect // res1: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), (3,Map(1 - 2)), (2,Map(1 - 1))) ``` And new output after the changes looks like: ```scala import org.apache.spark.graphx._ val g = Graph(sc.makeRDD(Array((1L,), (2L,), (3L,))), sc.makeRDD(Array(Edge(1L,2L,), Edge(2L,3L, lib.ShortestPaths.run(g,Array(3)).vertices.collect // res0: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(3 - 2)), (2,Map(3 - 1)), (3,Map(3 - 0))) lib.ShortestPaths.run(g,Array(1)).vertices.collect // res1: Array[(org.apache.spark.graphx.VertexId, org.apache.spark.graphx.lib.ShortestPaths.SPMap)] = Array((1,Map(1 - 0)), (2,Map()), (3,Map())) ``` Author: Brennon York brennon.y...@capitalone.com Closes #4478 from brennonyork/SPARK-5343 and squashes the following commits: aa57f83 [Brennon York] updated to set ShortestPaths to run 'forward' rather than 'backward' Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/58209612 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/58209612 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/58209612 Branch: refs/heads/master Commit: 5820961289eb98e45eb467efa316c7592b8d619c Parents: fd2c032 Author: Brennon York brennon.y...@capitalone.com Authored: Tue Feb 10 14:57:00 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Feb 10 14:57:00 2015 -0800 -- .../main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/58209612/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala index 590f047..179f284 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala @@ -61,8 +61,8 @@ object ShortestPaths { } def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = { - val newAttr = incrementMap(edge.srcAttr) - if (edge.dstAttr != addMaps(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr)) + val newAttr = incrementMap(edge.dstAttr) + if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr)) else Iterator.empty } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImp...
Repository: spark Updated Branches: refs/heads/master cef1f092a - e224dbb01 [SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImp... If the value of 'spark.default.parallelism' does not match the number of partitoins in EdgePartition(EdgeRDDImpl), the following error occurs in ReplicatedVertexView.scala:72; object GraphTest extends Logging { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = { graph.aggregateMessages( ctx = { ctx.sendToSrc(1) ctx.sendToDst(2) }, _ + _) } } val g = GraphLoader.edgeListFile(sc, graph.txt) val rdd = GraphTest.run(g) java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:204) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:204) at org.apache.spark.ShuffleDependency.init(Dependency.scala:82) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191) ... Author: Takeshi Yamamuro linguin@gmail.com Closes #4136 from maropu/EdgePartitionBugFix and squashes the following commits: 0cd8942 [Ankur Dave] Use more concise getOrElse aad4a2c [Ankur Dave] Add unit test for non-default number of edge partitions 0a2f32b [Takeshi Yamamuro] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImpl Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e224dbb0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e224dbb0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e224dbb0 Branch: refs/heads/master Commit: e224dbb011789297cd6c6ba095f702c042869ed6 Parents: cef1f09 Author: Takeshi Yamamuro linguin@gmail.com Authored: Fri Jan 23 19:25:15 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Fri Jan 23 19:26:39 2015 -0800 -- .../apache/spark/graphx/impl/EdgeRDDImpl.scala | 4 ++-- .../org/apache/spark/graphx/GraphSuite.scala| 20 2 files changed, 22 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e224dbb0/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 897c7ee..f1550ac 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} -import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} +import org.apache.spark.{OneToOneDependency, HashPartitioner, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -46,7 +46,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( * partitioner that allows co-partitioning with `partitionsRDD`. */ override val partitioner = - partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) +partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitions.size))) override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() http://git-wip-us.apache.org/repos/asf/spark/blob/e224dbb0/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 9da0064..ed9876b 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -386,4 +386,24 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test(non-default number of edge partitions) { +
spark git commit: [SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImp...
Repository: spark Updated Branches: refs/heads/branch-1.2 2ea782a9d - 73cb806f7 [SPARK-5351][GraphX] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImp... If the value of 'spark.default.parallelism' does not match the number of partitoins in EdgePartition(EdgeRDDImpl), the following error occurs in ReplicatedVertexView.scala:72; object GraphTest extends Logging { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): VertexRDD[Int] = { graph.aggregateMessages( ctx = { ctx.sendToSrc(1) ctx.sendToDst(2) }, _ + _) } } val g = GraphLoader.edgeListFile(sc, graph.txt) val rdd = GraphTest.run(g) java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:204) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:206) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:204) at org.apache.spark.ShuffleDependency.init(Dependency.scala:82) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:193) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:191) ... Author: Takeshi Yamamuro linguin@gmail.com Closes #4136 from maropu/EdgePartitionBugFix and squashes the following commits: 0cd8942 [Ankur Dave] Use more concise getOrElse aad4a2c [Ankur Dave] Add unit test for non-default number of edge partitions 0a2f32b [Takeshi Yamamuro] Do not use Partitioner.defaultPartitioner as a partitioner of EdgeRDDImpl (cherry picked from commit e224dbb011789297cd6c6ba095f702c042869ed6) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73cb806f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73cb806f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73cb806f Branch: refs/heads/branch-1.2 Commit: 73cb806f71fbc44ce2488254db177f6500fe83c7 Parents: 2ea782a Author: Takeshi Yamamuro linguin@gmail.com Authored: Fri Jan 23 19:25:15 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Fri Jan 23 19:29:42 2015 -0800 -- .../apache/spark/graphx/impl/EdgeRDDImpl.scala | 4 ++-- .../org/apache/spark/graphx/GraphSuite.scala| 20 2 files changed, 22 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/73cb806f/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 897c7ee..f1550ac 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} -import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} +import org.apache.spark.{OneToOneDependency, HashPartitioner, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -46,7 +46,7 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( * partitioner that allows co-partitioning with `partitionsRDD`. */ override val partitioner = - partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) +partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitions.size))) override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() http://git-wip-us.apache.org/repos/asf/spark/blob/73cb806f/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 9da0064..ed9876b 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -386,4 +386,24 @@
spark git commit: [SPARK-5064][GraphX] Add numEdges upperbound validation for R-MAT graph generator to prevent infinite loop
Repository: spark Updated Branches: refs/heads/branch-1.2 e90f6b5c6 - 37db20c94 [SPARK-5064][GraphX] Add numEdges upperbound validation for R-MAT graph generator to prevent infinite loop I looked into GraphGenerators#chooseCell, and found that chooseCell can't generate more edges than pow(2, (2 * (log2(numVertices)-1))) to make a Power-law graph. (Ex. numVertices:4 upperbound:4, numVertices:8 upperbound:16, numVertices:16 upperbound:64) If we request more edges over the upperbound, rmatGraph fall into infinite loop. So, how about adding an argument validation? Author: Kenji Kikushima kikushima.ke...@lab.ntt.co.jp Closes #3950 from kj-ki/SPARK-5064 and squashes the following commits: 4ee18c7 [Ankur Dave] Reword error message and add unit test d760bc7 [Kenji Kikushima] Add numEdges upperbound validation for R-MAT graph generator to prevent infinite loop. (cherry picked from commit 3ee3ab592eee831d759c940eb68231817ad6d083) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/37db20c9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/37db20c9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/37db20c9 Branch: refs/heads/branch-1.2 Commit: 37db20c9414d26ebd423e9500825bedc037b20f5 Parents: e90f6b5 Author: Kenji Kikushima kikushima.ke...@lab.ntt.co.jp Authored: Wed Jan 21 12:34:00 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Wed Jan 21 12:38:09 2015 -0800 -- .../org/apache/spark/graphx/util/GraphGenerators.scala| 6 ++ .../apache/spark/graphx/util/GraphGeneratorsSuite.scala | 10 ++ 2 files changed, 16 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/37db20c9/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index 8a13c74..2d6a825 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -133,6 +133,12 @@ object GraphGenerators { // This ensures that the 4 quadrants are the same size at all recursion levels val numVertices = math.round( math.pow(2.0, math.ceil(math.log(requestedNumVertices) / math.log(2.0.toInt +val numEdgesUpperBound = + math.pow(2.0, 2 * ((math.log(numVertices) / math.log(2.0)) - 1)).toInt +if (numEdgesUpperBound numEdges) { + throw new IllegalArgumentException( +snumEdges must be = $numEdgesUpperBound but was $numEdges) +} var edges: Set[Edge[Int]] = Set() while (edges.size numEdges) { if (edges.size % 100 == 0) { http://git-wip-us.apache.org/repos/asf/spark/blob/37db20c9/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala index 3abefbe..8d9c8dd 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala @@ -110,4 +110,14 @@ class GraphGeneratorsSuite extends FunSuite with LocalSparkContext { } } + test(SPARK-5064 GraphGenerators.rmatGraph numEdges upper bound) { +withSpark { sc = + val g1 = GraphGenerators.rmatGraph(sc, 4, 4) + assert(g1.edges.count() === 4) + intercept[IllegalArgumentException] { +val g2 = GraphGenerators.rmatGraph(sc, 4, 8) + } +} + } + } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5064][GraphX] Add numEdges upperbound validation for R-MAT graph generator to prevent infinite loop
Repository: spark Updated Branches: refs/heads/master 7450a992b - 3ee3ab592 [SPARK-5064][GraphX] Add numEdges upperbound validation for R-MAT graph generator to prevent infinite loop I looked into GraphGenerators#chooseCell, and found that chooseCell can't generate more edges than pow(2, (2 * (log2(numVertices)-1))) to make a Power-law graph. (Ex. numVertices:4 upperbound:4, numVertices:8 upperbound:16, numVertices:16 upperbound:64) If we request more edges over the upperbound, rmatGraph fall into infinite loop. So, how about adding an argument validation? Author: Kenji Kikushima kikushima.ke...@lab.ntt.co.jp Closes #3950 from kj-ki/SPARK-5064 and squashes the following commits: 4ee18c7 [Ankur Dave] Reword error message and add unit test d760bc7 [Kenji Kikushima] Add numEdges upperbound validation for R-MAT graph generator to prevent infinite loop. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ee3ab59 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ee3ab59 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ee3ab59 Branch: refs/heads/master Commit: 3ee3ab592eee831d759c940eb68231817ad6d083 Parents: 7450a99 Author: Kenji Kikushima kikushima.ke...@lab.ntt.co.jp Authored: Wed Jan 21 12:34:00 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Wed Jan 21 12:36:03 2015 -0800 -- .../org/apache/spark/graphx/util/GraphGenerators.scala| 6 ++ .../apache/spark/graphx/util/GraphGeneratorsSuite.scala | 10 ++ 2 files changed, 16 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3ee3ab59/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala index 8a13c74..2d6a825 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/util/GraphGenerators.scala @@ -133,6 +133,12 @@ object GraphGenerators { // This ensures that the 4 quadrants are the same size at all recursion levels val numVertices = math.round( math.pow(2.0, math.ceil(math.log(requestedNumVertices) / math.log(2.0.toInt +val numEdgesUpperBound = + math.pow(2.0, 2 * ((math.log(numVertices) / math.log(2.0)) - 1)).toInt +if (numEdgesUpperBound numEdges) { + throw new IllegalArgumentException( +snumEdges must be = $numEdgesUpperBound but was $numEdges) +} var edges: Set[Edge[Int]] = Set() while (edges.size numEdges) { if (edges.size % 100 == 0) { http://git-wip-us.apache.org/repos/asf/spark/blob/3ee3ab59/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala index 3abefbe..8d9c8dd 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/util/GraphGeneratorsSuite.scala @@ -110,4 +110,14 @@ class GraphGeneratorsSuite extends FunSuite with LocalSparkContext { } } + test(SPARK-5064 GraphGenerators.rmatGraph numEdges upper bound) { +withSpark { sc = + val g1 = GraphGenerators.rmatGraph(sc, 4, 4) + assert(g1.edges.count() === 4) + intercept[IllegalArgumentException] { +val g2 = GraphGenerators.rmatGraph(sc, 4, 8) + } +} + } + } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4917] Add a function to convert into a graph with canonical edges in GraphOps
Repository: spark Updated Branches: refs/heads/master 8d45834de - f825e193f [SPARK-4917] Add a function to convert into a graph with canonical edges in GraphOps Convert bi-directional edges into uni-directional ones instead of 'canonicalOrientation' in GraphLoader.edgeListFile. This function is useful when a graph is loaded as it is and then is transformed into one with canonical edges. It rewrites the vertex ids of edges so that srcIds are bigger than dstIds, and merges the duplicated edges. Author: Takeshi Yamamuro linguin@gmail.com Closes #3760 from maropu/ConvertToCanonicalEdgesSpike and squashes the following commits: 7f8b580 [Takeshi Yamamuro] Add a function to convert into a graph with canonical edges in GraphOps Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f825e193 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f825e193 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f825e193 Branch: refs/heads/master Commit: f825e193f3357e60949bf4c0174675d0d1a40988 Parents: 8d45834 Author: Takeshi Yamamuro linguin@gmail.com Authored: Thu Jan 8 09:55:12 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Thu Jan 8 09:55:12 2015 -0800 -- .../org/apache/spark/graphx/GraphOps.scala | 26 .../org/apache/spark/graphx/GraphOpsSuite.scala | 15 +++ 2 files changed, 41 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f825e193/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 116d1ea..dc8b478 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -279,6 +279,32 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali } /** + * Convert bi-directional edges into uni-directional ones. + * Some graph algorithms (e.g., TriangleCount) assume that an input graph + * has its edges in canonical direction. + * This function rewrites the vertex ids of edges so that srcIds are bigger + * than dstIds, and merges the duplicated edges. + * + * @param mergeFunc the user defined reduce function which should + * be commutative and associative and is used to combine the output + * of the map phase + * + * @return the resulting graph with canonical edges + */ + def convertToCanonicalEdges( + mergeFunc: (ED, ED) = ED = (e1, e2) = e1): Graph[VD, ED] = { +val newEdges = + graph.edges +.map { + case e if e.srcId e.dstId = ((e.srcId, e.dstId), e.attr) + case e = ((e.dstId, e.srcId), e.attr) +} +.reduceByKey(mergeFunc) +.map(e = new Edge(e._1._1, e._1._2, e._2)) +Graph(graph.vertices, newEdges) + } + + /** * Execute a Pregel-like iterative vertex-parallel abstraction. The * user-defined vertex-program `vprog` is executed in parallel on * each vertex receiving any inbound messages and computing a new http://git-wip-us.apache.org/repos/asf/spark/blob/f825e193/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index ea94d4a..9bc8007 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -79,6 +79,21 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { } } + test (convertToCanonicalEdges) { +withSpark { sc = + val vertices = +sc.parallelize(Seq[(VertexId, String)]((1, one), (2, two), (3, three)), 2) + val edges = +sc.parallelize(Seq(Edge(1, 2, 1), Edge(2, 1, 1), Edge(3, 2, 2))) + val g: Graph[String, Int] = Graph(vertices, edges) + + val g1 = g.convertToCanonicalEdges() + + val e = g1.edges.collect().toSet + assert(e === Set(Edge(1, 2, 1), Edge(2, 3, 2))) +} + } + test(collectEdgesCycleDirectionOut) { withSpark { sc = val graph = getCycleGraph(sc, 100) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [Minor] Fix comments for GraphX 2D partitioning strategy
Repository: spark Updated Branches: refs/heads/master a6394bc2c - 5e3ec1110 [Minor] Fix comments for GraphX 2D partitioning strategy The sum of vertices on matrix (v0 to v11) is 12. And, I think one same block overlaps in this strategy. This is minor PR, so I didn't file in JIRA. Author: kj-ki kikushima.ke...@lab.ntt.co.jp Closes #3904 from kj-ki/fix-partitionstrategy-comments and squashes the following commits: 79829d9 [kj-ki] Fix comments for 2D partitioning. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5e3ec111 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5e3ec111 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5e3ec111 Branch: refs/heads/master Commit: 5e3ec1110495899a298313c4aa9c6c151c1f54da Parents: a6394bc Author: kj-ki kikushima.ke...@lab.ntt.co.jp Authored: Tue Jan 6 09:49:37 2015 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Jan 6 09:49:37 2015 -0800 -- .../main/scala/org/apache/spark/graphx/PartitionStrategy.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5e3ec111/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 13033fe..7372dfb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -32,9 +32,9 @@ trait PartitionStrategy extends Serializable { object PartitionStrategy { /** * Assigns edges to partitions using a 2D partitioning of the sparse edge adjacency matrix, - * guaranteeing a `2 * sqrt(numParts)` bound on vertex replication. + * guaranteeing a `2 * sqrt(numParts) - 1` bound on vertex replication. * - * Suppose we have a graph with 11 vertices that we want to partition + * Suppose we have a graph with 12 vertices that we want to partition * over 9 machines. We can use the following sparse matrix representation: * * pre @@ -61,7 +61,7 @@ object PartitionStrategy { * that edges adjacent to `v11` can only be in the first column of blocks `(P0, P3, * P6)` or the last * row of blocks `(P6, P7, P8)`. As a consequence we can guarantee that `v11` will need to be - * replicated to at most `2 * sqrt(numParts)` machines. + * replicated to at most `2 * sqrt(numParts) - 1` machines. * * Notice that `P0` has many edges and as a consequence this partitioning would lead to poor work * balance. To improve balance we first multiply each vertex id by a large prime to shuffle the - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark
Repository: spark Updated Branches: refs/heads/master e895e0cbe - 2e6b736b0 [SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark This patch just replaces a native quick sorter with Sorter(TimSort) in Spark. It could get performance gains by ~8% in my quick experiments. Author: Takeshi Yamamuro linguin@gmail.com Closes #3507 from maropu/TimSortInEdgePartitionBuilderSpike and squashes the following commits: 8d4e5d2 [Takeshi Yamamuro] Remove a wildcard import 3527e00 [Takeshi Yamamuro] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e6b736b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e6b736b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e6b736b Branch: refs/heads/master Commit: 2e6b736b0e6e5920d0523533c87832a53211db42 Parents: e895e0c Author: Takeshi Yamamuro linguin@gmail.com Authored: Sun Dec 7 19:36:08 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Sun Dec 7 19:37:14 2014 -0800 -- .../scala/org/apache/spark/graphx/Edge.scala| 30 +++ .../graphx/impl/EdgePartitionBuilder.scala | 39 +--- 2 files changed, 64 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2e6b736b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala index 7e842ec..ecc37dc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala @@ -17,6 +17,8 @@ package org.apache.spark.graphx +import org.apache.spark.util.collection.SortDataFormat + /** * A single directed edge consisting of a source id, target id, * and the data associated with the edge. @@ -65,4 +67,32 @@ object Edge { else 1 } } + + private[graphx] def edgeArraySortDataFormat[ED] = new SortDataFormat[Edge[ED], Array[Edge[ED]]] { +override def getKey(data: Array[Edge[ED]], pos: Int): Edge[ED] = { + data(pos) +} + +override def swap(data: Array[Edge[ED]], pos0: Int, pos1: Int): Unit = { + val tmp = data(pos0) + data(pos0) = data(pos1) + data(pos1) = tmp +} + +override def copyElement( +src: Array[Edge[ED]], srcPos: Int, +dst: Array[Edge[ED]], dstPos: Int) { + dst(dstPos) = src(srcPos) +} + +override def copyRange( +src: Array[Edge[ED]], srcPos: Int, +dst: Array[Edge[ED]], dstPos: Int, length: Int) { + System.arraycopy(src, srcPos, dst, dstPos, length) +} + +override def allocate(length: Int): Array[Edge[ED]] = { + new Array[Edge[ED]](length) +} + } } http://git-wip-us.apache.org/repos/asf/spark/blob/2e6b736b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index b0cb0fe..409cf60 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -18,12 +18,10 @@ package org.apache.spark.graphx.impl import scala.reflect.ClassTag -import scala.util.Sorting - -import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector} import org.apache.spark.graphx._ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap +import org.apache.spark.util.collection.{SortDataFormat, Sorter, PrimitiveVector} /** Constructs an EdgePartition from scratch. */ private[graphx] @@ -38,7 +36,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla def toEdgePartition: EdgePartition[ED, VD] = { val edgeArray = edges.trim().array -Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering) +new Sorter(Edge.edgeArraySortDataFormat[ED]) + .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering) val localSrcIds = new Array[Int](edgeArray.size) val localDstIds = new Array[Int](edgeArray.size) val data = new Array[ED](edgeArray.size) @@ -97,7 +96,8 @@ class ExistingEdgePartitionBuilder[ def toEdgePartition: EdgePartition[ED, VD] = { val edgeArray = edges.trim().array -Sorting.quickSort(edgeArray)(EdgeWithLocalIds.lexicographicOrdering) +new Sorter(EdgeWithLocalIds.edgeArraySortDataFormat[ED]) + .sort(edgeArray, 0, edgeArray.length,
spark git commit: [SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark
Repository: spark Updated Branches: refs/heads/branch-1.2 27d9f13af - a4ae7c8b5 [SPARK-4646] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark This patch just replaces a native quick sorter with Sorter(TimSort) in Spark. It could get performance gains by ~8% in my quick experiments. Author: Takeshi Yamamuro linguin@gmail.com Closes #3507 from maropu/TimSortInEdgePartitionBuilderSpike and squashes the following commits: 8d4e5d2 [Takeshi Yamamuro] Remove a wildcard import 3527e00 [Takeshi Yamamuro] Replace Scala.util.Sorting.quickSort with Sorter(TimSort) in Spark (cherry picked from commit 2e6b736b0e6e5920d0523533c87832a53211db42) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a4ae7c8b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a4ae7c8b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a4ae7c8b Branch: refs/heads/branch-1.2 Commit: a4ae7c8b533b3998484879439c0982170c3c38a7 Parents: 27d9f13 Author: Takeshi Yamamuro linguin@gmail.com Authored: Sun Dec 7 19:36:08 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Sun Dec 7 19:37:32 2014 -0800 -- .../scala/org/apache/spark/graphx/Edge.scala| 30 +++ .../graphx/impl/EdgePartitionBuilder.scala | 39 +--- 2 files changed, 64 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a4ae7c8b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala index 7e842ec..ecc37dc 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Edge.scala @@ -17,6 +17,8 @@ package org.apache.spark.graphx +import org.apache.spark.util.collection.SortDataFormat + /** * A single directed edge consisting of a source id, target id, * and the data associated with the edge. @@ -65,4 +67,32 @@ object Edge { else 1 } } + + private[graphx] def edgeArraySortDataFormat[ED] = new SortDataFormat[Edge[ED], Array[Edge[ED]]] { +override def getKey(data: Array[Edge[ED]], pos: Int): Edge[ED] = { + data(pos) +} + +override def swap(data: Array[Edge[ED]], pos0: Int, pos1: Int): Unit = { + val tmp = data(pos0) + data(pos0) = data(pos1) + data(pos1) = tmp +} + +override def copyElement( +src: Array[Edge[ED]], srcPos: Int, +dst: Array[Edge[ED]], dstPos: Int) { + dst(dstPos) = src(srcPos) +} + +override def copyRange( +src: Array[Edge[ED]], srcPos: Int, +dst: Array[Edge[ED]], dstPos: Int, length: Int) { + System.arraycopy(src, srcPos, dst, dstPos, length) +} + +override def allocate(length: Int): Array[Edge[ED]] = { + new Array[Edge[ED]](length) +} + } } http://git-wip-us.apache.org/repos/asf/spark/blob/a4ae7c8b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index b0cb0fe..409cf60 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -18,12 +18,10 @@ package org.apache.spark.graphx.impl import scala.reflect.ClassTag -import scala.util.Sorting - -import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector} import org.apache.spark.graphx._ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap +import org.apache.spark.util.collection.{SortDataFormat, Sorter, PrimitiveVector} /** Constructs an EdgePartition from scratch. */ private[graphx] @@ -38,7 +36,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla def toEdgePartition: EdgePartition[ED, VD] = { val edgeArray = edges.trim().array -Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering) +new Sorter(Edge.edgeArraySortDataFormat[ED]) + .sort(edgeArray, 0, edgeArray.length, Edge.lexicographicOrdering) val localSrcIds = new Array[Int](edgeArray.size) val localDstIds = new Array[Int](edgeArray.size) val data = new Array[ED](edgeArray.size) @@ -97,7 +96,8 @@ class ExistingEdgePartitionBuilder[ def toEdgePartition: EdgePartition[ED, VD] = { val edgeArray = edges.trim().array -Sorting.quickSort(edgeArray)(EdgeWithLocalIds.lexicographicOrdering) +new
spark git commit: [SPARK-4620] Add unpersist in Graph and GraphImpl
Repository: spark Updated Branches: refs/heads/master 2e6b736b0 - 8817fc7fe [SPARK-4620] Add unpersist in Graph and GraphImpl Add an IF to uncache both vertices and edges of Graph/GraphImpl. This IF is useful when iterative graph operations build a new graph in each iteration, and the vertices and edges of previous iterations are no longer needed for following iterations. Author: Takeshi Yamamuro linguin@gmail.com This patch had conflicts when merged, resolved by Committer: Ankur Dave ankurd...@gmail.com Closes #3476 from maropu/UnpersistInGraphSpike and squashes the following commits: 77a006a [Takeshi Yamamuro] Add unpersist in Graph and GraphImpl Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8817fc7f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8817fc7f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8817fc7f Branch: refs/heads/master Commit: 8817fc7fe8785d7b11138ca744f22f7e70f1f0a0 Parents: 2e6b736 Author: Takeshi Yamamuro linguin@gmail.com Authored: Sun Dec 7 19:42:02 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Sun Dec 7 19:42:02 2014 -0800 -- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 6 ++ .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala| 6 ++ 2 files changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8817fc7f/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 23538b7..84b72b3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -105,6 +105,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab def checkpoint(): Unit /** + * Uncaches both vertices and edges of this graph. This is useful in iterative algorithms that + * build a new graph in each iteration. + */ + def unpersist(blocking: Boolean = true): Graph[VD, ED] + + /** * Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative * algorithms that modify the vertex attributes but reuse the edges. This method can be used to * uncache the vertex attributes of previous iterations once they are no longer needed, improving http://git-wip-us.apache.org/repos/asf/spark/blob/8817fc7f/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index a617d84..3f4a900 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -70,6 +70,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( replicatedVertexView.edges.checkpoint() } + override def unpersist(blocking: Boolean = true): Graph[VD, ED] = { +unpersistVertices(blocking) +replicatedVertexView.edges.unpersist(blocking) +this + } + override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { vertices.unpersist(blocking) // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4620] Add unpersist in Graph and GraphImpl
Repository: spark Updated Branches: refs/heads/branch-1.2 a4ae7c8b5 - 6b9e8b081 [SPARK-4620] Add unpersist in Graph and GraphImpl Add an IF to uncache both vertices and edges of Graph/GraphImpl. This IF is useful when iterative graph operations build a new graph in each iteration, and the vertices and edges of previous iterations are no longer needed for following iterations. Author: Takeshi Yamamuro linguin@gmail.com This patch had conflicts when merged, resolved by Committer: Ankur Dave ankurd...@gmail.com Closes #3476 from maropu/UnpersistInGraphSpike and squashes the following commits: 77a006a [Takeshi Yamamuro] Add unpersist in Graph and GraphImpl (cherry picked from commit 8817fc7fe8785d7b11138ca744f22f7e70f1f0a0) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b9e8b08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b9e8b08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b9e8b08 Branch: refs/heads/branch-1.2 Commit: 6b9e8b081655f71f7ff2c4238254f7aaa110723c Parents: a4ae7c8 Author: Takeshi Yamamuro linguin@gmail.com Authored: Sun Dec 7 19:42:02 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Sun Dec 7 19:42:29 2014 -0800 -- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 6 ++ .../main/scala/org/apache/spark/graphx/impl/GraphImpl.scala| 6 ++ 2 files changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6b9e8b08/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 23538b7..84b72b3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -105,6 +105,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab def checkpoint(): Unit /** + * Uncaches both vertices and edges of this graph. This is useful in iterative algorithms that + * build a new graph in each iteration. + */ + def unpersist(blocking: Boolean = true): Graph[VD, ED] + + /** * Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative * algorithms that modify the vertex attributes but reuse the edges. This method can be used to * uncache the vertex attributes of previous iterations once they are no longer needed, improving http://git-wip-us.apache.org/repos/asf/spark/blob/6b9e8b08/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index a617d84..3f4a900 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -70,6 +70,12 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( replicatedVertexView.edges.checkpoint() } + override def unpersist(blocking: Boolean = true): Graph[VD, ED] = { +unpersistVertices(blocking) +replicatedVertexView.edges.unpersist(blocking) +this + } + override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { vertices.unpersist(blocking) // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-3623][GraphX] GraphX should support the checkpoint operation
Repository: spark Updated Branches: refs/heads/master 6eb1b6f62 - e895e0cbe [SPARK-3623][GraphX] GraphX should support the checkpoint operation Author: GuoQiang Li wi...@qq.com Closes #2631 from witgo/SPARK-3623 and squashes the following commits: a70c500 [GuoQiang Li] Remove java related 4d1e249 [GuoQiang Li] Add comments e682724 [GuoQiang Li] Graph should support the checkpoint operation Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e895e0cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e895e0cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e895e0cb Branch: refs/heads/master Commit: e895e0cbecbbec1b412ff21321e57826d2d0a982 Parents: 6eb1b6f Author: GuoQiang Li wi...@qq.com Authored: Sat Dec 6 00:56:51 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Sat Dec 6 00:56:51 2014 -0800 -- .../scala/org/apache/spark/graphx/Graph.scala | 8 .../apache/spark/graphx/impl/GraphImpl.scala| 5 + .../org/apache/spark/graphx/GraphSuite.scala| 21 3 files changed, 34 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 6377915..23538b7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -97,6 +97,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab def cache(): Graph[VD, ED] /** + * Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint + * directory set with SparkContext.setCheckpointDir() and all references to its parent + * RDDs will be removed. It is strongly recommended that this Graph is persisted in + * memory, otherwise saving it on a file will require recomputation. + */ + def checkpoint(): Unit + + /** * Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative * algorithms that modify the vertex attributes but reuse the edges. This method can be used to * uncache the vertex attributes of previous iterations once they are no longer needed, improving http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 0eae2a6..a617d84 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( this } + override def checkpoint(): Unit = { +vertices.checkpoint() +replicatedVertexView.edges.checkpoint() + } + override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { vertices.unpersist(blocking) // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone http://git-wip-us.apache.org/repos/asf/spark/blob/e895e0cb/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index a05d1dd..9da0064 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.graphx import org.scalatest.FunSuite +import com.google.common.io.Files + import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test(checkpoint) { +val checkpointDir = Files.createTempDir() +checkpointDir.deleteOnExit() +withSpark { sc = + sc.setCheckpointDir(checkpointDir.getAbsolutePath) + val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) = Edge(a, b, 1)} + val rdd = sc.parallelize(ring) + val graph = Graph.fromEdges(rdd, 1.0F) + graph.checkpoint() + graph.edges.map(_.attr).count() + graph.vertices.map(_._2).count() + + val edgesDependencies = graph.edges.partitionsRDD.dependencies + val verticesDependencies =
spark git commit: [SPARK-3623][GraphX] GraphX should support the checkpoint operation
Repository: spark Updated Branches: refs/heads/branch-1.2 11446a648 - 27d9f13af [SPARK-3623][GraphX] GraphX should support the checkpoint operation Author: GuoQiang Li wi...@qq.com Closes #2631 from witgo/SPARK-3623 and squashes the following commits: a70c500 [GuoQiang Li] Remove java related 4d1e249 [GuoQiang Li] Add comments e682724 [GuoQiang Li] Graph should support the checkpoint operation (cherry picked from commit e895e0cbecbbec1b412ff21321e57826d2d0a982) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27d9f13a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27d9f13a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27d9f13a Branch: refs/heads/branch-1.2 Commit: 27d9f13af2df3bd7af029cf7ac48443ba6f4d6e0 Parents: 11446a6 Author: GuoQiang Li wi...@qq.com Authored: Sat Dec 6 00:56:51 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Sat Dec 6 00:57:02 2014 -0800 -- .../scala/org/apache/spark/graphx/Graph.scala | 8 .../apache/spark/graphx/impl/GraphImpl.scala| 5 + .../org/apache/spark/graphx/GraphSuite.scala| 21 3 files changed, 34 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/27d9f13a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 6377915..23538b7 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -97,6 +97,14 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab def cache(): Graph[VD, ED] /** + * Mark this Graph for checkpointing. It will be saved to a file inside the checkpoint + * directory set with SparkContext.setCheckpointDir() and all references to its parent + * RDDs will be removed. It is strongly recommended that this Graph is persisted in + * memory, otherwise saving it on a file will require recomputation. + */ + def checkpoint(): Unit + + /** * Uncaches only the vertices of this graph, leaving the edges alone. This is useful in iterative * algorithms that modify the vertex attributes but reuse the edges. This method can be used to * uncache the vertex attributes of previous iterations once they are no longer needed, improving http://git-wip-us.apache.org/repos/asf/spark/blob/27d9f13a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 0eae2a6..a617d84 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -65,6 +65,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( this } + override def checkpoint(): Unit = { +vertices.checkpoint() +replicatedVertexView.edges.checkpoint() + } + override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { vertices.unpersist(blocking) // TODO: unpersist the replicated vertices in `replicatedVertexView` but leave the edges alone http://git-wip-us.apache.org/repos/asf/spark/blob/27d9f13a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index a05d1dd..9da0064 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.graphx import org.scalatest.FunSuite +import com.google.common.io.Files + import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -365,4 +367,23 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test(checkpoint) { +val checkpointDir = Files.createTempDir() +checkpointDir.deleteOnExit() +withSpark { sc = + sc.setCheckpointDir(checkpointDir.getAbsolutePath) + val ring = (0L to 100L).zip((1L to 99L) :+ 0L).map { case (a, b) = Edge(a, b, 1)} + val rdd = sc.parallelize(ring) + val graph = Graph.fromEdges(rdd, 1.0F) + graph.checkpoint() + graph.edges.map(_.attr).count() + graph.vertices.map(_._2).count() + + val
spark git commit: [SPARK-4672][Core]Checkpoint() should clear f to shorten the serialization chain
Repository: spark Updated Branches: refs/heads/master 17c162f66 - 77be8b986 [SPARK-4672][Core]Checkpoint() should clear f to shorten the serialization chain The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672 The f closure of `PartitionsRDD(ZippedPartitionsRDD2)` contains a `$outer` that references EdgeRDD/VertexRDD, which causes task's serialization chain become very long in iterative GraphX applications. As a result, StackOverflow error will occur. If we set f = null in `clearDependencies()`, checkpoint() can cut off the long serialization chain. More details and explanation can be found in the JIRA. Author: JerryLead jerryl...@163.com Author: Lijie Xu csxuli...@gmail.com Closes #3545 from JerryLead/my_core and squashes the following commits: f7faea5 [JerryLead] checkpoint() should clear the f to avoid StackOverflow error c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark 52799e3 [Lijie Xu] Merge pull request #1 from apache/master Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77be8b98 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77be8b98 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77be8b98 Branch: refs/heads/master Commit: 77be8b986fd21b7bbe28aa8db1042cb22bc74fe7 Parents: 17c162f Author: JerryLead jerryl...@163.com Authored: Tue Dec 2 23:53:29 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Dec 2 23:53:29 2014 -0800 -- .../scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala| 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/77be8b98/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 996f2cd..95b2dd9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -77,7 +77,7 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( sc: SparkContext, -f: (Iterator[A], Iterator[B]) = Iterator[V], +var f: (Iterator[A], Iterator[B]) = Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], preservesPartitioning: Boolean = false) @@ -92,13 +92,14 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag] super.clearDependencies() rdd1 = null rdd2 = null +f = null } } private[spark] class ZippedPartitionsRDD3 [A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag]( sc: SparkContext, -f: (Iterator[A], Iterator[B], Iterator[C]) = Iterator[V], +var f: (Iterator[A], Iterator[B], Iterator[C]) = Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], var rdd3: RDD[C], @@ -117,13 +118,14 @@ private[spark] class ZippedPartitionsRDD3 rdd1 = null rdd2 = null rdd3 = null +f = null } } private[spark] class ZippedPartitionsRDD4 [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag]( sc: SparkContext, -f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) = Iterator[V], +var f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) = Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], var rdd3: RDD[C], @@ -145,5 +147,6 @@ private[spark] class ZippedPartitionsRDD4 rdd2 = null rdd3 = null rdd4 = null +f = null } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4672][Core]Checkpoint() should clear f to shorten the serialization chain
Repository: spark Updated Branches: refs/heads/branch-1.2 528cce8bc - 667f7ff44 [SPARK-4672][Core]Checkpoint() should clear f to shorten the serialization chain The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672 The f closure of `PartitionsRDD(ZippedPartitionsRDD2)` contains a `$outer` that references EdgeRDD/VertexRDD, which causes task's serialization chain become very long in iterative GraphX applications. As a result, StackOverflow error will occur. If we set f = null in `clearDependencies()`, checkpoint() can cut off the long serialization chain. More details and explanation can be found in the JIRA. Author: JerryLead jerryl...@163.com Author: Lijie Xu csxuli...@gmail.com Closes #3545 from JerryLead/my_core and squashes the following commits: f7faea5 [JerryLead] checkpoint() should clear the f to avoid StackOverflow error c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark 52799e3 [Lijie Xu] Merge pull request #1 from apache/master (cherry picked from commit 77be8b986fd21b7bbe28aa8db1042cb22bc74fe7) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/667f7ff4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/667f7ff4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/667f7ff4 Branch: refs/heads/branch-1.2 Commit: 667f7ff440dea9b83dbf3910f26d8dbf82d343a5 Parents: 528cce8 Author: JerryLead jerryl...@163.com Authored: Tue Dec 2 23:53:29 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Dec 2 23:53:38 2014 -0800 -- .../scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala| 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/667f7ff4/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 996f2cd..95b2dd9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -77,7 +77,7 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]( sc: SparkContext, -f: (Iterator[A], Iterator[B]) = Iterator[V], +var f: (Iterator[A], Iterator[B]) = Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], preservesPartitioning: Boolean = false) @@ -92,13 +92,14 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag] super.clearDependencies() rdd1 = null rdd2 = null +f = null } } private[spark] class ZippedPartitionsRDD3 [A: ClassTag, B: ClassTag, C: ClassTag, V: ClassTag]( sc: SparkContext, -f: (Iterator[A], Iterator[B], Iterator[C]) = Iterator[V], +var f: (Iterator[A], Iterator[B], Iterator[C]) = Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], var rdd3: RDD[C], @@ -117,13 +118,14 @@ private[spark] class ZippedPartitionsRDD3 rdd1 = null rdd2 = null rdd3 = null +f = null } } private[spark] class ZippedPartitionsRDD4 [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag]( sc: SparkContext, -f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) = Iterator[V], +var f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) = Iterator[V], var rdd1: RDD[A], var rdd2: RDD[B], var rdd3: RDD[C], @@ -145,5 +147,6 @@ private[spark] class ZippedPartitionsRDD4 rdd2 = null rdd3 = null rdd4 = null +f = null } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4672][GraphX]Perform checkpoint() on PartitionsRDD to shorten the lineage
Repository: spark Updated Branches: refs/heads/master 5da21f07d - fc0a1475e [SPARK-4672][GraphX]Perform checkpoint() on PartitionsRDD to shorten the lineage The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672 Iterative GraphX applications always have long lineage, while checkpoint() on EdgeRDD and VertexRDD themselves cannot shorten the lineage. In contrast, if we perform checkpoint() on their ParitionsRDD, the long lineage can be cut off. Moreover, the existing operations such as cache() in this code is performed on the PartitionsRDD, so checkpoint() should do the same way. More details and explanation can be found in the JIRA. Author: JerryLead jerryl...@163.com Author: Lijie Xu csxuli...@gmail.com Closes #3549 from JerryLead/my_graphX_checkpoint and squashes the following commits: d1aa8d8 [JerryLead] Perform checkpoint() on PartitionsRDD not VertexRDD and EdgeRDD themselves ff08ed4 [JerryLead] Merge branch 'master' of https://github.com/apache/spark c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark 52799e3 [Lijie Xu] Merge pull request #1 from apache/master Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc0a1475 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc0a1475 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc0a1475 Branch: refs/heads/master Commit: fc0a1475ef7c8b33363d88adfe8e8f28def5afc7 Parents: 5da21f0 Author: JerryLead jerryl...@163.com Authored: Tue Dec 2 17:08:02 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Dec 2 17:08:02 2014 -0800 -- .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala| 4 .../main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala | 4 2 files changed, 8 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fc0a1475/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index a816961..504559d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -70,6 +70,10 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( this } + override def checkpoint() = { +partitionsRDD.checkpoint() + } + /** The number of edges in the RDD. */ override def count(): Long = { partitionsRDD.map(_._2.size.toLong).reduce(_ + _) http://git-wip-us.apache.org/repos/asf/spark/blob/fc0a1475/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index d92a55a..c8898b1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -71,6 +71,10 @@ class VertexRDDImpl[VD] private[graphx] ( this } + override def checkpoint() = { +partitionsRDD.checkpoint() + } + /** The number of vertices in the RDD. */ override def count(): Long = { partitionsRDD.map(_.size).reduce(_ + _) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4672][GraphX]Perform checkpoint() on PartitionsRDD to shorten the lineage
Repository: spark Updated Branches: refs/heads/branch-1.2 5e026a3e6 - f1859fc18 [SPARK-4672][GraphX]Perform checkpoint() on PartitionsRDD to shorten the lineage The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672 Iterative GraphX applications always have long lineage, while checkpoint() on EdgeRDD and VertexRDD themselves cannot shorten the lineage. In contrast, if we perform checkpoint() on their ParitionsRDD, the long lineage can be cut off. Moreover, the existing operations such as cache() in this code is performed on the PartitionsRDD, so checkpoint() should do the same way. More details and explanation can be found in the JIRA. Author: JerryLead jerryl...@163.com Author: Lijie Xu csxuli...@gmail.com Closes #3549 from JerryLead/my_graphX_checkpoint and squashes the following commits: d1aa8d8 [JerryLead] Perform checkpoint() on PartitionsRDD not VertexRDD and EdgeRDD themselves ff08ed4 [JerryLead] Merge branch 'master' of https://github.com/apache/spark c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark 52799e3 [Lijie Xu] Merge pull request #1 from apache/master (cherry picked from commit fc0a1475ef7c8b33363d88adfe8e8f28def5afc7) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f1859fc1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f1859fc1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f1859fc1 Branch: refs/heads/branch-1.2 Commit: f1859fc189d9657381fbe82795420de34cad4025 Parents: 5e026a3 Author: JerryLead jerryl...@163.com Authored: Tue Dec 2 17:08:02 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Dec 2 17:11:05 2014 -0800 -- .../main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala| 4 .../main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala | 4 2 files changed, 8 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/f1859fc1/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index a816961..504559d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -70,6 +70,10 @@ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( this } + override def checkpoint() = { +partitionsRDD.checkpoint() + } + /** The number of edges in the RDD. */ override def count(): Long = { partitionsRDD.map(_._2.size.toLong).reduce(_ + _) http://git-wip-us.apache.org/repos/asf/spark/blob/f1859fc1/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index d92a55a..c8898b1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -71,6 +71,10 @@ class VertexRDDImpl[VD] private[graphx] ( this } + override def checkpoint() = { +partitionsRDD.checkpoint() + } + /** The number of vertices in the RDD. */ override def count(): Long = { partitionsRDD.map(_.size).reduce(_ + _) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4672][GraphX]Non-transient PartitionsRDDs will lead to StackOverflow error
Repository: spark Updated Branches: refs/heads/master fc0a1475e - 17c162f66 [SPARK-4672][GraphX]Non-transient PartitionsRDDs will lead to StackOverflow error The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672 In a nutshell, if `val partitionsRDD` in EdgeRDDImpl and VertexRDDImpl are non-transient, the serialization chain can become very long in iterative algorithms and finally lead to the StackOverflow error. More details and explanation can be found in the JIRA. Author: JerryLead jerryl...@163.com Author: Lijie Xu csxuli...@gmail.com Closes #3544 from JerryLead/my_graphX and squashes the following commits: 628f33c [JerryLead] set PartitionsRDD to be transient in EdgeRDDImpl and VertexRDDImpl c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark 52799e3 [Lijie Xu] Merge pull request #1 from apache/master Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17c162f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17c162f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17c162f6 Branch: refs/heads/master Commit: 17c162f6682520e6e2790626e37da3a074471793 Parents: fc0a147 Author: JerryLead jerryl...@163.com Authored: Tue Dec 2 17:14:11 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Dec 2 17:14:11 2014 -0800 -- .../src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala | 2 +- .../main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala| 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/17c162f6/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala index 504559d..897c7ee 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeRDDImpl.scala @@ -26,7 +26,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ class EdgeRDDImpl[ED: ClassTag, VD: ClassTag] private[graphx] ( -override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], +@transient override val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends EdgeRDD[ED](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { http://git-wip-us.apache.org/repos/asf/spark/blob/17c162f6/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index c8898b1..9732c5b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -27,7 +27,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ class VertexRDDImpl[VD] private[graphx] ( -val partitionsRDD: RDD[ShippableVertexPartition[VD]], +@transient val partitionsRDD: RDD[ShippableVertexPartition[VD]], val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) (implicit override protected val vdTag: ClassTag[VD]) extends VertexRDD[VD](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx
Repository: spark Updated Branches: refs/heads/master 23eaf0e12 - d15c6e9dc [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx at first srcIds is not initialized and are all 0. so we use edgeArray(0).srcId to currSrcId Author: lianhuiwang lianhuiwan...@gmail.com Closes #3138 from lianhuiwang/SPARK-4249 and squashes the following commits: 3f4e503 [lianhuiwang] fix a problem of EdgePartitionBuilder in Graphx Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d15c6e9d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d15c6e9d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d15c6e9d Branch: refs/heads/master Commit: d15c6e9dc2860bbe56e31ddf71218ccc6d5c841d Parents: 23eaf0e Author: lianhuiwang lianhuiwan...@gmail.com Authored: Thu Nov 6 10:46:45 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Thu Nov 6 10:46:45 2014 -0800 -- .../org/apache/spark/graphx/impl/EdgePartitionBuilder.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d15c6e9d/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index 4520beb..2b6137b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -45,8 +45,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and // adding them to the index if (edgeArray.length 0) { - index.update(srcIds(0), 0) - var currSrcId: VertexId = srcIds(0) + index.update(edgeArray(0).srcId, 0) + var currSrcId: VertexId = edgeArray(0).srcId var i = 0 while (i edgeArray.size) { srcIds(i) = edgeArray(i).srcId - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx
Repository: spark Updated Branches: refs/heads/branch-1.2 aaaeaf939 - 9061bc4e1 [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx at first srcIds is not initialized and are all 0. so we use edgeArray(0).srcId to currSrcId Author: lianhuiwang lianhuiwan...@gmail.com Closes #3138 from lianhuiwang/SPARK-4249 and squashes the following commits: 3f4e503 [lianhuiwang] fix a problem of EdgePartitionBuilder in Graphx (cherry picked from commit d15c6e9dc2860bbe56e31ddf71218ccc6d5c841d) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9061bc4e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9061bc4e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9061bc4e Branch: refs/heads/branch-1.2 Commit: 9061bc4e127abb0c44e37f1b8b7706883d451bc7 Parents: aaaeaf9 Author: lianhuiwang lianhuiwan...@gmail.com Authored: Thu Nov 6 10:46:45 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Thu Nov 6 10:47:49 2014 -0800 -- .../org/apache/spark/graphx/impl/EdgePartitionBuilder.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9061bc4e/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index 4520beb..2b6137b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -45,8 +45,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and // adding them to the index if (edgeArray.length 0) { - index.update(srcIds(0), 0) - var currSrcId: VertexId = srcIds(0) + index.update(edgeArray(0).srcId, 0) + var currSrcId: VertexId = edgeArray(0).srcId var i = 0 while (i edgeArray.size) { srcIds(i) = edgeArray(i).srcId - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx
Repository: spark Updated Branches: refs/heads/branch-1.1 c58c1bb83 - 0a40eac25 [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx at first srcIds is not initialized and are all 0. so we use edgeArray(0).srcId to currSrcId Author: lianhuiwang lianhuiwan...@gmail.com Closes #3138 from lianhuiwang/SPARK-4249 and squashes the following commits: 3f4e503 [lianhuiwang] fix a problem of EdgePartitionBuilder in Graphx (cherry picked from commit d15c6e9dc2860bbe56e31ddf71218ccc6d5c841d) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0a40eac2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0a40eac2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0a40eac2 Branch: refs/heads/branch-1.1 Commit: 0a40eac25a0202c492f58e3c97a96a35ceed6ce8 Parents: c58c1bb Author: lianhuiwang lianhuiwan...@gmail.com Authored: Thu Nov 6 10:46:45 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Thu Nov 6 10:48:32 2014 -0800 -- .../org/apache/spark/graphx/impl/EdgePartitionBuilder.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0a40eac2/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index 4520beb..2b6137b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -45,8 +45,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and // adding them to the index if (edgeArray.length 0) { - index.update(srcIds(0), 0) - var currSrcId: VertexId = srcIds(0) + index.update(edgeArray(0).srcId, 0) + var currSrcId: VertexId = edgeArray(0).srcId var i = 0 while (i edgeArray.size) { srcIds(i) = edgeArray(i).srcId - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx
Repository: spark Updated Branches: refs/heads/branch-1.0 49224fd0f - 76c20cac9 [SPARK-4249][GraphX]fix a problem of EdgePartitionBuilder in Graphx at first srcIds is not initialized and are all 0. so we use edgeArray(0).srcId to currSrcId Author: lianhuiwang lianhuiwan...@gmail.com Closes #3138 from lianhuiwang/SPARK-4249 and squashes the following commits: 3f4e503 [lianhuiwang] fix a problem of EdgePartitionBuilder in Graphx (cherry picked from commit d15c6e9dc2860bbe56e31ddf71218ccc6d5c841d) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76c20cac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76c20cac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76c20cac Branch: refs/heads/branch-1.0 Commit: 76c20cac99437ac07bcc9d71e295fd49fd465f4e Parents: 49224fd Author: lianhuiwang lianhuiwan...@gmail.com Authored: Thu Nov 6 10:46:45 2014 -0800 Committer: Ankur Dave ankurd...@gmail.com Committed: Thu Nov 6 10:49:44 2014 -0800 -- .../org/apache/spark/graphx/impl/EdgePartitionBuilder.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/76c20cac/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index 4520beb..2b6137b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -45,8 +45,8 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: Cla // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters and // adding them to the index if (edgeArray.length 0) { - index.update(srcIds(0), 0) - var currSrcId: VertexId = srcIds(0) + index.update(edgeArray(0).srcId, 0) + var currSrcId: VertexId = edgeArray(0).srcId var i = 0 while (i edgeArray.size) { srcIds(i) = edgeArray(i).srcId - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [graphX] GraphOps: random pick vertex bug
Repository: spark Updated Branches: refs/heads/master 0bbe7faef - 51229ff7f [graphX] GraphOps: random pick vertex bug When `numVertices 50`, probability is set to 0. This would cause infinite loop. Author: yingjieMiao ying...@42go.com Closes #2553 from yingjieMiao/graphx and squashes the following commits: 6adf3c8 [yingjieMiao] [graphX] GraphOps: random pick vertex bug Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51229ff7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51229ff7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51229ff7 Branch: refs/heads/master Commit: 51229ff7f4d3517706a1cdc1a2943ede1c605089 Parents: 0bbe7fa Author: yingjieMiao ying...@42go.com Authored: Mon Sep 29 18:01:27 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Mon Sep 29 18:01:27 2014 -0700 -- graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/51229ff7/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 02afaa9..d0dd45d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -254,7 +254,7 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * Picks a random vertex from the graph and returns its ID. */ def pickRandomVertex(): VertexId = { -val probability = 50 / graph.numVertices +val probability = 50.0 / graph.numVertices var found = false var retVal: VertexId = null.asInstanceOf[VertexId] while (!found) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc
Repository: spark Updated Branches: refs/heads/branch-1.1 cf15b22d4 - 1687d6ba9 [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc VertexRDD.apply had a bug where it ignored the merge function for duplicate vertices and instead used whichever vertex attribute occurred first. This commit fixes the bug by passing the merge function through to ShippableVertexPartition.apply, which merges any duplicates using the merge function and then fills in missing vertices using the specified default vertex attribute. This commit also adds a unit test for VertexRDD.apply. Author: Larry Xiao xia...@sjtu.edu.cn Author: Blie Arkansol xia...@sjtu.edu.cn Author: Ankur Dave ankurd...@gmail.com Closes #1903 from larryxiao/2062 and squashes the following commits: 625aa9d [Blie Arkansol] Merge pull request #1 from ankurdave/SPARK-2062 476770b [Ankur Dave] ShippableVertexPartition.initFrom: Don't run mergeFunc on default values 614059f [Larry Xiao] doc update: note about the default null value vertices construction dfdb3c9 [Larry Xiao] minor fix 1c70366 [Larry Xiao] scalastyle check: wrap line, parameter list indent 4 spaces e4ca697 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc 6a35ea8 [Larry Xiao] [TEST] VertexRDD.apply mergeFunc 4fbc29c [Blie Arkansol] undo unnecessary change efae765 [Larry Xiao] fix mistakes: should be able to call with or without mergeFunc b2422f9 [Larry Xiao] Merge branch '2062' of github.com:larryxiao/spark into 2062 52dc7f7 [Larry Xiao] pass mergeFunc to VertexPartitionBase, where merge is handled 581e9ee [Larry Xiao] TODO: VertexRDDSuite 20d80a3 [Larry Xiao] [SPARK-2062][GraphX] VertexRDD.apply does not use the mergeFunc (cherry picked from commit 3bbbdd8180cf316c6f8dde0e879410b6b29f8cc3) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1687d6ba Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1687d6ba Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1687d6ba Branch: refs/heads/branch-1.1 Commit: 1687d6ba95e5335a1445e0e392170a2d462bd356 Parents: cf15b22 Author: Larry Xiao xia...@sjtu.edu.cn Authored: Thu Sep 18 23:32:32 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Thu Sep 18 23:37:22 2014 -0700 -- .../org/apache/spark/graphx/VertexRDD.scala | 4 +-- .../graphx/impl/ShippableVertexPartition.scala | 28 .../apache/spark/graphx/VertexRDDSuite.scala| 11 3 files changed, 36 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1687d6ba/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 04fbc9d..2c8b245 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -392,7 +392,7 @@ object VertexRDD { */ def apply[VD: ClassTag]( vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = { -VertexRDD(vertices, edges, defaultVal, (a, b) = b) +VertexRDD(vertices, edges, defaultVal, (a, b) = a) } /** @@ -419,7 +419,7 @@ object VertexRDD { (vertexIter, routingTableIter) = val routingTable = if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty -Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal)) +Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal, mergeFunc)) } new VertexRDD(vertexPartitions) } http://git-wip-us.apache.org/repos/asf/spark/blob/1687d6ba/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala index dca54b8..5412d72 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala @@ -36,7 +36,7 @@ private[graphx] object ShippableVertexPartition { /** Construct a `ShippableVertexPartition` from the given vertices without any routing table. */ def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] = -apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD]) +apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD], (a, b) = a) /** * Construct a `ShippableVertexPartition
git commit: [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions
Repository: spark Updated Branches: refs/heads/branch-1.1 f41c45a75 - 8c40ab5c0 [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions 9b225ac3072de522b40b46aba6df1f1c231f13ef has been causing GraphX tests to fail nondeterministically, which is blocking development for others. Author: Ankur Dave ankurd...@gmail.com Closes #2271 from ankurdave/SPARK-3400 and squashes the following commits: 10c2a97 [Ankur Dave] [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions (cherry picked from commit 00362dac976cd05b06638deb11d990d612429e0b) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8c40ab5c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8c40ab5c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8c40ab5c Branch: refs/heads/branch-1.1 Commit: 8c40ab5c06ab72e85a8a9d4272fed0e81eca1d3a Parents: f41c45a Author: Ankur Dave ankurd...@gmail.com Authored: Wed Sep 3 23:49:47 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Wed Sep 3 23:50:11 2014 -0700 -- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 4 ++-- .../scala/org/apache/spark/graphx/GraphSuite.scala | 16 2 files changed, 2 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8c40ab5c/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 0f1a101..899a3cb 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx import scala.reflect.{classTag, ClassTag} -import org.apache.spark._ +import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -47,7 +47,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( * partitioner that allows co-partitioning with `partitionsRDD`. */ override val partitioner = -partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size))) + partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) http://git-wip-us.apache.org/repos/asf/spark/blob/8c40ab5c/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index eaaa449..6506bac 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.graphx import org.scalatest.FunSuite -import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -351,19 +350,4 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } - test(non-default number of edge partitions) { -val n = 10 -val defaultParallelism = 3 -val numEdgePartitions = 4 -assert(defaultParallelism != numEdgePartitions) -val conf = new SparkConf() - .set(spark.default.parallelism, defaultParallelism.toString) -val sc = new SparkContext(local, test, conf) -val edges = sc.parallelize((1 to n).map(x = (x: VertexId, 0: VertexId)), - numEdgePartitions) -val graph = Graph.fromEdgeTuples(edges, 1) -val neighborAttrSums = graph.mapReduceTriplets[Int]( - et = Iterator((et.dstId, et.srcAttr)), _ + _) -assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n))) - } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions
Repository: spark Updated Branches: refs/heads/master 1bed0a386 - 00362dac9 [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions 9b225ac3072de522b40b46aba6df1f1c231f13ef has been causing GraphX tests to fail nondeterministically, which is blocking development for others. Author: Ankur Dave ankurd...@gmail.com Closes #2271 from ankurdave/SPARK-3400 and squashes the following commits: 10c2a97 [Ankur Dave] [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00362dac Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00362dac Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00362dac Branch: refs/heads/master Commit: 00362dac976cd05b06638deb11d990d612429e0b Parents: 1bed0a3 Author: Ankur Dave ankurd...@gmail.com Authored: Wed Sep 3 23:49:47 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Wed Sep 3 23:49:47 2014 -0700 -- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 4 ++-- .../scala/org/apache/spark/graphx/GraphSuite.scala | 16 2 files changed, 2 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/00362dac/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 35fbd47..5bcb96b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx import scala.reflect.{classTag, ClassTag} -import org.apache.spark._ +import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -55,7 +55,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( * partitioner that allows co-partitioning with `partitionsRDD`. */ override val partitioner = -partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size))) + partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) http://git-wip-us.apache.org/repos/asf/spark/blob/00362dac/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index eaaa449..6506bac 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.graphx import org.scalatest.FunSuite -import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -351,19 +350,4 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } - test(non-default number of edge partitions) { -val n = 10 -val defaultParallelism = 3 -val numEdgePartitions = 4 -assert(defaultParallelism != numEdgePartitions) -val conf = new SparkConf() - .set(spark.default.parallelism, defaultParallelism.toString) -val sc = new SparkContext(local, test, conf) -val edges = sc.parallelize((1 to n).map(x = (x: VertexId, 0: VertexId)), - numEdgePartitions) -val graph = Graph.fromEdgeTuples(edges, 1) -val neighborAttrSums = graph.mapReduceTriplets[Int]( - et = Iterator((et.dstId, et.srcAttr)), _ + _) -assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n))) - } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions
Repository: spark Updated Branches: refs/heads/branch-1.0 8dd7690e2 - 4d3ab2925 [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions 9b225ac3072de522b40b46aba6df1f1c231f13ef has been causing GraphX tests to fail nondeterministically, which is blocking development for others. Author: Ankur Dave ankurd...@gmail.com Closes #2271 from ankurdave/SPARK-3400 and squashes the following commits: 10c2a97 [Ankur Dave] [HOTFIX] [SPARK-3400] Revert 9b225ac fix GraphX EdgeRDD zipPartitions (cherry picked from commit 00362dac976cd05b06638deb11d990d612429e0b) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d3ab292 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d3ab292 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d3ab292 Branch: refs/heads/branch-1.0 Commit: 4d3ab292576396e11d9e5c2dabb676c07b34d286 Parents: 8dd7690 Author: Ankur Dave ankurd...@gmail.com Authored: Wed Sep 3 23:49:47 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Wed Sep 3 23:50:22 2014 -0700 -- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 4 ++-- .../scala/org/apache/spark/graphx/GraphSuite.scala | 16 2 files changed, 2 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4d3ab292/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 4dd15bf..a8fc095 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx import scala.reflect.{classTag, ClassTag} -import org.apache.spark._ +import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -45,7 +45,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( * partitioner that allows co-partitioning with `partitionsRDD`. */ override val partitioner = -partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size))) + partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) http://git-wip-us.apache.org/repos/asf/spark/blob/4d3ab292/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index e1b83c2..abc25d0 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.graphx import org.scalatest.FunSuite -import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -326,19 +325,4 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } - test(non-default number of edge partitions) { -val n = 10 -val defaultParallelism = 3 -val numEdgePartitions = 4 -assert(defaultParallelism != numEdgePartitions) -val conf = new SparkConf() - .set(spark.default.parallelism, defaultParallelism.toString) -val sc = new SparkContext(local, test, conf) -val edges = sc.parallelize((1 to n).map(x = (x: VertexId, 0: VertexId)), - numEdgePartitions) -val graph = Graph.fromEdgeTuples(edges, 1) -val neighborAttrSums = graph.mapReduceTriplets[Int]( - et = Iterator((et.dstId, et.srcAttr)), _ + _) -assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n))) - } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-3263][GraphX] Fix changes made to GraphGenerator.logNormalGraph in PR #720
Repository: spark Updated Branches: refs/heads/master 6481d2742 - e5d376801 [SPARK-3263][GraphX] Fix changes made to GraphGenerator.logNormalGraph in PR #720 PR #720 made multiple changes to GraphGenerator.logNormalGraph including: * Replacing the call to functions for generating random vertices and edges with in-line implementations with different equations. Based on reading the Pregel paper, I believe the in-line functions are incorrect. * Hard-coding of RNG seeds so that method now generates the same graph for a given number of vertices, edges, mu, and sigma -- user is not able to override seed or specify that seed should be randomly generated. * Backwards-incompatible change to logNormalGraph signature with introduction of new required parameter. * Failed to update scala docs and programming guide for API changes * Added a Synthetic Benchmark in the examples. This PR: * Removes the in-line calls and calls original vertex / edge generation functions again * Adds an optional seed parameter for deterministic behavior (when desired) * Keeps the number of partitions parameter that was added. * Keeps compatibility with the synthetic benchmark example * Maintains backwards-compatible API Author: RJ Nowling rnowl...@gmail.com Author: Ankur Dave ankurd...@gmail.com Closes #2168 from rnowling/graphgenrand and squashes the following commits: f1cd79f [Ankur Dave] Style fixes e11918e [RJ Nowling] Fix bad comparisons in unit tests 785ac70 [RJ Nowling] Fix style error c70868d [RJ Nowling] Fix logNormalGraph scala doc for seed 41fd1f8 [RJ Nowling] Fix logNormalGraph scala doc for seed 799f002 [RJ Nowling] Added test for different seeds for sampleLogNormal 43949ad [RJ Nowling] Added test for different seeds for generateRandomEdges 2faf75f [RJ Nowling] Added unit test for logNormalGraph 82f22397 [RJ Nowling] Add unit test for sampleLogNormal b99cba9 [RJ Nowling] Make sampleLogNormal private to Spark (vs private) for unit testing 6803da1 [RJ Nowling] Add GraphGeneratorsSuite with test for generateRandomEdges 1c8fc44 [RJ Nowling] Connected components part of SynthBenchmark was failing to call count on RDD before printing dfbb6dd [RJ Nowling] Fix parameter name in SynthBenchmark docs b5eeb80 [RJ Nowling] Add optional seed parameter to SynthBenchmark and set default to randomly generate a seed 1ff8d30 [RJ Nowling] Fix bug in generateRandomEdges where numVertices instead of numEdges was used to control number of edges to generate 98bb73c [RJ Nowling] Add documentation for logNormalGraph parameters d40141a [RJ Nowling] Fix style error 684804d [RJ Nowling] revert PR #720 which introduce errors in logNormalGraph and messed up seeding of RNGs. Add user-defined optional seed for deterministic behavior c183136 [RJ Nowling] Fix to deterministic GraphGenerators.logNormalGraph that allows generating graphs randomly using optional seed. 015010c [RJ Nowling] Fixed GraphGenerator logNormalGraph API to make backward-incompatible change in commit 894ecde04 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e5d37680 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e5d37680 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e5d37680 Branch: refs/heads/master Commit: e5d376801d57dffb0791980a1786a0a9b45bc491 Parents: 6481d27 Author: RJ Nowling rnowl...@gmail.com Authored: Wed Sep 3 14:15:22 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Wed Sep 3 14:16:06 2014 -0700 -- .../spark/examples/graphx/SynthBenchmark.scala | 9 +- .../spark/graphx/util/GraphGenerators.scala | 65 ++- .../graphx/util/GraphGeneratorsSuite.scala | 110 +++ 3 files changed, 152 insertions(+), 32 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e5d37680/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala index 551c339..5f35a58 100644 --- a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -38,12 +38,13 @@ object SynthBenchmark { * Options: * -app pagerank or cc for pagerank or connected components. (Default: pagerank) * -niters the number of iterations of pagerank to use (Default: 10) - * -numVertices the number of vertices in the graph (Default: 100) + * -nverts the number of vertices in the graph (Default: 100) * -numEPart the number of edge partitions in the graph (Default: number of cores) * -partStrategy the graph partitioning
git commit: [SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples
Repository: spark Updated Branches: refs/heads/master 644e31524 - 7c92b49d6 [SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples to support ~/spark/bin/run-example GraphXAnalytics triangles /soc-LiveJournal1.txt --numEPart=256 Author: Larry Xiao xia...@sjtu.edu.cn Closes #1766 from larryxiao/1986 and squashes the following commits: bb77cd9 [Larry Xiao] [SPARK-1986][GraphX]move lib.Analytics to org.apache.spark.examples Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c92b49d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c92b49d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c92b49d Branch: refs/heads/master Commit: 7c92b49d6b62f88fcde883aacb60c5e32ae54b30 Parents: 644e315 Author: Larry Xiao xia...@sjtu.edu.cn Authored: Tue Sep 2 18:29:08 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Sep 2 18:29:08 2014 -0700 -- .../spark/examples/graphx/Analytics.scala | 162 +++ .../examples/graphx/LiveJournalPageRank.scala | 2 +- .../org/apache/spark/graphx/lib/Analytics.scala | 161 -- 3 files changed, 163 insertions(+), 162 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7c92b49d/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala new file mode 100644 index 000..c4317a6 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/Analytics.scala @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.graphx + +import scala.collection.mutable +import org.apache.spark._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.graphx._ +import org.apache.spark.graphx.lib._ +import org.apache.spark.graphx.PartitionStrategy._ + +/** + * Driver program for running graph algorithms. + */ +object Analytics extends Logging { + + def main(args: Array[String]): Unit = { +if (args.length 2) { + System.err.println( +Usage: Analytics taskType file --numEPart=num_edge_partitions [other options]) + System.exit(1) +} + +val taskType = args(0) +val fname = args(1) +val optionsList = args.drop(2).map { arg = + arg.dropWhile(_ == '-').split('=') match { +case Array(opt, v) = (opt - v) +case _ = throw new IllegalArgumentException(Invalid argument: + arg) + } +} +val options = mutable.Map(optionsList: _*) + +def pickPartitioner(v: String): PartitionStrategy = { + // TODO: Use reflection rather than listing all the partitioning strategies here. + v match { +case RandomVertexCut = RandomVertexCut +case EdgePartition1D = EdgePartition1D +case EdgePartition2D = EdgePartition2D +case CanonicalRandomVertexCut = CanonicalRandomVertexCut +case _ = throw new IllegalArgumentException(Invalid PartitionStrategy: + v) + } +} + +val conf = new SparkConf() + .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) + .set(spark.kryo.registrator, org.apache.spark.graphx.GraphKryoRegistrator) + .set(spark.locality.wait, 10) + +val numEPart = options.remove(numEPart).map(_.toInt).getOrElse { + println(Set the number of edge partitions using --numEPart.) + sys.exit(1) +} +val partitionStrategy: Option[PartitionStrategy] = options.remove(partStrategy) + .map(pickPartitioner(_)) +val edgeStorageLevel = options.remove(edgeStorageLevel) + .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) +val vertexStorageLevel = options.remove(vertexStorageLevel) + .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY) + +taskType match { + case pagerank = +val tol =
git commit: [SPARK-3123][GraphX]: override the setName function to set EdgeRDD's name manually just as VertexRDD does.
Repository: spark Updated Branches: refs/heads/master 7c92b49d6 - 7c9bbf172 [SPARK-3123][GraphX]: override the setName function to set EdgeRDD's name manually just as VertexRDD does. Author: uncleGen husty...@gmail.com Closes #2033 from uncleGen/master_origin and squashes the following commits: 801994b [uncleGen] Update EdgeRDD.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c9bbf17 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c9bbf17 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c9bbf17 Branch: refs/heads/master Commit: 7c9bbf172512701c75992671bcb2f4b6d9e5034b Parents: 7c92b49 Author: uncleGen husty...@gmail.com Authored: Tue Sep 2 18:41:54 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Sep 2 18:44:58 2014 -0700 -- .../src/main/scala/org/apache/spark/graphx/EdgeRDD.scala | 10 +- 1 file changed, 9 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7c9bbf17/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 899a3cb..5bcb96b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -37,7 +37,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { - partitionsRDD.setName(EdgeRDD) + override def setName(_name: String): this.type = { +if (partitionsRDD.name != null) { + partitionsRDD.setName(partitionsRDD.name + , + _name) +} else { + partitionsRDD.setName(_name) +} +this + } + setName(EdgeRDD) override protected def getPartitions: Array[Partition] = partitionsRDD.partitions - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-2981][GraphX] EdgePartition1D Int overflow
Repository: spark Updated Branches: refs/heads/master 7c9bbf172 - aa7de128c [SPARK-2981][GraphX] EdgePartition1D Int overflow minor fix detail is here: https://issues.apache.org/jira/browse/SPARK-2981 Author: Larry Xiao xia...@sjtu.edu.cn Closes #1902 from larryxiao/2981 and squashes the following commits: 88059a2 [Larry Xiao] [SPARK-2981][GraphX] EdgePartition1D Int overflow Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa7de128 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa7de128 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa7de128 Branch: refs/heads/master Commit: aa7de128c5987fd2e134736f07ae913ad1f5eb26 Parents: 7c9bbf1 Author: Larry Xiao xia...@sjtu.edu.cn Authored: Tue Sep 2 18:50:52 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Sep 2 18:50:52 2014 -0700 -- .../src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aa7de128/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 5e7e72a..13033fe 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -91,7 +91,7 @@ object PartitionStrategy { case object EdgePartition1D extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val mixingPrime: VertexId = 1125899906842597L - (math.abs(src) * mixingPrime).toInt % numParts + (math.abs(src * mixingPrime) % numParts).toInt } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-2981][GraphX] EdgePartition1D Int overflow
Repository: spark Updated Branches: refs/heads/branch-1.1 7267e402c - 9b0cff2d4 [SPARK-2981][GraphX] EdgePartition1D Int overflow minor fix detail is here: https://issues.apache.org/jira/browse/SPARK-2981 Author: Larry Xiao xia...@sjtu.edu.cn Closes #1902 from larryxiao/2981 and squashes the following commits: 88059a2 [Larry Xiao] [SPARK-2981][GraphX] EdgePartition1D Int overflow (cherry picked from commit aa7de128c5987fd2e134736f07ae913ad1f5eb26) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b0cff2d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b0cff2d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b0cff2d Branch: refs/heads/branch-1.1 Commit: 9b0cff2d45027cc348f5c5dd095d137368457779 Parents: 7267e40 Author: Larry Xiao xia...@sjtu.edu.cn Authored: Tue Sep 2 18:50:52 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Sep 2 18:51:03 2014 -0700 -- .../src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b0cff2d/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 5e7e72a..13033fe 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -91,7 +91,7 @@ object PartitionStrategy { case object EdgePartition1D extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val mixingPrime: VertexId = 1125899906842597L - (math.abs(src) * mixingPrime).toInt % numParts + (math.abs(src * mixingPrime) % numParts).toInt } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-2981][GraphX] EdgePartition1D Int overflow
Repository: spark Updated Branches: refs/heads/branch-1.0 5481196ab - d60f60ccc [SPARK-2981][GraphX] EdgePartition1D Int overflow minor fix detail is here: https://issues.apache.org/jira/browse/SPARK-2981 Author: Larry Xiao xia...@sjtu.edu.cn Closes #1902 from larryxiao/2981 and squashes the following commits: 88059a2 [Larry Xiao] [SPARK-2981][GraphX] EdgePartition1D Int overflow (cherry picked from commit aa7de128c5987fd2e134736f07ae913ad1f5eb26) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d60f60cc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d60f60cc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d60f60cc Branch: refs/heads/branch-1.0 Commit: d60f60ccc73ff717e2811a549a2a9ed5bdfd405b Parents: 5481196 Author: Larry Xiao xia...@sjtu.edu.cn Authored: Tue Sep 2 18:50:52 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Sep 2 18:51:16 2014 -0700 -- .../src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d60f60cc/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 1526cce..a1ab199 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -91,7 +91,7 @@ object PartitionStrategy { case object EdgePartition1D extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val mixingPrime: VertexId = 1125899906842597L - (math.abs(src) * mixingPrime).toInt % numParts + (math.abs(src * mixingPrime) % numParts).toInt } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions
Repository: spark Updated Branches: refs/heads/branch-1.1 0c8183cb3 - ffdb2fcf8 [SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions If the users set âspark.default.parallelismâ and the value is different with the EdgeRDD partition number, GraphX jobs will throw: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions Author: luluorta luluo...@gmail.com Closes #1763 from luluorta/fix-graph-zip and squashes the following commits: 8338961 [luluorta] fix GraphX EdgeRDD zipPartitions (cherry picked from commit 9b225ac3072de522b40b46aba6df1f1c231f13ef) Signed-off-by: Ankur Dave ankurd...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffdb2fcf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffdb2fcf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffdb2fcf Branch: refs/heads/branch-1.1 Commit: ffdb2fcf8cd5880375bee52ee101e0373bf63e27 Parents: 0c8183c Author: luluorta luluo...@gmail.com Authored: Tue Sep 2 19:25:52 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Sep 2 19:28:57 2014 -0700 -- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 4 ++-- .../scala/org/apache/spark/graphx/GraphSuite.scala | 16 2 files changed, 18 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ffdb2fcf/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 899a3cb..0f1a101 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx import scala.reflect.{classTag, ClassTag} -import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} +import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -47,7 +47,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( * partitioner that allows co-partitioning with `partitionsRDD`. */ override val partitioner = - partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) +partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size))) override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) http://git-wip-us.apache.org/repos/asf/spark/blob/ffdb2fcf/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 6506bac..eaaa449 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx import org.scalatest.FunSuite +import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -350,4 +351,19 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test(non-default number of edge partitions) { +val n = 10 +val defaultParallelism = 3 +val numEdgePartitions = 4 +assert(defaultParallelism != numEdgePartitions) +val conf = new SparkConf() + .set(spark.default.parallelism, defaultParallelism.toString) +val sc = new SparkContext(local, test, conf) +val edges = sc.parallelize((1 to n).map(x = (x: VertexId, 0: VertexId)), + numEdgePartitions) +val graph = Graph.fromEdgeTuples(edges, 1) +val neighborAttrSums = graph.mapReduceTriplets[Int]( + et = Iterator((et.dstId, et.srcAttr)), _ + _) +assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n))) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions
Repository: spark Updated Branches: refs/heads/master e9bb12bea - 9b225ac30 [SPARK-2823][GraphX]fix GraphX EdgeRDD zipPartitions If the users set âspark.default.parallelismâ and the value is different with the EdgeRDD partition number, GraphX jobs will throw: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions Author: luluorta luluo...@gmail.com Closes #1763 from luluorta/fix-graph-zip and squashes the following commits: 8338961 [luluorta] fix GraphX EdgeRDD zipPartitions Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b225ac3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b225ac3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b225ac3 Branch: refs/heads/master Commit: 9b225ac3072de522b40b46aba6df1f1c231f13ef Parents: e9bb12b Author: luluorta luluo...@gmail.com Authored: Tue Sep 2 19:25:52 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Sep 2 19:26:27 2014 -0700 -- .../scala/org/apache/spark/graphx/EdgeRDD.scala | 4 ++-- .../scala/org/apache/spark/graphx/GraphSuite.scala | 16 2 files changed, 18 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9b225ac3/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 5bcb96b..35fbd47 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.graphx import scala.reflect.{classTag, ClassTag} -import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} +import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel @@ -55,7 +55,7 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( * partitioner that allows co-partitioning with `partitionsRDD`. */ override val partitioner = - partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) +partitionsRDD.partitioner.orElse(Some(new HashPartitioner(partitionsRDD.partitions.size))) override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) http://git-wip-us.apache.org/repos/asf/spark/blob/9b225ac3/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala -- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 6506bac..eaaa449 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx import org.scalatest.FunSuite +import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.graphx.Graph._ import org.apache.spark.graphx.PartitionStrategy._ @@ -350,4 +351,19 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test(non-default number of edge partitions) { +val n = 10 +val defaultParallelism = 3 +val numEdgePartitions = 4 +assert(defaultParallelism != numEdgePartitions) +val conf = new SparkConf() + .set(spark.default.parallelism, defaultParallelism.toString) +val sc = new SparkContext(local, test, conf) +val edges = sc.parallelize((1 to n).map(x = (x: VertexId, 0: VertexId)), + numEdgePartitions) +val graph = Graph.fromEdgeTuples(edges, 1) +val neighborAttrSums = graph.mapReduceTriplets[Int]( + et = Iterator((et.dstId, et.srcAttr)), _ + _) +assert(neighborAttrSums.collect.toSet === Set((0: VertexId, n))) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r1607545 - in /spark: images/graphx-perf-comparison.png site/images/graphx-perf-comparison.png
Author: ankurdave Date: Thu Jul 3 07:08:46 2014 New Revision: 1607545 URL: http://svn.apache.org/r1607545 Log: Correct the GraphX performance comparison graphic Modified: spark/images/graphx-perf-comparison.png spark/site/images/graphx-perf-comparison.png Modified: spark/images/graphx-perf-comparison.png URL: http://svn.apache.org/viewvc/spark/images/graphx-perf-comparison.png?rev=1607545r1=1607544r2=1607545view=diff == Binary files - no diff available. Modified: spark/site/images/graphx-perf-comparison.png URL: http://svn.apache.org/viewvc/spark/site/images/graphx-perf-comparison.png?rev=1607545r1=1607544r2=1607545view=diff == Binary files - no diff available.
git commit: Minor: Fix documentation error from apache/spark#946
Repository: spark Updated Branches: refs/heads/master 11ded3f66 - abea2d4ff Minor: Fix documentation error from apache/spark#946 Author: Ankur Dave ankurd...@gmail.com Closes #970 from ankurdave/SPARK-1991_docfix and squashes the following commits: 6d07343 [Ankur Dave] Minor: Fix documentation error from apache/spark#946 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abea2d4f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abea2d4f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abea2d4f Branch: refs/heads/master Commit: abea2d4ff099036c67fc73136d0e61d0d0e22123 Parents: 11ded3f Author: Ankur Dave ankurd...@gmail.com Authored: Wed Jun 4 16:45:53 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Wed Jun 4 16:45:53 2014 -0700 -- graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/abea2d4f/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index 2e814e3..f4c7936 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -49,8 +49,8 @@ object GraphLoader extends Logging { * @param canonicalOrientation whether to orient edges in the positive *direction * @param minEdgePartitions the number of partitions for the edge RDD - * @param edgeStorageLevel the desired storage level for the edge partitions. To set the vertex - *storage level, call [[org.apache.spark.graphx.Graph#persistVertices]]. + * @param edgeStorageLevel the desired storage level for the edge partitions + * @param vertexStorageLevel the desired storage level for the vertex partitions */ def edgeListFile( sc: SparkContext,
git commit: Synthetic GraphX Benchmark
Repository: spark Updated Branches: refs/heads/master aa41a522d - 894ecde04 Synthetic GraphX Benchmark This PR accomplishes two things: 1. It introduces a Synthetic Benchmark application that generates an arbitrarily large log-normal graph and executes either PageRank or connected components on the graph. This can be used to profile GraphX system on arbitrary clusters without access to large graph datasets 2. This PR improves the implementation of the log-normal graph generator. Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com Author: Ankur Dave ankurd...@gmail.com Closes #720 from jegonzal/graphx_synth_benchmark and squashes the following commits: e40812a [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0 bad [Ankur Dave] Fix long lines 374678a [Ankur Dave] Bugfix and style changes 1bdf39a [Joseph E. Gonzalez] updating options d943972 [Joseph E. Gonzalez] moving the benchmark application into the examples folder. f4f839a [Joseph E. Gonzalez] Creating a synthetic benchmark script. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/894ecde0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/894ecde0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/894ecde0 Branch: refs/heads/master Commit: 894ecde04faa7e2054a40825a58b2e9cdaa93c70 Parents: aa41a52 Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com Authored: Tue Jun 3 14:14:48 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Jun 3 14:14:48 2014 -0700 -- .../spark/examples/graphx/SynthBenchmark.scala | 128 +++ .../apache/spark/graphx/PartitionStrategy.scala | 9 ++ .../spark/graphx/util/GraphGenerators.scala | 41 -- project/MimaExcludes.scala | 4 +- 4 files changed, 171 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/894ecde0/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala -- diff --git a/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala new file mode 100644 index 000..551c339 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/graphx/SynthBenchmark.scala @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.graphx + +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx.PartitionStrategy +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.graphx.util.GraphGenerators +import java.io.{PrintWriter, FileOutputStream} + +/** + * The SynthBenchmark application can be used to run various GraphX algorithms on + * synthetic log-normal graphs. The intent of this code is to enable users to + * profile the GraphX system without access to large graph datasets. + */ +object SynthBenchmark { + + /** + * To run this program use the following: + * + * MASTER=spark://foobar bin/run-example graphx.SynthBenchmark -app=pagerank + * + * Options: + * -app pagerank or cc for pagerank or connected components. (Default: pagerank) + * -niters the number of iterations of pagerank to use (Default: 10) + * -numVertices the number of vertices in the graph (Default: 100) + * -numEPart the number of edge partitions in the graph (Default: number of cores) + * -partStrategy the graph partitioning strategy to use + * -mu the mean parameter for the log-normal graph (Default: 4.0) + * -sigma the stdev parameter for the log-normal graph (Default: 1.3) + * -degFile the local file to save the degree information (Default: Empty) + */ + def main(args: Array[String]) { +val options = args.map { + arg = +arg.dropWhile(_ == '-').split('=') match { + case Array(opt, v) = (opt - v) + case _ = throw new IllegalArgumentException(Invalid argument: + arg) +} +} + +
git commit: Enable repartitioning of graph over different number of partitions
Repository: spark Updated Branches: refs/heads/master e8d93ee52 - 5284ca78d Enable repartitioning of graph over different number of partitions It is currently very difficult to repartition a graph over a different number of partitions. This PR adds an additional `partitionBy` function that takes the number of partitions. Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com Closes #719 from jegonzal/graph_partitioning_options and squashes the following commits: 730b405 [Joseph E. Gonzalez] adding an additional number of partitions option to partitionBy Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5284ca78 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5284ca78 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5284ca78 Branch: refs/heads/master Commit: 5284ca78d17fb4de9a7019f3bbecf86484c13763 Parents: e8d93ee Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com Authored: Tue Jun 3 20:49:14 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Tue Jun 3 20:49:14 2014 -0700 -- graphx/src/main/scala/org/apache/spark/graphx/Graph.scala | 10 ++ .../scala/org/apache/spark/graphx/PartitionStrategy.scala | 8 +--- .../scala/org/apache/spark/graphx/impl/GraphImpl.scala| 6 +- 3 files changed, 20 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index c4f9d65..14ae50e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -106,10 +106,20 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab /** * Repartitions the edges in the graph according to `partitionStrategy`. + * + * @param the partitioning strategy to use when partitioning the edges in the graph. */ def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] /** + * Repartitions the edges in the graph according to `partitionStrategy`. + * + * @param the partitioning strategy to use when partitioning the edges in the graph. + * @param numPartitions the number of edge partitions in the new graph. + */ + def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] + + /** * Transforms each vertex attribute in the graph using the map function. * * @note The new graph has the same structure. As a consequence the underlying index structures http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index ef412cf..5e7e72a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -114,9 +114,11 @@ object PartitionStrategy { */ case object CanonicalRandomVertexCut extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { - val lower = math.min(src, dst) - val higher = math.max(src, dst) - math.abs((lower, higher).hashCode()) % numParts + if (src dst) { +math.abs((src, dst).hashCode()) % numParts + } else { +math.abs((dst, src).hashCode()) % numParts + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/5284ca78/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 59d9a88..15ea05c 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -74,7 +74,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { -val numPartitions = edges.partitions.size +partitionBy(partitionStrategy, edges.partitions.size) + } + + override def partitionBy( + partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = { val edTag = classTag[ED] val vdTag = classTag[VD] val newEdges =
git commit: initial version of LPA
Repository: spark Updated Branches: refs/heads/master 8f7141fbc - b7e28fa45 initial version of LPA A straightforward implementation of LPA algorithm for detecting graph communities using the Pregel framework. Amongst the growing literature on community detection algorithms in networks, LPA is perhaps the most elementary, and despite its flaws it remains a nice and simple approach. Author: Ankur Dave ankurd...@gmail.com Author: haroldsultan haroldsul...@gmail.com Author: Harold Sultan haroldsul...@gmail.com Closes #905 from haroldsultan/master and squashes the following commits: 327aee0 [haroldsultan] Merge pull request #2 from ankurdave/label-propagation 227a4d0 [Ankur Dave] Untabify 0ac574c [haroldsultan] Merge pull request #1 from ankurdave/label-propagation 0e24303 [Ankur Dave] Add LabelPropagationSuite 84aa061 [Ankur Dave] LabelPropagation: Fix compile errors and style; rename from LPA 9830342 [Harold Sultan] initial version of LPA Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b7e28fa4 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b7e28fa4 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b7e28fa4 Branch: refs/heads/master Commit: b7e28fa451511b3b0f849c3d2919ac9c2e4231a1 Parents: 8f7141f Author: Ankur Dave ankurd...@gmail.com Authored: Thu May 29 15:39:25 2014 -0700 Committer: Ankur Dave ankurd...@gmail.com Committed: Thu May 29 15:39:25 2014 -0700 -- .../spark/graphx/lib/LabelPropagation.scala | 66 .../graphx/lib/LabelPropagationSuite.scala | 45 + 2 files changed, 111 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b7e28fa4/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala new file mode 100644 index 000..776bfb8 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import scala.reflect.ClassTag +import org.apache.spark.graphx._ + +/** Label Propagation algorithm. */ +object LabelPropagation { + /** + * Run static Label Propagation for detecting communities in networks. + * + * Each node in the network is initially assigned to its own community. At every superstep, nodes + * send their community affiliation to all neighbors and update their state to the mode community + * affiliation of incoming messages. + * + * LPA is a standard community detection algorithm for graphs. It is very inexpensive + * computationally, although (1) convergence is not guaranteed and (2) one can end up with + * trivial solutions (all nodes are identified into a single community). + * + * @tparam ED the edge attribute type (not used in the computation) + * + * @param graph the graph for which to compute the community affiliation + * @param maxSteps the number of supersteps of LPA to be performed. Because this is a static + * implementation, the algorithm will run for exactly this many supersteps. + * + * @return a graph with vertex attributes containing the label of community affiliation + */ + def run[ED: ClassTag](graph: Graph[_, ED], maxSteps: Int): Graph[VertexId, ED] = { +val lpaGraph = graph.mapVertices { case (vid, _) = vid } +def sendMessage(e: EdgeTriplet[VertexId, ED]) = { + Iterator((e.srcId, Map(e.dstAttr - 1L)), (e.dstId, Map(e.srcAttr - 1L))) +} +def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long]) + : Map[VertexId, Long] = { + (count1.keySet ++ count2.keySet).map { i = +val count1Val = count1.getOrElse(i, 0L) +val count2Val = count2.getOrElse(i, 0L) +i - (count1Val + count2Val) + }.toMap +} +def vertexProgram(vid: VertexId