This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 079594ae976 Revert "[SPARK-43313][SQL] Adding missing column DEFAULT values for MERGE INSERT actions" 079594ae976 is described below commit 079594ae976b377459ad09d864106734ef65c32d Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Thu May 18 18:39:38 2023 +0800 Revert "[SPARK-43313][SQL] Adding missing column DEFAULT values for MERGE INSERT actions" This reverts commit 3a0e6bde2aaa11e1165f4fde040ff02e1743795e. --- .../connector/write/SupportsCustomSchemaWrite.java | 38 ------ .../catalyst/analysis/ResolveDefaultColumns.scala | 56 +------- .../catalyst/util/ResolveDefaultColumnsUtil.scala | 32 +---- .../analysis/ResolveDefaultColumnsSuite.scala | 151 +-------------------- .../org/apache/spark/sql/sources/InsertSuite.scala | 4 +- 5 files changed, 16 insertions(+), 265 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsCustomSchemaWrite.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsCustomSchemaWrite.java deleted file mode 100644 index 9435625a1c4..00000000000 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/SupportsCustomSchemaWrite.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.write; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.types.StructType; - -/** - * Trait for tables that support custom schemas for write operations including INSERT INTO commands - * whose target table columns have explicit or implicit default values. - * - * @since 3.4.1 - */ -@Evolving -public interface SupportsCustomSchemaWrite { - /** - * Represents a table with a custom schema to use for resolving DEFAULT column references when - * inserting into the table. For example, this can be useful for excluding hidden pseudocolumns. - * - * @return the new schema to use for this process. - */ - StructType customSchemaForInserts(); -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala index 13e9866645a..d028d4ff596 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala @@ -18,14 +18,13 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.connector.write.SupportsCustomSchemaWrite +import org.apache.spark.sql.connector.catalog.CatalogV2Util import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.internal.SQLConf @@ -215,11 +214,8 @@ case class ResolveDefaultColumns( throw QueryCompilationErrors.defaultReferencesNotAllowedInMergeCondition() } } - val columnsWithDefaults = ArrayBuffer.empty[String] val defaultExpressions: Seq[Expression] = schema.fields.map { - case f if f.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) => - columnsWithDefaults.append(normalizeFieldName(f.name)) - analyze(f, "MERGE") + case f if f.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) => analyze(f, "MERGE") case _ => Literal(null) } val columnNamesToExpressions: Map[String, Expression] = @@ -232,8 +228,7 @@ case class ResolveDefaultColumns( }.getOrElse(action) } val newNotMatchedActions: Seq[MergeAction] = m.notMatchedActions.map { action: MergeAction => - val expanded = addMissingDefaultValuesForMergeAction(action, m, columnsWithDefaults.toSeq) - replaceExplicitDefaultValuesInMergeAction(expanded, columnNamesToExpressions).map { r => + replaceExplicitDefaultValuesInMergeAction(action, columnNamesToExpressions).map { r => replaced = true r }.getOrElse(action) @@ -254,38 +249,6 @@ case class ResolveDefaultColumns( } } - /** Adds a new expressions to a merge action to generate missing default column values. */ - def addMissingDefaultValuesForMergeAction( - action: MergeAction, - m: MergeIntoTable, - columnNamesWithDefaults: Seq[String]): MergeAction = { - action match { - case i: InsertAction => - val targetColumns: Set[String] = i.assignments.map(_.key).flatMap { expr => - expr match { - case a: AttributeReference => Seq(normalizeFieldName(a.name)) - case u: UnresolvedAttribute => Seq(u.nameParts.map(normalizeFieldName).mkString(".")) - case _ => Seq() - } - }.toSet - val targetTable: String = m.targetTable match { - case SubqueryAlias(id, _) => id.name - case d: DataSourceV2Relation => d.name - } - val missingColumnNamesWithDefaults = columnNamesWithDefaults.filter { name => - !targetColumns.contains(normalizeFieldName(name)) && - !targetColumns.contains( - s"${normalizeFieldName(targetTable)}.${normalizeFieldName(name)}") - } - val newAssignments: Seq[Assignment] = missingColumnNamesWithDefaults.map { key => - Assignment(UnresolvedAttribute(key), UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME)) - } - i.copy(assignments = i.assignments ++ newAssignments) - case _ => - action - } - } - /** * Replaces unresolved DEFAULT column references with corresponding values in one action of a * MERGE INTO command. @@ -584,14 +547,9 @@ case class ResolveDefaultColumns( name: String => colNamesToFields.getOrElse(normalizeFieldName(name), return None) } val userSpecifiedColNames: Set[String] = userSpecifiedCols.toSet - .map(normalizeFieldName) val nonUserSpecifiedFields: Seq[StructField] = schema.fields.filter { - field => !userSpecifiedColNames.contains( - normalizeFieldName( - field.name - ) - ) + field => !userSpecifiedColNames.contains(field.name) } Some(StructType(userSpecifiedFields ++ getStructFieldsForDefaultExpressions(nonUserSpecifiedFields))) @@ -631,10 +589,8 @@ case class ResolveDefaultColumns( resolved.collectFirst { case r: UnresolvedCatalogRelation => r.tableMeta.schema - case DataSourceV2Relation(table: SupportsCustomSchemaWrite, _, _, _, _) => - table.customSchemaForInserts - case r: NamedRelation if !r.skipSchemaResolution => - r.schema + case d: DataSourceV2Relation if !d.skipSchemaResolution && !d.isStreaming => + CatalogV2Util.v2ColumnsToStructType(d.table.columns()) case v: View if v.isTempViewStoringAnalyzedPlan => v.schema } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala index 8c7e2ad4f1d..d0287cc602b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.util import scala.collection.mutable.ArrayBuffer -import org.apache.spark.SparkThrowable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis._ @@ -219,33 +218,10 @@ object ResolveDefaultColumns { } else if (Cast.canUpCast(analyzed.dataType, dataType)) { Cast(analyzed, dataType) } else { - // If the provided default value is a literal of a wider type than the target column, but the - // literal value fits within the narrower type, just coerce it for convenience. Exclude - // boolean/array/struct/map types from consideration for this type coercion to avoid - // surprising behavior like interpreting "false" as integer zero. - val result = if (analyzed.isInstanceOf[Literal] && - !Seq(dataType, analyzed.dataType).exists(_ match { - case _: BooleanType | _: ArrayType | _: StructType | _: MapType => true - case _ => false - })) { - try { - val casted = Cast(analyzed, dataType, evalMode = EvalMode.TRY).eval() - if (casted != null) { - Some(Literal(casted, dataType)) - } else { - None - } - } catch { - case _: SparkThrowable | _: RuntimeException => - None - } - } else None - result.getOrElse { - throw new AnalysisException( - s"Failed to execute $statementType command because the destination table column " + - s"$colName has a DEFAULT value with type $dataType, but the " + - s"statement provided a value of incompatible type ${analyzed.dataType}") - } + throw new AnalysisException( + s"Failed to execute $statementType command because the destination table column " + + s"$colName has a DEFAULT value with type $dataType, but the " + + s"statement provided a value of incompatible type ${analyzed.dataType}") } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala index ba52ac995b7..ea0dc9d603d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala @@ -17,16 +17,11 @@ package org.apache.spark.sql.catalyst.analysis -import org.apache.spark.sql.{AnalysisException, QueryTest} -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._ -import org.apache.spark.sql.connector.catalog.{Table, TableCapability} -import org.apache.spark.sql.connector.write.SupportsCustomSchemaWrite -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{MetadataBuilder, StringType, StructField, StructType, TimestampType} -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.sql.types.{StructField, StructType, TimestampType} class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { val rule = ResolveDefaultColumns(null) @@ -98,142 +93,4 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession { } } } - - test("SPARK-43313: Column default values with implicit coercion from provided values") { - withDatabase("demos") { - sql("create database demos") - withTable("demos.test_ts") { - // If the provided default value is a literal of a wider type than the target column, but - // the literal value fits within the narrower type, just coerce it for convenience. - sql( - """create table demos.test_ts ( - |a int default 42L, - |b timestamp_ntz default '2022-01-02', - |c date default '2022-01-03', - |f float default 0D - |) using parquet""".stripMargin) - sql("insert into demos.test_ts(a) values (default)") - checkAnswer(spark.table("demos.test_ts"), - sql("select 42, timestamp_ntz'2022-01-02', date'2022-01-03', 0f")) - // If the provided default value is a literal of a different type than the target column - // such that no coercion is possible, throw an error. - Seq( - "create table demos.test_ts_other (a int default 'abc') using parquet", - "create table demos.test_ts_other (a timestamp default '2022-01-02') using parquet", - "create table demos.test_ts_other (a boolean default 'true') using parquet", - "create table demos.test_ts_other (a int default true) using parquet" - ).foreach { command => - assert(intercept[AnalysisException](sql(command)) - .getMessage.contains("statement provided a value of incompatible type")) - } - } - } - } - - /** - * This is a new relation type that defines the 'customSchemaForInserts' method. - * Its implementation drops the last table column as it represents an internal pseudocolumn. - */ - case class TableWithCustomInsertSchema(output: Seq[Attribute], numMetadataColumns: Int) - extends Table with SupportsCustomSchemaWrite { - override def name: String = "t" - override def schema: StructType = StructType.fromAttributes(output) - override def capabilities(): java.util.Set[TableCapability] = - new java.util.HashSet[TableCapability]() - override def customSchemaForInserts: StructType = - StructType(schema.fields.dropRight(numMetadataColumns)) - } - - /** Helper method to generate a DSV2 relation using the above table type. */ - private def relationWithCustomInsertSchema( - output: Seq[AttributeReference], numMetadataColumns: Int): DataSourceV2Relation = { - DataSourceV2Relation( - TableWithCustomInsertSchema(output, numMetadataColumns), - output, - catalog = None, - identifier = None, - options = CaseInsensitiveStringMap.empty) - } - - test("SPARK-43313: Add missing default values for MERGE INSERT actions") { - val testRelation = SubqueryAlias( - "testRelation", - relationWithCustomInsertSchema(Seq( - AttributeReference( - "a", - StringType, - true, - new MetadataBuilder() - .putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'a'") - .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, "'a'") - .build())(), - AttributeReference( - "b", - StringType, - true, - new MetadataBuilder() - .putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'b'") - .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, "'b'") - .build())(), - AttributeReference( - "c", - StringType, - true, - new MetadataBuilder() - .putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'c'") - .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, "'c'") - .build())(), - AttributeReference( - "pseudocolumn", - StringType, - true, - new MetadataBuilder() - .putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, "'pseudocolumn'") - .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, "'pseudocolumn'") - .build())()), - numMetadataColumns = 1)) - val testRelation2 = - SubqueryAlias( - "testRelation2", - relationWithCustomInsertSchema(Seq( - AttributeReference("d", StringType)(), - AttributeReference("e", StringType)(), - AttributeReference("f", StringType)()), - numMetadataColumns = 0)) - val mergePlan = MergeIntoTable( - targetTable = testRelation, - sourceTable = testRelation2, - mergeCondition = EqualTo(testRelation.output.head, testRelation2.output.head), - matchedActions = Seq(DeleteAction(None)), - notMatchedActions = Seq( - InsertAction( - condition = None, - assignments = Seq( - Assignment( - key = UnresolvedAttribute("a"), - value = UnresolvedAttribute("DEFAULT")), - Assignment( - key = UnresolvedAttribute(Seq("testRelation", "b")), - value = Literal("xyz"))))), - notMatchedBySourceActions = Seq(DeleteAction(None))) - // Run the 'addMissingDefaultValuesForMergeAction' method of the 'ResolveDefaultColumns' rule - // on an MERGE INSERT action with two assignments, one to the target table's column 'a' and - // another to the target table's column 'b'. - val columnNamesWithDefaults = Seq("a", "b", "c") - val actualMergeAction = - rule.apply(mergePlan).asInstanceOf[MergeIntoTable].notMatchedActions.head - val expectedMergeAction = - InsertAction( - condition = None, - assignments = Seq( - Assignment(key = UnresolvedAttribute("a"), value = Literal("a")), - Assignment(key = UnresolvedAttribute(Seq("testRelation", "b")), value = Literal("xyz")), - Assignment(key = UnresolvedAttribute("c"), value = Literal("c")))) - assert(expectedMergeAction == actualMergeAction) - // Run the same method on another MERGE DELETE action. There is no change because this method - // only operates on MERGE INSERT actions. - assert(rule.addMissingDefaultValuesForMergeAction( - mergePlan.matchedActions.head, mergePlan, columnNamesWithDefaults) == - mergePlan.matchedActions.head) - } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 7d86a60815c..2207661478d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -1044,8 +1044,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { test("SPARK-38336 INSERT INTO statements with tables with default columns: negative tests") { object Errors { val COMMON_SUBSTRING = " has a DEFAULT value" + val COLUMN_DEFAULT_NOT_FOUND = "`default` cannot be resolved." val BAD_SUBQUERY = "subquery expressions are not allowed in DEFAULT values" - val TARGET_TABLE_NOT_FOUND = "The table or view `t` cannot be found" } // The default value fails to analyze. withTable("t") { @@ -1096,7 +1096,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { assert(intercept[AnalysisException] { sql("insert into t values(false, default)") - }.getMessage.contains(Errors.TARGET_TABLE_NOT_FOUND)) + }.getMessage.contains(Errors.COLUMN_DEFAULT_NOT_FOUND)) } // The default value parses but the type is not coercible. withTable("t") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org