Dear Spark developers, Below is the GraphX Pregel code snippet from https://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api: (it does not contain caching step):
while (activeMessages > 0 && i < maxIterations) { // Receive the messages: ----------------------------------------------------------------------- // (1st join) Run the vertex program on all vertices that receive messages val newVerts = g.vertices.innerJoin(messages)(vprog).cache() // (2nd join) Merge the new vertex values back into the graph g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache() // Send Messages: ------------------------------------------------------------------------------ // Vertices that didn't receive a message above don't appear in newVerts and therefore don't // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked // on edges in the activeDir of vertices in newVerts messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() activeMessages = messages.count() i += 1 } It seems that the mentioned two joins can be rewritten as one outer join as follows: g = g.outerJoinVertices(messages) { (vid, old, mess) => mess match { case Some(mess) => vprog(vid, old, mess) case None => old } } This code passes PregelSuite (after removing newVerts). Could you elaborate why two joins are used instead of one and why do you need intermediate variable `newVerts`? Are there some performance considerations? Best regards, Alexander