I’ve found two PRs (almost identical) for replacing mapReduceTriplets with 
aggregateMessages:
https://github.com/apache/spark/pull/3782
https://github.com/apache/spark/pull/3883
First is closed by Dave’s suggestion, second is stale.
Also there is a PR for the new Pregel API, which is also closed.

Do you know the reason why this improvement is not pushed?

CC’ing Dave

From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: Monday, July 27, 2015 9:11 AM
To: Ulanov, Alexander
Cc: dev@spark.apache.org
Subject: Re: Two joins in GraphX Pregel implementation

Quite possibly - there is a JIRA open for replacing mapReduceTriplets with 
aggregateMessages (don’t recall the number off the top of my head)

Robin
On 27 Jul 2015, at 17:01, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:

Thank you, your explanation does make sense to me. Do you think that one join 
will work if `mapReduceTriplets` is replaced by the new `aggregateMessages`? 
The latter does not return the vertices that did not receive a message.
From: Robin East [mailto:robin.e...@xense.co.uk]
Sent: Monday, July 27, 2015 8:56 AM
To: Ulanov, Alexander
Cc: dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: Re: Two joins in GraphX Pregel implementation

What happens to this line of code:

messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, 
activeDir))).cache()

Part of the Pregel ‘contract’ is that vertices that don’t receive messages from 
the previous superstep don’t get to send messages this superstep. Not sure if 
there is a test for that but there ought to be.

Robin


On 27 Jul 2015, at 16:42, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:

Dear Spark developers,

Below is the GraphX Pregel code snippet from 
https://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api:
(it does not contain caching step):

while (activeMessages > 0 && i < maxIterations) {
     // Receive the messages: 
-----------------------------------------------------------------------
      // (1st join) Run the vertex program on all vertices that receive messages
      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
      // (2nd join) Merge the new vertex values back into the graph
      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => 
newOpt.getOrElse(old) }.cache()
      // Send Messages: 
------------------------------------------------------------------------------
      // Vertices that didn't receive a message above don't appear in newVerts 
and therefore don't
      // get to send messages.  More precisely the map phase of 
mapReduceTriplets is only invoked
      // on edges in the activeDir of vertices in newVerts
      messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, 
activeDir))).cache()
      activeMessages = messages.count()
      i += 1
    }

It seems that the mentioned two joins can be rewritten as one outer join as 
follows:
g = g.outerJoinVertices(messages) { (vid, old, mess) => mess match {
  case Some(mess) => vprog(vid, old, mess)
  case None => old }
}
This code passes PregelSuite (after removing newVerts). Could you elaborate why 
two joins are used instead of one and why do you need intermediate variable 
`newVerts`? Are there some performance considerations?

Best regards, Alexander

Reply via email to