Repository: spark
Updated Branches:
  refs/heads/master aad003227 -> 39fb57968


[SPARK-6510][GraphX]: Add Graph#minus method to act as Set#difference

Adds a `Graph#minus` method which will return only unique `VertexId`'s from the 
calling `VertexRDD`.

To demonstrate a basic example with pseudocode:

```
Set((0L,0),(1L,1)).minus(Set((1L,1),(2L,2)))
> Set((0L,0))
```

Author: Brennon York <brennon.y...@capitalone.com>

Closes #5175 from brennonyork/SPARK-6510 and squashes the following commits:

248d5c8 [Brennon York] added minus(VertexRDD[VD]) method to avoid 
createUsingIndex and updated the mask operations to simplify with andNot call
3fb7cce [Brennon York] updated graphx doc to reflect the addition of minus 
method
6575d92 [Brennon York] updated mima exclude
aaa030b [Brennon York] completed graph#minus functionality
7227c0f [Brennon York] beginning work on minus functionality


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39fb5796
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39fb5796
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39fb5796

Branch: refs/heads/master
Commit: 39fb57968352549f2276ac4fcd2b92988ed6fe42
Parents: aad0032
Author: Brennon York <brennon.y...@capitalone.com>
Authored: Thu Mar 26 19:08:09 2015 -0700
Committer: Ankur Dave <ankurd...@gmail.com>
Committed: Thu Mar 26 19:08:09 2015 -0700

----------------------------------------------------------------------
 docs/graphx-programming-guide.md                |  2 ++
 .../org/apache/spark/graphx/VertexRDD.scala     | 16 ++++++++++
 .../graphx/impl/VertexPartitionBaseOps.scala    | 15 +++++++++
 .../spark/graphx/impl/VertexRDDImpl.scala       | 25 +++++++++++++++
 .../apache/spark/graphx/VertexRDDSuite.scala    | 33 ++++++++++++++++++--
 project/MimaExcludes.scala                      |  3 ++
 6 files changed, 92 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/docs/graphx-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md
index c601d79..3f10cb2 100644
--- a/docs/graphx-programming-guide.md
+++ b/docs/graphx-programming-guide.md
@@ -899,6 +899,8 @@ class VertexRDD[VD] extends RDD[(VertexID, VD)] {
   // Transform the values without changing the ids (preserves the internal 
index)
   def mapValues[VD2](map: VD => VD2): VertexRDD[VD2]
   def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2]
+  // Show only vertices unique to this set based on their VertexId's
+  def minus(other: RDD[(VertexId, VD)])
   // Remove vertices from this set that appear in the other set
   def diff(other: VertexRDD[VD]): VertexRDD[VD]
   // Join operators that take advantage of the internal indexing to accelerate 
joins (substantially)

http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index ad4bfe0..a9f04b5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -122,6 +122,22 @@ abstract class VertexRDD[VD](
   def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2]
 
   /**
+   * For each VertexId present in both `this` and `other`, minus will act as a 
set difference
+   * operation returning only those unique VertexId's present in `this`.
+   *
+   * @param other an RDD to run the set operation against
+   */
+  def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD]
+
+  /**
+   * For each VertexId present in both `this` and `other`, minus will act as a 
set difference
+   * operation returning only those unique VertexId's present in `this`.
+   *
+   * @param other a VertexRDD to run the set operation against
+   */
+  def minus(other: VertexRDD[VD]): VertexRDD[VD]
+
+  /**
    * For each vertex present in both `this` and `other`, `diff` returns only 
those vertices with
    * differing values; for values that are different, keeps the values from 
`other`. This is
    * only guaranteed to work if the VertexRDDs share a common ancestor.

http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
index 4fd2548..b90f9fa 100644
--- 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
+++ 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
@@ -88,6 +88,21 @@ private[graphx] abstract class VertexPartitionBaseOps
     this.withMask(newMask)
   }
 
+  /** Hides the VertexId's that are the same between `this` and `other`. */
+  def minus(other: Self[VD]): Self[VD] = {
+    if (self.index != other.index) {
+      logWarning("Minus operations on two VertexPartitions with different 
indexes is slow.")
+      minus(createUsingIndex(other.iterator))
+    } else {
+      self.withMask(self.mask.andNot(other.mask))
+    }
+  }
+
+  /** Hides the VertexId's that are the same between `this` and `other`. */
+  def minus(other: Iterator[(VertexId, VD)]): Self[VD] = {
+    minus(createUsingIndex(other))
+  }
+
   /**
    * Hides vertices that are the same between this and other. For vertices 
that are different, keeps
    * the values from `other`. The indices of `this` and `other` must be the 
same.

http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 125692d..349c854 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -103,6 +103,31 @@ class VertexRDDImpl[VD] private[graphx] (
   override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): 
VertexRDD[VD2] =
     this.mapVertexPartitions(_.map(f))
 
+  override def minus(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+    minus(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
+  }
+
+  override def minus (other: VertexRDD[VD]): VertexRDD[VD] = {
+    other match {
+      case other: VertexRDD[_] if this.partitioner == other.partitioner =>
+        this.withPartitionsRDD[VD](
+          partitionsRDD.zipPartitions(
+            other.partitionsRDD, preservesPartitioning = true) {
+            (thisIter, otherIter) =>
+              val thisPart = thisIter.next()
+              val otherPart = otherIter.next()
+              Iterator(thisPart.minus(otherPart))
+          })
+      case _ =>
+        this.withPartitionsRDD[VD](
+          partitionsRDD.zipPartitions(
+            other.partitionBy(this.partitioner.get), preservesPartitioning = 
true) {
+            (partIter, msgs) => partIter.map(_.minus(msgs))
+          }
+        )
+    }
+  }
+
   override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
     diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 4f7a442..c9443d1 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -47,6 +47,35 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext 
{
     }
   }
 
+  test("minus") {
+    withSpark { sc =>
+      val vertexA = VertexRDD(sc.parallelize(0 until 75, 2).map(i => 
(i.toLong, 0))).cache()
+      val vertexB = VertexRDD(sc.parallelize(25 until 100, 2).map(i => 
(i.toLong, 1))).cache()
+      val vertexC = vertexA.minus(vertexB)
+      assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
+    }
+  }
+
+  test("minus with RDD[(VertexId, VD)]") {
+    withSpark { sc =>
+      val vertexA = VertexRDD(sc.parallelize(0 until 75, 2).map(i => 
(i.toLong, 0))).cache()
+      val vertexB: RDD[(VertexId, Int)] =
+        sc.parallelize(25 until 100, 2).map(i => (i.toLong, 1)).cache()
+      val vertexC = vertexA.minus(vertexB)
+      assert(vertexC.map(_._1).collect.toSet === (0 until 25).toSet)
+    }
+  }
+
+  test("minus with non-equal number of partitions") {
+    withSpark { sc =>
+      val vertexA = VertexRDD(sc.parallelize(0 until 75, 5).map(i => 
(i.toLong, 0)))
+      val vertexB = VertexRDD(sc.parallelize(50 until 100, 2).map(i => 
(i.toLong, 1)))
+      assert(vertexA.partitions.size != vertexB.partitions.size)
+      val vertexC = vertexA.minus(vertexB)
+      assert(vertexC.map(_._1).collect.toSet === (0 until 50).toSet)
+    }
+  }
+
   test("diff") {
     withSpark { sc =>
       val n = 100
@@ -71,7 +100,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext 
{
     }
   }
 
-  test("diff vertices with the non-equal number of partitions") {
+  test("diff vertices with non-equal number of partitions") {
     withSpark { sc =>
       val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => 
(i.toLong, 0)))
       val vertexB = VertexRDD(sc.parallelize(8 until 16, 2).map(i => 
(i.toLong, 1)))
@@ -96,7 +125,7 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext 
{
     }
   }
 
-  test("leftJoin vertices with the non-equal number of partitions") {
+  test("leftJoin vertices with non-equal number of partitions") {
     withSpark { sc =>
       val vertexA = VertexRDD(sc.parallelize(0 until 100, 2).map(i => 
(i.toLong, 1)))
       val vertexB = VertexRDD(

http://git-wip-us.apache.org/repos/asf/spark/blob/39fb5796/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 56f5dbe..b9f4004 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -51,6 +51,9 @@ object MimaExcludes {
               "org.apache.spark.broadcast.HttpBroadcastFactory.newBroadcast"),
             ProblemFilters.exclude[IncompatibleResultTypeProblem](
               
"org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast")
+          ) ++ Seq(
+          // SPARK-6510 Add a Graph#minus method acting as Set#difference
+            
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus")
           )
 
         case v if v.startsWith("1.3") =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to