I’ve found two PRs (almost identical) for replacing mapReduceTriplets with aggregateMessages: https://github.com/apache/spark/pull/3782 https://github.com/apache/spark/pull/3883 First is closed by Dave’s suggestion, second is stale. Also there is a PR for the new Pregel API, which is also closed.
Do you know the reason why this improvement is not pushed? CC’ing Dave From: Robin East [mailto:robin.e...@xense.co.uk] Sent: Monday, July 27, 2015 9:11 AM To: Ulanov, Alexander Cc: dev@spark.apache.org Subject: Re: Two joins in GraphX Pregel implementation Quite possibly - there is a JIRA open for replacing mapReduceTriplets with aggregateMessages (don’t recall the number off the top of my head) Robin On 27 Jul 2015, at 17:01, Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: Thank you, your explanation does make sense to me. Do you think that one join will work if `mapReduceTriplets` is replaced by the new `aggregateMessages`? The latter does not return the vertices that did not receive a message. From: Robin East [mailto:robin.e...@xense.co.uk] Sent: Monday, July 27, 2015 8:56 AM To: Ulanov, Alexander Cc: dev@spark.apache.org<mailto:dev@spark.apache.org> Subject: Re: Two joins in GraphX Pregel implementation What happens to this line of code: messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() Part of the Pregel ‘contract’ is that vertices that don’t receive messages from the previous superstep don’t get to send messages this superstep. Not sure if there is a test for that but there ought to be. Robin On 27 Jul 2015, at 16:42, Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: Dear Spark developers, Below is the GraphX Pregel code snippet from https://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api: (it does not contain caching step): while (activeMessages > 0 && i < maxIterations) { // Receive the messages: ----------------------------------------------------------------------- // (1st join) Run the vertex program on all vertices that receive messages val newVerts = g.vertices.innerJoin(messages)(vprog).cache() // (2nd join) Merge the new vertex values back into the graph g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache() // Send Messages: ------------------------------------------------------------------------------ // Vertices that didn't receive a message above don't appear in newVerts and therefore don't // get to send messages. More precisely the map phase of mapReduceTriplets is only invoked // on edges in the activeDir of vertices in newVerts messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache() activeMessages = messages.count() i += 1 } It seems that the mentioned two joins can be rewritten as one outer join as follows: g = g.outerJoinVertices(messages) { (vid, old, mess) => mess match { case Some(mess) => vprog(vid, old, mess) case None => old } } This code passes PregelSuite (after removing newVerts). Could you elaborate why two joins are used instead of one and why do you need intermediate variable `newVerts`? Are there some performance considerations? Best regards, Alexander