Repository: spark Updated Branches: refs/heads/master aa6536fa3 -> 45f4c6612
[SPARK-5922][GraphX]: Add diff(other: RDD[VertexId, VD]) in VertexRDD Changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)]. This change maintains backwards compatibility and better unifies the VertexRDD methods to match each other. Author: Brennon York <brennon.y...@capitalone.com> Closes #4733 from brennonyork/SPARK-5922 and squashes the following commits: e800f08 [Brennon York] fixed merge conflicts b9274af [Brennon York] fixed merge conflicts f86375c [Brennon York] fixed minor include line 398ddb4 [Brennon York] fixed merge conflicts aac1810 [Brennon York] updated to aggregateUsingIndex and added test to ensure that method works properly 2af0b88 [Brennon York] removed deprecation line 753c963 [Brennon York] fixed merge conflicts and set preference to use the diff(other: VertexRDD[VD]) method 2c678c6 [Brennon York] added mima exclude to exclude new public diff method from VertexRDD 93186f3 [Brennon York] added back the original diff method to sustain binary compatibility f18356e [Brennon York] changed method invocation of 'diff' to match that of 'innerJoin' and 'leftJoin' from VertexRDD[VD] to RDD[(VertexId, VD)] Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45f4c661 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45f4c661 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45f4c661 Branch: refs/heads/master Commit: 45f4c66122c57011e74c694a424756812ab77d99 Parents: aa6536f Author: Brennon York <brennon.y...@capitalone.com> Authored: Mon Mar 16 01:06:26 2015 -0700 Committer: Ankur Dave <ankurd...@gmail.com> Committed: Mon Mar 16 01:06:26 2015 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/graphx/VertexRDD.scala | 9 +++++++++ .../org/apache/spark/graphx/impl/VertexRDDImpl.scala | 4 ++++ .../scala/org/apache/spark/graphx/VertexRDDSuite.scala | 13 +++++++++++++ project/MimaExcludes.scala | 3 +++ 4 files changed, 29 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 40ecff7..ad4bfe0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -126,6 +126,15 @@ abstract class VertexRDD[VD]( * differing values; for values that are different, keeps the values from `other`. This is * only guaranteed to work if the VertexRDDs share a common ancestor. * + * @param other the other RDD[(VertexId, VD)] with which to diff against. + */ + def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] + + /** + * For each vertex present in both `this` and `other`, `diff` returns only those vertices with + * differing values; for values that are different, keeps the values from `other`. This is + * only guaranteed to work if the VertexRDDs share a common ancestor. + * * @param other the other VertexRDD with which to diff against. */ def diff(other: VertexRDD[VD]): VertexRDD[VD] http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala ---------------------------------------------------------------------- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala index 904be21..125692d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexRDDImpl.scala @@ -103,6 +103,10 @@ class VertexRDDImpl[VD] private[graphx] ( override def mapValues[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexRDD[VD2] = this.mapVertexPartitions(_.map(f)) + override def diff(other: RDD[(VertexId, VD)]): VertexRDD[VD] = { + diff(this.aggregateUsingIndex(other, (a: VD, b: VD) => a)) + } + override def diff(other: VertexRDD[VD]): VertexRDD[VD] = { val otherPartition = other match { case other: VertexRDD[_] if this.partitioner == other.partitioner => http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala ---------------------------------------------------------------------- diff --git a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala index 97533dd..4f7a442 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/VertexRDDSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.graphx import org.scalatest.FunSuite import org.apache.spark.{HashPartitioner, SparkContext} +import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel class VertexRDDSuite extends FunSuite with LocalSparkContext { @@ -58,6 +59,18 @@ class VertexRDDSuite extends FunSuite with LocalSparkContext { } } + test("diff with RDD[(VertexId, VD)]") { + withSpark { sc => + val n = 100 + val verts = vertices(sc, n).cache() + val flipEvens: RDD[(VertexId, Int)] = + sc.parallelize(0L to 100L) + .map(id => if (id % 2 == 0) (id, -id.toInt) else (id, id.toInt)).cache() + // diff should keep only the changed vertices + assert(verts.diff(flipEvens).map(_._2).collect().toSet === (2 to n by 2).map(-_).toSet) + } + } + test("diff vertices with the non-equal number of partitions") { withSpark { sc => val vertexA = VertexRDD(sc.parallelize(0 until 24, 3).map(i => (i.toLong, 0))) http://git-wip-us.apache.org/repos/asf/spark/blob/45f4c661/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 627b2ce..a6b07fa 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -181,6 +181,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.TestClock") + ) ++ Seq( + // SPARK-5922 Adding a generalized diff(other: RDD[(VertexId, VD)]) to VertexRDD + ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.diff") ) case v if v.startsWith("1.2") => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org