Repository: spark
Updated Branches:
  refs/heads/master 5340dfaf9 -> b715933fc


[SPARK-9436] [GRAPHX] Pregel simplification patch

Pregel code contains two consecutive joins:
```
g.vertices.innerJoin(messages)(vprog)
...
g = g.outerJoinVertices(newVerts)
{ (vid, old, newOpt) => newOpt.getOrElse(old) }
```
This can be simplified with one join. ankurdave proposed a patch based on our 
discussion in the mailing list: 
https://www.mail-archive.com/devspark.apache.org/msg10316.html

Author: Alexander Ulanov <[email protected]>

Closes #7749 from avulanov/SPARK-9436-pregel and squashes the following commits:

8568e06 [Alexander Ulanov] Pregel simplification patch


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b715933f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b715933f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b715933f

Branch: refs/heads/master
Commit: b715933fc69a49653abdb2fba0818dfc4f35d358
Parents: 5340dfa
Author: Alexander Ulanov <[email protected]>
Authored: Wed Jul 29 13:59:00 2015 -0700
Committer: Ankur Dave <[email protected]>
Committed: Wed Jul 29 13:59:00 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/Pregel.scala  | 23 +++++++++-----------
 1 file changed, 10 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b715933f/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
index cfcf724..2ca60d5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
@@ -127,28 +127,25 @@ object Pregel extends Logging {
     var prevG: Graph[VD, ED] = null
     var i = 0
     while (activeMessages > 0 && i < maxIterations) {
-      // Receive the messages. Vertices that didn't get any messages do not 
appear in newVerts.
-      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
-      // Update the graph with the new vertices.
+      // Receive the messages and update the vertices.
       prevG = g
-      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => 
newOpt.getOrElse(old) }
-      g.cache()
+      g = g.joinVertices(messages)(vprog).cache()
 
       val oldMessages = messages
-      // Send new messages. Vertices that didn't get any messages don't appear 
in newVerts, so don't
-      // get to send messages. We must cache messages so it can be 
materialized on the next line,
-      // allowing us to uncache the previous iteration.
-      messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, 
activeDirection))).cache()
-      // The call to count() materializes `messages`, `newVerts`, and the 
vertices of `g`. This
-      // hides oldMessages (depended on by newVerts), newVerts (depended on by 
messages), and the
-      // vertices of prevG (depended on by newVerts, oldMessages, and the 
vertices of g).
+      // Send new messages, skipping edges where neither side received a 
message. We must cache
+      // messages so it can be materialized on the next line, allowing us to 
uncache the previous
+      // iteration.
+      messages = g.mapReduceTriplets(
+        sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
+      // The call to count() materializes `messages` and the vertices of `g`. 
This hides oldMessages
+      // (depended on by the vertices of g) and the vertices of prevG 
(depended on by oldMessages
+      // and the vertices of g).
       activeMessages = messages.count()
 
       logInfo("Pregel finished iteration " + i)
 
       // Unpersist the RDDs hidden by newly-materialized RDDs
       oldMessages.unpersist(blocking = false)
-      newVerts.unpersist(blocking = false)
       prevG.unpersistVertices(blocking = false)
       prevG.edges.unpersist(blocking = false)
       // count the iteration


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to