Repository: spark
Updated Branches:
  refs/heads/master 987760ec0 -> 235a47ce1


Rebuild routing table after Graph.reverse

GraphImpl.reverse used to reverse edges in each partition of the edge RDD but 
preserve the routing table and replicated vertex view, since reversing should 
not affect partitioning.

However, the old routing table would then have incorrect information for 
srcAttrOnly and dstAttrOnly. These RDDs should be switched.

A simple fix is for Graph.reverse to rebuild the routing table and replicated 
vertex view.

Thanks to Bogdan Ghidireac for reporting this issue on the [mailing 
list](http://apache-spark-user-list.1001560.n3.nabble.com/graph-reverse-amp-Pregel-API-td4338.html).

Author: Ankur Dave <[email protected]>

Closes #431 from ankurdave/fix-reverse-bug and squashes the following commits:

75d63cb [Ankur Dave] Rebuild routing table after Graph.reverse


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/235a47ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/235a47ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/235a47ce

Branch: refs/heads/master
Commit: 235a47ce14b3c7523e79ce671355dea7ee06f4b7
Parents: 987760e
Author: Ankur Dave <[email protected]>
Authored: Wed Apr 16 17:15:50 2014 -0700
Committer: Reynold Xin <[email protected]>
Committed: Wed Apr 16 17:15:50 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/graphx/impl/GraphImpl.scala    |  2 +-
 .../test/scala/org/apache/spark/graphx/GraphSuite.scala   | 10 ++++++++++
 2 files changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/235a47ce/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index c2b510a..9eabccd 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -102,7 +102,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
 
   override def reverse: Graph[VD, ED] = {
     val newETable = edges.mapEdgePartitions((pid, part) => part.reverse)
-    new GraphImpl(vertices, newETable, routingTable, replicatedVertexView)
+    GraphImpl(vertices, newETable)
   }
 
   override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): 
Graph[VD2, ED] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/235a47ce/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
index c65e366..d9ba467 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala
@@ -172,6 +172,16 @@ class GraphSuite extends FunSuite with LocalSparkContext {
     }
   }
 
+  test("reverse with join elimination") {
+    withSpark { sc =>
+      val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 
2)))
+      val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
+      val graph = Graph(vertices, edges).reverse
+      val result = graph.mapReduceTriplets[Int](et => Iterator((et.dstId, 
et.srcAttr)), _ + _)
+      assert(result.collect.toSet === Set((1L, 2)))
+    }
+  }
+
   test("subgraph") {
     withSpark { sc =>
       // Create a star graph of 10 veritces.

Reply via email to