This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d5fd71e43a9 [SPARK-39078][SQL] Support UPDATE commands with DEFAULT
values
d5fd71e43a9 is described below
commit d5fd71e43a98077abc8b226763f42f62c694715e
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Sat May 7 10:43:33 2022 +0800
[SPARK-39078][SQL] Support UPDATE commands with DEFAULT values
## What changes were proposed in this pull request?
DEFAULT columns are now supported in UPDATE commands.
Example:
```
CREATE TABLE t(
i STRING,
c CHAR(5),
d VARCHAR(5),
e STRING DEFAULT 'abc',
f BIGINT DEFAULT 42)
USING DELTA;
INSERT INTO t VALUES('1', '12345', '67890', 'def', 31);
UPDATE t SET c = DEFAULT WHERE i = '1';
SELECT * FROM t;
> "1", NULL, "67890", "def", 31
UPDATE t SET e = DEFAULT WHERE i = '1';
SELECT * FROM t;
> "1", NULL, "67890", "abc", 31
UPDATE t SET e = 'ghi' WHERE i = '1';
SELECT * FROM t;
> "1", NULL, "67890", "ghi", 31
```
### Why are the changes needed?
This makes UPDATE commands easier to use.
### Does this PR introduce _any_ user-facing change?
Yes.
## How was this patch tested?
This PR adds new unit test coverage.
Closes #36415 from dtenedor/default-updates.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../catalyst/analysis/ResolveDefaultColumns.scala | 202 ++++++++++++++++-----
.../catalyst/util/ResolveDefaultColumnsUtil.scala | 5 -
.../spark/sql/errors/QueryCompilationErrors.scala | 25 +++
.../execution/command/PlanResolutionSuite.scala | 76 +++++++-
.../org/apache/spark/sql/sources/InsertSuite.scala | 10 +-
5 files changed, 266 insertions(+), 52 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
index ffbe18a7dfa..9612713f593 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -33,7 +34,7 @@ import org.apache.spark.sql.types._
* This is a rule to process DEFAULT columns in statements such as
CREATE/REPLACE TABLE.
*
* Background: CREATE TABLE and ALTER TABLE invocations support setting column
default values for
- * later operations. Following INSERT, and INSERT MERGE commands may then
reference the value
+ * later operations. Following INSERT, UPDATE, and MERGE commands may then
reference the value
* using the DEFAULT keyword as needed.
*
* Example:
@@ -60,6 +61,8 @@ case class ResolveDefaultColumns(
case i@InsertIntoStatement(_, _, _, project: Project, _, _)
if !project.projectList.exists(_.isInstanceOf[Star]) =>
resolveDefaultColumnsForInsertFromProject(i)
+ case u: UpdateTable =>
+ resolveDefaultColumnsForUpdate(u)
}
}
@@ -98,23 +101,23 @@ case class ResolveDefaultColumns(
node = node.children(0)
}
val table = node.asInstanceOf[UnresolvedInlineTable]
- val insertTableSchemaWithoutPartitionColumns: StructType =
+ val insertTableSchemaWithoutPartitionColumns: Option[StructType] =
getInsertTableSchemaWithoutPartitionColumns(i)
- .getOrElse(return i)
- val regenerated: InsertIntoStatement =
- regenerateUserSpecifiedCols(i, insertTableSchemaWithoutPartitionColumns)
- val expanded: UnresolvedInlineTable =
- addMissingDefaultValuesForInsertFromInlineTable(
- table, insertTableSchemaWithoutPartitionColumns)
- val replaced: LogicalPlan =
- replaceExplicitDefaultValuesForInputOfInsertInto(
- analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
- .getOrElse(return i)
- node = replaced
- for (child <- children.reverse) {
- node = child.withNewChildren(Seq(node))
- }
- regenerated.copy(query = node)
+ insertTableSchemaWithoutPartitionColumns.map { schema: StructType =>
+ val regenerated: InsertIntoStatement =
+ regenerateUserSpecifiedCols(i, schema)
+ val expanded: UnresolvedInlineTable =
+ addMissingDefaultValuesForInsertFromInlineTable(table, schema)
+ val replaced: Option[LogicalPlan] =
+ replaceExplicitDefaultValuesForInputOfInsertInto(analyzer, schema,
expanded)
+ replaced.map { r: LogicalPlan =>
+ node = r
+ for (child <- children.reverse) {
+ node = child.withNewChildren(Seq(node))
+ }
+ regenerated.copy(query = node)
+ }.getOrElse(i)
+ }.getOrElse(i)
}
/**
@@ -122,20 +125,50 @@ case class ResolveDefaultColumns(
* projection.
*/
private def resolveDefaultColumnsForInsertFromProject(i:
InsertIntoStatement): LogicalPlan = {
- val insertTableSchemaWithoutPartitionColumns: StructType =
+ val insertTableSchemaWithoutPartitionColumns: Option[StructType] =
getInsertTableSchemaWithoutPartitionColumns(i)
- .getOrElse(return i)
- val regenerated: InsertIntoStatement =
- regenerateUserSpecifiedCols(i, insertTableSchemaWithoutPartitionColumns)
- val project: Project = i.query.asInstanceOf[Project]
- val expanded: Project =
- addMissingDefaultValuesForInsertFromProject(
- project, insertTableSchemaWithoutPartitionColumns)
- val replaced: LogicalPlan =
- replaceExplicitDefaultValuesForInputOfInsertInto(
- analyzer, insertTableSchemaWithoutPartitionColumns, expanded)
- .getOrElse(return i)
- regenerated.copy(query = replaced)
+ insertTableSchemaWithoutPartitionColumns.map { schema =>
+ val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i,
schema)
+ val project: Project = i.query.asInstanceOf[Project]
+ val expanded: Project =
+ addMissingDefaultValuesForInsertFromProject(project, schema)
+ val replaced: Option[LogicalPlan] =
+ replaceExplicitDefaultValuesForInputOfInsertInto(analyzer, schema,
expanded)
+ replaced.map { r =>
+ regenerated.copy(query = r)
+ }.getOrElse(i)
+ }.getOrElse(i)
+ }
+
+ /**
+ * Resolves DEFAULT column references for an UPDATE command.
+ */
+ private def resolveDefaultColumnsForUpdate(u: UpdateTable): LogicalPlan = {
+ // Return a more descriptive error message if the user tries to use a
DEFAULT column reference
+ // inside an UPDATE command's WHERE clause; this is not allowed.
+ u.condition.foreach { c: Expression =>
+ if (c.find(isExplicitDefaultColumn).isDefined) {
+ throw
QueryCompilationErrors.defaultReferencesNotAllowedInUpdateWhereClause()
+ }
+ }
+ val schemaForTargetTable: Option[StructType] =
getSchemaForTargetTable(u.table)
+ schemaForTargetTable.map { schema =>
+ val defaultExpressions: Seq[Expression] = schema.fields.map {
+ case f if f.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) =>
+ analyze(analyzer, f, "UPDATE")
+ case _ => Literal(null)
+ }
+ // Create a map from each column name in the target table to its DEFAULT
expression.
+ val columnNamesToExpressions: Map[String, Expression] =
+ mapStructFieldNamesToExpressions(schema, defaultExpressions)
+ // For each assignment in the UPDATE command's SET clause with a DEFAULT
column reference on
+ // the right-hand side, look up the corresponding expression from the
above map.
+ val newAssignments: Option[Seq[Assignment]] =
+ replaceExplicitDefaultValuesForUpdateAssignments(u.assignments,
columnNamesToExpressions)
+ newAssignments.map { n =>
+ u.copy(assignments = n)
+ }.getOrElse(u)
+ }.getOrElse(u)
}
/**
@@ -260,7 +293,7 @@ case class ResolveDefaultColumns(
expr = row(i)
defaultExpr = if (i < defaultExpressions.size) defaultExpressions(i)
else Literal(null)
} yield replaceExplicitDefaultReferenceInExpression(
- expr, defaultExpr, DEFAULTS_IN_EXPRESSIONS_ERROR, false).map { e =>
+ expr, defaultExpr, CommandType.Insert, addAlias = false).map { e =>
replaced = true
e
}.getOrElse(expr)
@@ -286,7 +319,7 @@ case class ResolveDefaultColumns(
projectExpr = project.projectList(i)
defaultExpr = if (i < defaultExpressions.size) defaultExpressions(i)
else Literal(null)
} yield replaceExplicitDefaultReferenceInExpression(
- projectExpr, defaultExpr, DEFAULTS_IN_EXPRESSIONS_ERROR, true).map { e
=>
+ projectExpr, defaultExpr, CommandType.Insert, addAlias = true).map { e
=>
replaced = true
e.asInstanceOf[NamedExpression]
}.getOrElse(projectExpr)
@@ -298,19 +331,26 @@ case class ResolveDefaultColumns(
}
}
+ /**
+ * Represents a type of command we are currently processing.
+ */
+ private object CommandType extends Enumeration {
+ val Insert, Update = Value
+ }
+
/**
* Checks if a given input expression is an unresolved "DEFAULT" attribute
reference.
*
* @param input the input expression to examine.
* @param defaultExpr the default to return if [[input]] is an unresolved
"DEFAULT" reference.
- * @param complexDefaultError error if [[input]] is a complex expression
with "DEFAULT" inside.
+ * @param isInsert the type of command we are currently processing.
* @param addAlias if true, wraps the result with an alias of the original
default column name.
* @return [[defaultExpr]] if [[input]] is an unresolved "DEFAULT" attribute
reference.
*/
private def replaceExplicitDefaultReferenceInExpression(
input: Expression,
defaultExpr: Expression,
- complexDefaultError: String,
+ command: CommandType.Value,
addAlias: Boolean): Option[Expression] = {
input match {
case a@Alias(u: UnresolvedAttribute, _)
@@ -325,7 +365,14 @@ case class ResolveDefaultColumns(
}
case expr@_
if expr.find(isExplicitDefaultColumn).isDefined =>
- throw new AnalysisException(complexDefaultError)
+ command match {
+ case CommandType.Insert =>
+ throw QueryCompilationErrors
+
.defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList()
+ case CommandType.Update =>
+ throw QueryCompilationErrors
+
.defaultReferencesNotAllowedInComplexExpressionsInUpdateSetClause()
+ }
case _ =>
None
}
@@ -344,16 +391,10 @@ case class ResolveDefaultColumns(
if (userSpecifiedCols.isEmpty) {
return Some(schema)
}
- def normalize(str: String) = {
- if (SQLConf.get.caseSensitiveAnalysis) str else str.toLowerCase()
- }
- val colNamesToFields: Map[String, StructField] =
- schema.fields.map {
- field: StructField => normalize(field.name) -> field
- }.toMap
+ val colNamesToFields: Map[String, StructField] =
mapStructFieldNamesToFields(schema)
val userSpecifiedFields: Seq[StructField] =
userSpecifiedCols.map {
- name: String => colNamesToFields.getOrElse(normalize(name), return
None)
+ name: String => colNamesToFields.getOrElse(normalizeFieldName(name),
return None)
}
val userSpecifiedColNames: Set[String] = userSpecifiedCols.toSet
val nonUserSpecifiedFields: Seq[StructField] =
@@ -364,10 +405,47 @@ case class ResolveDefaultColumns(
getStructFieldsForDefaultExpressions(nonUserSpecifiedFields)))
}
+ /**
+ * Normalizes a schema field name suitable for use in map lookups.
+ */
+ private def normalizeFieldName(str: String): String = {
+ if (SQLConf.get.caseSensitiveAnalysis) {
+ str
+ } else {
+ str.toLowerCase()
+ }
+ }
+
+ /**
+ * Returns a map of the names of fields in a schema to the fields themselves.
+ */
+ private def mapStructFieldNamesToFields(schema: StructType): Map[String,
StructField] = {
+ schema.fields.map {
+ field: StructField => normalizeFieldName(field.name) -> field
+ }.toMap
+ }
+
+ /**
+ * Returns a map of the names of fields in a schema to corresponding
expressions.
+ */
+ private def mapStructFieldNamesToExpressions(
+ schema: StructType,
+ expressions: Seq[Expression]): Map[String, Expression] = {
+ val namesToFields: Map[String, StructField] =
mapStructFieldNamesToFields(schema)
+ val namesAndExpressions: Seq[(String, Expression)] =
namesToFields.keys.toSeq.zip(expressions)
+ namesAndExpressions.toMap
+ }
+
/**
* Returns the schema for the target table of a DML command, looking into
the catalog if needed.
*/
private def getSchemaForTargetTable(table: LogicalPlan): Option[StructType]
= {
+ // Check if the target table is already resolved. If so, return the
computed schema.
+ table match {
+ case r: NamedRelation if r.schema.fields.nonEmpty => return
Some(r.schema)
+ case SubqueryAlias(_, r: NamedRelation) if r.schema.fields.nonEmpty =>
return Some (r.schema)
+ case _ =>
+ }
// Lookup the relation from the catalog by name. This either succeeds or
returns some "not
// found" error. In the latter cases, return out of this rule without
changing anything and let
// the analyzer return a proper error message elsewhere.
@@ -389,4 +467,42 @@ case class ResolveDefaultColumns(
case _ => None
}
}
+
+ /**
+ * Replaces unresolved DEFAULT column references with corresponding values
in a series of
+ * assignments in an UPDATE command.
+ */
+ private def replaceExplicitDefaultValuesForUpdateAssignments(
+ assignments: Seq[Assignment],
+ columnNamesToExpressions: Map[String, Expression]):
Option[Seq[Assignment]] = {
+ var replaced = false
+ val newAssignments: Seq[Assignment] =
+ for (assignment <- assignments) yield {
+ val destColName = assignment.key match {
+ case a: AttributeReference => a.name
+ case u: UnresolvedAttribute => u.nameParts.last
+ case _ => ""
+ }
+ val adjusted: String = normalizeFieldName(destColName)
+ val lookup: Option[Expression] = columnNamesToExpressions.get(adjusted)
+ val newValue: Expression = lookup.map { defaultExpr =>
+ val updated: Option[Expression] =
+ replaceExplicitDefaultReferenceInExpression(
+ assignment.value,
+ defaultExpr,
+ CommandType.Update,
+ addAlias = false)
+ updated.map { e =>
+ replaced = true
+ e
+ }.getOrElse(assignment.value)
+ }.getOrElse(assignment.value)
+ assignment.copy(value = newValue)
+ }
+ if (replaced) {
+ Some(newAssignments)
+ } else {
+ None
+ }
+ }
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index b7db9be114f..6e23ba46396 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -55,11 +55,6 @@ object ResolveDefaultColumns {
// Name of attributes representing explicit references to the value stored
in the above
// CURRENT_DEFAULT_COLUMN_METADATA.
val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"
- // Return a more descriptive error message if the user tries to nest the
DEFAULT column reference
- // inside some other expression, such as DEFAULT + 1 (this is not allowed).
- val DEFAULTS_IN_EXPRESSIONS_ERROR = "Failed to execute INSERT INTO command
because the " +
- "VALUES list contains a DEFAULT column reference as part of another
expression; this is " +
- "not allowed"
/**
* Finds "current default" expressions in CREATE/REPLACE TABLE columns and
constant-folds them.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 151c43cd92d..3b167eeb417 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2407,4 +2407,29 @@ object QueryCompilationErrors extends QueryErrorsBase {
def noSuchFunctionError(database: String, funcInfo: String): Throwable = {
new AnalysisException(s"$database does not support function: $funcInfo")
}
+
+ // Return a more descriptive error message if the user tries to nest a
DEFAULT column reference
+ // inside some other expression (such as DEFAULT + 1) in an INSERT INTO
command's VALUES list;
+ // this is not allowed.
+ def defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList():
Throwable = {
+ new AnalysisException(
+ "Failed to execute INSERT INTO command because the VALUES list contains
a DEFAULT column " +
+ "reference as part of another expression; this is not allowed")
+ }
+
+ // Return a descriptive error message in the presence of INSERT INTO
commands with explicit
+ // DEFAULT column references and explicit column lists, since this is not
implemented yet.
+ def defaultReferencesNotAllowedInComplexExpressionsInUpdateSetClause():
Throwable = {
+ new AnalysisException(
+ "Failed to execute UPDATE command because the SET list contains a
DEFAULT column reference " +
+ "as part of another expression; this is not allowed")
+ }
+
+ // Return a more descriptive error message if the user tries to use a
DEFAULT column reference
+ // inside an UPDATE command's WHERE clause; this is not allowed.
+ def defaultReferencesNotAllowedInUpdateWhereClause(): Throwable = {
+ new AnalysisException(
+ "Failed to execute UPDATE command because the WHERE clause contains a
DEFAULT column " +
+ "reference; this is not allowed")
+ }
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 63d2b9d47f1..84b900f4cd7 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -33,14 +33,16 @@ import
org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn,
AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect,
DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction,
LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, Project,
SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias,
UnsetTableProperties, UpdateAction, UpdateTable}
import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
import org.apache.spark.sql.connector.FakeV2Provider
import org.apache.spark.sql.connector.catalog.{CatalogManager,
CatalogNotFoundException, Identifier, SupportsDelete, Table, TableCapability,
TableCatalog, V1Table}
import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.{CreateTable =>
CreateTableV1}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.sources.SimpleScanSource
-import org.apache.spark.sql.types.{BooleanType, CharType, DoubleType,
IntegerType, LongType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{BooleanType, CharType, DoubleType,
IntegerType, LongType, MetadataBuilder, StringType, StructField, StructType}
class PlanResolutionSuite extends AnalysisTest {
import CatalystSqlParser._
@@ -83,6 +85,22 @@ class PlanResolutionSuite extends AnalysisTest {
t
}
+ private val defaultValues: Table = {
+ val t = mock(classOf[Table])
+ when(t.schema()).thenReturn(
+ new StructType()
+ .add("i", BooleanType, true,
+ new MetadataBuilder()
+
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "true")
+
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
"true").build())
+ .add("s", IntegerType, true,
+ new MetadataBuilder()
+
.putString(ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY, "42")
+
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY,
"42").build()))
+ when(t.partitioning()).thenReturn(Array.empty[Transform])
+ t
+ }
+
private val v1Table: V1Table = {
val t = mock(classOf[CatalogTable])
when(t.schema).thenReturn(new StructType()
@@ -118,6 +136,7 @@ class PlanResolutionSuite extends AnalysisTest {
case "tab1" => table1
case "tab2" => table2
case "charvarchar" => charVarcharTable
+ case "defaultvalues" => defaultValues
case name => throw new NoSuchTableException(name)
}
})
@@ -974,11 +993,19 @@ class PlanResolutionSuite extends AnalysisTest {
|SET t.age=32
|WHERE t.name IN (SELECT s.name FROM s)
""".stripMargin
+ val sql5 = s"UPDATE $tblName SET name=DEFAULT, age=DEFAULT"
+ // Note: 'i' and 's' are the names of the columns in 'tblName'.
+ val sql6 = s"UPDATE $tblName SET i=DEFAULT, s=DEFAULT"
+ val sql7 = s"UPDATE defaultvalues SET i=DEFAULT, s=DEFAULT"
+ val sql8 = s"UPDATE $tblName SET name='Robert', age=32 WHERE p=DEFAULT"
val parsed1 = parseAndResolve(sql1)
val parsed2 = parseAndResolve(sql2)
val parsed3 = parseAndResolve(sql3)
val parsed4 = parseAndResolve(sql4)
+ val parsed5 = parseAndResolve(sql5)
+ val parsed6 = parseAndResolve(sql6)
+ val parsed7 = parseAndResolve(sql7, true)
parsed1 match {
case UpdateTable(
@@ -1035,6 +1062,53 @@ class PlanResolutionSuite extends AnalysisTest {
case _ => fail("Expect UpdateTable, but got:\n" + parsed4.treeString)
}
+
+ parsed5 match {
+ case UpdateTable(
+ AsDataSourceV2Relation(_),
+ Seq(
+ Assignment(name: UnresolvedAttribute,
UnresolvedAttribute(Seq("DEFAULT"))),
+ Assignment(age: UnresolvedAttribute,
UnresolvedAttribute(Seq("DEFAULT")))),
+ None) =>
+ assert(name.name == "name")
+ assert(age.name == "age")
+
+ case _ => fail("Expect UpdateTable, but got:\n" + parsed5.treeString)
+ }
+
+ parsed6 match {
+ case UpdateTable(
+ AsDataSourceV2Relation(_),
+ Seq(
+ // Note that when resolving DEFAULT column references, the
analyzer will insert literal
+ // NULL values if the corresponding table does not define an
explicit default value for
+ // that column. This is intended.
+ Assignment(i: AttributeReference, AnsiCast(Literal(null, _),
IntegerType, _)),
+ Assignment(s: AttributeReference, AnsiCast(Literal(null, _),
StringType, _))),
+ None) =>
+ assert(i.name == "i")
+ assert(s.name == "s")
+
+ case _ => fail("Expect UpdateTable, but got:\n" + parsed6.treeString)
+ }
+
+ parsed7 match {
+ case UpdateTable(
+ _,
+ Seq(
+ Assignment(i: AttributeReference, Literal(true, BooleanType)),
+ Assignment(s: AttributeReference, Literal(42, IntegerType))),
+ None) =>
+ assert(i.name == "i")
+ assert(s.name == "s")
+
+ case _ => fail("Expect UpdateTable, but got:\n" + parsed7.treeString)
+ }
+
+ assert(intercept[AnalysisException] {
+ parseAndResolve(sql8)
+ }.getMessage.contains(
+
QueryCompilationErrors.defaultReferencesNotAllowedInUpdateWhereClause().getMessage))
}
val sql1 = "UPDATE non_existing SET id=1"
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index b312e65154a..a2237b377cf 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.DataSourceUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
@@ -1058,14 +1058,18 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
sql("create table t(i boolean, s bigint default 42) using parquet")
assert(intercept[AnalysisException] {
sql("insert into t values(false, default + 1)")
-
}.getMessage.contains(ResolveDefaultColumns.DEFAULTS_IN_EXPRESSIONS_ERROR))
+ }.getMessage.contains(
+
QueryCompilationErrors.defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList()
+ .getMessage))
}
// Explicit default values may not participate in complex expressions in
the SELECT query.
withTable("t") {
sql("create table t(i boolean, s bigint default 42) using parquet")
assert(intercept[AnalysisException] {
sql("insert into t select false, default + 1")
-
}.getMessage.contains(ResolveDefaultColumns.DEFAULTS_IN_EXPRESSIONS_ERROR))
+ }.getMessage.contains(
+
QueryCompilationErrors.defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList()
+ .getMessage))
}
// Explicit default values have a reasonable error path if the table is
not found.
withTable("t") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]