I'm trying to setup a simple iterative message/update problem in GraphX
(spark 1.2.0), but I'm running into issues with the caching and
re-calculation of data. I'm trying to follow the example found in the
Pregel implementation of materializing and cacheing messages and graphs and
then unpersisting them after the next cycle has been done.
It doesn't seem to be working, because every cycle gets progressively
slower and it seems as if more and more of the values are being
re-calculated despite my attempts to cache them.

The code:
```
      var oldMessages : VertexRDD[List[Message]] = null
      var oldGraph : Graph[MyVertex, MyEdge ] = null
      curGraph = curGraph.mapVertices((x, y) => y.init())
      for (i <- 0 to cycle_count) {
        val curMessages = curGraph.aggregateMessages[List[Message]](x => {
          //send messages
          .....
        },
        (x, y) => {
           //collect messages into lists
            val out = x ++ y
            out
          }
        ).cache()
        curMessages.count()
        val ti = i
        oldGraph = curGraph
        curGraph = curGraph.outerJoinVertices(curMessages)(
          (vid, vertex, message) =>
vertex.process(message.getOrElse(List[Message]()), ti)
        ).cache()
        curGraph.vertices.count()
        oldGraph.unpersistVertices(blocking = false)
        oldGraph.edges.unpersist(blocking = false)
        oldGraph = curGraph
        if (oldMessages != null ) {
          oldMessages.unpersist(blocking=false)
        }
        oldMessages = curMessages
      }
```

The MyVertex.process method takes the list of incoming messages, averages
them and returns a new MyVertex object. I've also set it up to append the
cycle number (the second argument) into a log file named after the vertex.
What ends up getting dumped into the log file for every vertex (in the
exact same pattern) is
```
Cycle: 0
Cycle: 1
Cycle: 0
Cycle: 2
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 3
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 2
Cycle: 4
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 2
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 0
Cycle: 0
Cycle: 1
Cycle: 2
Cycle: 3
Cycle: 5
```

Any ideas about what I might be doing wrong for the caching? And how I can
avoid re-calculating so many of the values.


Kyle

Reply via email to