Repository: spark
Updated Branches:
  refs/heads/branch-2.1 cbbe21777 -> 6916ddc38


[SPARK-18674][SQL] improve the error message of using join

## What changes were proposed in this pull request?

The current error message of USING join is quite confusing, for example:
```
scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1")
df1: org.apache.spark.sql.DataFrame = [c1: int]

scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2")
df2: org.apache.spark.sql.DataFrame = [c2: int]

scala> df1.join(df2, usingColumn = "c1")
org.apache.spark.sql.AnalysisException: using columns ['c1] can not be resolved 
given input columns: [c1, c2] ;;
'Join UsingJoin(Inner,List('c1))
:- Project [value#1 AS c1#3]
:  +- LocalRelation [value#1]
+- Project [value#7 AS c2#9]
   +- LocalRelation [value#7]
```

after this PR, it becomes:
```
scala> val df1 = List(1,2,3).toDS.withColumnRenamed("value", "c1")
df1: org.apache.spark.sql.DataFrame = [c1: int]

scala> val df2 = List(1,2,3).toDS.withColumnRenamed("value", "c2")
df2: org.apache.spark.sql.DataFrame = [c2: int]

scala> df1.join(df2, usingColumn = "c1")
org.apache.spark.sql.AnalysisException: USING column `c1` can not be resolved 
with the right join side, the right output is: [c2];
```

## How was this patch tested?

updated tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #16100 from cloud-fan/natural.

(cherry picked from commit e6534847100670a22b3b191a0f9d924fab7f3c02)
Signed-off-by: Herman van Hovell <hvanhov...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6916ddc3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6916ddc3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6916ddc3

Branch: refs/heads/branch-2.1
Commit: 6916ddc385fc33fa390e541300ca2bb1dbd0599c
Parents: cbbe217
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Dec 1 11:53:12 2016 -0800
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Thu Dec 1 11:53:39 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 28 ++++--------
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  6 ---
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  5 +--
 .../spark/sql/catalyst/plans/joinTypes.scala    |  2 +-
 .../analysis/ResolveNaturalJoinSuite.scala      | 47 +++++++++-----------
 .../sql/catalyst/parser/PlanParserSuite.scala   |  2 +-
 .../scala/org/apache/spark/sql/Dataset.scala    |  2 +-
 7 files changed, 34 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e576d53..372a121 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1899,15 +1899,7 @@ class Analyzer(
     override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators 
{
       case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
           if left.resolved && right.resolved && j.duplicateResolved =>
-        // Resolve the column names referenced in using clause from both the 
legs of join.
-        val lCols = usingCols.flatMap(col => left.resolveQuoted(col.name, 
resolver))
-        val rCols = usingCols.flatMap(col => right.resolveQuoted(col.name, 
resolver))
-        if ((lCols.length == usingCols.length) && (rCols.length == 
usingCols.length)) {
-          val joinNames = lCols.map(exp => exp.name)
-          commonNaturalJoinProcessing(left, right, joinType, joinNames, None)
-        } else {
-          j
-        }
+        commonNaturalJoinProcessing(left, right, joinType, usingCols, None)
       case j @ Join(left, right, NaturalJoin(joinType), condition) if 
j.resolvedExceptNatural =>
         // find common column names from both sides
         val joinNames = 
left.output.map(_.name).intersect(right.output.map(_.name))
@@ -1922,18 +1914,16 @@ class Analyzer(
       joinNames: Seq[String],
       condition: Option[Expression]) = {
     val leftKeys = joinNames.map { keyName =>
-      val joinColumn = left.output.find(attr => resolver(attr.name, keyName))
-      assert(
-        joinColumn.isDefined,
-        s"$keyName should exist in ${left.output.map(_.name).mkString(",")}")
-      joinColumn.get
+      left.output.find(attr => resolver(attr.name, keyName)).getOrElse {
+        throw new AnalysisException(s"USING column `$keyName` can not be 
resolved with the " +
+          s"left join side, the left output is: 
[${left.output.map(_.name).mkString(", ")}]")
+      }
     }
     val rightKeys = joinNames.map { keyName =>
-      val joinColumn = right.output.find(attr => resolver(attr.name, keyName))
-      assert(
-        joinColumn.isDefined,
-        s"$keyName should exist in ${right.output.map(_.name).mkString(",")}")
-      joinColumn.get
+      right.output.find(attr => resolver(attr.name, keyName)).getOrElse {
+        throw new AnalysisException(s"USING column `$keyName` can not be 
resolved with the " +
+          s"right join side, the right output is: 
[${right.output.map(_.name).mkString(", ")}]")
+      }
     }
     val joinPairs = leftKeys.zip(rightKeys)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index db41752..235a799 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -190,12 +190,6 @@ trait CheckAnalysis extends PredicateHelper {
               case e =>
             }
 
-          case j @ Join(_, _, UsingJoin(_, cols), _) =>
-            val from = operator.inputSet.map(_.name).mkString(", ")
-            failAnalysis(
-              s"using columns [${cols.mkString(",")}] " +
-                s"can not be resolved given input columns: [$from] ")
-
           case j @ Join(_, _, _, Some(condition)) if condition.dataType != 
BooleanType =>
             failAnalysis(
               s"join condition '${condition.sql}' " +

http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 2006844..06f0f5b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -570,10 +570,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
         // Resolve the join type and join condition
         val (joinType, condition) = Option(join.joinCriteria) match {
           case Some(c) if c.USING != null =>
-            val columns = c.identifier.asScala.map { column =>
-              UnresolvedAttribute.quoted(column.getText)
-            }
-            (UsingJoin(baseJoinType, columns), None)
+            (UsingJoin(baseJoinType, c.identifier.asScala.map(_.getText)), 
None)
           case Some(c) if c.booleanExpression != null =>
             (baseJoinType, Option(expression(c.booleanExpression)))
           case None if join.NATURAL != null =>

http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
index 61e083e..853e9f3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
@@ -100,7 +100,7 @@ case class NaturalJoin(tpe: JoinType) extends JoinType {
   override def sql: String = "NATURAL " + tpe.sql
 }
 
-case class UsingJoin(tpe: JoinType, usingColumns: Seq[UnresolvedAttribute]) 
extends JoinType {
+case class UsingJoin(tpe: JoinType, usingColumns: Seq[String]) extends 
JoinType {
   require(Seq(Inner, LeftOuter, LeftSemi, RightOuter, FullOuter, 
LeftAnti).contains(tpe),
     "Unsupported using join type " + tpe)
   override def sql: String = "USING " + tpe.sql

http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala
index 100ec4d..1421d36 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveNaturalJoinSuite.scala
@@ -38,7 +38,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
 
   test("natural/using inner join") {
     val naturalPlan = r1.join(r2, NaturalJoin(Inner), None)
-    val usingPlan = r1.join(r2, UsingJoin(Inner, 
Seq(UnresolvedAttribute("a"))), None)
+    val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
     val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)
     checkAnalysis(naturalPlan, expected)
     checkAnalysis(usingPlan, expected)
@@ -46,7 +46,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
 
   test("natural/using left join") {
     val naturalPlan = r1.join(r2, NaturalJoin(LeftOuter), None)
-    val usingPlan = r1.join(r2, UsingJoin(LeftOuter, 
Seq(UnresolvedAttribute("a"))), None)
+    val usingPlan = r1.join(r2, UsingJoin(LeftOuter, Seq("a")), None)
     val expected = r1.join(r2, LeftOuter, Some(EqualTo(a, a))).select(a, b, c)
     checkAnalysis(naturalPlan, expected)
     checkAnalysis(usingPlan, expected)
@@ -54,7 +54,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
 
   test("natural/using right join") {
     val naturalPlan = r1.join(r2, NaturalJoin(RightOuter), None)
-    val usingPlan = r1.join(r2, UsingJoin(RightOuter, 
Seq(UnresolvedAttribute("a"))), None)
+    val usingPlan = r1.join(r2, UsingJoin(RightOuter, Seq("a")), None)
     val expected = r1.join(r2, RightOuter, Some(EqualTo(a, a))).select(a, b, c)
     checkAnalysis(naturalPlan, expected)
     checkAnalysis(usingPlan, expected)
@@ -62,7 +62,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
 
   test("natural/using full outer join") {
     val naturalPlan = r1.join(r2, NaturalJoin(FullOuter), None)
-    val usingPlan = r1.join(r2, UsingJoin(FullOuter, 
Seq(UnresolvedAttribute("a"))), None)
+    val usingPlan = r1.join(r2, UsingJoin(FullOuter, Seq("a")), None)
     val expected = r1.join(r2, FullOuter, Some(EqualTo(a, a))).select(
       Alias(Coalesce(Seq(a, a)), "a")(), b, c)
     checkAnalysis(naturalPlan, expected)
@@ -71,7 +71,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
 
   test("natural/using inner join with no nullability") {
     val naturalPlan = r3.join(r4, NaturalJoin(Inner), None)
-    val usingPlan = r3.join(r4, UsingJoin(Inner, 
Seq(UnresolvedAttribute("b"))), None)
+    val usingPlan = r3.join(r4, UsingJoin(Inner, Seq("b")), None)
     val expected = r3.join(r4, Inner, Some(EqualTo(bNotNull, 
bNotNull))).select(
       bNotNull, aNotNull, cNotNull)
     checkAnalysis(naturalPlan, expected)
@@ -80,7 +80,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
 
   test("natural/using left join with no nullability") {
     val naturalPlan = r3.join(r4, NaturalJoin(LeftOuter), None)
-    val usingPlan = r3.join(r4, UsingJoin(LeftOuter, 
Seq(UnresolvedAttribute("b"))), None)
+    val usingPlan = r3.join(r4, UsingJoin(LeftOuter, Seq("b")), None)
     val expected = r3.join(r4, LeftOuter, Some(EqualTo(bNotNull, 
bNotNull))).select(
       bNotNull, aNotNull, c)
     checkAnalysis(naturalPlan, expected)
@@ -89,7 +89,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
 
   test("natural/using right join with no nullability") {
     val naturalPlan = r3.join(r4, NaturalJoin(RightOuter), None)
-    val usingPlan = r3.join(r4, UsingJoin(RightOuter, 
Seq(UnresolvedAttribute("b"))), None)
+    val usingPlan = r3.join(r4, UsingJoin(RightOuter, Seq("b")), None)
     val expected = r3.join(r4, RightOuter, Some(EqualTo(bNotNull, 
bNotNull))).select(
       bNotNull, a, cNotNull)
     checkAnalysis(naturalPlan, expected)
@@ -98,7 +98,7 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
 
   test("natural/using full outer join with no nullability") {
     val naturalPlan = r3.join(r4, NaturalJoin(FullOuter), None)
-    val usingPlan = r3.join(r4, UsingJoin(FullOuter, 
Seq(UnresolvedAttribute("b"))), None)
+    val usingPlan = r3.join(r4, UsingJoin(FullOuter, Seq("b")), None)
     val expected = r3.join(r4, FullOuter, Some(EqualTo(bNotNull, 
bNotNull))).select(
       Alias(Coalesce(Seq(b, b)), "b")(), a, c)
     checkAnalysis(naturalPlan, expected)
@@ -106,40 +106,35 @@ class ResolveNaturalJoinSuite extends AnalysisTest {
   }
 
   test("using unresolved attribute") {
-    val usingPlan = r1.join(r2, UsingJoin(Inner, 
Seq(UnresolvedAttribute("d"))), None)
-    val error = intercept[AnalysisException] {
-      SimpleAnalyzer.checkAnalysis(usingPlan)
-    }
-    assert(error.message.contains(
-      "using columns ['d] can not be resolved given input columns: [b, a, c]"))
+    assertAnalysisError(
+      r1.join(r2, UsingJoin(Inner, Seq("d"))),
+      "USING column `d` can not be resolved with the left join side" :: Nil)
+    assertAnalysisError(
+      r1.join(r2, UsingJoin(Inner, Seq("b"))),
+      "USING column `b` can not be resolved with the right join side" :: Nil)
   }
 
   test("using join with a case sensitive analyzer") {
     val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)
 
-    {
-      val usingPlan = r1.join(r2, UsingJoin(Inner, 
Seq(UnresolvedAttribute("a"))), None)
-      checkAnalysis(usingPlan, expected, caseSensitive = true)
-    }
+    val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
+    checkAnalysis(usingPlan, expected, caseSensitive = true)
 
-    {
-      val usingPlan = r1.join(r2, UsingJoin(Inner, 
Seq(UnresolvedAttribute("A"))), None)
-      assertAnalysisError(
-        usingPlan,
-        Seq("using columns ['A] can not be resolved given input columns: [b, 
a, c, a]"))
-    }
+    assertAnalysisError(
+      r1.join(r2, UsingJoin(Inner, Seq("A"))),
+      "USING column `A` can not be resolved with the left join side" :: Nil)
   }
 
   test("using join with a case insensitive analyzer") {
     val expected = r1.join(r2, Inner, Some(EqualTo(a, a))).select(a, b, c)
 
     {
-      val usingPlan = r1.join(r2, UsingJoin(Inner, 
Seq(UnresolvedAttribute("a"))), None)
+      val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("a")), None)
       checkAnalysis(usingPlan, expected, caseSensitive = false)
     }
 
     {
-      val usingPlan = r1.join(r2, UsingJoin(Inner, 
Seq(UnresolvedAttribute("A"))), None)
+      val usingPlan = r1.join(r2, UsingJoin(Inner, Seq("A")), None)
       checkAnalysis(usingPlan, expected, caseSensitive = false)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index e5f1f7b..304beb1 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -348,7 +348,7 @@ class PlanParserSuite extends PlanTest {
     val testUsingJoin = (sql: String, jt: JoinType) => {
       assertEqual(
         s"select * from t $sql u using(a, b)",
-        table("t").join(table("u"), UsingJoin(jt, Seq('a.attr, 'b.attr)), 
None).select(star()))
+        table("t").join(table("u"), UsingJoin(jt, Seq("a", "b")), 
None).select(star()))
     }
     val testAll = Seq(testUnconditionalJoin, testConditionalJoin, 
testNaturalJoin, testUsingJoin)
     val testExistence = Seq(testUnconditionalJoin, testConditionalJoin, 
testUsingJoin)

http://git-wip-us.apache.org/repos/asf/spark/blob/6916ddc3/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index fcc02e5..133f633 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -776,7 +776,7 @@ class Dataset[T] private[sql](
       Join(
         joined.left,
         joined.right,
-        UsingJoin(JoinType(joinType), 
usingColumns.map(UnresolvedAttribute(_))),
+        UsingJoin(JoinType(joinType), usingColumns),
         None)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to