Repository: spark
Updated Branches:
  refs/heads/master aa6536fa3 -> 45f4c6612


[SPARK-5922][GraphX]: Add diff(other: RDD[VertexId, VD]) in VertexRDD

Changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' 
from VertexRDD[VD] to RDD[(VertexId, VD)]. This change maintains backwards 
compatibility and better unifies the VertexRDD methods to match each other.

Author: Brennon York <brennon.y...@capitalone.com>

Closes #4733 from brennonyork/SPARK-5922 and squashes the following commits:

e800f08 [Brennon York] fixed merge conflicts
b9274af [Brennon York] fixed merge conflicts
f86375c [Brennon York] fixed minor include line
398ddb4 [Brennon York] fixed merge conflicts
aac1810 [Brennon York] updated to aggregateUsingIndex and added test to ensure 
that method works properly
2af0b88 [Brennon York] removed deprecation line
753c963 [Brennon York] fixed merge conflicts and set preference to use the 
diff(other: VertexRDD[VD]) method
2c678c6 [Brennon York] added mima exclude to exclude new public diff method 
from VertexRDD
93186f3 [Brennon York] added back the original diff method to sustain binary 
compatibility
f18356e [Brennon York] changed method invocation of 'diff' to match that of 
'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45f4c661
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45f4c661
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45f4c661

Branch: refs/heads/master
Commit: 45f4c66122c57011e74c694a424756812ab77d99
Parents: aa6536f
Author: Brennon York <brennon.y...@capitalone.com>
Authored: Mon Mar 16 01:06:26 2015 -0700
Committer: Ankur Dave <ankurd...@gmail.com>
Committed: Mon Mar 16 01:06:26 2015 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/graphx/VertexRDD.scala |  9 +++++++++
 .../org/apache/spark/graphx/impl/VertexRDDImpl.scala   |  4 ++++
 .../scala/org/apache/spark/graphx/VertexRDDSuite.scala | 13 +++++++++++++
 project/MimaExcludes.scala                             |  3 +++
 4 files changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 40ecff7..ad4bfe0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -126,6 +126,15 @@ abstract class VertexRDD[VD](
    * differing values; for values that are different, keeps the values from 
`other`. This is
    * only guaranteed to work if the VertexRDDs share a common ancestor.
    *
+   * @param other the other RDD[(VertexId, VD)] with which to diff against.
+   */
+  def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD]
+
+  /**
+   * For each vertex present in both `this` and `other`, `diff` returns only 
those vertices with
+   * differing values; for values that are different, keeps the values from 
`other`. This is
+   * only guaranteed to work if the VertexRDDs share a common ancestor.
+   *
    * @param other the other VertexRDD with which to diff against.
    */
   def diff(other: VertexRDD[VD]): VertexRDD[VD]

http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
----------------------------------------------------------------------
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
index 904be21..125692d 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala
@@ -103,6 +103,10 @@ class VertexRDDImpl[VD] private[graphx] (
   override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): 
VertexRDD[VD2] =
     this.mapVertexPartitions(_.map(f))
 
+  override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = {
+    diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a))
+  }
+
   override def diff(other: VertexRDD[VD]): VertexRDD[VD] = {
     val otherPartition = other match {
       case other: VertexRDD[_] if this.partitioner == other.partitioner =>

http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
index 97533dd..4f7a442 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.graphx
 import org.scalatest.FunSuite
 
 import org.apache.spark.{HashPartitioner, SparkContext}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
 class VertexRDDSuite extends FunSuite with LocalSparkContext {
@@ -58,6 +59,18 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext 
{
     }
   }
 
+  test("diff with RDD[(VertexId, VD)]") {
+    withSpark { sc =>
+      val n = 100
+      val verts = vertices(sc, n).cache()
+      val flipEvens: RDD[(VertexId, Int)] =
+        sc.parallelize(0L to 100L)
+          .map(id => if (id % 2 == 0) (id, -id.toInt) else (id, 
id.toInt)).cache()
+      // diff should keep only the changed vertices
+      assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 
2).map(-_).toSet)
+    }
+  }
+
   test("diff vertices with the non-equal number of partitions") {
     withSpark { sc =>
       val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => 
(i.toLong, 0)))

http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 627b2ce..a6b07fa 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -181,6 +181,9 @@ object MimaExcludes {
             
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
             
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),
             
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock")
+          ) ++ Seq(
+            // SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, 
VD)]) to VertexRDD
+            
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff")
           )
 
         case v if v.startsWith("1.2") =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to