Re: [GraphX] Excessive value recalculations during aggregateMessages cycles
Hi, I tried quick and simple tests though, ISTM the vertices below were correctly cached. Could you give me the differences between my codes and yours? import org.apache.spark.graphx._ import org.apache.spark.graphx.lib._ object Prog { def processInt(d: Int) = d * 2 } val g = GraphLoader.edgeListFile(sc, ../temp/graph.txt) .cache val g2 = g.outerJoinVertices(g.degrees)( (vid, old, msg) = Prog.processInt(msg.getOrElse(0))) .cache g2.vertices.count val g3 = g.outerJoinVertices(g.degrees)( (vid, old, msg) = msg.getOrElse(0)) .mapVertices((vid, d) = Prog.processInt(d)) .cache g3.vertices.count 'g2.vertices.toDebugString' outputs; (2) VertexRDDImpl[16] at RDD at VertexRDD.scala:57 [] | VertexRDD ZippedPartitionsRDD2[15] at zipPartitions at VertexRDDImpl.scala:121 [] | CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B | VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at VertexRDD.scala:319 [] | CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B | MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 [] | ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 [] +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:330 [] | GraphLoader.edgeListFile - edges (../temp/graph.txt), EdgeRDD, EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at Graph... 'g3.vertices.toDebugString' outputs; (2) VertexRDDImpl[33] at RDD at VertexRDD.scala:57 [] | VertexRDD MapPartitionsRDD[32] at mapPartitions at VertexRDDImpl.scala:96 [] | CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B | VertexRDD ZippedPartitionsRDD2[24] at zipPartitions at VertexRDDImpl.scala:121 [] | CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B | VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at VertexRDD.scala:319 [] | CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B; DiskSize: 0.0 B | MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 [] | ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 [] +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation) MapPartitionsRDD[5] at mapPar... -- maropu On Mon, Feb 9, 2015 at 5:47 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I changed the curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = vertex.process(message.getOrElse(List[Message]()), ti) ).cache() to curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = (vertex, message.getOrElse(List[Message]())) ).mapVertices( (x,y) = y._1.process( y._2, ti ) ).cache() So the call to the 'process' method was moved out of the outerJoinVertices and into a separate mapVertices call, and the problem went away. Now, 'process' is only called once during the correct cycle. So it would appear that outerJoinVertices caches the closure to be recalculated if needed again while mapVertices actually caches the derived values. Is this a bug or a feature? Kyle On Sat, Feb 7, 2015 at 11:44 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm trying to setup a simple iterative message/update problem in GraphX (spark 1.2.0), but I'm running into issues with the caching and re-calculation of data. I'm trying to follow the example found in the Pregel implementation of materializing and cacheing messages and graphs and then unpersisting them after the next cycle has been done. It doesn't seem to be working, because every cycle gets progressively slower and it seems as if more and more of the values are being re-calculated despite my attempts to cache them. The code: ``` var oldMessages : VertexRDD[List[Message]] = null var oldGraph : Graph[MyVertex, MyEdge ] = null curGraph = curGraph.mapVertices((x, y) = y.init()) for (i - 0 to cycle_count) { val curMessages = curGraph.aggregateMessages[List[Message]](x = { //send messages . }, (x, y) = { //collect messages into lists val out = x ++ y out } ).cache() curMessages.count() val ti = i oldGraph = curGraph curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = vertex.process(message.getOrElse(List[Message]()), ti) ).cache() curGraph.vertices.count() oldGraph.unpersistVertices(blocking = false) oldGraph.edges.unpersist(blocking = false) oldGraph = curGraph if (oldMessages != null ) { oldMessages.unpersist(blocking=false) } oldMessages = curMessages } ``` The MyVertex.process method takes the list of incoming messages, averages them and returns a new MyVertex object. I've also set it up to
Re: [GraphX] Excessive value recalculations during aggregateMessages cycles
I changed the curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = vertex.process(message.getOrElse(List[Message]()), ti) ).cache() to curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = (vertex, message.getOrElse(List[Message]())) ).mapVertices( (x,y) = y._1.process( y._2, ti ) ).cache() So the call to the 'process' method was moved out of the outerJoinVertices and into a separate mapVertices call, and the problem went away. Now, 'process' is only called once during the correct cycle. So it would appear that outerJoinVertices caches the closure to be recalculated if needed again while mapVertices actually caches the derived values. Is this a bug or a feature? Kyle On Sat, Feb 7, 2015 at 11:44 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm trying to setup a simple iterative message/update problem in GraphX (spark 1.2.0), but I'm running into issues with the caching and re-calculation of data. I'm trying to follow the example found in the Pregel implementation of materializing and cacheing messages and graphs and then unpersisting them after the next cycle has been done. It doesn't seem to be working, because every cycle gets progressively slower and it seems as if more and more of the values are being re-calculated despite my attempts to cache them. The code: ``` var oldMessages : VertexRDD[List[Message]] = null var oldGraph : Graph[MyVertex, MyEdge ] = null curGraph = curGraph.mapVertices((x, y) = y.init()) for (i - 0 to cycle_count) { val curMessages = curGraph.aggregateMessages[List[Message]](x = { //send messages . }, (x, y) = { //collect messages into lists val out = x ++ y out } ).cache() curMessages.count() val ti = i oldGraph = curGraph curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = vertex.process(message.getOrElse(List[Message]()), ti) ).cache() curGraph.vertices.count() oldGraph.unpersistVertices(blocking = false) oldGraph.edges.unpersist(blocking = false) oldGraph = curGraph if (oldMessages != null ) { oldMessages.unpersist(blocking=false) } oldMessages = curMessages } ``` The MyVertex.process method takes the list of incoming messages, averages them and returns a new MyVertex object. I've also set it up to append the cycle number (the second argument) into a log file named after the vertex. What ends up getting dumped into the log file for every vertex (in the exact same pattern) is ``` Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 3 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 4 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 3 Cycle: 5 ``` Any ideas about what I might be doing wrong for the caching? And how I can avoid re-calculating so many of the values. Kyle
[GraphX] Excessive value recalculations during aggregateMessages cycles
I'm trying to setup a simple iterative message/update problem in GraphX (spark 1.2.0), but I'm running into issues with the caching and re-calculation of data. I'm trying to follow the example found in the Pregel implementation of materializing and cacheing messages and graphs and then unpersisting them after the next cycle has been done. It doesn't seem to be working, because every cycle gets progressively slower and it seems as if more and more of the values are being re-calculated despite my attempts to cache them. The code: ``` var oldMessages : VertexRDD[List[Message]] = null var oldGraph : Graph[MyVertex, MyEdge ] = null curGraph = curGraph.mapVertices((x, y) = y.init()) for (i - 0 to cycle_count) { val curMessages = curGraph.aggregateMessages[List[Message]](x = { //send messages . }, (x, y) = { //collect messages into lists val out = x ++ y out } ).cache() curMessages.count() val ti = i oldGraph = curGraph curGraph = curGraph.outerJoinVertices(curMessages)( (vid, vertex, message) = vertex.process(message.getOrElse(List[Message]()), ti) ).cache() curGraph.vertices.count() oldGraph.unpersistVertices(blocking = false) oldGraph.edges.unpersist(blocking = false) oldGraph = curGraph if (oldMessages != null ) { oldMessages.unpersist(blocking=false) } oldMessages = curMessages } ``` The MyVertex.process method takes the list of incoming messages, averages them and returns a new MyVertex object. I've also set it up to append the cycle number (the second argument) into a log file named after the vertex. What ends up getting dumped into the log file for every vertex (in the exact same pattern) is ``` Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 3 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 4 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 0 Cycle: 0 Cycle: 1 Cycle: 2 Cycle: 3 Cycle: 5 ``` Any ideas about what I might be doing wrong for the caching? And how I can avoid re-calculating so many of the values. Kyle