Dear Spark developers,

Below is the GraphX Pregel code snippet from 
https://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api:
(it does not contain caching step):

while (activeMessages > 0 && i < maxIterations) {
     // Receive the messages: 
-----------------------------------------------------------------------
      // (1st join) Run the vertex program on all vertices that receive messages
      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
      // (2nd join) Merge the new vertex values back into the graph
      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => 
newOpt.getOrElse(old) }.cache()
      // Send Messages: 
------------------------------------------------------------------------------
      // Vertices that didn't receive a message above don't appear in newVerts 
and therefore don't
      // get to send messages.  More precisely the map phase of 
mapReduceTriplets is only invoked
      // on edges in the activeDir of vertices in newVerts
      messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, 
activeDir))).cache()
      activeMessages = messages.count()
      i += 1
    }

It seems that the mentioned two joins can be rewritten as one outer join as 
follows:
g = g.outerJoinVertices(messages) { (vid, old, mess) => mess match {
  case Some(mess) => vprog(vid, old, mess)
  case None => old }
}
This code passes PregelSuite (after removing newVerts). Could you elaborate why 
two joins are used instead of one and why do you need intermediate variable 
`newVerts`? Are there some performance considerations?

Best regards, Alexander

Reply via email to