This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 852997d6ed6 [SPARK-38914][SQL] Allow user to insert specified columns 
into insertable view
852997d6ed6 is described below

commit 852997d6ed61f4e098803b96927e7cbbd24e3d7c
Author: morvenhuang <morven.hu...@gmail.com>
AuthorDate: Wed Apr 27 23:35:45 2022 +0800

    [SPARK-38914][SQL] Allow user to insert specified columns into insertable 
view
    
    ### What changes were proposed in this pull request?
    Allow user to insert specified columns into insertable view, for example,
    ```
    CREATE TEMPORARY VIEW v1 (c1 int, c2 string) USING 
org.apache.spark.sql.json.DefaultSource OPTIONS (path 'json_dir')
    INSERT INTO v1(c1) VALUES(100)
    SELECT c1, c2 FROM v1;
    +---+----+
    | c1|  c2|
    +---+----+
    |100|null|
    +---+----+
    ```
    
    ### Why are the changes needed?
    The option spark.sql.defaultColumn.useNullsForMissingDefautValues allows us 
to insert specified columns into table 
(https://issues.apache.org/jira/browse/SPARK-38795), but currently this option 
does not work for insertable view,
    
    To keep consistenct with the behavior of INSERT INTO table, we should also 
allow user to specify columns when running INSERT INTO view.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    ### How was this patch tested?
    New unit tests added.
    
    Closes #36212 from morvenhuang/SPARK-38914.
    
    Authored-by: morvenhuang <morven.hu...@gmail.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../catalyst/analysis/ResolveDefaultColumns.scala  |  6 ++-
 .../org/apache/spark/sql/internal/SQLConf.scala    |  2 +-
 .../org/apache/spark/sql/sources/InsertSuite.scala | 45 ++++++++++++++++++++++
 3 files changed, 51 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
index 447769be300..422a1e422be 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
@@ -79,7 +79,8 @@ case class ResolveDefaultColumns(
           replaceExplicitDefaultColumnValues(analyzer, 
expanded).getOrElse(table)
         replaced
 
-      case i@InsertIntoStatement(_, _, _, project: Project, _, _) =>
+      case i@InsertIntoStatement(_, _, _, project: Project, _, _)
+        if !project.projectList.exists(_.isInstanceOf[Star]) =>
         enclosingInsert = Some(i)
         insertTableSchemaWithoutPartitionColumns = 
getInsertTableSchemaWithoutPartitionColumns
         val expanded: Project = 
addMissingDefaultColumnValues(project).getOrElse(project)
@@ -280,6 +281,9 @@ case class ResolveDefaultColumns(
       case SubqueryAlias(_, r: UnresolvedCatalogRelation) =>
         StructType(r.tableMeta.schema.fields.dropRight(
           enclosingInsert.get.partitionSpec.size))
+      case SubqueryAlias(_, r: View) if r.isTempView =>
+        StructType(r.schema.fields.dropRight(
+          enclosingInsert.get.partitionSpec.size))
       case _ => return None
     }
     // Rearrange the columns in the result schema to match the order of the 
explicit column list,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 0ba870d10e9..8876d780799 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2880,7 +2880,7 @@ object SQLConf {
       .createWithDefault(true)
 
   val USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES =
-    buildConf("spark.sql.defaultColumn.useNullsForMissingDefautValues")
+    buildConf("spark.sql.defaultColumn.useNullsForMissingDefaultValues")
       .internal()
       .doc("When true, and DEFAULT columns are enabled, allow column 
definitions lacking " +
         "explicit default values to behave as if they had specified DEFAULT 
NULL instead. " +
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 04acedb7ead..1312353f537 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -856,6 +856,33 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
+  test("Allow user to insert specified columns into insertable view") {
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"true") {
+      sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
+      checkAnswer(
+        sql("SELECT a, b FROM jsonTable"),
+        (1 to 10).map(i => Row(i, null))
+      )
+
+      sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt")
+      checkAnswer(
+        sql("SELECT a, b FROM jsonTable"),
+        (1 to 10).map(i => Row(i, null))
+      )
+
+      sql("INSERT OVERWRITE TABLE jsonTable(b) SELECT b FROM jt")
+      checkAnswer(
+        sql("SELECT a, b FROM jsonTable"),
+        (1 to 10).map(i => Row(null, s"str$i"))
+      )
+    }
+
+    val message = intercept[AnalysisException] {
+      sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt")
+    }.getMessage
+    assert(message.contains("target table has 2 column(s) but the inserted 
data has 1 column(s)"))
+  }
+
   test("SPARK-38336 INSERT INTO statements with tables with default columns: 
positive tests") {
     // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is 
enabled, and no
     // explicit DEFAULT value is available when the INSERT INTO statement 
provides fewer
@@ -1632,6 +1659,24 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
+  test("SELECT clause with star wildcard") {
+    withTable("t1") {
+      sql("CREATE TABLE t1(c1 int, c2 string) using parquet")
+      sql("INSERT INTO TABLE t1 select * from jt where a=1")
+      checkAnswer(spark.table("t1"), Row(1, "str1"))
+    }
+
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"true") {
+      withTable("t1") {
+        sql("CREATE TABLE t1(c1 int, c2 string, c3 int) using parquet")
+        sql("INSERT INTO TABLE t1 select * from jt where a=1")
+        checkAnswer(spark.table("t1"), Row(1, "str1", null))
+        sql("INSERT INTO TABLE t1 select *, 2 from jt where a=2")
+        checkAnswer(spark.table("t1"), Seq(Row(1, "str1", null), Row(2, 
"str2", 2)))
+      }
+    }
+  }
+
   test("SPARK-37294: insert ANSI intervals into a table partitioned by the 
interval columns") {
     val tbl = "interval_table"
     Seq(PartitionOverwriteMode.DYNAMIC, PartitionOverwriteMode.STATIC).foreach 
{ mode =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to