This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d27cf97e48c [SPARK-38869][SQL] Respect table capability 
ACCEPT_ANY_SCHEMA in DEFAULT column resolution
d27cf97e48c is described below

commit d27cf97e48c19e3f6496ec0f272fb3288e992ba8
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Mon May 23 22:41:04 2022 +0800

    [SPARK-38869][SQL] Respect table capability ACCEPT_ANY_SCHEMA in DEFAULT 
column resolution
    
    ### What changes were proposed in this pull request?
    
    Respect table capability ACCEPT_ANY_SCHEMA in DEFAULT column resolution by 
leaving such tables unchanged by any DEFAULT column resolution logic.
    
    ### Why are the changes needed?
    
    Tables with ACCEPT_ANY_SCHEMA capability declare themselves to accept 
inserted rows of any schema, so we should not add extra columns to INSERT 
VALUES lists or projections corresponding to any DEFAULT columns in the target 
table schema.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    This PR introduces new unit test coverage.
    
    Closes #36475 from dtenedor/default-any-schema.
    
    Authored-by: Daniel Tenedorio <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../catalyst/analysis/ResolveDefaultColumns.scala  | 11 +++--
 .../execution/command/PlanResolutionSuite.scala    | 57 ++++++++++++++++++++--
 2 files changed, 60 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
index 792a0d8cf24..a91e4652ba4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
@@ -502,12 +502,16 @@ case class ResolveDefaultColumns(
    * Returns the schema for the target table of a DML command, looking into 
the catalog if needed.
    */
   private def getSchemaForTargetTable(table: LogicalPlan): Option[StructType] 
= {
-    // Check if the target table is already resolved. If so, return the 
computed schema.
-    // Note that we use 'collectFirst' to descend past any SubqueryAlias nodes 
that may be present.
+    // First find the source relation. Note that we use 'collectFirst' to 
descend past any
+    // SubqueryAlias nodes that may be present.
     val source: Option[LogicalPlan] = table.collectFirst {
-      case r: NamedRelation => r
+      case r: NamedRelation if !r.skipSchemaResolution =>
+        // Here we only resolve the default columns in the tables that require 
schema resolution
+        // during write operations.
+        r
       case r: UnresolvedCatalogRelation => r
     }
+    // Check if the target table is already resolved. If so, return the 
computed schema.
     source.map { r =>
       if (r.schema.fields.nonEmpty) {
         return Some(r.schema)
@@ -521,6 +525,7 @@ case class ResolveDefaultColumns(
       case Some(r: UnresolvedCatalogRelation) => r.tableMeta.identifier
       case _ => return None
     }
+
     val lookup: LogicalPlan = try {
       catalog.lookupRelation(tableName)
     } catch {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 42103e54a9c..b6d41f39184 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -26,12 +26,12 @@ import org.mockito.invocation.InvocationOnMock
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, 
Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedDBObjectName, 
ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, 
UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, 
Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedDBObjectName, 
ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, 
UnresolvedInlineTable, UnresolvedRelation, UnresolvedSubqueryColumnAliases, 
UnresolvedTable}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, 
CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{AnsiCast, 
AttributeReference, Cast, EqualTo, Expression, InSubquery, IntegerLiteral, 
ListQuery, Literal, StringLiteral}
 import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
-import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, 
AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, 
DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, 
LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project, 
SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, 
UnsetTableProperties, UpdateAction, UpdateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, 
AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, 
DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, 
InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, 
OneRowRelation, Project, SetTableLocation, SetTableProperties, 
ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, 
UpdateTable}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
 import org.apache.spark.sql.connector.FakeV2Provider
@@ -73,6 +73,7 @@ class PlanResolutionSuite extends AnalysisTest {
 
   private val tableWithAcceptAnySchemaCapability: Table = {
     val t = mock(classOf[Table])
+    when(t.name()).thenReturn("v2TableWithAcceptAnySchemaCapability")
     when(t.schema()).thenReturn(new StructType().add("i", "int"))
     
when(t.capabilities()).thenReturn(Collections.singleton(TableCapability.ACCEPT_ANY_SCHEMA))
     t
@@ -1024,6 +1025,9 @@ class PlanResolutionSuite extends AnalysisTest {
       val sql7 = s"UPDATE defaultvalues SET i=DEFAULT, s=DEFAULT"
       val sql8 = s"UPDATE $tblName SET name='Robert', age=32 WHERE p=DEFAULT"
       val sql9 = s"UPDATE defaultvalues2 SET i=DEFAULT"
+      // Note: 'i' is the correct column name, but since the table has 
ACCEPT_ANY_SCHEMA capability,
+      // DEFAULT column resolution should skip this table.
+      val sql10 = s"UPDATE v2TableWithAcceptAnySchemaCapability SET i=DEFAULT"
 
       val parsed1 = parseAndResolve(sql1)
       val parsed2 = parseAndResolve(sql2)
@@ -1033,6 +1037,7 @@ class PlanResolutionSuite extends AnalysisTest {
       val parsed6 = parseAndResolve(sql6)
       val parsed7 = parseAndResolve(sql7, true)
       val parsed9 = parseAndResolve(sql9, true)
+      val parsed10 = parseAndResolve(sql10)
 
       parsed1 match {
         case UpdateTable(
@@ -1146,6 +1151,23 @@ class PlanResolutionSuite extends AnalysisTest {
 
         case _ => fail("Expect UpdateTable, but got:\n" + parsed9.treeString)
       }
+
+      parsed10 match {
+        case u: UpdateTable =>
+          assert(u.assignments.size == 1)
+          u.assignments(0).key match {
+            case i: AttributeReference =>
+              assert(i.name == "i")
+          }
+          u.assignments(0).value match {
+            case d: UnresolvedAttribute =>
+              assert(d.name == "DEFAULT")
+          }
+
+        case _ =>
+          fail("Expect UpdateTable, but got:\n" + parsed10.treeString)
+      }
+
     }
 
     val sql1 = "UPDATE non_existing SET id=1"
@@ -1178,6 +1200,31 @@ class PlanResolutionSuite extends AnalysisTest {
     }
   }
 
+  test("SPARK-38869 INSERT INTO table with ACCEPT_ANY_SCHEMA capability") {
+    // Note: 'i' is the correct column name, but since the table has 
ACCEPT_ANY_SCHEMA capability,
+    // DEFAULT column resolution should skip this table.
+    val sql1 = s"INSERT INTO v2TableWithAcceptAnySchemaCapability 
VALUES(DEFAULT)"
+    val sql2 = s"INSERT INTO v2TableWithAcceptAnySchemaCapability SELECT 
DEFAULT"
+    val parsed1 = parseAndResolve(sql1)
+    val parsed2 = parseAndResolve(sql2)
+    parsed1 match {
+      case InsertIntoStatement(
+        _, _, _,
+        UnresolvedInlineTable(_, 
Seq(Seq(UnresolvedAttribute(Seq("DEFAULT"))))),
+        _, _) =>
+
+      case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
+    }
+    parsed2 match {
+      case InsertIntoStatement(
+        _, _, _,
+        Project(Seq(UnresolvedAttribute(Seq("DEFAULT"))), _),
+        _, _) =>
+
+      case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
+    }
+  }
+
   test("alter table: alter column") {
     Seq("v1Table" -> true, "v2Table" -> false, "testcat.tab" -> false).foreach 
{
       case (tblName, useV1Command) =>
@@ -1906,9 +1953,9 @@ class PlanResolutionSuite extends AnalysisTest {
          |MERGE INTO v2TableWithAcceptAnySchemaCapability AS target
          |USING v2Table AS source
          |ON target.i = source.i
-         |WHEN MATCHED AND (target.s='delete') THEN DELETE
+         |WHEN MATCHED AND (target.s='delete')THEN DELETE
          |WHEN MATCHED AND (target.s='update') THEN UPDATE SET target.s = 
source.s
-         |WHEN NOT MATCHED AND (target.s='insert')
+         |WHEN NOT MATCHED AND (target.s=DEFAULT)
          |  THEN INSERT (target.i, target.s) values (source.i, source.s)
        """.stripMargin
 
@@ -1924,7 +1971,7 @@ class PlanResolutionSuite extends AnalysisTest {
               updateAssigns)),
           Seq(
             InsertAction(
-              Some(EqualTo(il: UnresolvedAttribute, StringLiteral("insert"))),
+              Some(EqualTo(il: UnresolvedAttribute, 
UnresolvedAttribute(Seq("DEFAULT")))),
               insertAssigns))) =>
         assert(l.name == "target.i" && r.name == "source.i")
         assert(dl.name == "target.s")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to