This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 079594ae976 Revert "[SPARK-43313][SQL] Adding missing column DEFAULT 
values for MERGE INSERT actions"
079594ae976 is described below

commit 079594ae976b377459ad09d864106734ef65c32d
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Thu May 18 18:39:38 2023 +0800

    Revert "[SPARK-43313][SQL] Adding missing column DEFAULT values for MERGE 
INSERT actions"
    
    This reverts commit 3a0e6bde2aaa11e1165f4fde040ff02e1743795e.
---
 .../connector/write/SupportsCustomSchemaWrite.java |  38 ------
 .../catalyst/analysis/ResolveDefaultColumns.scala  |  56 +-------
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  |  32 +----
 .../analysis/ResolveDefaultColumnsSuite.scala      | 151 +--------------------
 .../org/apache/spark/sql/sources/InsertSuite.scala |   4 +-
 5 files changed, 16 insertions(+), 265 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsCustomSchemaWrite.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsCustomSchemaWrite.java
deleted file mode 100644
index 9435625a1c4..00000000000
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsCustomSchemaWrite.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.connector.write;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.types.StructType;
-
-/**
- * Trait for tables that support custom schemas for write operations including 
INSERT INTO commands
- * whose target table columns have explicit or implicit default values.
- *
- * @since 3.4.1
- */
-@Evolving
-public interface SupportsCustomSchemaWrite {
-    /**
-     * Represents a table with a custom schema to use for resolving DEFAULT 
column references when
-     * inserting into the table. For example, this can be useful for excluding 
hidden pseudocolumns.
-     *
-     * @return the new schema to use for this process.
-     */
-    StructType customSchemaForInserts();
-}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
index 13e9866645a..d028d4ff596 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
@@ -18,14 +18,13 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
-import org.apache.spark.sql.connector.write.SupportsCustomSchemaWrite
+import org.apache.spark.sql.connector.catalog.CatalogV2Util
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
@@ -215,11 +214,8 @@ case class ResolveDefaultColumns(
         throw 
QueryCompilationErrors.defaultReferencesNotAllowedInMergeCondition()
       }
     }
-    val columnsWithDefaults = ArrayBuffer.empty[String]
     val defaultExpressions: Seq[Expression] = schema.fields.map {
-      case f if f.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) =>
-        columnsWithDefaults.append(normalizeFieldName(f.name))
-        analyze(f, "MERGE")
+      case f if f.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) => 
analyze(f, "MERGE")
       case _ => Literal(null)
     }
     val columnNamesToExpressions: Map[String, Expression] =
@@ -232,8 +228,7 @@ case class ResolveDefaultColumns(
       }.getOrElse(action)
     }
     val newNotMatchedActions: Seq[MergeAction] = m.notMatchedActions.map { 
action: MergeAction =>
-      val expanded = addMissingDefaultValuesForMergeAction(action, m, 
columnsWithDefaults.toSeq)
-      replaceExplicitDefaultValuesInMergeAction(expanded, 
columnNamesToExpressions).map { r =>
+      replaceExplicitDefaultValuesInMergeAction(action, 
columnNamesToExpressions).map { r =>
         replaced = true
         r
       }.getOrElse(action)
@@ -254,38 +249,6 @@ case class ResolveDefaultColumns(
     }
   }
 
-  /** Adds a new expressions to a merge action to generate missing default 
column values. */
-  def addMissingDefaultValuesForMergeAction(
-      action: MergeAction,
-      m: MergeIntoTable,
-      columnNamesWithDefaults: Seq[String]): MergeAction = {
-    action match {
-      case i: InsertAction =>
-        val targetColumns: Set[String] = i.assignments.map(_.key).flatMap { 
expr =>
-          expr match {
-            case a: AttributeReference => Seq(normalizeFieldName(a.name))
-            case u: UnresolvedAttribute => 
Seq(u.nameParts.map(normalizeFieldName).mkString("."))
-            case _ => Seq()
-          }
-        }.toSet
-        val targetTable: String = m.targetTable match {
-          case SubqueryAlias(id, _) => id.name
-          case d: DataSourceV2Relation => d.name
-        }
-        val missingColumnNamesWithDefaults = columnNamesWithDefaults.filter { 
name =>
-          !targetColumns.contains(normalizeFieldName(name)) &&
-            !targetColumns.contains(
-              
s"${normalizeFieldName(targetTable)}.${normalizeFieldName(name)}")
-        }
-        val newAssignments: Seq[Assignment] = 
missingColumnNamesWithDefaults.map { key =>
-          Assignment(UnresolvedAttribute(key), 
UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME))
-        }
-        i.copy(assignments = i.assignments ++ newAssignments)
-      case _ =>
-        action
-    }
-  }
-
   /**
    * Replaces unresolved DEFAULT column references with corresponding values 
in one action of a
    * MERGE INTO command.
@@ -584,14 +547,9 @@ case class ResolveDefaultColumns(
         name: String => colNamesToFields.getOrElse(normalizeFieldName(name), 
return None)
       }
     val userSpecifiedColNames: Set[String] = userSpecifiedCols.toSet
-      .map(normalizeFieldName)
     val nonUserSpecifiedFields: Seq[StructField] =
       schema.fields.filter {
-        field => !userSpecifiedColNames.contains(
-          normalizeFieldName(
-            field.name
-          )
-        )
+        field => !userSpecifiedColNames.contains(field.name)
       }
     Some(StructType(userSpecifiedFields ++
       getStructFieldsForDefaultExpressions(nonUserSpecifiedFields)))
@@ -631,10 +589,8 @@ case class ResolveDefaultColumns(
     resolved.collectFirst {
       case r: UnresolvedCatalogRelation =>
         r.tableMeta.schema
-      case DataSourceV2Relation(table: SupportsCustomSchemaWrite, _, _, _, _) 
=>
-        table.customSchemaForInserts
-      case r: NamedRelation if !r.skipSchemaResolution =>
-        r.schema
+      case d: DataSourceV2Relation if !d.skipSchemaResolution && 
!d.isStreaming =>
+        CatalogV2Util.v2ColumnsToStructType(d.table.columns())
       case v: View if v.isTempViewStoringAnalyzedPlan =>
         v.schema
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 8c7e2ad4f1d..d0287cc602b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.util
 
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.SparkThrowable
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis._
@@ -219,33 +218,10 @@ object ResolveDefaultColumns {
     } else if (Cast.canUpCast(analyzed.dataType, dataType)) {
       Cast(analyzed, dataType)
     } else {
-      // If the provided default value is a literal of a wider type than the 
target column, but the
-      // literal value fits within the narrower type, just coerce it for 
convenience. Exclude
-      // boolean/array/struct/map types from consideration for this type 
coercion to avoid
-      // surprising behavior like interpreting "false" as integer zero.
-      val result = if (analyzed.isInstanceOf[Literal] &&
-        !Seq(dataType, analyzed.dataType).exists(_ match {
-          case _: BooleanType | _: ArrayType | _: StructType | _: MapType => 
true
-          case _ => false
-        })) {
-        try {
-          val casted = Cast(analyzed, dataType, evalMode = EvalMode.TRY).eval()
-          if (casted != null) {
-            Some(Literal(casted, dataType))
-          } else {
-            None
-          }
-        } catch {
-          case _: SparkThrowable | _: RuntimeException =>
-            None
-        }
-      } else None
-      result.getOrElse {
-        throw new AnalysisException(
-          s"Failed to execute $statementType command because the destination 
table column " +
-            s"$colName has a DEFAULT value with type $dataType, but the " +
-            s"statement provided a value of incompatible type 
${analyzed.dataType}")
-      }
+      throw new AnalysisException(
+        s"Failed to execute $statementType command because the destination 
table column " +
+          s"$colName has a DEFAULT value with type $dataType, but the " +
+          s"statement provided a value of incompatible type 
${analyzed.dataType}")
     }
   }
   /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala
index ba52ac995b7..ea0dc9d603d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala
@@ -17,16 +17,11 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.spark.sql.{AnalysisException, QueryTest}
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
-import org.apache.spark.sql.connector.catalog.{Table, TableCapability}
-import org.apache.spark.sql.connector.write.SupportsCustomSchemaWrite
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.types.{MetadataBuilder, StringType, StructField, 
StructType, TimestampType}
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.types.{StructField, StructType, TimestampType}
 
 class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
   val rule = ResolveDefaultColumns(null)
@@ -98,142 +93,4 @@ class ResolveDefaultColumnsSuite extends QueryTest with 
SharedSparkSession {
       }
     }
   }
-
-  test("SPARK-43313: Column default values with implicit coercion from 
provided values") {
-    withDatabase("demos") {
-      sql("create database demos")
-      withTable("demos.test_ts") {
-        // If the provided default value is a literal of a wider type than the 
target column, but
-        // the literal value fits within the narrower type, just coerce it for 
convenience.
-        sql(
-          """create table demos.test_ts (
-            |a int default 42L,
-            |b timestamp_ntz default '2022-01-02',
-            |c date default '2022-01-03',
-            |f float default 0D
-            |) using parquet""".stripMargin)
-        sql("insert into demos.test_ts(a) values (default)")
-        checkAnswer(spark.table("demos.test_ts"),
-          sql("select 42, timestamp_ntz'2022-01-02', date'2022-01-03', 0f"))
-        // If the provided default value is a literal of a different type than 
the target column
-        // such that no coercion is possible, throw an error.
-        Seq(
-          "create table demos.test_ts_other (a int default 'abc') using 
parquet",
-          "create table demos.test_ts_other (a timestamp default '2022-01-02') 
using parquet",
-          "create table demos.test_ts_other (a boolean default 'true') using 
parquet",
-          "create table demos.test_ts_other (a int default true) using parquet"
-        ).foreach { command =>
-          assert(intercept[AnalysisException](sql(command))
-            .getMessage.contains("statement provided a value of incompatible 
type"))
-        }
-      }
-    }
-  }
-
-  /**
-   * This is a new relation type that defines the 'customSchemaForInserts' 
method.
-   * Its implementation drops the last table column as it represents an 
internal pseudocolumn.
-   */
-  case class TableWithCustomInsertSchema(output: Seq[Attribute], 
numMetadataColumns: Int)
-    extends Table with SupportsCustomSchemaWrite {
-    override def name: String = "t"
-    override def schema: StructType = StructType.fromAttributes(output)
-    override def capabilities(): java.util.Set[TableCapability] =
-      new java.util.HashSet[TableCapability]()
-    override def customSchemaForInserts: StructType =
-      StructType(schema.fields.dropRight(numMetadataColumns))
-  }
-
-  /** Helper method to generate a DSV2 relation using the above table type. */
-  private def relationWithCustomInsertSchema(
-      output: Seq[AttributeReference], numMetadataColumns: Int): 
DataSourceV2Relation = {
-    DataSourceV2Relation(
-      TableWithCustomInsertSchema(output, numMetadataColumns),
-      output,
-      catalog = None,
-      identifier = None,
-      options = CaseInsensitiveStringMap.empty)
-  }
-
-  test("SPARK-43313: Add missing default values for MERGE INSERT actions") {
-    val testRelation = SubqueryAlias(
-      "testRelation",
-      relationWithCustomInsertSchema(Seq(
-        AttributeReference(
-          "a",
-          StringType,
-          true,
-          new MetadataBuilder()
-            .putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'a'")
-            .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, "'a'")
-            .build())(),
-        AttributeReference(
-          "b",
-          StringType,
-          true,
-          new MetadataBuilder()
-            .putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'b'")
-            .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, "'b'")
-            .build())(),
-        AttributeReference(
-          "c",
-          StringType,
-          true,
-          new MetadataBuilder()
-            .putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'c'")
-            .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, "'c'")
-            .build())(),
-        AttributeReference(
-          "pseudocolumn",
-          StringType,
-          true,
-          new MetadataBuilder()
-            .putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'pseudocolumn'")
-            .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, "'pseudocolumn'")
-            .build())()),
-        numMetadataColumns = 1))
-    val testRelation2 =
-      SubqueryAlias(
-        "testRelation2",
-        relationWithCustomInsertSchema(Seq(
-          AttributeReference("d", StringType)(),
-          AttributeReference("e", StringType)(),
-          AttributeReference("f", StringType)()),
-        numMetadataColumns = 0))
-    val mergePlan = MergeIntoTable(
-      targetTable = testRelation,
-      sourceTable = testRelation2,
-      mergeCondition = EqualTo(testRelation.output.head, 
testRelation2.output.head),
-      matchedActions = Seq(DeleteAction(None)),
-      notMatchedActions = Seq(
-        InsertAction(
-          condition = None,
-          assignments = Seq(
-            Assignment(
-              key = UnresolvedAttribute("a"),
-              value = UnresolvedAttribute("DEFAULT")),
-            Assignment(
-              key = UnresolvedAttribute(Seq("testRelation", "b")),
-              value = Literal("xyz"))))),
-      notMatchedBySourceActions = Seq(DeleteAction(None)))
-    // Run the 'addMissingDefaultValuesForMergeAction' method of the 
'ResolveDefaultColumns' rule
-    // on an MERGE INSERT action with two assignments, one to the target 
table's column 'a' and
-    // another to the target table's column 'b'.
-    val columnNamesWithDefaults = Seq("a", "b", "c")
-    val actualMergeAction =
-      rule.apply(mergePlan).asInstanceOf[MergeIntoTable].notMatchedActions.head
-    val expectedMergeAction =
-      InsertAction(
-        condition = None,
-        assignments = Seq(
-          Assignment(key = UnresolvedAttribute("a"), value = Literal("a")),
-          Assignment(key = UnresolvedAttribute(Seq("testRelation", "b")), 
value = Literal("xyz")),
-          Assignment(key = UnresolvedAttribute("c"), value = Literal("c"))))
-    assert(expectedMergeAction == actualMergeAction)
-    // Run the same method on another MERGE DELETE action. There is no change 
because this method
-    // only operates on MERGE INSERT actions.
-    assert(rule.addMissingDefaultValuesForMergeAction(
-      mergePlan.matchedActions.head, mergePlan, columnNamesWithDefaults) ==
-      mergePlan.matchedActions.head)
-  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 7d86a60815c..2207661478d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1044,8 +1044,8 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
   test("SPARK-38336 INSERT INTO statements with tables with default columns: 
negative tests") {
     object Errors {
       val COMMON_SUBSTRING = " has a DEFAULT value"
+      val COLUMN_DEFAULT_NOT_FOUND = "`default` cannot be resolved."
       val BAD_SUBQUERY = "subquery expressions are not allowed in DEFAULT 
values"
-      val TARGET_TABLE_NOT_FOUND = "The table or view `t` cannot be found"
     }
     // The default value fails to analyze.
     withTable("t") {
@@ -1096,7 +1096,7 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     withTable("t") {
       assert(intercept[AnalysisException] {
         sql("insert into t values(false, default)")
-      }.getMessage.contains(Errors.TARGET_TABLE_NOT_FOUND))
+      }.getMessage.contains(Errors.COLUMN_DEFAULT_NOT_FOUND))
     }
     // The default value parses but the type is not coercible.
     withTable("t") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to