This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 104e43b75f4f [SPARK-55903][SQL] Simplify MERGE Schema Evolution and
Check Write Privileges
104e43b75f4f is described below
commit 104e43b75f4f106b03c63f0f0e5fd18d69b1cdf9
Author: Szehon Ho <[email protected]>
AuthorDate: Wed Mar 11 14:31:21 2026 +0100
[SPARK-55903][SQL] Simplify MERGE Schema Evolution and Check Write
Privileges
### What changes were proposed in this pull request?
Some simplification for Merge Into Schema Evolution and minor bug fixes
- The biggest cleanup is getting rid of 'needSchemaEvolution',
'canEvaluateSchemaEvolution', 'changesForSchemaEvolution'. The three-part
state is because the rule to evaluate schema evolution needed the analyzer to
resolve all the references it can (minus the un-resolved target references in
the query that would be solved by schema evolution). Thus it needed a weird
'canEvaluateSchemaEvolution' state to block until that happened. Now, the code
has a simple 'pendingChanges' that is [...]
- Load the table from catalog with write Privileges, previously it was not
doing so and could be performing schema evolution without the privilege
- Catch and wrap the exception properly
### Why are the changes needed?
Discussed with aokolnychyi on the state of Spark 4.1 schema evolution , and
he suggested these changes as the code is currently confusing. Not using the
write privileges is also wrong.
### Does this PR introduce _any_ user-facing change?
Write privilege is now enforced (if any system used DSV2 privileges).
Error message are changed.
### How was this patch tested?
Run existing unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54704 from szehon-ho/anton_refactor_merge_1.
Authored-by: Szehon Ho <[email protected]>
Signed-off-by: Anton Okolnychyi <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 24 +++--
.../spark/sql/catalyst/analysis/Analyzer.scala | 15 +--
.../analysis/ResolveMergeIntoSchemaEvolution.scala | 101 ++++++++++---------
.../ResolveRowLevelCommandAssignments.scala | 29 ++++--
.../catalyst/analysis/RewriteMergeIntoTable.scala | 12 +--
.../sql/catalyst/plans/logical/v2Commands.scala | 109 ++++++++++++---------
.../spark/sql/catalyst/types/DataTypeUtils.scala | 6 ++
.../spark/sql/errors/QueryCompilationErrors.scala | 26 +++--
.../datasources/v2/DataSourceV2Relation.scala | 4 +-
.../MergeIntoSchemaEvolutionExtraSQLTests.scala | 4 +-
10 files changed, 199 insertions(+), 131 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 5db07c4cea52..38be710e9865 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -6782,6 +6782,24 @@
],
"sqlState" : "0A000"
},
+ "UNSUPPORTED_AUTO_SCHEMA_EVOLUTION_CHANGES" : {
+ "message" : [
+ "Automatic schema evolution failed for table <tableName>."
+ ],
+ "subClass" : {
+ "FAILED_EVOLUTION" : {
+ "message" : [
+ "The catalog returned the error: <detail>"
+ ]
+ },
+ "PARTIAL_EVOLUTION" : {
+ "message" : [
+ "The catalog did not support or only partially applied the following
changes: <changes>."
+ ]
+ }
+ },
+ "sqlState" : "42000"
+ },
"UNSUPPORTED_CALL" : {
"message" : [
"Cannot call the method \"<methodName>\" of the class \"<className>\"."
@@ -7658,12 +7676,6 @@
},
"sqlState" : "0A000"
},
- "UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION" : {
- "message" : [
- "Operation could not apply the following schema changes to table
<tableName> because the catalog did not support or only partially applied them:
<changes>."
- ],
- "sqlState" : "42000"
- },
"UNSUPPORTED_TABLE_CHANGE_IN_JDBC_CATALOG" : {
"message" : [
"The table change <change> is not supported for the JDBC catalog on
table <tableName>. Supported changes include: AddColumn, RenameColumn,
DeleteColumn, UpdateColumnType, UpdateColumnNullability."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 12fc0f0a09fa..e6d1d6da06b6 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1565,8 +1565,7 @@ class Analyzer(
UpdateAction(
resolvedUpdateCondition,
// The update value can access columns from both target and
source tables.
- resolveAssignments(assignments, m, MergeResolvePolicy.BOTH,
- throws = throws),
+ resolveAssignments(assignments, m, MergeResolvePolicy.BOTH,
throws),
fromStar)
case UpdateStarAction(updateCondition) =>
// Expand star to top level source columns. If source has
less columns than target,
@@ -1587,8 +1586,7 @@ class Analyzer(
UpdateAction(
updateCondition.map(resolveExpressionByPlanChildren(_, m)),
// For UPDATE *, the value must be from source table.
- resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE,
- throws = throws),
+ resolveAssignments(assignments, m,
MergeResolvePolicy.SOURCE, throws),
fromStar = true)
case o => o
}
@@ -1600,8 +1598,7 @@ class Analyzer(
resolveExpressionByPlanOutput(_, m.sourceTable))
InsertAction(
resolvedInsertCondition,
- resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE,
- throws = throws))
+ resolveAssignments(assignments, m,
MergeResolvePolicy.SOURCE, throws))
case InsertStarAction(insertCondition) =>
// The insert action is used when not matched, so its
condition and value can only
// access columns from the source table.
@@ -1624,8 +1621,7 @@ class Analyzer(
}
InsertAction(
resolvedInsertCondition,
- resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE,
- throws = throws))
+ resolveAssignments(assignments, m,
MergeResolvePolicy.SOURCE, throws))
case o => o
}
val newNotMatchedBySourceActions = m.notMatchedBySourceActions.map
{
@@ -1639,8 +1635,7 @@ class Analyzer(
UpdateAction(
resolvedUpdateCondition,
// The update value can access columns from the target table
only.
- resolveAssignments(assignments, m, MergeResolvePolicy.TARGET,
- throws = throws),
+ resolveAssignments(assignments, m,
MergeResolvePolicy.TARGET, throws),
fromStar)
case o => o
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
index bbb8e7852b2c..ff8f08adb5aa 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
@@ -17,18 +17,19 @@
package org.apache.spark.sql.catalyst.analysis
-import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+import scala.util.control.NonFatal
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.SparkThrowable
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.AttributeMap
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog,
TableChange}
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.connector.catalog.{Identifier, Table,
TableCatalog, TableChange}
+import org.apache.spark.sql.errors.{QueryCompilationErrors,
QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.StructType
-
+import
org.apache.spark.sql.execution.datasources.v2.ExtractV2CatalogAndIdentifier
/**
* A rule that resolves schema evolution for MERGE INTO.
@@ -40,46 +41,58 @@ object ResolveMergeIntoSchemaEvolution extends
Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
// This rule should run only if all assignments are resolved, except those
// that will be satisfied by schema evolution
- case m@MergeIntoTable(_, _, _, _, _, _, _) if m.evaluateSchemaEvolution =>
- val changes = m.changesForSchemaEvolution
- if (changes.isEmpty) {
- m
- } else {
- val finalAttrMapping = ArrayBuffer.empty[(Attribute, Attribute)]
- val newTarget = m.targetTable.transform {
- case r: DataSourceV2Relation =>
- val referencedSourceSchema =
MergeIntoTable.sourceSchemaForSchemaEvolution(m)
- val newTarget = performSchemaEvolution(r, referencedSourceSchema,
changes)
- val oldTargetOutput = m.targetTable.output
- val newTargetOutput = newTarget.output
- val attributeMapping = oldTargetOutput.zip(newTargetOutput)
- finalAttrMapping ++= attributeMapping
- newTarget
- }
- val res = m.copy(targetTable = newTarget)
- res.rewriteAttrs(AttributeMap(finalAttrMapping.toSeq))
+ case m: MergeIntoTable if m.pendingSchemaChanges.nonEmpty =>
+ EliminateSubqueryAliases(m.targetTable) match {
+ case ExtractV2CatalogAndIdentifier(catalog, ident) =>
+ evolveSchema(catalog, ident, m.pendingSchemaChanges)
+ val writePrivileges = MergeIntoTable.getWritePrivileges(m)
+ val newTable = catalog.loadTable(ident, writePrivileges.toSet.asJava)
+ val mergeWithNewTarget = replaceMergeTarget(m, newTable)
+
+ val remainingChanges = mergeWithNewTarget.pendingSchemaChanges
+ if (remainingChanges.nonEmpty) {
+ throw
QueryCompilationErrors.unsupportedAutoSchemaEvolutionChangesError(
+ catalog, ident, remainingChanges)
+ }
+
+ mergeWithNewTarget
+ case _ =>
+ m
}
}
- private def performSchemaEvolution(
- relation: DataSourceV2Relation,
- referencedSourceSchema: StructType,
- changes: Array[TableChange]): DataSourceV2Relation = {
- (relation.catalog, relation.identifier) match {
- case (Some(c: TableCatalog), Some(i)) =>
- c.alterTable(i, changes: _*)
- val newTable = c.loadTable(i)
- val newSchema = CatalogV2Util.v2ColumnsToStructType(newTable.columns())
- // Check if there are any remaining changes not applied.
- val remainingChanges = MergeIntoTable.schemaChanges(newSchema,
referencedSourceSchema)
- if (remainingChanges.nonEmpty) {
- throw
QueryCompilationErrors.unsupportedTableChangesInAutoSchemaEvolutionError(
- remainingChanges, i.toQualifiedNameParts(c))
- }
- relation.copy(table = newTable, output =
DataTypeUtils.toAttributes(newSchema))
- case _ => logWarning(s"Schema Evolution enabled but data source
$relation " +
- s"does not support it, skipping.")
- relation
+ private def evolveSchema(
+ catalog: TableCatalog,
+ ident: Identifier,
+ changes: Seq[TableChange]): Unit = {
+ try {
+ catalog.alterTable(ident, changes: _*)
+ } catch {
+ case e: IllegalArgumentException if !e.isInstanceOf[SparkThrowable] =>
+ throw QueryExecutionErrors.unsupportedTableChangeError(e)
+ case NonFatal(e) =>
+ throw QueryCompilationErrors.failedAutoSchemaEvolutionError(
+ catalog, ident, e)
+ }
+ }
+
+ private def replaceMergeTarget(
+ merge: MergeIntoTable,
+ newTable: Table): MergeIntoTable = {
+ val oldOutput = merge.targetTable.output
+ val newOutput = DataTypeUtils.toAttributes(newTable.columns)
+ val newTargetTable = merge.targetTable.transform {
+ case r: DataSourceV2Relation => r.copy(table = newTable, output =
newOutput)
}
+ val mergeWithNewTargetTable = merge.copy(targetTable = newTargetTable)
+ rewriteAttrs(mergeWithNewTargetTable, oldOutput, newOutput)
+ }
+
+ private def rewriteAttrs[T <: LogicalPlan](
+ plan: T,
+ oldOutput: Seq[Attribute],
+ newOutput: Seq[Attribute]): T = {
+ val attrMap = AttributeMap(oldOutput.zip(newOutput))
+ plan.rewriteAttrs(attrMap).asInstanceOf[T]
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
index bf1016ba8268..76035ea819ff 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
/**
@@ -50,24 +49,36 @@ object ResolveRowLevelCommandAssignments extends
Rule[LogicalPlan] {
case u: UpdateTable if !u.skipSchemaResolution && u.resolved && !u.aligned
=>
resolveAssignments(u)
- case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved &&
m.rewritable && !m.aligned &&
- !m.needSchemaEvolution =>
+ case m: MergeIntoTable if m.rewritable && shouldAlignAssignments(m) &&
containsFinalSchema(m) =>
validateStoreAssignmentPolicy()
- val coerceNestedTypes = SQLConf.get.coerceMergeNestedTypes &&
m.withSchemaEvolution
+ val coerceNestedTypes = conf.coerceMergeNestedTypes &&
m.withSchemaEvolution
m.copy(
targetTable = cleanAttrMetadata(m.targetTable),
- matchedActions = alignActions(m.targetTable.output, m.matchedActions,
+ matchedActions = alignActions(
+ m.targetTable.output,
+ m.matchedActions,
coerceNestedTypes),
- notMatchedActions = alignActions(m.targetTable.output,
m.notMatchedActions,
+ notMatchedActions = alignActions(
+ m.targetTable.output,
+ m.notMatchedActions,
coerceNestedTypes),
- notMatchedBySourceActions = alignActions(m.targetTable.output,
m.notMatchedBySourceActions,
+ notMatchedBySourceActions = alignActions(
+ m.targetTable.output,
+ m.notMatchedBySourceActions,
coerceNestedTypes))
- case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved &&
!m.aligned
- && !m.needSchemaEvolution =>
+ case m: MergeIntoTable if shouldAlignAssignments(m) &&
containsFinalSchema(m) =>
resolveAssignments(m)
}
+ private def shouldAlignAssignments(m: MergeIntoTable): Boolean = {
+ !m.skipSchemaResolution && m.resolved && !m.aligned
+ }
+
+ private def containsFinalSchema(m: MergeIntoTable): Boolean = {
+ !m.schemaEvolutionEnabled || (m.schemaEvolutionReady &&
m.pendingSchemaChanges.isEmpty)
+ }
+
private def validateStoreAssignmentPolicy(): Unit = {
// SPARK-28730: LEGACY store assignment policy is disallowed in data
source v2
if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
index 9675ee232786..8ff734c7a9a0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
@@ -44,9 +44,10 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand
with PredicateHelper
private final val ROW_FROM_TARGET = "__row_from_target"
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ // aligned is false when schema evolution is pending (see
ResolveRowLevelCommandAssignments)
case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions,
notMatchedActions,
- notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned
&&
- !m.needSchemaEvolution && matchedActions.isEmpty &&
notMatchedActions.size == 1 &&
+ notMatchedBySourceActions, _) if m.resolved && m.rewritable &&
m.aligned &&
+ matchedActions.isEmpty && notMatchedActions.size == 1 &&
notMatchedBySourceActions.isEmpty =>
EliminateSubqueryAliases(aliasedTable) match {
@@ -79,8 +80,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand
with PredicateHelper
}
case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions,
notMatchedActions,
- notMatchedBySourceActions, _)
- if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution &&
+ notMatchedBySourceActions, _) if m.resolved && m.rewritable &&
m.aligned &&
matchedActions.isEmpty && notMatchedBySourceActions.isEmpty =>
EliminateSubqueryAliases(aliasedTable) match {
@@ -121,9 +121,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand
with PredicateHelper
}
case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions,
notMatchedActions,
- notMatchedBySourceActions, _)
- if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution =>
-
+ notMatchedBySourceActions, _) if m.resolved && m.rewritable &&
m.aligned =>
EliminateSubqueryAliases(aliasedTable) match {
case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) =>
validateMergeIntoConditions(m)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 06a4d85a856c..a38e067861d4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -893,67 +893,57 @@ case class MergeIntoTable(
case _ => false
}
- lazy val needSchemaEvolution: Boolean =
- evaluateSchemaEvolution && changesForSchemaEvolution.nonEmpty
-
- lazy val evaluateSchemaEvolution: Boolean =
- schemaEvolutionEnabled &&
- canEvaluateSchemaEvolution
+ lazy val pendingSchemaChanges: Seq[TableChange] = {
+ if (schemaEvolutionEnabled && schemaEvolutionReady) {
+ val referencedSourceSchema =
MergeIntoTable.sourceSchemaForSchemaEvolution(this)
+ MergeIntoTable.schemaChanges(targetTable.schema,
referencedSourceSchema).toSeq
+ } else {
+ Seq.empty
+ }
+ }
lazy val schemaEvolutionEnabled: Boolean = withSchemaEvolution && {
EliminateSubqueryAliases(targetTable) match {
- case r: DataSourceV2Relation if r.autoSchemaEvolution() => true
+ case r: DataSourceV2Relation if r.autoSchemaEvolution => true
case _ => false
}
}
// Guard that assignments are either resolved or candidates for evolution
before
// evaluating schema evolution. We need to use resolved assignment values to
check
- // candidates, see MergeIntoTable.sourceSchemaForSchemaEvolution for details.
- lazy val canEvaluateSchemaEvolution: Boolean = {
- if ((!targetTable.resolved) || (!sourceTable.resolved)) {
- false
- } else {
- val actions = matchedActions ++ notMatchedActions
- val hasStarActions = actions.exists {
- case _: UpdateStarAction => true
- case _: InsertStarAction => true
- case _ => false
- }
- if (hasStarActions) {
- // need to resolve star actions first
- false
- } else {
- val assignments = actions.collect {
- case a: UpdateAction => a.assignments
- case a: InsertAction => a.assignments
- }.flatten
- val sourcePaths =
DataTypeUtils.extractAllFieldPaths(sourceTable.schema)
- assignments.forall { assignment =>
- assignment.resolved ||
- (assignment.value.resolved && sourcePaths.exists {
- path => MergeIntoTable.isEqual(assignment, path)
- })
- }
- }
- }
+ // candidates, see MergeIntoTable.isSchemaEvolutionCandidate for details.
+ lazy val schemaEvolutionReady: Boolean = {
+ targetTable.resolved && sourceTable.resolved && actionsSchemaEvolutionReady
}
- private lazy val sourceSchemaForEvolution: StructType =
- MergeIntoTable.sourceSchemaForSchemaEvolution(this)
-
- lazy val changesForSchemaEvolution: Array[TableChange] =
- MergeIntoTable.schemaChanges(targetTable.schema, sourceSchemaForEvolution)
+ private def actionsSchemaEvolutionReady: Boolean = {
+ val actions = matchedActions ++ notMatchedActions
+ actions.forall {
+ case a: UpdateAction =>
MergeIntoTable.areSchemaEvolutionReady(a.assignments, sourceTable)
+ case a: InsertAction =>
MergeIntoTable.areSchemaEvolutionReady(a.assignments, sourceTable)
+ case _ => false
+ }
+ }
override def left: LogicalPlan = targetTable
override def right: LogicalPlan = sourceTable
+
override protected def withNewChildrenInternal(
- newLeft: LogicalPlan, newRight: LogicalPlan): MergeIntoTable =
+ newLeft: LogicalPlan,
+ newRight: LogicalPlan): MergeIntoTable = {
copy(targetTable = newLeft, sourceTable = newRight)
+ }
}
object MergeIntoTable {
+ def getWritePrivileges(merge: MergeIntoTable): Seq[TableWritePrivilege] = {
+ getWritePrivileges(
+ merge.matchedActions,
+ merge.notMatchedActions,
+ merge.notMatchedBySourceActions)
+ }
+
def getWritePrivileges(
matchedActions: Iterable[MergeAction],
notMatchedActions: Iterable[MergeAction],
@@ -997,7 +987,7 @@ object MergeIntoTable {
// Identify the newly added fields and append to the end
val currentFieldMap = toFieldMap(currentFields)
val adds = newFields.filterNot(f => currentFieldMap.contains(f.name))
- .map(f => TableChange.addColumn(fieldPath ++ Set(f.name),
f.dataType))
+ .map(f => TableChange.addColumn(fieldPath ++ Seq(f.name),
f.dataType))
updates ++ adds
@@ -1106,11 +1096,40 @@ object MergeIntoTable {
// Example: UPDATE SET target.a = source.a
private def isEqual(assignment: Assignment, sourceFieldPath: Seq[String]):
Boolean = {
// key must be a non-qualified field path that may be added to target
schema via evolution
- val assignmenKeyExpr = extractFieldPath(assignment.key, allowUnresolved =
true)
+ val assignmentKeyExpr = extractFieldPath(assignment.key, allowUnresolved =
true)
// value should always be resolved (from source)
val assignmentValueExpr = extractFieldPath(assignment.value,
allowUnresolved = false)
- assignmenKeyExpr == assignmentValueExpr &&
- assignmenKeyExpr == sourceFieldPath
+ assignmentKeyExpr == assignmentValueExpr && assignmentKeyExpr ==
sourceFieldPath
+ }
+
+ private def areSchemaEvolutionReady(
+ assignments: Seq[Assignment],
+ source: LogicalPlan): Boolean = {
+ assignments.forall(assign => assign.resolved ||
isSchemaEvolutionCandidate(assign, source))
+ }
+
+ private def isSchemaEvolutionCandidate(assignment: Assignment, source:
LogicalPlan): Boolean = {
+ assignment.value.resolved && isSameColumnAssignment(assignment, source)
+ }
+
+ // Helper method to check if an assignment key is equal to a source column
+ // and if the assignment value is that same source column.
+ //
+ // Top-level example: UPDATE SET target.a = source.a
+ // key: AttributeReference("a", ...) -> path Seq("a")
+ // value: AttributeReference("a", ...) from source
+ //
+ // Nested example: UPDATE SET addr.city = source.addr.city
+ // key: GetStructField(GetStructField(AttributeReference("addr", ...),
0), 1)
+ // value: GetStructField(GetStructField(AttributeReference("addr", ...),
0), 1) from source
+ //
+ // references contains only root attributes, so subsetOf(source.outputSet)
works for both.
+ private def isSameColumnAssignment(assignment: Assignment, source:
LogicalPlan): Boolean = {
+ // key must be a non-qualified field path that may be added to target
schema via evolution
+ val keyPath = extractFieldPath(assignment.key, allowUnresolved = true)
+ // value should always be resolved (from source)
+ val valuePath = extractFieldPath(assignment.value, allowUnresolved = false)
+ keyPath == valuePath &&
assignment.value.references.subsetOf(source.outputSet)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
index 20f2f97fea6c..9c17fd9064b1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
@@ -20,6 +20,8 @@ import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Cast, Literal}
import org.apache.spark.sql.catalyst.util.CollationFactory
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
+import org.apache.spark.sql.connector.catalog.CatalogV2Util
+import org.apache.spark.sql.connector.catalog.Column
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy.{ANSI,
STRICT}
@@ -238,6 +240,10 @@ object DataTypeUtils {
schema.map(toAttribute)
}
+ def toAttributes(columns: Array[Column]): Seq[AttributeReference] = {
+ toAttributes(CatalogV2Util.v2ColumnsToStructType(columns))
+ }
+
def fromAttributes(attributes: Seq[Attribute]): StructType =
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable,
a.metadata)))
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index a784276200c0..2d0445b9fb42 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3514,14 +3514,28 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
"change" -> change.toString, "tableName" ->
toSQLId(sanitizedTableName)))
}
- def unsupportedTableChangesInAutoSchemaEvolutionError(
- changes: Array[TableChange], tableName: Seq[String]): Throwable = {
- val sanitizedTableName = tableName.map(_.replaceAll("\"", ""))
- val changesDesc = changes.map(_.toString).mkString("; ")
+ def unsupportedAutoSchemaEvolutionChangesError(
+ catalog: CatalogPlugin,
+ ident: Identifier,
+ remainingChanges: Seq[TableChange]): Throwable = {
+ new AnalysisException(
+ errorClass =
"UNSUPPORTED_AUTO_SCHEMA_EVOLUTION_CHANGES.PARTIAL_EVOLUTION",
+ messageParameters = Map(
+ "tableName" -> toSQLId(ident.toQualifiedNameParts(catalog)),
+ "changes" -> remainingChanges.mkString("; ")))
+ }
+
+ def failedAutoSchemaEvolutionError(
+ catalog: CatalogPlugin,
+ ident: Identifier,
+ cause: Throwable): Throwable = {
+ val detail = Option(cause.getMessage).getOrElse("Unknown error")
new AnalysisException(
- errorClass = "UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION",
+ errorClass =
"UNSUPPORTED_AUTO_SCHEMA_EVOLUTION_CHANGES.FAILED_EVOLUTION",
messageParameters = Map(
- "changes" -> changesDesc, "tableName" -> toSQLId(sanitizedTableName)))
+ "tableName" -> toSQLId(ident.toQualifiedNameParts(catalog)),
+ "detail" -> detail),
+ cause = Some(cause))
}
def pathOptionNotSetCorrectlyWhenReadingError(): Throwable = {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 970ab1a32710..8eb2f958b447 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -138,8 +138,8 @@ case class DataSourceV2Relation(
}
}
- def autoSchemaEvolution(): Boolean =
- table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)
+ def autoSchemaEvolution: Boolean =
+ table.capabilities.contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)
def isVersioned: Boolean = table.version != null
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
index 3bbeb0db1b74..8565c0b31c0c 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
@@ -135,8 +135,8 @@ trait MergeIntoSchemaEvolutionExtraSQLTests extends
RowLevelOperationSuiteBase {
|VALUES (s.pk, s.salary, s.dep, s.active)
|""".stripMargin)
}
- assert(ex.getCondition ===
"UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION",
- s"Expected error class
UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION but got: " +
+
assert(ex.getCondition.startsWith("UNSUPPORTED_AUTO_SCHEMA_EVOLUTION_CHANGES"),
+ s"Expected error class UNSUPPORTED_AUTO_SCHEMA_EVOLUTION_CHANGES but
got: " +
s"${ex.getCondition}. Message: ${ex.getMessage}")
assert(ex.getMessageParameters.get("tableName") != null,
s"Error message should mention table name: ${ex.getMessage}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]