Repository: spark Updated Branches: refs/heads/master f5abd2712 -> 10b59ba23
[SPARK-2428][SQL] Add except and intersect methods to SchemaRDD. Author: Takuya UESHIN <[email protected]> Closes #1355 from ueshin/issues/SPARK-2428 and squashes the following commits: b6fa264 [Takuya UESHIN] Add except and intersect methods to SchemaRDD. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10b59ba2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10b59ba2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10b59ba2 Branch: refs/heads/master Commit: 10b59ba230cb426f2a5d43cd0a4964a556e24c3f Parents: f5abd27 Author: Takuya UESHIN <[email protected]> Authored: Thu Jul 10 19:27:24 2014 -0700 Committer: Michael Armbrust <[email protected]> Committed: Thu Jul 10 19:27:24 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/SchemaRDD.scala | 20 +++++++++++++++++++ .../org/apache/spark/sql/DslQuerySuite.scala | 21 ++++++++++++++++++++ 2 files changed, 41 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/10b59ba2/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 8bcfc7c..0c95b66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -257,6 +257,26 @@ class SchemaRDD( new SchemaRDD(sqlContext, Union(logicalPlan, otherPlan.logicalPlan)) /** + * Performs a relational except on two SchemaRDDs + * + * @param otherPlan the [[SchemaRDD]] that should be excepted from this one. + * + * @group Query + */ + def except(otherPlan: SchemaRDD): SchemaRDD = + new SchemaRDD(sqlContext, Except(logicalPlan, otherPlan.logicalPlan)) + + /** + * Performs a relational intersect on two SchemaRDDs + * + * @param otherPlan the [[SchemaRDD]] that should be intersected with this one. + * + * @group Query + */ + def intersect(otherPlan: SchemaRDD): SchemaRDD = + new SchemaRDD(sqlContext, Intersect(logicalPlan, otherPlan.logicalPlan)) + + /** * Filters tuples using a function over the value of the specified column. * * {{{ http://git-wip-us.apache.org/repos/asf/spark/blob/10b59ba2/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala index 04ac008..68dae58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala @@ -168,4 +168,25 @@ class DslQuerySuite extends QueryTest { test("zero count") { assert(emptyTableData.count() === 0) } + + test("except") { + checkAnswer( + lowerCaseData.except(upperCaseData), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer(lowerCaseData.except(lowerCaseData), Nil) + checkAnswer(upperCaseData.except(upperCaseData), Nil) + } + + test("intersect") { + checkAnswer( + lowerCaseData.intersect(lowerCaseData), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer(lowerCaseData.intersect(upperCaseData), Nil) + } }
