I changed the

curGraph = curGraph.outerJoinVertices(curMessages)(
          (vid, vertex, message) =>
vertex.process(message.getOrElse(List[Message]()), ti)
        ).cache()

to

curGraph = curGraph.outerJoinVertices(curMessages)(
          (vid, vertex, message) => (vertex,
message.getOrElse(List[Message]()))
        ).mapVertices( (x,y) => y._1.process( y._2, ti ) ).cache()

So the call to the 'process' method was moved out of the outerJoinVertices
and into a separate mapVertices call, and the problem went away. Now,
'process' is only called once during the correct cycle.
So it would appear that outerJoinVertices caches the closure to be
recalculated if needed again while mapVertices actually caches the derived
values.

Is this a bug or a feature?

Kyle



On Sat, Feb 7, 2015 at 11:44 PM, Kyle Ellrott <kellr...@soe.ucsc.edu> wrote:

> I'm trying to setup a simple iterative message/update problem in GraphX
> (spark 1.2.0), but I'm running into issues with the caching and
> re-calculation of data. I'm trying to follow the example found in the
> Pregel implementation of materializing and cacheing messages and graphs and
> then unpersisting them after the next cycle has been done.
> It doesn't seem to be working, because every cycle gets progressively
> slower and it seems as if more and more of the values are being
> re-calculated despite my attempts to cache them.
>
> The code:
> ```
>       var oldMessages : VertexRDD[List[Message]] = null
>       var oldGraph : Graph[MyVertex, MyEdge ] = null
>       curGraph = curGraph.mapVertices((x, y) => y.init())
>       for (i <- 0 to cycle_count) {
>         val curMessages = curGraph.aggregateMessages[List[Message]](x => {
>           //send messages
>           .....
>         },
>         (x, y) => {
>            //collect messages into lists
>             val out = x ++ y
>             out
>           }
>         ).cache()
>         curMessages.count()
>         val ti = i
>         oldGraph = curGraph
>         curGraph = curGraph.outerJoinVertices(curMessages)(
>           (vid, vertex, message) =>
> vertex.process(message.getOrElse(List[Message]()), ti)
>         ).cache()
>         curGraph.vertices.count()
>         oldGraph.unpersistVertices(blocking = false)
>         oldGraph.edges.unpersist(blocking = false)
>         oldGraph = curGraph
>         if (oldMessages != null ) {
>           oldMessages.unpersist(blocking=false)
>         }
>         oldMessages = curMessages
>       }
> ```
>
> The MyVertex.process method takes the list of incoming messages, averages
> them and returns a new MyVertex object. I've also set it up to append the
> cycle number (the second argument) into a log file named after the vertex.
> What ends up getting dumped into the log file for every vertex (in the
> exact same pattern) is
> ```
> Cycle: 0
> Cycle: 1
> Cycle: 0
> Cycle: 2
> Cycle: 0
> Cycle: 0
> Cycle: 1
> Cycle: 3
> Cycle: 0
> Cycle: 0
> Cycle: 1
> Cycle: 0
> Cycle: 0
> Cycle: 1
> Cycle: 2
> Cycle: 4
> Cycle: 0
> Cycle: 0
> Cycle: 1
> Cycle: 0
> Cycle: 0
> Cycle: 1
> Cycle: 2
> Cycle: 0
> Cycle: 0
> Cycle: 1
> Cycle: 0
> Cycle: 0
> Cycle: 1
> Cycle: 2
> Cycle: 3
> Cycle: 5
> ```
>
> Any ideas about what I might be doing wrong for the caching? And how I can
> avoid re-calculating so many of the values.
>
>
> Kyle
>
>
>

Reply via email to