Re: [GraphX] Excessive value recalculations during aggregateMessages cycles

2015-02-15 Thread Takeshi Yamamuro
Hi,

I tried quick and simple tests though, ISTM the vertices below were
correctly cached.
Could you give me the differences between my codes and yours?

import org.apache.spark.graphx._
import org.apache.spark.graphx.lib._

object Prog {
  def processInt(d: Int) = d * 2
}

val g = GraphLoader.edgeListFile(sc, ../temp/graph.txt)
.cache

val g2 = g.outerJoinVertices(g.degrees)(
  (vid, old, msg) = Prog.processInt(msg.getOrElse(0)))
.cache

g2.vertices.count

val g3 = g.outerJoinVertices(g.degrees)(
  (vid, old, msg) = msg.getOrElse(0))
.mapVertices((vid, d) = Prog.processInt(d))
.cache

g3.vertices.count

'g2.vertices.toDebugString' outputs;

(2) VertexRDDImpl[16] at RDD at VertexRDD.scala:57 []
 |  VertexRDD ZippedPartitionsRDD2[15] at zipPartitions at
VertexRDDImpl.scala:121 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at
VertexRDD.scala:319 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 []
 |  ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 []
 +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPartitions at VertexRDD.scala:330 []
|  GraphLoader.edgeListFile - edges (../temp/graph.txt), EdgeRDD,
EdgeRDD MapPartitionsRDD[2] at mapPartitionsWithIndex at Graph...


'g3.vertices.toDebugString' outputs;

(2) VertexRDDImpl[33] at RDD at VertexRDD.scala:57 []
 |  VertexRDD MapPartitionsRDD[32] at mapPartitions at
VertexRDDImpl.scala:96 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD ZippedPartitionsRDD2[24] at zipPartitions at
VertexRDDImpl.scala:121 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  VertexRDD, VertexRDD MapPartitionsRDD[8] at mapPartitions at
VertexRDD.scala:319 []
 |  CachedPartitions: 2; MemorySize: 3.3 KB; TachyonSize: 0.0 B;
DiskSize: 0.0 B
 |  MapPartitionsRDD[7] at mapPartitions at VertexRDD.scala:335 []
 |  ShuffledRDD[6] at partitionBy at VertexRDD.scala:335 []
 +-(2) VertexRDD.createRoutingTables - vid2pid (aggregation)
MapPartitionsRDD[5] at mapPar...

-- maropu

On Mon, Feb 9, 2015 at 5:47 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote:

 I changed the

 curGraph = curGraph.outerJoinVertices(curMessages)(
   (vid, vertex, message) =
 vertex.process(message.getOrElse(List[Message]()), ti)
 ).cache()

 to

 curGraph = curGraph.outerJoinVertices(curMessages)(
   (vid, vertex, message) = (vertex,
 message.getOrElse(List[Message]()))
 ).mapVertices( (x,y) = y._1.process( y._2, ti ) ).cache()

 So the call to the 'process' method was moved out of the outerJoinVertices
 and into a separate mapVertices call, and the problem went away. Now,
 'process' is only called once during the correct cycle.
 So it would appear that outerJoinVertices caches the closure to be
 recalculated if needed again while mapVertices actually caches the
 derived values.

 Is this a bug or a feature?

 Kyle



 On Sat, Feb 7, 2015 at 11:44 PM, Kyle Ellrott kellr...@soe.ucsc.edu
 wrote:

 I'm trying to setup a simple iterative message/update problem in GraphX
 (spark 1.2.0), but I'm running into issues with the caching and
 re-calculation of data. I'm trying to follow the example found in the
 Pregel implementation of materializing and cacheing messages and graphs and
 then unpersisting them after the next cycle has been done.
 It doesn't seem to be working, because every cycle gets progressively
 slower and it seems as if more and more of the values are being
 re-calculated despite my attempts to cache them.

 The code:
 ```
   var oldMessages : VertexRDD[List[Message]] = null
   var oldGraph : Graph[MyVertex, MyEdge ] = null
   curGraph = curGraph.mapVertices((x, y) = y.init())
   for (i - 0 to cycle_count) {
 val curMessages = curGraph.aggregateMessages[List[Message]](x = {
   //send messages
   .
 },
 (x, y) = {
//collect messages into lists
 val out = x ++ y
 out
   }
 ).cache()
 curMessages.count()
 val ti = i
 oldGraph = curGraph
 curGraph = curGraph.outerJoinVertices(curMessages)(
   (vid, vertex, message) =
 vertex.process(message.getOrElse(List[Message]()), ti)
 ).cache()
 curGraph.vertices.count()
 oldGraph.unpersistVertices(blocking = false)
 oldGraph.edges.unpersist(blocking = false)
 oldGraph = curGraph
 if (oldMessages != null ) {
   oldMessages.unpersist(blocking=false)
 }
 oldMessages = curMessages
   }
 ```

 The MyVertex.process method takes the list of incoming messages, averages
 them and returns a new MyVertex object. I've also set it up to 

Re: [GraphX] Excessive value recalculations during aggregateMessages cycles

2015-02-08 Thread Kyle Ellrott
I changed the

curGraph = curGraph.outerJoinVertices(curMessages)(
  (vid, vertex, message) =
vertex.process(message.getOrElse(List[Message]()), ti)
).cache()

to

curGraph = curGraph.outerJoinVertices(curMessages)(
  (vid, vertex, message) = (vertex,
message.getOrElse(List[Message]()))
).mapVertices( (x,y) = y._1.process( y._2, ti ) ).cache()

So the call to the 'process' method was moved out of the outerJoinVertices
and into a separate mapVertices call, and the problem went away. Now,
'process' is only called once during the correct cycle.
So it would appear that outerJoinVertices caches the closure to be
recalculated if needed again while mapVertices actually caches the derived
values.

Is this a bug or a feature?

Kyle



On Sat, Feb 7, 2015 at 11:44 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote:

 I'm trying to setup a simple iterative message/update problem in GraphX
 (spark 1.2.0), but I'm running into issues with the caching and
 re-calculation of data. I'm trying to follow the example found in the
 Pregel implementation of materializing and cacheing messages and graphs and
 then unpersisting them after the next cycle has been done.
 It doesn't seem to be working, because every cycle gets progressively
 slower and it seems as if more and more of the values are being
 re-calculated despite my attempts to cache them.

 The code:
 ```
   var oldMessages : VertexRDD[List[Message]] = null
   var oldGraph : Graph[MyVertex, MyEdge ] = null
   curGraph = curGraph.mapVertices((x, y) = y.init())
   for (i - 0 to cycle_count) {
 val curMessages = curGraph.aggregateMessages[List[Message]](x = {
   //send messages
   .
 },
 (x, y) = {
//collect messages into lists
 val out = x ++ y
 out
   }
 ).cache()
 curMessages.count()
 val ti = i
 oldGraph = curGraph
 curGraph = curGraph.outerJoinVertices(curMessages)(
   (vid, vertex, message) =
 vertex.process(message.getOrElse(List[Message]()), ti)
 ).cache()
 curGraph.vertices.count()
 oldGraph.unpersistVertices(blocking = false)
 oldGraph.edges.unpersist(blocking = false)
 oldGraph = curGraph
 if (oldMessages != null ) {
   oldMessages.unpersist(blocking=false)
 }
 oldMessages = curMessages
   }
 ```

 The MyVertex.process method takes the list of incoming messages, averages
 them and returns a new MyVertex object. I've also set it up to append the
 cycle number (the second argument) into a log file named after the vertex.
 What ends up getting dumped into the log file for every vertex (in the
 exact same pattern) is
 ```
 Cycle: 0
 Cycle: 1
 Cycle: 0
 Cycle: 2
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 3
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 2
 Cycle: 4
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 2
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 0
 Cycle: 0
 Cycle: 1
 Cycle: 2
 Cycle: 3
 Cycle: 5
 ```

 Any ideas about what I might be doing wrong for the caching? And how I can
 avoid re-calculating so many of the values.


 Kyle





[GraphX] Excessive value recalculations during aggregateMessages cycles

2015-02-07 Thread Kyle Ellrott
I'm trying to setup a simple iterative message/update problem in GraphX
(spark 1.2.0), but I'm running into issues with the caching and
re-calculation of data. I'm trying to follow the example found in the
Pregel implementation of materializing and cacheing messages and graphs and
then unpersisting them after the next cycle has been done.
It doesn't seem to be working, because every cycle gets progressively
slower and it seems as if more and more of the values are being
re-calculated despite my attempts to cache them.

The code:
```
  var oldMessages : VertexRDD[List[Message]] = null
  var oldGraph : Graph[MyVertex, MyEdge ] = null
  curGraph = curGraph.mapVertices((x, y) = y.init())
  for (i - 0 to cycle_count) {
val curMessages = curGraph.aggregateMessages[List[Message]](x = {
  //send messages
  .
},
(x, y) = {
   //collect messages into lists
val out = x ++ y
out
  }
).cache()
curMessages.count()
val ti = i
oldGraph = curGraph
curGraph = curGraph.outerJoinVertices(curMessages)(
  (vid, vertex, message) =
vertex.process(message.getOrElse(List[Message]()), ti)
).cache()
curGraph.vertices.count()
oldGraph.unpersistVertices(blocking = false)
oldGraph.edges.unpersist(blocking = false)
oldGraph = curGraph
if (oldMessages != null ) {
  oldMessages.unpersist(blocking=false)
}
oldMessages = curMessages
  }
```

The MyVertex.process method takes the list of incoming messages, averages
them and returns a new MyVertex object. I've also set it up to append the
cycle number (the second argument) into a log file named after the vertex.
What ends up getting dumped into the log file for every vertex (in the
exact same pattern) is
```
Cycle: 0
Cycle: 1
Cycle: 0
Cycle: 2
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 3
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 2
Cycle: 4
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 2
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 2
Cycle: 3
Cycle: 5
```

Any ideas about what I might be doing wrong for the caching? And how I can
avoid re-calculating so many of the values.


Kyle