Repository: spark Updated Branches: refs/heads/branch-2.1 cbbe21777 -> 6916ddc38
[SPARK-18674][SQL] improve the error message of using join ## What changes were proposed in this pull request? The current error message of USING join is quite confusing, for example: ``` scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1") df1: org.apache.spark.sql.DataFrame = [c1: int] scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2") df2: org.apache.spark.sql.DataFrame = [c2: int] scala> df1.join(df2, usingColumn = "c1") org.apache.spark.sql.AnalysisException: using columns ['c1] can not be resolved given input columns: [c1, c2] ;; 'Join UsingJoin(Inner,List('c1)) :- Project [value#1 AS c1#3] : +- LocalRelation [value#1] +- Project [value#7 AS c2#9] +- LocalRelation [value#7] ``` after this PR, it becomes: ``` scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1") df1: org.apache.spark.sql.DataFrame = [c1: int] scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2") df2: org.apache.spark.sql.DataFrame = [c2: int] scala> df1.join(df2, usingColumn = "c1") org.apache.spark.sql.AnalysisException: USING column `c1` can not be resolved with the right join side, the right output is: [c2]; ``` ## How was this patch tested? updated tests Author: Wenchen Fan <wenc...@databricks.com> Closes #16100 from cloud-fan/natural. (cherry picked from commit e6534847100670a22b3b191a0f9d924fab7f3c02) Signed-off-by: Herman van Hovell <hvanhov...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6916ddc3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6916ddc3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6916ddc3 Branch: refs/heads/branch-2.1 Commit: 6916ddc385fc33fa390e541300ca2bb1dbd0599c Parents: cbbe217 Author: Wenchen Fan <wenc...@databricks.com> Authored: Thu Dec 1 11:53:12 2016 -0800 Committer: Herman van Hovell <hvanhov...@databricks.com> Committed: Thu Dec 1 11:53:39 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/Analyzer.scala | 28 ++++-------- .../sql/catalyst/analysis/CheckAnalysis.scala | 6 --- .../spark/sql/catalyst/parser/AstBuilder.scala | 5 +-- .../spark/sql/catalyst/plans/joinTypes.scala | 2 +- .../analysis/ResolveNaturalJoinSuite.scala | 47 +++++++++----------- .../sql/catalyst/parser/PlanParserSuite.scala | 2 +- .../scala/org/apache/spark/sql/Dataset.scala | 2 +- 7 files changed, 34 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e576d53..372a121 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1899,15 +1899,7 @@ class Analyzer( override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case j @ Join(left, right, UsingJoin(joinType, usingCols), condition) if left.resolved && right.resolved && j.duplicateResolved => - // Resolve the column names referenced in using clause from both the legs of join. - val lCols = usingCols.flatMap(col => left.resolveQuoted(col.name, resolver)) - val rCols = usingCols.flatMap(col => right.resolveQuoted(col.name, resolver)) - if ((lCols.length == usingCols.length) && (rCols.length == usingCols.length)) { - val joinNames = lCols.map(exp => exp.name) - commonNaturalJoinProcessing(left, right, joinType, joinNames, None) - } else { - j - } + commonNaturalJoinProcessing(left, right, joinType, usingCols, None) case j @ Join(left, right, NaturalJoin(joinType), condition) if j.resolvedExceptNatural => // find common column names from both sides val joinNames = left.output.map(_.name).intersect(right.output.map(_.name)) @@ -1922,18 +1914,16 @@ class Analyzer( joinNames: Seq[String], condition: Option[Expression]) = { val leftKeys = joinNames.map { keyName => - val joinColumn = left.output.find(attr => resolver(attr.name, keyName)) - assert( - joinColumn.isDefined, - s"$keyName should exist in ${left.output.map(_.name).mkString(",")}") - joinColumn.get + left.output.find(attr => resolver(attr.name, keyName)).getOrElse { + throw new AnalysisException(s"USING column `$keyName` can not be resolved with the " + + s"left join side, the left output is: [${left.output.map(_.name).mkString(", ")}]") + } } val rightKeys = joinNames.map { keyName => - val joinColumn = right.output.find(attr => resolver(attr.name, keyName)) - assert( - joinColumn.isDefined, - s"$keyName should exist in ${right.output.map(_.name).mkString(",")}") - joinColumn.get + right.output.find(attr => resolver(attr.name, keyName)).getOrElse { + throw new AnalysisException(s"USING column `$keyName` can not be resolved with the " + + s"right join side, the right output is: [${right.output.map(_.name).mkString(", ")}]") + } } val joinPairs = leftKeys.zip(rightKeys) http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index db41752..235a799 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -190,12 +190,6 @@ trait CheckAnalysis extends PredicateHelper { case e => } - case j @ Join(_, _, UsingJoin(_, cols), _) => - val from = operator.inputSet.map(_.name).mkString(", ") - failAnalysis( - s"using columns [${cols.mkString(",")}] " + - s"can not be resolved given input columns: [$from] ") - case j @ Join(_, _, _, Some(condition)) if condition.dataType != BooleanType => failAnalysis( s"join condition '${condition.sql}' " + http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 2006844..06f0f5b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -570,10 +570,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { // Resolve the join type and join condition val (joinType, condition) = Option(join.joinCriteria) match { case Some(c) if c.USING != null => - val columns = c.identifier.asScala.map { column => - UnresolvedAttribute.quoted(column.getText) - } - (UsingJoin(baseJoinType, columns), None) + (UsingJoin(baseJoinType, c.identifier.asScala.map(_.getText)), None) case Some(c) if c.booleanExpression != null => (baseJoinType, Option(expression(c.booleanExpression))) case None if join.NATURAL != null => http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala index 61e083e..853e9f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala @@ -100,7 +100,7 @@ case class NaturalJoin(tpe: JoinType) extends JoinType { override def sql: String = "NATURAL " + tpe.sql } -case class UsingJoin(tpe: JoinType, usingColumns: Seq[UnresolvedAttribute]) extends JoinType { +case class UsingJoin(tpe: JoinType, usingColumns: Seq[String]) extends JoinType { require(Seq(Inner, LeftOuter, LeftSemi, RightOuter, FullOuter, LeftAnti).contains(tpe), "Unsupported using join type " + tpe) override def sql: String = "USING " + tpe.sql http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala index 100ec4d..1421d36 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala @@ -38,7 +38,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest { test("natural/using inner join") { val naturalPlan = r1.join(r2, NaturalJoin(Inner), None) - val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None) + val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None) val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c) checkAnalysis(naturalPlan, expected) checkAnalysis(usingPlan, expected) @@ -46,7 +46,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest { test("natural/using left join") { val naturalPlan = r1.join(r2, NaturalJoin(LeftOuter), None) - val usingPlan = r1.join(r2, UsingJoin(LeftOuter, Seq(UnresolvedAttribute("a"))), None) + val usingPlan = r1.join(r2, UsingJoin(LeftOuter, Seq("a")), None) val expected = r1.join(r2, LeftOuter, Some(EqualTo(a, a))).select(a, b, c) checkAnalysis(naturalPlan, expected) checkAnalysis(usingPlan, expected) @@ -54,7 +54,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest { test("natural/using right join") { val naturalPlan = r1.join(r2, NaturalJoin(RightOuter), None) - val usingPlan = r1.join(r2, UsingJoin(RightOuter, Seq(UnresolvedAttribute("a"))), None) + val usingPlan = r1.join(r2, UsingJoin(RightOuter, Seq("a")), None) val expected = r1.join(r2, RightOuter, Some(EqualTo(a, a))).select(a, b, c) checkAnalysis(naturalPlan, expected) checkAnalysis(usingPlan, expected) @@ -62,7 +62,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest { test("natural/using full outer join") { val naturalPlan = r1.join(r2, NaturalJoin(FullOuter), None) - val usingPlan = r1.join(r2, UsingJoin(FullOuter, Seq(UnresolvedAttribute("a"))), None) + val usingPlan = r1.join(r2, UsingJoin(FullOuter, Seq("a")), None) val expected = r1.join(r2, FullOuter, Some(EqualTo(a, a))).select( Alias(Coalesce(Seq(a, a)), "a")(), b, c) checkAnalysis(naturalPlan, expected) @@ -71,7 +71,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest { test("natural/using inner join with no nullability") { val naturalPlan = r3.join(r4, NaturalJoin(Inner), None) - val usingPlan = r3.join(r4, UsingJoin(Inner, Seq(UnresolvedAttribute("b"))), None) + val usingPlan = r3.join(r4, UsingJoin(Inner, Seq("b")), None) val expected = r3.join(r4, Inner, Some(EqualTo(bNotNull, bNotNull))).select( bNotNull, aNotNull, cNotNull) checkAnalysis(naturalPlan, expected) @@ -80,7 +80,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest { test("natural/using left join with no nullability") { val naturalPlan = r3.join(r4, NaturalJoin(LeftOuter), None) - val usingPlan = r3.join(r4, UsingJoin(LeftOuter, Seq(UnresolvedAttribute("b"))), None) + val usingPlan = r3.join(r4, UsingJoin(LeftOuter, Seq("b")), None) val expected = r3.join(r4, LeftOuter, Some(EqualTo(bNotNull, bNotNull))).select( bNotNull, aNotNull, c) checkAnalysis(naturalPlan, expected) @@ -89,7 +89,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest { test("natural/using right join with no nullability") { val naturalPlan = r3.join(r4, NaturalJoin(RightOuter), None) - val usingPlan = r3.join(r4, UsingJoin(RightOuter, Seq(UnresolvedAttribute("b"))), None) + val usingPlan = r3.join(r4, UsingJoin(RightOuter, Seq("b")), None) val expected = r3.join(r4, RightOuter, Some(EqualTo(bNotNull, bNotNull))).select( bNotNull, a, cNotNull) checkAnalysis(naturalPlan, expected) @@ -98,7 +98,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest { test("natural/using full outer join with no nullability") { val naturalPlan = r3.join(r4, NaturalJoin(FullOuter), None) - val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq(UnresolvedAttribute("b"))), None) + val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq("b")), None) val expected = r3.join(r4, FullOuter, Some(EqualTo(bNotNull, bNotNull))).select( Alias(Coalesce(Seq(b, b)), "b")(), a, c) checkAnalysis(naturalPlan, expected) @@ -106,40 +106,35 @@ class ResolveNaturalJoinSuite extends AnalysisTest { } test("using unresolved attribute") { - val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("d"))), None) - val error = intercept[AnalysisException] { - SimpleAnalyzer.checkAnalysis(usingPlan) - } - assert(error.message.contains( - "using columns ['d] can not be resolved given input columns: [b, a, c]")) + assertAnalysisError( + r1.join(r2, UsingJoin(Inner, Seq("d"))), + "USING column `d` can not be resolved with the left join side" :: Nil) + assertAnalysisError( + r1.join(r2, UsingJoin(Inner, Seq("b"))), + "USING column `b` can not be resolved with the right join side" :: Nil) } test("using join with a case sensitive analyzer") { val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c) - { - val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None) - checkAnalysis(usingPlan, expected, caseSensitive = true) - } + val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None) + checkAnalysis(usingPlan, expected, caseSensitive = true) - { - val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("A"))), None) - assertAnalysisError( - usingPlan, - Seq("using columns ['A] can not be resolved given input columns: [b, a, c, a]")) - } + assertAnalysisError( + r1.join(r2, UsingJoin(Inner, Seq("A"))), + "USING column `A` can not be resolved with the left join side" :: Nil) } test("using join with a case insensitive analyzer") { val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c) { - val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("a"))), None) + val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None) checkAnalysis(usingPlan, expected, caseSensitive = false) } { - val usingPlan = r1.join(r2, UsingJoin(Inner, Seq(UnresolvedAttribute("A"))), None) + val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("A")), None) checkAnalysis(usingPlan, expected, caseSensitive = false) } } http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index e5f1f7b..304beb1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -348,7 +348,7 @@ class PlanParserSuite extends PlanTest { val testUsingJoin = (sql: String, jt: JoinType) => { assertEqual( s"select * from t $sql u using(a, b)", - table("t").join(table("u"), UsingJoin(jt, Seq('a.attr, 'b.attr)), None).select(star())) + table("t").join(table("u"), UsingJoin(jt, Seq("a", "b")), None).select(star())) } val testAll = Seq(testUnconditionalJoin, testConditionalJoin, testNaturalJoin, testUsingJoin) val testExistence = Seq(testUnconditionalJoin, testConditionalJoin, testUsingJoin) http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index fcc02e5..133f633 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -776,7 +776,7 @@ class Dataset[T] private[sql]( Join( joined.left, joined.right, - UsingJoin(JoinType(joinType), usingColumns.map(UnresolvedAttribute(_))), + UsingJoin(JoinType(joinType), usingColumns), None) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org