This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 1c23be3  [SPARK-31015][SQL] Star(*) expression fails when used with 
qualified column names for v2 tables
1c23be3 is described below

commit 1c23be3b8addaa5e15d29de788b906bcbdae953b
Author: Terry Kim <yumin...@gmail.com>
AuthorDate: Wed Mar 4 00:55:26 2020 +0800

    [SPARK-31015][SQL] Star(*) expression fails when used with qualified column 
names for v2 tables
    
    ### What changes were proposed in this pull request?
    
    For a v2 table created with `CREATE TABLE testcat.ns1.ns2.tbl (id bigint, 
name string) USING foo`, the following works as expected
    ```
    SELECT testcat.ns1.ns2.tbl.id FROM testcat.ns1.ns2.tbl
    ```
    , but a query with qualified column name with star(*)
    ```
    SELECT testcat.ns1.ns2.tbl.* FROM testcat.ns1.ns2.tbl
    [info]   org.apache.spark.sql.AnalysisException: cannot resolve 
'testcat.ns1.ns2.tbl.*' given input columns 'id, name';
    ```
    fails to resolve. And this PR proposes to fix this issue.
    
    ### Why are the changes needed?
    
    To fix a bug as describe above.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, now `SELECT testcat.ns1.ns2.tbl.* FROM testcat.ns1.ns2.tbl` works as 
expected.
    
    ### How was this patch tested?
    
    Added new test.
    
    Closes #27766 from imback82/fix_star_expression.
    
    Authored-by: Terry Kim <yumin...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit c263c154080e54dd07aaa584913773314c3528e5)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/analysis/unresolved.scala   | 35 ++++++++--------------
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 27 +++++++++++++++++
 2 files changed, 40 insertions(+), 22 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 608f39c..6048d98 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -298,35 +298,26 @@ abstract class Star extends LeafExpression with 
NamedExpression {
 case class UnresolvedStar(target: Option[Seq[String]]) extends Star with 
Unevaluable {
 
   /**
-   * Returns true if the nameParts match the qualifier of the attribute
+   * Returns true if the nameParts is a subset of the last elements of 
qualifier of the attribute.
    *
-   * There are two checks: i) Check if the nameParts match the qualifier fully.
-   * E.g. SELECT db.t1.* FROM db1.t1   In this case, the nameParts is 
Seq("db1", "t1") and
-   * qualifier of the attribute is Seq("db1","t1")
-   * ii) If (i) is not true, then check if nameParts is only a single element 
and it
-   * matches the table portion of the qualifier
-   *
-   * E.g. SELECT t1.* FROM db1.t1  In this case nameParts is Seq("t1") and
-   * qualifier is Seq("db1","t1")
-   * SELECT a.* FROM db1.t1 AS a
-   * In this case nameParts is Seq("a") and qualifier for
-   * attribute is Seq("a")
+   * For example, the following should all return true:
+   *   - `SELECT ns1.ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns1", 
"ns2", "t") and
+   *     qualifier is Seq("ns1", "ns2", "t").
+   *   - `SELECT ns2.t.* FROM ns1.n2.t` where nameParts is Seq("ns2", "t") and
+   *     qualifier is Seq("ns1", "ns2", "t").
+   *   - `SELECT t.* FROM ns1.n2.t` where nameParts is Seq("t") and
+   *     qualifier is Seq("ns1", "ns2", "t").
    */
   private def matchedQualifier(
       attribute: Attribute,
       nameParts: Seq[String],
       resolver: Resolver): Boolean = {
-    val qualifierList = attribute.qualifier
-
-    val matched = nameParts.corresponds(qualifierList)(resolver) || {
-      // check if it matches the table portion of the qualifier
-      if (nameParts.length == 1 && qualifierList.nonEmpty) {
-        resolver(nameParts.head, qualifierList.last)
-      } else {
-        false
-      }
+    val qualifierList = if (nameParts.length == attribute.qualifier.length) {
+      attribute.qualifier
+    } else {
+      attribute.qualifier.takeRight(nameParts.length)
     }
-    matched
+    nameParts.corresponds(qualifierList)(resolver)
   }
 
   override def expand(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index c074b335..bccdce7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2342,6 +2342,33 @@ class DataSourceV2SQLSuite
     assert(e2.message.contains("It is not allowed to add database prefix"))
   }
 
+  test("SPARK-31015: star expression should work for qualified column names 
for v2 tables") {
+    val t = "testcat.ns1.ns2.tbl"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint, name string) USING foo")
+      sql(s"INSERT INTO $t VALUES (1, 'hello')")
+
+      def check(tbl: String): Unit = {
+        checkAnswer(sql(s"SELECT testcat.ns1.ns2.tbl.* FROM $tbl"), Row(1, 
"hello"))
+        checkAnswer(sql(s"SELECT ns1.ns2.tbl.* FROM $tbl"), Row(1, "hello"))
+        checkAnswer(sql(s"SELECT ns2.tbl.* FROM $tbl"), Row(1, "hello"))
+        checkAnswer(sql(s"SELECT tbl.* FROM $tbl"), Row(1, "hello"))
+      }
+
+      // Test with qualified table name "testcat.ns1.ns2.tbl".
+      check(t)
+
+      // Test if current catalog and namespace is respected in column 
resolution.
+      sql("USE testcat.ns1.ns2")
+      check("tbl")
+
+      val ex = intercept[AnalysisException] {
+        sql(s"SELECT ns1.ns2.ns3.tbl.* from $t")
+      }
+      assert(ex.getMessage.contains("cannot resolve 'ns1.ns2.ns3.tbl.*"))
+    }
+  }
+
   private def testV1Command(sqlCommand: String, sqlParams: String): Unit = {
     val e = intercept[AnalysisException] {
       sql(s"$sqlCommand $sqlParams")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to