Repository: spark Updated Branches: refs/heads/master 4352a2fda -> 50561f439
[SPARK-2235][SQL]Spark SQL basicOperator add Intersect operator Hi all, I want to submit a basic operator Intersect For example , in sql case select * from table1 intersect select * from table2 So ,i want use this operator support this function in Spark SQL This operator will return the the intersection of SparkPlan child table RDD . JIRA:https://issues.apache.org/jira/browse/SPARK-2235 Author: Yanjie Gao <[email protected]> Author: YanjieGao <[email protected]> Closes #1150 from YanjieGao/patch-5 and squashes the following commits: 4629afe [YanjieGao] reformat the code bdc2ac0 [YanjieGao] reformat the code as Michael's suggestion 3b29ad6 [YanjieGao] Merge remote branch 'upstream/master' into patch-5 1cfbfe6 [YanjieGao] refomat some files ea78f33 [YanjieGao] resolve conflict and add annotation on basicOperator and remove HiveQl 0c7cca5 [YanjieGao] modify format problem a802ca8 [YanjieGao] Merge remote branch 'upstream/master' into patch-5 5e374c7 [YanjieGao] resolve conflict in SparkStrategies and basicOperator f7961f6 [Yanjie Gao] update the line less than bdc4a05 [Yanjie Gao] Update basicOperators.scala 0b49837 [Yanjie Gao] delete the annotation f1288b4 [Yanjie Gao] delete annotation e2b64be [Yanjie Gao] Update basicOperators.scala 4dd453e [Yanjie Gao] Update SQLQuerySuite.scala 790765d [Yanjie Gao] Update SparkStrategies.scala ac73e60 [Yanjie Gao] Update basicOperators.scala d4ac5e5 [Yanjie Gao] Update HiveQl.scala 61e88e7 [Yanjie Gao] Update SqlParser.scala 469f099 [Yanjie Gao] Update basicOperators.scala e5bff61 [Yanjie Gao] Spark SQL basicOperator add Intersect operator Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50561f43 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50561f43 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50561f43 Branch: refs/heads/master Commit: 50561f4396be7d641feb2a7a54a374d294628231 Parents: 4352a2f Author: Yanjie Gao <[email protected]> Authored: Mon Jul 7 19:40:04 2014 -0700 Committer: Michael Armbrust <[email protected]> Committed: Mon Jul 7 19:40:04 2014 -0700 ---------------------------------------------------------------------- .../org/apache/spark/sql/catalyst/SqlParser.scala | 2 ++ .../sql/catalyst/plans/logical/basicOperators.scala | 5 +++++ .../apache/spark/sql/execution/SparkStrategies.scala | 2 ++ .../apache/spark/sql/execution/basicOperators.scala | 13 +++++++++++++ .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 11 +++++++++++ 5 files changed, 33 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/50561f43/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index ecb1112..e5653c5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -118,6 +118,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") + protected val INTERSECT = Keyword("INTERSECT") protected val EXCEPT = Keyword("EXCEPT") @@ -140,6 +141,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected lazy val query: Parser[LogicalPlan] = ( select * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | + INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } | EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) http://git-wip-us.apache.org/repos/asf/spark/blob/50561f43/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index 0728fa7..1537de2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -210,3 +210,8 @@ case class Distinct(child: LogicalPlan) extends UnaryNode { case object NoRelation extends LeafNode { override def output = Nil } + +case class Intersect(left: LogicalPlan, right: LogicalPlan) extends BinaryNode { + override def output = left.output + override def references = Set.empty +} http://git-wip-us.apache.org/repos/asf/spark/blob/50561f43/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 9e036e1..7080074 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -275,6 +275,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Union(unionChildren.map(planLater))(sqlContext) :: Nil case logical.Except(left,right) => execution.Except(planLater(left),planLater(right)) :: Nil + case logical.Intersect(left, right) => + execution.Intersect(planLater(left), planLater(right)) :: Nil case logical.Generate(generator, join, outer, _, child) => execution.Generate(generator, join = join, outer = outer, planLater(child)) :: Nil case logical.NoRelation => http://git-wip-us.apache.org/repos/asf/spark/blob/50561f43/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 4b59e0b..e8816f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -220,3 +220,16 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode { } } +/** + * :: DeveloperApi :: + * Returns the rows in left that also appear in right using the built in spark + * intersection function. + */ +@DeveloperApi +case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode { + override def output = children.head.output + + override def execute() = { + left.execute().map(_.copy()).intersection(right.execute().map(_.copy())) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/50561f43/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 5c6701e..fa1f32f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -385,6 +385,17 @@ class SQLQuerySuite extends QueryTest { sql("SELECT * FROM upperCaseData EXCEPT SELECT * FROM upperCaseData "), Nil) } + test("INTERSECT") { + checkAnswer( + sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM lowerCaseData"), + (1, "a") :: + (2, "b") :: + (3, "c") :: + (4, "d") :: Nil) + checkAnswer( + sql("SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData"), Nil) + } + test("SET commands semantics using sql()") { TestSQLContext.settings.synchronized { clear()
