This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 104e43b75f4f [SPARK-55903][SQL] Simplify MERGE Schema Evolution and 
Check Write Privileges
104e43b75f4f is described below

commit 104e43b75f4f106b03c63f0f0e5fd18d69b1cdf9
Author: Szehon Ho <[email protected]>
AuthorDate: Wed Mar 11 14:31:21 2026 +0100

    [SPARK-55903][SQL] Simplify MERGE Schema Evolution and Check Write 
Privileges
    
    ### What changes were proposed in this pull request?
    Some simplification for Merge Into Schema Evolution and minor bug fixes
    - The biggest cleanup is getting rid of 'needSchemaEvolution', 
'canEvaluateSchemaEvolution', 'changesForSchemaEvolution'.  The three-part 
state is because the rule to evaluate schema evolution needed the analyzer to 
resolve all the references it can (minus the un-resolved target references in 
the query that would be solved by schema evolution).  Thus it needed a weird 
'canEvaluateSchemaEvolution' state to block until that happened.  Now, the code 
has a simple 'pendingChanges' that is  [...]
    - Load the table from catalog with write Privileges, previously it was not 
doing so and could be performing schema evolution without the privilege
    - Catch and wrap the exception properly
    
    ### Why are the changes needed?
    Discussed with aokolnychyi on the state of Spark 4.1 schema evolution , and 
he suggested these changes as the code is currently confusing.  Not using the 
write privileges is also wrong.
    
    ### Does this PR introduce _any_ user-facing change?
    Write privilege is now enforced (if any system used DSV2 privileges).  
Error message are changed.
    
    ### How was this patch tested?
    Run existing unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #54704 from szehon-ho/anton_refactor_merge_1.
    
    Authored-by: Szehon Ho <[email protected]>
    Signed-off-by: Anton Okolnychyi <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  24 +++--
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  15 +--
 .../analysis/ResolveMergeIntoSchemaEvolution.scala | 101 ++++++++++---------
 .../ResolveRowLevelCommandAssignments.scala        |  29 ++++--
 .../catalyst/analysis/RewriteMergeIntoTable.scala  |  12 +--
 .../sql/catalyst/plans/logical/v2Commands.scala    | 109 ++++++++++++---------
 .../spark/sql/catalyst/types/DataTypeUtils.scala   |   6 ++
 .../spark/sql/errors/QueryCompilationErrors.scala  |  26 +++--
 .../datasources/v2/DataSourceV2Relation.scala      |   4 +-
 .../MergeIntoSchemaEvolutionExtraSQLTests.scala    |   4 +-
 10 files changed, 199 insertions(+), 131 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 5db07c4cea52..38be710e9865 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -6782,6 +6782,24 @@
     ],
     "sqlState" : "0A000"
   },
+  "UNSUPPORTED_AUTO_SCHEMA_EVOLUTION_CHANGES" : {
+    "message" : [
+      "Automatic schema evolution failed for table <tableName>."
+    ],
+    "subClass" : {
+      "FAILED_EVOLUTION" : {
+        "message" : [
+          "The catalog returned the error: <detail>"
+        ]
+      },
+      "PARTIAL_EVOLUTION" : {
+        "message" : [
+          "The catalog did not support or only partially applied the following 
changes: <changes>."
+        ]
+      }
+    },
+    "sqlState" : "42000"
+  },
   "UNSUPPORTED_CALL" : {
     "message" : [
       "Cannot call the method \"<methodName>\" of the class \"<className>\"."
@@ -7658,12 +7676,6 @@
     },
     "sqlState" : "0A000"
   },
-  "UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION" : {
-    "message" : [
-      "Operation could not apply the following schema changes to table 
<tableName> because the catalog did not support or only partially applied them: 
<changes>."
-    ],
-    "sqlState" : "42000"
-  },
   "UNSUPPORTED_TABLE_CHANGE_IN_JDBC_CATALOG" : {
     "message" : [
       "The table change <change> is not supported for the JDBC catalog on 
table <tableName>. Supported changes include: AddColumn, RenameColumn, 
DeleteColumn, UpdateColumnType, UpdateColumnNullability."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 12fc0f0a09fa..e6d1d6da06b6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1565,8 +1565,7 @@ class Analyzer(
                 UpdateAction(
                   resolvedUpdateCondition,
                   // The update value can access columns from both target and 
source tables.
-                  resolveAssignments(assignments, m, MergeResolvePolicy.BOTH,
-                    throws = throws),
+                  resolveAssignments(assignments, m, MergeResolvePolicy.BOTH, 
throws),
                   fromStar)
               case UpdateStarAction(updateCondition) =>
                 // Expand star to top level source columns.  If source has 
less columns than target,
@@ -1587,8 +1586,7 @@ class Analyzer(
                 UpdateAction(
                   updateCondition.map(resolveExpressionByPlanChildren(_, m)),
                   // For UPDATE *, the value must be from source table.
-                  resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE,
-                    throws = throws),
+                  resolveAssignments(assignments, m, 
MergeResolvePolicy.SOURCE, throws),
                   fromStar = true)
               case o => o
             }
@@ -1600,8 +1598,7 @@ class Analyzer(
                   resolveExpressionByPlanOutput(_, m.sourceTable))
                 InsertAction(
                   resolvedInsertCondition,
-                  resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE,
-                    throws = throws))
+                  resolveAssignments(assignments, m, 
MergeResolvePolicy.SOURCE, throws))
               case InsertStarAction(insertCondition) =>
                 // The insert action is used when not matched, so its 
condition and value can only
                 // access columns from the source table.
@@ -1624,8 +1621,7 @@ class Analyzer(
                 }
                 InsertAction(
                   resolvedInsertCondition,
-                  resolveAssignments(assignments, m, MergeResolvePolicy.SOURCE,
-                    throws = throws))
+                  resolveAssignments(assignments, m, 
MergeResolvePolicy.SOURCE, throws))
               case o => o
             }
             val newNotMatchedBySourceActions = m.notMatchedBySourceActions.map 
{
@@ -1639,8 +1635,7 @@ class Analyzer(
                 UpdateAction(
                   resolvedUpdateCondition,
                   // The update value can access columns from the target table 
only.
-                  resolveAssignments(assignments, m, MergeResolvePolicy.TARGET,
-                    throws = throws),
+                  resolveAssignments(assignments, m, 
MergeResolvePolicy.TARGET, throws),
                   fromStar)
               case o => o
             }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
index bbb8e7852b2c..ff8f08adb5aa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoSchemaEvolution.scala
@@ -17,18 +17,19 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import scala.collection.mutable.ArrayBuffer
+import scala.jdk.CollectionConverters._
+import scala.util.control.NonFatal
 
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.SparkThrowable
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.AttributeMap
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog, 
TableChange}
-import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, 
TableCatalog, TableChange}
+import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.StructType
-
+import 
org.apache.spark.sql.execution.datasources.v2.ExtractV2CatalogAndIdentifier
 
 /**
  * A rule that resolves schema evolution for MERGE INTO.
@@ -40,46 +41,58 @@ object ResolveMergeIntoSchemaEvolution extends 
Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     // This rule should run only if all assignments are resolved, except those
     // that will be satisfied by schema evolution
-    case m@MergeIntoTable(_, _, _, _, _, _, _) if m.evaluateSchemaEvolution =>
-      val changes = m.changesForSchemaEvolution
-      if (changes.isEmpty) {
-        m
-      } else {
-        val finalAttrMapping = ArrayBuffer.empty[(Attribute, Attribute)]
-        val newTarget = m.targetTable.transform {
-          case r: DataSourceV2Relation =>
-            val referencedSourceSchema = 
MergeIntoTable.sourceSchemaForSchemaEvolution(m)
-            val newTarget = performSchemaEvolution(r, referencedSourceSchema, 
changes)
-            val oldTargetOutput = m.targetTable.output
-            val newTargetOutput = newTarget.output
-            val attributeMapping = oldTargetOutput.zip(newTargetOutput)
-            finalAttrMapping ++= attributeMapping
-            newTarget
-        }
-        val res = m.copy(targetTable = newTarget)
-        res.rewriteAttrs(AttributeMap(finalAttrMapping.toSeq))
+    case m: MergeIntoTable if m.pendingSchemaChanges.nonEmpty =>
+      EliminateSubqueryAliases(m.targetTable) match {
+        case ExtractV2CatalogAndIdentifier(catalog, ident) =>
+          evolveSchema(catalog, ident, m.pendingSchemaChanges)
+          val writePrivileges = MergeIntoTable.getWritePrivileges(m)
+          val newTable = catalog.loadTable(ident, writePrivileges.toSet.asJava)
+          val mergeWithNewTarget = replaceMergeTarget(m, newTable)
+
+          val remainingChanges = mergeWithNewTarget.pendingSchemaChanges
+          if (remainingChanges.nonEmpty) {
+            throw 
QueryCompilationErrors.unsupportedAutoSchemaEvolutionChangesError(
+              catalog, ident, remainingChanges)
+          }
+
+          mergeWithNewTarget
+        case _ =>
+          m
       }
   }
 
-  private def performSchemaEvolution(
-      relation: DataSourceV2Relation,
-      referencedSourceSchema: StructType,
-      changes: Array[TableChange]): DataSourceV2Relation = {
-    (relation.catalog, relation.identifier) match {
-      case (Some(c: TableCatalog), Some(i)) =>
-        c.alterTable(i, changes: _*)
-        val newTable = c.loadTable(i)
-        val newSchema = CatalogV2Util.v2ColumnsToStructType(newTable.columns())
-        // Check if there are any remaining changes not applied.
-        val remainingChanges = MergeIntoTable.schemaChanges(newSchema, 
referencedSourceSchema)
-        if (remainingChanges.nonEmpty) {
-          throw 
QueryCompilationErrors.unsupportedTableChangesInAutoSchemaEvolutionError(
-            remainingChanges, i.toQualifiedNameParts(c))
-        }
-        relation.copy(table = newTable, output = 
DataTypeUtils.toAttributes(newSchema))
-      case _ => logWarning(s"Schema Evolution enabled but data source 
$relation " +
-        s"does not support it, skipping.")
-        relation
+  private def evolveSchema(
+      catalog: TableCatalog,
+      ident: Identifier,
+      changes: Seq[TableChange]): Unit = {
+    try {
+      catalog.alterTable(ident, changes: _*)
+    } catch {
+      case e: IllegalArgumentException if !e.isInstanceOf[SparkThrowable] =>
+        throw QueryExecutionErrors.unsupportedTableChangeError(e)
+      case NonFatal(e) =>
+        throw QueryCompilationErrors.failedAutoSchemaEvolutionError(
+          catalog, ident, e)
+    }
+  }
+
+  private def replaceMergeTarget(
+      merge: MergeIntoTable,
+      newTable: Table): MergeIntoTable = {
+    val oldOutput = merge.targetTable.output
+    val newOutput = DataTypeUtils.toAttributes(newTable.columns)
+    val newTargetTable = merge.targetTable.transform {
+      case r: DataSourceV2Relation => r.copy(table = newTable, output = 
newOutput)
     }
+    val mergeWithNewTargetTable = merge.copy(targetTable = newTargetTable)
+    rewriteAttrs(mergeWithNewTargetTable, oldOutput, newOutput)
+  }
+
+  private def rewriteAttrs[T <: LogicalPlan](
+      plan: T,
+      oldOutput: Seq[Attribute],
+      newOutput: Seq[Attribute]): T = {
+    val attrMap = AttributeMap(oldOutput.zip(newOutput))
+    plan.rewriteAttrs(attrMap).asInstanceOf[T]
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
index bf1016ba8268..76035ea819ff 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.COMMAND
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 
 /**
@@ -50,24 +49,36 @@ object ResolveRowLevelCommandAssignments extends 
Rule[LogicalPlan] {
     case u: UpdateTable if !u.skipSchemaResolution && u.resolved && !u.aligned 
=>
       resolveAssignments(u)
 
-    case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && 
m.rewritable && !m.aligned &&
-      !m.needSchemaEvolution =>
+    case m: MergeIntoTable if m.rewritable && shouldAlignAssignments(m) && 
containsFinalSchema(m) =>
       validateStoreAssignmentPolicy()
-      val coerceNestedTypes = SQLConf.get.coerceMergeNestedTypes && 
m.withSchemaEvolution
+      val coerceNestedTypes = conf.coerceMergeNestedTypes && 
m.withSchemaEvolution
       m.copy(
         targetTable = cleanAttrMetadata(m.targetTable),
-        matchedActions = alignActions(m.targetTable.output, m.matchedActions,
+        matchedActions = alignActions(
+          m.targetTable.output,
+          m.matchedActions,
           coerceNestedTypes),
-        notMatchedActions = alignActions(m.targetTable.output, 
m.notMatchedActions,
+        notMatchedActions = alignActions(
+          m.targetTable.output,
+          m.notMatchedActions,
           coerceNestedTypes),
-        notMatchedBySourceActions = alignActions(m.targetTable.output, 
m.notMatchedBySourceActions,
+        notMatchedBySourceActions = alignActions(
+          m.targetTable.output,
+          m.notMatchedBySourceActions,
           coerceNestedTypes))
 
-    case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && 
!m.aligned
-      && !m.needSchemaEvolution =>
+    case m: MergeIntoTable if shouldAlignAssignments(m) && 
containsFinalSchema(m) =>
       resolveAssignments(m)
   }
 
+  private def shouldAlignAssignments(m: MergeIntoTable): Boolean = {
+    !m.skipSchemaResolution && m.resolved && !m.aligned
+  }
+
+  private def containsFinalSchema(m: MergeIntoTable): Boolean = {
+    !m.schemaEvolutionEnabled || (m.schemaEvolutionReady && 
m.pendingSchemaChanges.isEmpty)
+  }
+
   private def validateStoreAssignmentPolicy(): Unit = {
     // SPARK-28730: LEGACY store assignment policy is disallowed in data 
source v2
     if (conf.storeAssignmentPolicy == StoreAssignmentPolicy.LEGACY) {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
index 9675ee232786..8ff734c7a9a0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
@@ -44,9 +44,10 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand 
with PredicateHelper
   private final val ROW_FROM_TARGET = "__row_from_target"
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    // aligned is false when schema evolution is pending (see 
ResolveRowLevelCommandAssignments)
     case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,
-      notMatchedBySourceActions, _) if m.resolved && m.rewritable && m.aligned 
&&
-        !m.needSchemaEvolution && matchedActions.isEmpty && 
notMatchedActions.size == 1 &&
+        notMatchedBySourceActions, _) if m.resolved && m.rewritable && 
m.aligned &&
+        matchedActions.isEmpty && notMatchedActions.size == 1 &&
         notMatchedBySourceActions.isEmpty =>
 
       EliminateSubqueryAliases(aliasedTable) match {
@@ -79,8 +80,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand 
with PredicateHelper
       }
 
     case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,
-        notMatchedBySourceActions, _)
-      if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution &&
+        notMatchedBySourceActions, _) if m.resolved && m.rewritable && 
m.aligned &&
         matchedActions.isEmpty && notMatchedBySourceActions.isEmpty =>
 
       EliminateSubqueryAliases(aliasedTable) match {
@@ -121,9 +121,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand 
with PredicateHelper
       }
 
     case m @ MergeIntoTable(aliasedTable, source, cond, matchedActions, 
notMatchedActions,
-        notMatchedBySourceActions, _)
-      if m.resolved && m.rewritable && m.aligned && !m.needSchemaEvolution =>
-
+        notMatchedBySourceActions, _) if m.resolved && m.rewritable && 
m.aligned =>
       EliminateSubqueryAliases(aliasedTable) match {
         case r @ ExtractV2Table(tbl: SupportsRowLevelOperations) =>
           validateMergeIntoConditions(m)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 06a4d85a856c..a38e067861d4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -893,67 +893,57 @@ case class MergeIntoTable(
     case _ => false
   }
 
-  lazy val needSchemaEvolution: Boolean =
-    evaluateSchemaEvolution && changesForSchemaEvolution.nonEmpty
-
-  lazy val evaluateSchemaEvolution: Boolean =
-    schemaEvolutionEnabled &&
-      canEvaluateSchemaEvolution
+  lazy val pendingSchemaChanges: Seq[TableChange] = {
+    if (schemaEvolutionEnabled && schemaEvolutionReady) {
+      val referencedSourceSchema = 
MergeIntoTable.sourceSchemaForSchemaEvolution(this)
+      MergeIntoTable.schemaChanges(targetTable.schema, 
referencedSourceSchema).toSeq
+    } else {
+      Seq.empty
+    }
+  }
 
   lazy val schemaEvolutionEnabled: Boolean = withSchemaEvolution && {
     EliminateSubqueryAliases(targetTable) match {
-      case r: DataSourceV2Relation if r.autoSchemaEvolution() => true
+      case r: DataSourceV2Relation if r.autoSchemaEvolution => true
       case _ => false
     }
   }
 
   // Guard that assignments are either resolved or candidates for evolution 
before
   // evaluating schema evolution. We need to use resolved assignment values to 
check
-  // candidates, see MergeIntoTable.sourceSchemaForSchemaEvolution for details.
-  lazy val canEvaluateSchemaEvolution: Boolean = {
-    if ((!targetTable.resolved) || (!sourceTable.resolved)) {
-      false
-    } else {
-      val actions = matchedActions ++ notMatchedActions
-      val hasStarActions = actions.exists {
-        case _: UpdateStarAction => true
-        case _: InsertStarAction => true
-        case _ => false
-      }
-      if (hasStarActions) {
-        // need to resolve star actions first
-        false
-      } else {
-        val assignments = actions.collect {
-          case a: UpdateAction => a.assignments
-          case a: InsertAction => a.assignments
-        }.flatten
-        val sourcePaths = 
DataTypeUtils.extractAllFieldPaths(sourceTable.schema)
-        assignments.forall { assignment =>
-          assignment.resolved ||
-            (assignment.value.resolved && sourcePaths.exists {
-              path => MergeIntoTable.isEqual(assignment, path)
-            })
-        }
-      }
-    }
+  // candidates, see MergeIntoTable.isSchemaEvolutionCandidate for details.
+  lazy val schemaEvolutionReady: Boolean = {
+    targetTable.resolved && sourceTable.resolved && actionsSchemaEvolutionReady
   }
 
-  private lazy val sourceSchemaForEvolution: StructType =
-    MergeIntoTable.sourceSchemaForSchemaEvolution(this)
-
-  lazy val changesForSchemaEvolution: Array[TableChange] =
-    MergeIntoTable.schemaChanges(targetTable.schema, sourceSchemaForEvolution)
+  private def actionsSchemaEvolutionReady: Boolean = {
+    val actions = matchedActions ++ notMatchedActions
+    actions.forall {
+      case a: UpdateAction => 
MergeIntoTable.areSchemaEvolutionReady(a.assignments, sourceTable)
+      case a: InsertAction => 
MergeIntoTable.areSchemaEvolutionReady(a.assignments, sourceTable)
+      case _ => false
+    }
+  }
 
   override def left: LogicalPlan = targetTable
   override def right: LogicalPlan = sourceTable
+
   override protected def withNewChildrenInternal(
-      newLeft: LogicalPlan, newRight: LogicalPlan): MergeIntoTable =
+      newLeft: LogicalPlan,
+      newRight: LogicalPlan): MergeIntoTable = {
     copy(targetTable = newLeft, sourceTable = newRight)
+  }
 }
 
 object MergeIntoTable {
 
+  def getWritePrivileges(merge: MergeIntoTable): Seq[TableWritePrivilege] = {
+    getWritePrivileges(
+      merge.matchedActions,
+      merge.notMatchedActions,
+      merge.notMatchedBySourceActions)
+  }
+
   def getWritePrivileges(
       matchedActions: Iterable[MergeAction],
       notMatchedActions: Iterable[MergeAction],
@@ -997,7 +987,7 @@ object MergeIntoTable {
         // Identify the newly added fields and append to the end
         val currentFieldMap = toFieldMap(currentFields)
         val adds = newFields.filterNot(f => currentFieldMap.contains(f.name))
-          .map(f => TableChange.addColumn(fieldPath ++ Set(f.name), 
f.dataType))
+          .map(f => TableChange.addColumn(fieldPath ++ Seq(f.name), 
f.dataType))
 
         updates ++ adds
 
@@ -1106,11 +1096,40 @@ object MergeIntoTable {
   // Example: UPDATE SET target.a = source.a
   private def isEqual(assignment: Assignment, sourceFieldPath: Seq[String]): 
Boolean = {
     // key must be a non-qualified field path that may be added to target 
schema via evolution
-    val assignmenKeyExpr = extractFieldPath(assignment.key, allowUnresolved = 
true)
+    val assignmentKeyExpr = extractFieldPath(assignment.key, allowUnresolved = 
true)
     // value should always be resolved (from source)
     val assignmentValueExpr = extractFieldPath(assignment.value, 
allowUnresolved = false)
-    assignmenKeyExpr == assignmentValueExpr &&
-      assignmenKeyExpr == sourceFieldPath
+    assignmentKeyExpr == assignmentValueExpr && assignmentKeyExpr == 
sourceFieldPath
+  }
+
+  private def areSchemaEvolutionReady(
+      assignments: Seq[Assignment],
+      source: LogicalPlan): Boolean = {
+    assignments.forall(assign => assign.resolved || 
isSchemaEvolutionCandidate(assign, source))
+  }
+
+  private def isSchemaEvolutionCandidate(assignment: Assignment, source: 
LogicalPlan): Boolean = {
+    assignment.value.resolved && isSameColumnAssignment(assignment, source)
+  }
+
+  // Helper method to check if an assignment key is equal to a source column
+  // and if the assignment value is that same source column.
+  //
+  // Top-level example: UPDATE SET target.a = source.a
+  //   key:   AttributeReference("a", ...) -> path Seq("a")
+  //   value: AttributeReference("a", ...) from source
+  //
+  // Nested example: UPDATE SET addr.city = source.addr.city
+  //   key:   GetStructField(GetStructField(AttributeReference("addr", ...), 
0), 1)
+  //   value: GetStructField(GetStructField(AttributeReference("addr", ...), 
0), 1) from source
+  //
+  // references contains only root attributes, so subsetOf(source.outputSet) 
works for both.
+  private def isSameColumnAssignment(assignment: Assignment, source: 
LogicalPlan): Boolean = {
+    // key must be a non-qualified field path that may be added to target 
schema via evolution
+    val keyPath = extractFieldPath(assignment.key, allowUnresolved = true)
+    // value should always be resolved (from source)
+    val valuePath = extractFieldPath(assignment.value, allowUnresolved = false)
+    keyPath == valuePath && 
assignment.value.references.subsetOf(source.outputSet)
   }
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
index 20f2f97fea6c..9c17fd9064b1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/DataTypeUtils.scala
@@ -20,6 +20,8 @@ import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Cast, Literal}
 import org.apache.spark.sql.catalyst.util.CollationFactory
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
+import org.apache.spark.sql.connector.catalog.CatalogV2Util
+import org.apache.spark.sql.connector.catalog.Column
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy.{ANSI, 
STRICT}
@@ -238,6 +240,10 @@ object DataTypeUtils {
     schema.map(toAttribute)
   }
 
+  def toAttributes(columns: Array[Column]): Seq[AttributeReference] = {
+    toAttributes(CatalogV2Util.v2ColumnsToStructType(columns))
+  }
+
   def fromAttributes(attributes: Seq[Attribute]): StructType =
     StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, 
a.metadata)))
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index a784276200c0..2d0445b9fb42 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -3514,14 +3514,28 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
         "change" -> change.toString, "tableName" -> 
toSQLId(sanitizedTableName)))
   }
 
-  def unsupportedTableChangesInAutoSchemaEvolutionError(
-      changes: Array[TableChange], tableName: Seq[String]): Throwable = {
-    val sanitizedTableName = tableName.map(_.replaceAll("\"", ""))
-    val changesDesc = changes.map(_.toString).mkString("; ")
+  def unsupportedAutoSchemaEvolutionChangesError(
+      catalog: CatalogPlugin,
+      ident: Identifier,
+      remainingChanges: Seq[TableChange]): Throwable = {
+    new AnalysisException(
+      errorClass = 
"UNSUPPORTED_AUTO_SCHEMA_EVOLUTION_CHANGES.PARTIAL_EVOLUTION",
+      messageParameters = Map(
+        "tableName" -> toSQLId(ident.toQualifiedNameParts(catalog)),
+        "changes" -> remainingChanges.mkString("; ")))
+  }
+
+  def failedAutoSchemaEvolutionError(
+      catalog: CatalogPlugin,
+      ident: Identifier,
+      cause: Throwable): Throwable = {
+    val detail = Option(cause.getMessage).getOrElse("Unknown error")
     new AnalysisException(
-      errorClass = "UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION",
+      errorClass = 
"UNSUPPORTED_AUTO_SCHEMA_EVOLUTION_CHANGES.FAILED_EVOLUTION",
       messageParameters = Map(
-        "changes" -> changesDesc, "tableName" -> toSQLId(sanitizedTableName)))
+        "tableName" -> toSQLId(ident.toQualifiedNameParts(catalog)),
+        "detail" -> detail),
+      cause = Some(cause))
   }
 
   def pathOptionNotSetCorrectlyWhenReadingError(): Throwable = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 970ab1a32710..8eb2f958b447 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -138,8 +138,8 @@ case class DataSourceV2Relation(
     }
   }
 
-  def autoSchemaEvolution(): Boolean =
-    table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)
+  def autoSchemaEvolution: Boolean =
+    table.capabilities.contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)
 
   def isVersioned: Boolean = table.version != null
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
index 3bbeb0db1b74..8565c0b31c0c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoSchemaEvolutionExtraSQLTests.scala
@@ -135,8 +135,8 @@ trait MergeIntoSchemaEvolutionExtraSQLTests extends 
RowLevelOperationSuiteBase {
                |VALUES (s.pk, s.salary, s.dep, s.active)
                |""".stripMargin)
         }
-        assert(ex.getCondition === 
"UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION",
-          s"Expected error class 
UNSUPPORTED_TABLE_CHANGES_IN_AUTO_SCHEMA_EVOLUTION but got: " +
+        
assert(ex.getCondition.startsWith("UNSUPPORTED_AUTO_SCHEMA_EVOLUTION_CHANGES"),
+          s"Expected error class UNSUPPORTED_AUTO_SCHEMA_EVOLUTION_CHANGES but 
got: " +
             s"${ex.getCondition}. Message: ${ex.getMessage}")
         assert(ex.getMessageParameters.get("tableName") != null,
           s"Error message should mention table name: ${ex.getMessage}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to