This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 347c45efa5 [spark] Support merge schema in MERGE INTO (#7789)
347c45efa5 is described below

commit 347c45efa52082dadb2b99f4ad730f9123f1a7ca
Author: Zouxxyy <[email protected]>
AuthorDate: Mon May 11 16:22:59 2026 +0800

    [spark] Support merge schema in MERGE INTO (#7789)
    
    Add schema evolution support for MERGE INTO and fix nested-field
    alignment.
    
    - With `spark.paimon.write.merge-schema=true`, `UPDATE *` / `INSERT *`
    evolves target schema with new source columns. Star clauses pull from
    source by name; explicit clauses fill NULL.
    - A `FROM_STAR` `TreeNodeTag` preserves the original star intent, so a
    fully-listed explicit clause is not mistaken for `*`.
    - `AssignmentAlignmentHelper` now reorders nested struct / array<struct>
    / map fields by name.
---
 .../analysis/AssignmentAlignmentHelper.scala       | 180 --------
 .../analysis/PaimonMergeIntoResolverBase.scala     |   5 +-
 .../apache/spark/sql/paimon/shims/Spark4Shim.scala |  24 +-
 .../analysis/AssignmentAlignmentHelper.scala       | 104 ++++-
 .../analysis/MergeSchemaEvolutionHelper.scala      | 153 +++++++
 .../catalyst/analysis/PaimonMergeActionTags.scala  |  46 ++
 .../catalyst/analysis/PaimonMergeIntoBase.scala    |  91 ++--
 .../analysis/PaimonMergeIntoResolverBase.scala     |   5 +-
 .../paimon/spark/commands/SchemaHelper.scala       | 127 +++--
 .../apache/spark/sql/paimon/shims/SparkShim.scala  |  13 +-
 .../sql/MergeIntoNotMatchedBySourceTest.scala      |  37 ++
 .../paimon/spark/sql/MergeIntoTableTestBase.scala  | 510 +++++++++++++++++++++
 .../apache/spark/sql/paimon/shims/Spark3Shim.scala |  24 +-
 .../analysis/Spark41MergeIntoRewrite.scala         |  18 +-
 .../apache/spark/sql/paimon/shims/Spark4Shim.scala |  24 +-
 15 files changed, 1059 insertions(+), 302 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
deleted file mode 100644
index f61ed71b31..0000000000
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.catalyst.analysis
-
-import 
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY
-import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CreateNamedStruct, Expression, GetStructField, Literal, NamedExpression}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, 
InsertAction, InsertStarAction, MergeAction, MergeIntoTable, UpdateAction, 
UpdateStarAction}
-import org.apache.spark.sql.types.StructType
-
-trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
-
-  private lazy val resolver = conf.resolver
-
-  /**
-   * @param ref
-   *   attribute reference seq, e.g. a => Seq["a"], s.c1 => Seq["s", "c1"]
-   * @param expr
-   *   update expression
-   */
-  private case class AttrUpdate(ref: Seq[String], expr: Expression)
-
-  /**
-   * Generate aligned expressions, only supports PrimitiveType and StructType. 
For example, if attrs
-   * are [a int, b int, s struct(c1 int, c2 int)] and update assignments are 
[a = 1, s.c1 = 2], will
-   * return [1, b, struct(2, c2)].
-   * @param attrs
-   *   target attrs
-   * @param assignments
-   *   update assignments
-   * @return
-   *   aligned expressions
-   */
-  protected def generateAlignedExpressions(
-      attrs: Seq[Attribute],
-      assignments: Seq[Assignment],
-      isInsert: Boolean = false): Seq[Expression] = {
-    val attrUpdates = assignments.map(a => AttrUpdate(toRefSeq(a.key), 
a.value))
-    recursiveAlignUpdates(attrs, attrUpdates, Nil, isInsert)
-  }
-
-  protected def alignAssignments(
-      attrs: Seq[Attribute],
-      assignments: Seq[Assignment],
-      isInsert: Boolean = false): Seq[Assignment] = {
-    generateAlignedExpressions(attrs, assignments, isInsert).zip(attrs).map {
-      case (expression, field) => Assignment(field, expression)
-    }
-  }
-
-  /**
-   * Align assignments in a MergeAction based on the target table's output 
attributes.
-   *   - DeleteAction: returns as-is
-   *   - UpdateAction: aligns assignments for update
-   *   - InsertAction: aligns assignments for insert
-   */
-  protected def alignMergeAction(action: MergeAction, targetOutput: 
Seq[Attribute]): MergeAction = {
-    action match {
-      case d @ DeleteAction(_) => d
-      case u @ PaimonUpdateAction(_, assignments) =>
-        u.copy(assignments = alignAssignments(targetOutput, assignments))
-      case i @ InsertAction(_, assignments) =>
-        i.copy(assignments = alignAssignments(targetOutput, assignments, 
isInsert = true))
-      case _: UpdateStarAction =>
-        throw new RuntimeException("UpdateStarAction should not be here.")
-      case _: InsertStarAction =>
-        throw new RuntimeException("InsertStarAction should not be here.")
-      case _ =>
-        throw new RuntimeException(s"Can't recognize this action: $action")
-    }
-  }
-
-  private def recursiveAlignUpdates(
-      targetAttrs: Seq[NamedExpression],
-      updates: Seq[AttrUpdate],
-      namePrefix: Seq[String] = Nil,
-      isInsert: Boolean = false): Seq[Expression] = {
-
-    // build aligned updated expression for each target attr
-    targetAttrs.map {
-      targetAttr =>
-        val headMatchedUpdates = updates.filter(u => resolver(u.ref.head, 
targetAttr.name))
-        if (headMatchedUpdates.isEmpty) {
-          if (isInsert) {
-            // For Insert, use default value or NULL for missing columns
-            getDefaultValueOrNull(targetAttr)
-          } else {
-            // For Update, return the attr as is
-            targetAttr
-          }
-        } else {
-          val exactMatchedUpdate = headMatchedUpdates.find(_.ref.size == 1)
-          if (exactMatchedUpdate.isDefined) {
-            if (headMatchedUpdates.size == 1) {
-              // when an exact match (no nested fields) occurs, it must be the 
only match, then return it's expr
-              castIfNeeded(exactMatchedUpdate.get.expr, targetAttr.dataType)
-            } else {
-              // otherwise, there must be conflicting updates, for example:
-              // - update the same attr multiple times
-              // - update a struct attr and its fields at the same time (e.g. 
s and s.c1)
-              val conflictingAttrNames =
-                headMatchedUpdates.map(u => (namePrefix ++ 
u.ref).mkString(".")).distinct
-              throw new UnsupportedOperationException(
-                s"Conflicting update/insert on attrs: 
${conflictingAttrNames.mkString(", ")}"
-              )
-            }
-          } else {
-            targetAttr.dataType match {
-              case StructType(fields) =>
-                val fieldExprs = fields.zipWithIndex.map {
-                  case (field, ordinal) =>
-                    Alias(GetStructField(targetAttr, ordinal, 
Some(field.name)), field.name)()
-                }
-                val newUpdates = updates.map(u => u.copy(ref = u.ref.tail))
-                // process StructType's nested fields recursively
-                val updatedFieldExprs =
-                  recursiveAlignUpdates(
-                    fieldExprs,
-                    newUpdates,
-                    namePrefix :+ targetAttr.name,
-                    isInsert)
-
-                // build updated struct expression
-                CreateNamedStruct(fields.zip(updatedFieldExprs).flatMap {
-                  case (field, expr) =>
-                    Seq(Literal(field.name), expr)
-                })
-              case _ =>
-                // can't reach here
-                throw new UnsupportedOperationException("")
-            }
-          }
-        }
-    }
-  }
-
-  /** Get the default value expression for an attribute, or NULL if no default 
value is defined. */
-  private def getDefaultValueOrNull(attr: NamedExpression): Expression = {
-    attr match {
-      case a: Attribute if 
a.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY) =>
-        val defaultValueStr = 
a.metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
-        parseAndResolveDefaultValue(defaultValueStr, a)
-      case _ =>
-        Literal(null, attr.dataType)
-    }
-  }
-
-  /** Parse the default value string and resolve it to an expression. */
-  private def parseAndResolveDefaultValue(defaultValueStr: String, attr: 
Attribute): Expression = {
-    try {
-      val spark = SparkSession.active
-      val parsed = 
spark.sessionState.sqlParser.parseExpression(defaultValueStr)
-      castIfNeeded(parsed, attr.dataType)
-    } catch {
-      case _: Exception =>
-        // If parsing fails, fall back to NULL
-        Literal(null, attr.dataType)
-    }
-  }
-
-}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
index ba516ddbe8..4222c54d6e 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
@@ -71,7 +71,8 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
         }
         val resolvedAssignments =
           resolveAssignments(resolve, assignments, merge, SOURCE_ONLY)
-        UpdateAction(resolvedCond, resolvedAssignments)
+        // Tag so merge-schema can distinguish `UPDATE *` from a fully-listed 
explicit clause.
+        PaimonMergeActionTags.markFromStar(UpdateAction(resolvedCond, 
resolvedAssignments))
       case action =>
         throw new RuntimeException(s"Can't recognize this action: $action")
     }
@@ -97,7 +98,7 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
         }
         val resolvedAssignments =
           resolveAssignments(resolve, assignments, merge, SOURCE_ONLY)
-        InsertAction(resolvedCond, resolvedAssignments)
+        PaimonMergeActionTags.markFromStar(InsertAction(resolvedCond, 
resolvedAssignments))
       case action =>
         throw new RuntimeException(s"Can't recognize this action: $action")
     }
diff --git 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index 3b581e325f..11fdfbd579 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -29,10 +29,10 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, 
SubstituteUnresolvedOrdinals}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, 
LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, 
UnresolvedWith}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, 
MergeRows, SubqueryAlias, UnresolvedWith, UpdateAction}
 import org.apache.spark.sql.catalyst.plans.logical.MergeRows.Keep
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.ArrayData
@@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, 
Identifier, Table,
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.SparkFormatTable
 import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, 
PartitionSpec}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.{FileStreamSink, 
MetadataLogFileIndex}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataTypes, StructType, VariantType}
@@ -138,6 +139,25 @@ class Spark4Shim extends SparkShim {
       withSchemaEvolution)
   }
 
+  override def copyDataSourceV2Relation(
+      relation: DataSourceV2Relation,
+      table: Table,
+      output: Seq[AttributeReference]): DataSourceV2Relation = {
+    relation.copy(table = table, output = output)
+  }
+
+  override def copyUpdateAction(
+      action: UpdateAction,
+      assignments: Seq[Assignment]): UpdateAction = {
+    action.copy(assignments = assignments)
+  }
+
+  override def copyInsertAction(
+      action: InsertAction,
+      assignments: Seq[Assignment]): InsertAction = {
+    action.copy(assignments = assignments)
+  }
+
   // Spark 4.0 still has `SubstituteUnresolvedOrdinals` (Spark 4.1 removed it 
because the new
   // resolver framework handles ordinals inline). `PaimonViewResolver` applies 
the shim's early
   // rules to the parsed view text before storing, so we must substitute 
`ORDER BY 1` →
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
index f61ed71b31..f85baf846a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/AssignmentAlignmentHelper.scala
@@ -21,11 +21,12 @@ package org.apache.paimon.spark.catalyst.analysis
 import 
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{PaimonUtils, SparkSession}
 import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
CreateNamedStruct, Expression, GetStructField, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform, 
Attribute, CreateNamedStruct, Expression, GetStructField, If, IsNull, 
LambdaFunction, Literal, MapFromArrays, MapKeys, MapValues, NamedExpression, 
NamedLambdaVariable}
 import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, 
InsertAction, InsertStarAction, MergeAction, MergeIntoTable, UpdateAction, 
UpdateStarAction}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
 
 trait AssignmentAlignmentHelper extends SQLConfHelper with ExpressionHelper {
 
@@ -67,26 +68,23 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with 
ExpressionHelper {
     }
   }
 
-  /**
-   * Align assignments in a MergeAction based on the target table's output 
attributes.
-   *   - DeleteAction: returns as-is
-   *   - UpdateAction: aligns assignments for update
-   *   - InsertAction: aligns assignments for insert
-   */
+  /** Align a MergeAction's assignments to target output. Star actions must 
already be expanded. */
   protected def alignMergeAction(action: MergeAction, targetOutput: 
Seq[Attribute]): MergeAction = {
-    action match {
+    // `copyXxxAction` rebuilds the node and drops tags; re-carry FROM_STAR so 
merge-schema works.
+    val aligned = action match {
       case d @ DeleteAction(_) => d
       case u @ PaimonUpdateAction(_, assignments) =>
-        u.copy(assignments = alignAssignments(targetOutput, assignments))
+        SparkShimLoader.shim.copyUpdateAction(u, 
alignAssignments(targetOutput, assignments))
       case i @ InsertAction(_, assignments) =>
-        i.copy(assignments = alignAssignments(targetOutput, assignments, 
isInsert = true))
-      case _: UpdateStarAction =>
-        throw new RuntimeException("UpdateStarAction should not be here.")
-      case _: InsertStarAction =>
-        throw new RuntimeException("InsertStarAction should not be here.")
+        SparkShimLoader.shim.copyInsertAction(
+          i,
+          alignAssignments(targetOutput, assignments, isInsert = true))
+      case _: UpdateStarAction | _: InsertStarAction =>
+        throw new RuntimeException(s"Star action should already be expanded: 
$action")
       case _ =>
         throw new RuntimeException(s"Can't recognize this action: $action")
     }
+    PaimonMergeActionTags.carryFromStar(action, aligned)
   }
 
   private def recursiveAlignUpdates(
@@ -112,7 +110,7 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with 
ExpressionHelper {
           if (exactMatchedUpdate.isDefined) {
             if (headMatchedUpdates.size == 1) {
               // when an exact match (no nested fields) occurs, it must be the 
only match, then return it's expr
-              castIfNeeded(exactMatchedUpdate.get.expr, targetAttr.dataType)
+              resolveByNameAndCast(exactMatchedUpdate.get.expr, 
targetAttr.dataType)
             } else {
               // otherwise, there must be conflicting updates, for example:
               // - update the same attr multiple times
@@ -177,4 +175,76 @@ trait AssignmentAlignmentHelper extends SQLConfHelper with 
ExpressionHelper {
     }
   }
 
+  /**
+   * Resolve an assignment value expression by-name against the target type, 
then cast if needed.
+   * Recursively reorders nested type fields (Struct, Array, Map and any 
combination) by name to
+   * match target field order before casting. This is consistent with Spark's 
native MERGE INTO
+   * behavior (see TableOutputResolver.resolveUpdate).
+   */
+  private def resolveByNameAndCast(expression: Expression, targetType: 
DataType): Expression = {
+    if (PaimonUtils.sameType(expression.dataType, targetType)) {
+      // Types already structurally identical — no reordering needed.
+      // This guarantees idempotence when the rule is applied multiple times.
+      castIfNeeded(expression, targetType)
+    } else {
+      val reordered = reorderFieldsByName(expression, expression.dataType, 
targetType)
+      castIfNeeded(reordered, targetType)
+    }
+  }
+
+  /**
+   * Recursively reorder nested type fields by name to match target type's 
field order. Supports
+   * StructType, ArrayType and MapType in any nesting combination. Returns the 
original expression
+   * if no reordering is needed.
+   */
+  private def reorderFieldsByName(
+      expression: Expression,
+      sourceType: DataType,
+      targetType: DataType): Expression = {
+    (sourceType, targetType) match {
+      case (s: StructType, t: StructType) if s != t =>
+        reorderStructByName(expression, s, t)
+      case (ArrayType(sElem, sNull), ArrayType(tElem, _)) if sElem != tElem =>
+        val elementVar = NamedLambdaVariable("element", sElem, sNull)
+        val reordered = reorderFieldsByName(elementVar, sElem, tElem)
+        ArrayTransform(expression, LambdaFunction(reordered, Seq(elementVar)))
+      case (MapType(sKey, sVal, sValNull), MapType(tKey, tVal, _))
+          if sKey != tKey || sVal != tVal =>
+        val keyVar = NamedLambdaVariable("key", sKey, nullable = false)
+        val valVar = NamedLambdaVariable("value", sVal, sValNull)
+        val reorderedKey = reorderFieldsByName(keyVar, sKey, tKey)
+        val reorderedVal = reorderFieldsByName(valVar, sVal, tVal)
+        val newKeys = ArrayTransform(MapKeys(expression), 
LambdaFunction(reorderedKey, Seq(keyVar)))
+        val newVals =
+          ArrayTransform(MapValues(expression), LambdaFunction(reorderedVal, 
Seq(valVar)))
+        MapFromArrays(newKeys, newVals)
+      case _ =>
+        expression
+    }
+  }
+
+  /** Reorder source struct fields to match target field order by name, 
recursing into nested types. */
+  private def reorderStructByName(
+      expression: Expression,
+      sourceStruct: StructType,
+      targetStruct: StructType): Expression = {
+    val reorderedFields = targetStruct.map {
+      targetField =>
+        sourceStruct.fields.zipWithIndex.find(_._1.name == targetField.name) 
match {
+          case Some((sourceField, sourceIdx)) =>
+            val fieldExpr = GetStructField(expression, sourceIdx, 
Some(sourceField.name))
+            val reordered =
+              reorderFieldsByName(fieldExpr, sourceField.dataType, 
targetField.dataType)
+            Alias(reordered, targetField.name)()
+          case None =>
+            Alias(Literal(null, targetField.dataType), targetField.name)()
+        }
+    }
+    val struct = CreateNamedStruct(reorderedFields.flatMap(a => 
Seq(Literal(a.name), a.child)))
+    if (expression.nullable) {
+      If(IsNull(expression), Literal(null, struct.dataType), struct)
+    } else {
+      struct
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
new file mode 100644
index 0000000000..9e44f8c311
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.spark.{SparkTable, SparkTypeUtils}
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
+import org.apache.paimon.spark.commands.SchemaHelper
+import org.apache.paimon.spark.schema.SparkSystemColumns
+import org.apache.paimon.spark.util.OptionUtils
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, DeleteAction, 
InsertAction, MergeAction, MergeIntoTable, UpdateAction}
+import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.types.{StructField, StructType}
+
+/**
+ * Shared MERGE INTO `merge-schema=true` evolution logic. Mixed in by both the 
postHoc V1 path
+ * ([[PaimonMergeIntoBase]]) and the Spark 4.1 Resolution-batch rewrite for 
pure append-only tables
+ * (`Spark41MergeIntoRewrite`). Only fires when at least one action was 
authored as `UPDATE *` /
+ * `INSERT *`, tracked via [[PaimonMergeActionTags]].
+ */
+trait MergeSchemaEvolutionHelper extends ExpressionHelper {
+
+  /**
+   * @param resolveNotMatchedBySource
+   *   how to resolve `notMatchedBySourceActions` on Spark 3.2+/4.x; the 
version-specific shim is
+   *   supplied by the caller.
+   */
+  protected def evolveTargetIfNeeded(
+      merge: MergeIntoTable,
+      relation: DataSourceV2Relation,
+      v2Table: SparkTable,
+      spark: SparkSession,
+      resolveNotMatchedBySource: MergeIntoTable => Seq[MergeAction])
+      : Option[(MergeIntoTable, DataSourceV2Relation, SparkTable)] = {
+    if (!OptionUtils.writeMergeSchemaEnabled()) return None
+
+    val notMatchedBySourceActions = resolveNotMatchedBySource(merge)
+    val allActions = merge.matchedActions ++ merge.notMatchedActions ++ 
notMatchedBySourceActions
+    if (!allActions.exists(PaimonMergeActionTags.isFromStar)) return None
+
+    val fileStoreTable = v2Table.getTable.asInstanceOf[FileStoreTable]
+    val sourceSchema = StructType(
+      merge.sourceTable.output.map(a => StructField(a.name, a.dataType, 
a.nullable)))
+    val filteredSourceSchema = 
SparkSystemColumns.filterSparkSystemColumns(sourceSchema)
+    val allowExplicitCast = OptionUtils.writeMergeSchemaExplicitCastEnabled()
+    val updatedFileStoreTable = SchemaHelper
+      .mergeAndCommitSchema(fileStoreTable, filteredSourceSchema, 
allowExplicitCast)
+      .getOrElse(return None)
+
+    // Invalidate Spark catalog cache so subsequent queries see the new schema.
+    for (catalog <- relation.catalog; ident <- relation.identifier) {
+      catalog.asInstanceOf[TableCatalog].invalidateTable(ident)
+    }
+
+    val updatedV2Table = v2Table.copy(table = updatedFileStoreTable)
+    val resolver = spark.sessionState.conf.resolver
+    val mergedSparkSchema =
+      
SparkTypeUtils.fromPaimonRowType(updatedFileStoreTable.schema().logicalRowType())
+    val newOutput = buildEvolvedOutput(mergedSparkSchema, relation.output, 
resolver)
+    val updatedRelation =
+      SparkShimLoader.shim.copyDataSourceV2Relation(relation, updatedV2Table, 
newOutput)
+    val updatedTargetTable = merge.targetTable.transform {
+      case r: DataSourceV2Relation if r eq relation => updatedRelation
+    }
+
+    val newAttrs = {
+      val oldNames = relation.output.map(_.name).toSet
+      newOutput.filterNot(a => oldNames.exists(resolver(_, a.name)))
+    }
+    val expand = expandAction(newAttrs, merge.sourceTable.output, resolver) _
+
+    val updatedMerge = SparkShimLoader.shim.createMergeIntoTable(
+      updatedTargetTable,
+      merge.sourceTable,
+      merge.mergeCondition,
+      merge.matchedActions.map(expand),
+      merge.notMatchedActions.map(expand),
+      notMatchedBySourceActions.map(expand),
+      withSchemaEvolution = false
+    )
+    Some((updatedMerge, updatedRelation, updatedV2Table))
+  }
+
+  /** Rebuild the relation's output: reuse existing attribute ids, fabricate 
ones for new fields. */
+  private def buildEvolvedOutput(
+      mergedSparkSchema: StructType,
+      oldOutput: Seq[Attribute],
+      resolver: Resolver): Seq[AttributeReference] = {
+    mergedSparkSchema.map {
+      field =>
+        oldOutput.find(a => resolver(a.name, field.name)) match {
+          case Some(existing: AttributeReference) =>
+            existing.copy(dataType = field.dataType, nullable = 
field.nullable)(
+              exprId = existing.exprId,
+              qualifier = existing.qualifier)
+          case _ => AttributeReference(field.name, field.dataType, 
field.nullable)()
+        }
+    }
+  }
+
+  /**
+   * Append assignments for newly-added columns and re-tag the action. Star 
clauses pull values from
+   * source by name; explicit clauses fill NULL.
+   */
+  private def expandAction(
+      newAttrs: Seq[AttributeReference],
+      sourceOutput: Seq[Attribute],
+      resolver: Resolver)(action: MergeAction): MergeAction = {
+    val fromStar = PaimonMergeActionTags.isFromStar(action)
+    val newAssignments = newAttrs.map {
+      attr =>
+        val value: Expression = if (fromStar) {
+          sourceOutput
+            .find(s => resolver(s.name, attr.name))
+            .map(s => castIfNeeded(s, attr.dataType))
+            .getOrElse(Literal(null, attr.dataType))
+        } else {
+          Literal(null, attr.dataType)
+        }
+        Assignment(attr, value)
+    }
+    val expanded = action match {
+      case i: InsertAction =>
+        SparkShimLoader.shim.copyInsertAction(i, i.assignments ++ 
newAssignments)
+      case u: UpdateAction =>
+        SparkShimLoader.shim.copyUpdateAction(u, u.assignments ++ 
newAssignments)
+      case d: DeleteAction => d
+    }
+    PaimonMergeActionTags.carryFromStar(action, expanded)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeActionTags.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeActionTags.scala
new file mode 100644
index 0000000000..04e283319e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeActionTags.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical.MergeAction
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+
+/**
+ * Marks a `MergeAction` that originated from `INSERT *` / `UPDATE *`. After 
resolution a star
+ * action is indistinguishable from an explicit clause that happens to list 
every column, but
+ * merge-schema needs the original intent to decide between pulling new source 
columns and filling
+ * NULL. Tags do not survive fresh constructor calls — rebuild sites must 
re-tag via
+ * `carryFromStar`.
+ */
+object PaimonMergeActionTags {
+
+  val FROM_STAR: TreeNodeTag[Boolean] = 
TreeNodeTag[Boolean]("paimon.merge.fromStar")
+
+  def isFromStar(action: MergeAction): Boolean =
+    action.getTagValue(FROM_STAR).contains(true)
+
+  def markFromStar[T <: MergeAction](action: T): T = {
+    action.setTagValue(FROM_STAR, true)
+    action
+  }
+
+  def carryFromStar[T <: MergeAction](source: MergeAction, target: T): T = {
+    if (isFromStar(source)) markFromStar(target) else target
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
index 759fde6a71..747b0477a5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoBase.scala
@@ -33,78 +33,85 @@ trait PaimonMergeIntoBase
   extends Rule[LogicalPlan]
   with RowLevelHelper
   with ExpressionHelper
-  with AssignmentAlignmentHelper {
+  with AssignmentAlignmentHelper
+  with MergeSchemaEvolutionHelper {
 
   val spark: SparkSession
 
   override val operation: RowLevelOp = MergeInto
 
   def apply(plan: LogicalPlan): LogicalPlan = {
-    // Spark 4.1 moved RewriteMergeIntoTable from the "DML rewrite" batch into 
the main Resolution
-    // batch, which marks the plan analyzed before the Post-Hoc Resolution 
batch runs.
-    // `plan.resolveOperators` then short-circuits on the already-analyzed 
MERGE node and the
-    // physical planner rejects it with "Table does not support MERGE INTO 
TABLE". Use
-    // `transformDown` (which unconditionally visits every node) guarded by
-    // `AnalysisHelper.allowInvokingTransformsInAnalyzer` so the in-analyzer 
assertion does not
-    // trip. Pure append-only Paimon tables on Spark 4.1+ are handled earlier 
in the Resolution
-    // batch by `Spark41MergeIntoRewrite`, so by the time this postHoc rule 
fires the aligned
-    // `MergeIntoTable` node is already a `ReplaceData` / `AppendData` plan 
for that case; this
-    // rule only sees MERGEs that still need to become the V1 command.
+    // Spark 4.1 marks the plan analyzed before postHoc runs, so 
`transformDown` is needed to
+    // bypass `resolveOperators`'s short-circuit. Pure append-only tables on 
4.1+ are handled
+    // earlier by `Spark41MergeIntoRewrite` and never reach here.
     AnalysisHelper.allowInvokingTransformsInAnalyzer {
       plan.transformDown {
         case merge: MergeIntoTable
             if merge.resolved && 
PaimonRelation.isPaimonTable(merge.targetTable) =>
           val relation = PaimonRelation.getPaimonRelation(merge.targetTable)
-          val v2Table = relation.table.asInstanceOf[SparkTable]
-          val dataEvolutionEnabled = v2Table.coreOptions.dataEvolutionEnabled()
-          val targetOutput = relation.output
+          var v2Table = relation.table.asInstanceOf[SparkTable]
 
           checkPaimonTable(v2Table.getTable)
           checkCondition(merge.mergeCondition)
-          merge.matchedActions.flatMap(_.condition).foreach(checkCondition)
-          merge.notMatchedActions.flatMap(_.condition).foreach(checkCondition)
+          (merge.matchedActions ++ merge.notMatchedActions)
+            .flatMap(_.condition)
+            .foreach(checkCondition)
 
-          val updateActions = merge.matchedActions.collect { case a: 
UpdateAction => a }
           val primaryKeys = v2Table.getTable.primaryKeys().asScala.toSeq
           if (primaryKeys.nonEmpty) {
+            val updateActions = merge.matchedActions.collect { case a: 
UpdateAction => a }
             checkUpdateActionValidity(
-              AttributeSet(targetOutput),
+              AttributeSet(relation.output),
               merge.mergeCondition,
               updateActions,
               primaryKeys)
           }
 
-          val alignedMergeIntoTable = alignMergeIntoTable(merge, targetOutput)
+          // Commit schema changes before alignment so the aligned plan sees 
new columns.
+          val (resolvedMerge, targetOutput) =
+            evolveTargetIfNeeded(merge, relation, v2Table, spark, 
resolveNotMatchedBySourceActions)
+              .map { case (m, r, t) => v2Table = t; (m, r.output) }
+              .getOrElse((merge, relation.output))
 
-          if (!shouldFallbackToV1MergeInto(alignedMergeIntoTable)) {
-            alignedMergeIntoTable
+          val aligned = alignMergeIntoTable(resolvedMerge, targetOutput)
+
+          if (!shouldFallbackToV1MergeInto(aligned)) {
+            aligned
           } else {
-            if (dataEvolutionEnabled) {
-              MergeIntoPaimonDataEvolutionTable(
-                v2Table,
-                merge.targetTable,
-                merge.sourceTable,
-                merge.mergeCondition,
-                alignedMergeIntoTable.matchedActions,
-                alignedMergeIntoTable.notMatchedActions,
-                resolveNotMatchedBySourceActions(alignedMergeIntoTable)
-              )
-            } else {
-              MergeIntoPaimonTable(
-                v2Table,
-                merge.targetTable,
-                merge.sourceTable,
-                merge.mergeCondition,
-                alignedMergeIntoTable.matchedActions,
-                alignedMergeIntoTable.notMatchedActions,
-                resolveNotMatchedBySourceActions(alignedMergeIntoTable)
-              )
-            }
+            buildV1Command(v2Table, resolvedMerge, aligned)
           }
       }
     }
   }
 
+  private def buildV1Command(
+      v2Table: SparkTable,
+      resolvedMerge: MergeIntoTable,
+      aligned: MergeIntoTable): LogicalPlan = {
+    val notMatchedBySource = resolveNotMatchedBySourceActions(aligned)
+    if (v2Table.coreOptions.dataEvolutionEnabled()) {
+      MergeIntoPaimonDataEvolutionTable(
+        v2Table,
+        resolvedMerge.targetTable,
+        resolvedMerge.sourceTable,
+        resolvedMerge.mergeCondition,
+        aligned.matchedActions,
+        aligned.notMatchedActions,
+        notMatchedBySource
+      )
+    } else {
+      MergeIntoPaimonTable(
+        v2Table,
+        resolvedMerge.targetTable,
+        resolvedMerge.sourceTable,
+        resolvedMerge.mergeCondition,
+        aligned.matchedActions,
+        aligned.notMatchedActions,
+        notMatchedBySource
+      )
+    }
+  }
+
   private def checkCondition(condition: Expression): Unit = {
     if (!condition.resolved) {
       throw new RuntimeException(s"Condition $condition should have been 
resolved.")
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
index ba516ddbe8..4222c54d6e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonMergeIntoResolverBase.scala
@@ -71,7 +71,8 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
         }
         val resolvedAssignments =
           resolveAssignments(resolve, assignments, merge, SOURCE_ONLY)
-        UpdateAction(resolvedCond, resolvedAssignments)
+        // Tag so merge-schema can distinguish `UPDATE *` from a fully-listed 
explicit clause.
+        PaimonMergeActionTags.markFromStar(UpdateAction(resolvedCond, 
resolvedAssignments))
       case action =>
         throw new RuntimeException(s"Can't recognize this action: $action")
     }
@@ -97,7 +98,7 @@ trait PaimonMergeIntoResolverBase extends ExpressionHelper {
         }
         val resolvedAssignments =
           resolveAssignments(resolve, assignments, merge, SOURCE_ONLY)
-        InsertAction(resolvedCond, resolvedAssignments)
+        PaimonMergeActionTags.markFromStar(InsertAction(resolvedCond, 
resolvedAssignments))
       case action =>
         throw new RuntimeException(s"Can't recognize this action: $action")
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
index db77b19ca0..1416602a5f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
@@ -26,8 +26,8 @@ import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.{Column, DataFrame, PaimonUtils, SparkSession}
-import org.apache.spark.sql.functions.{col, lit, struct}
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.functions.{col, lit, struct, transform, 
transform_values}
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, 
StructType}
 
 import scala.collection.JavaConverters._
 
@@ -41,21 +41,66 @@ private[spark] trait SchemaHelper extends 
WithFileStoreTable {
 
   def mergeSchema(sparkSession: SparkSession, input: DataFrame, options: 
Options): DataFrame = {
     val dataSchema = SparkSystemColumns.filterSparkSystemColumns(input.schema)
-    val newTableSchema = mergeSchema(input.schema, options)
-    if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {
+    val writeSchema = mergeSchema(dataSchema, options)
+    if (!PaimonUtils.sameType(writeSchema, dataSchema)) {
       val resolve = sparkSession.sessionState.conf.resolver
-      val cols = alignColumns(newTableSchema, dataSchema, resolve)
+      val cols = SchemaHelper.alignColumns(writeSchema, dataSchema, resolve)
       input.select(cols: _*)
     } else {
       input
     }
   }
 
+  def mergeSchema(dataSchema: StructType, options: Options): StructType = {
+    val mergeSchemaEnabled =
+      options.get(SparkConnectorOptions.MERGE_SCHEMA) || 
OptionUtils.writeMergeSchemaEnabled()
+    if (!mergeSchemaEnabled) {
+      return dataSchema
+    }
+
+    val filteredDataSchema = 
SparkSystemColumns.filterSparkSystemColumns(dataSchema)
+    val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST) 
|| OptionUtils
+      .writeMergeSchemaExplicitCastEnabled()
+    SchemaHelper.mergeAndCommitSchema(table, filteredDataSchema, 
allowExplicitCast).foreach {
+      updatedTable => newTable = Some(updatedTable)
+    }
+
+    val writeSchema = 
SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
+    if (!PaimonUtils.sameType(writeSchema, filteredDataSchema)) {
+      writeSchema
+    } else {
+      filteredDataSchema
+    }
+  }
+
+  def updateTableWithOptions(options: Map[String, String]): Unit = {
+    newTable = Some(table.copy(options.asJava))
+  }
+}
+
+private[spark] object SchemaHelper {
+
+  /**
+   * Merge the given dataSchema into the table's schema. If the schema 
changed, commit the change
+   * and return the updated table; otherwise return None.
+   */
+  def mergeAndCommitSchema(
+      table: FileStoreTable,
+      dataSchema: StructType,
+      allowExplicitCast: Boolean): Option[FileStoreTable] = {
+    val dataRowType = 
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
+    if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
+      Some(table.copyWithLatestSchema())
+    } else {
+      None
+    }
+  }
+
   /**
    * Recursively align columns from dataSchema to targetSchema by name. For 
nested struct fields,
    * reorder and fill nulls for missing sub-fields.
    */
-  private def alignColumns(
+  def alignColumns(
       targetSchema: StructType,
       dataSchema: StructType,
       resolve: (String, String) => Boolean): Seq[Column] = {
@@ -77,54 +122,38 @@ private[spark] trait SchemaHelper extends 
WithFileStoreTable {
       resolve: (String, String) => Boolean): Column = {
     (sourceType, targetField.dataType) match {
       case (s: StructType, t: StructType) if !PaimonUtils.sameType(s, t) =>
-        val subCols = t.map {
-          subTargetField =>
-            s.find(f => resolve(f.name, subTargetField.name)) match {
-              case Some(subDataField) =>
-                alignColumn(
-                  sourceCol.getField(subDataField.name),
-                  subDataField.dataType,
-                  subTargetField,
-                  resolve)
-              case _ =>
-                lit(null).cast(subTargetField.dataType).as(subTargetField.name)
-            }
-        }
-        struct(subCols: _*).as(targetField.name)
+        alignStruct(sourceCol, s, t, resolve).as(targetField.name)
+      case (ArrayType(s: StructType, _), ArrayType(t: StructType, _))
+          if !PaimonUtils.sameType(s, t) =>
+        transform(sourceCol, elem => alignStruct(elem, s, t, resolve))
+          .as(targetField.name)
+      case (MapType(sKey, sVal: StructType, _), MapType(tKey, tVal: 
StructType, _))
+          if !PaimonUtils.sameType(sVal, tVal) =>
+        transform_values(sourceCol, (_, v) => alignStruct(v, sVal, tVal, 
resolve))
+          .as(targetField.name)
       case _ =>
         sourceCol.as(targetField.name)
     }
   }
 
-  def mergeSchema(dataSchema: StructType, options: Options): StructType = {
-    val mergeSchemaEnabled =
-      options.get(SparkConnectorOptions.MERGE_SCHEMA) || 
OptionUtils.writeMergeSchemaEnabled()
-    if (!mergeSchemaEnabled) {
-      return dataSchema
-    }
-
-    val filteredDataSchema = 
SparkSystemColumns.filterSparkSystemColumns(dataSchema)
-    val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST) 
|| OptionUtils
-      .writeMergeSchemaExplicitCastEnabled()
-    mergeAndCommitSchema(filteredDataSchema, allowExplicitCast)
-
-    val writeSchema = 
SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
-
-    if (!PaimonUtils.sameType(writeSchema, filteredDataSchema)) {
-      writeSchema
-    } else {
-      filteredDataSchema
-    }
-  }
-
-  private def mergeAndCommitSchema(dataSchema: StructType, allowExplicitCast: 
Boolean): Unit = {
-    val dataRowType = 
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
-    if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
-      newTable = Some(table.copyWithLatestSchema())
+  private def alignStruct(
+      sourceCol: Column,
+      sourceType: StructType,
+      targetType: StructType,
+      resolve: (String, String) => Boolean): Column = {
+    val subCols = targetType.map {
+      subTargetField =>
+        sourceType.find(f => resolve(f.name, subTargetField.name)) match {
+          case Some(subDataField) =>
+            alignColumn(
+              sourceCol.getField(subDataField.name),
+              subDataField.dataType,
+              subTargetField,
+              resolve)
+          case _ =>
+            lit(null).cast(subTargetField.dataType).as(subTargetField.name)
+        }
     }
-  }
-
-  def updateTableWithOptions(options: Map[String, String]): Unit = {
-    newTable = Some(table.copy(options.asJava))
+    struct(subCols: _*)
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
index 7a541a4514..3ceb494396 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/paimon/shims/SparkShim.scala
@@ -27,11 +27,12 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{CTERelationRef, 
LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, UnresolvedWith}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, 
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, 
SubqueryAlias, UnresolvedWith, UpdateAction}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types.StructType
 
 import java.util.{Map => JMap}
@@ -88,6 +89,16 @@ trait SparkShim {
       notMatchedBySourceActions: Seq[MergeAction],
       withSchemaEvolution: Boolean): MergeIntoTable
 
+  def copyDataSourceV2Relation(
+      relation: DataSourceV2Relation,
+      table: Table,
+      output: 
Seq[org.apache.spark.sql.catalyst.expressions.AttributeReference])
+      : DataSourceV2Relation
+
+  def copyUpdateAction(action: UpdateAction, assignments: Seq[Assignment]): 
UpdateAction
+
+  def copyInsertAction(action: InsertAction, assignments: Seq[Assignment]): 
InsertAction
+
   /**
    * Returns the list of "early" substitution rules Paimon needs to apply on a 
parsed view plan.
    * Spark 3.x exposes both `CTESubstitution` and 
`SubstituteUnresolvedOrdinals`, but 4.1 removed
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala
index 2c46af0aed..d2c822789c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoNotMatchedBySourceTest.scala
@@ -183,4 +183,41 @@ trait MergeIntoNotMatchedBySourceTest extends 
PaimonSparkTestBase with PaimonTab
       )
     }
   }
+
+  test("Paimon MergeInto: merge-schema with not matched by source") {
+    withTable("source", "target") {
+      spark.conf.set("spark.paimon.write.merge-schema", "true")
+      try {
+        createTable("target", "a INT, b STRING", Seq("a"))
+        spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2'), (3, 'v3')")
+
+        createTable("source", "a INT, b STRING, c INT", Seq("a"))
+        spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (4, 'u4', 40)")
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.a = source.a
+                    |WHEN MATCHED THEN
+                    |  UPDATE SET *
+                    |WHEN NOT MATCHED THEN
+                    |  INSERT *
+                    |WHEN NOT MATCHED BY SOURCE AND a = 2 THEN
+                    |  UPDATE SET b = 'updated'
+                    |WHEN NOT MATCHED BY SOURCE THEN
+                    |  DELETE
+                    |""".stripMargin)
+
+        // id=1: matched, UPDATE SET * => (1, 'u1', 10)
+        // id=2: not matched by source, a=2, UPDATE SET b='updated' => (2, 
'updated', null)
+        // id=3: not matched by source, DELETE => removed
+        // id=4: not matched, INSERT * => (4, 'u4', 40)
+        checkAnswer(
+          spark.sql("SELECT * FROM target ORDER BY a"),
+          Seq(Row(1, "u1", 10), Row(2, "updated", null), Row(4, "u4", 40)))
+      } finally {
+        spark.conf.unset("spark.paimon.write.merge-schema")
+      }
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index 6c7ee12761..b00e911c07 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -783,6 +783,483 @@ abstract class MergeIntoTableTestBase extends 
PaimonSparkTestBase with PaimonTab
       checkAnswer(sql("SELECT * FROM target ORDER BY a, b"), Seq(Row(1, 
"Eve"), Row(2, "Bob")))
     }
   }
+
+  test("Paimon MergeInto: merge-schema with explicit update columns") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable("target", "a INT, b STRING", Seq("a"))
+        spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+        createTable("source", "a INT, b STRING, c INT", Seq("a"))
+        spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+        // No star action — schema must NOT evolve, c is never added.
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.a = source.a
+                    |WHEN MATCHED THEN
+                    |UPDATE SET b = source.b
+                    |WHEN NOT MATCHED THEN
+                    |INSERT (a, b) VALUES (source.a, source.b)
+                    |""".stripMargin)
+
+        checkAnswer(
+          spark.sql("SELECT * FROM target ORDER BY a"),
+          Seq(Row(1, "u1"), Row(2, "v2"), Row(3, "u3")))
+      }
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema with partial explicit update only (no 
star semantics)") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable("target", "a INT, b STRING", Seq("a"))
+        spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+        createTable("source", "a INT, b STRING, c INT", Seq("a"))
+        spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+        // Single MATCHED clause without star — schema stays.
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.a = source.a
+                    |WHEN MATCHED THEN
+                    |UPDATE SET b = source.b
+                    |""".stripMargin)
+
+        checkAnswer(spark.sql("SELECT * FROM target ORDER BY a"), Seq(Row(1, 
"u1"), Row(2, "v2")))
+      }
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema with star update and explicit insert") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable("target", "a INT, b STRING", Seq("a"))
+        spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+        createTable("source", "a INT, b STRING, c INT", Seq("a"))
+        spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.a = source.a
+                    |WHEN MATCHED THEN
+                    |UPDATE SET *
+                    |WHEN NOT MATCHED THEN
+                    |INSERT (a, b) VALUES (source.a, source.b)
+                    |""".stripMargin)
+
+        // UPDATE * evolves schema; explicit INSERT does not reference c, so 
a=3 gets c=null.
+        checkAnswer(
+          spark.sql("SELECT * FROM target ORDER BY a"),
+          Seq(Row(1, "u1", 10), Row(2, "v2", null), Row(3, "u3", null)))
+      }
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema with explicit update and star insert") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable("target", "a INT, b STRING", Seq("a"))
+        spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+        createTable("source", "a INT, b STRING, c INT", Seq("a"))
+        spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.a = source.a
+                    |WHEN MATCHED THEN
+                    |UPDATE SET b = source.b
+                    |WHEN NOT MATCHED THEN
+                    |INSERT *
+                    |""".stripMargin)
+
+        // INSERT * evolves schema; explicit UPDATE only touches b, so a=1 
keeps c=null.
+        checkAnswer(
+          spark.sql("SELECT * FROM target ORDER BY a"),
+          Seq(Row(1, "u1", null), Row(2, "v2", null), Row(3, "u3", 30)))
+      }
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema with star update and insert") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable("target", "a INT, b STRING", Seq("a"))
+        spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+        createTable("source", "a INT, b STRING, c INT", Seq("a"))
+        spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.a = source.a
+                    |WHEN MATCHED THEN
+                    |UPDATE SET *
+                    |WHEN NOT MATCHED THEN
+                    |INSERT *
+                    |""".stripMargin)
+
+        // UPDATE SET * updates all columns including new column c from source.
+        checkAnswer(
+          spark.sql("SELECT * FROM target ORDER BY a"),
+          Seq(Row(1, "u1", 10), Row(2, "v2", null), Row(3, "u3", 30)))
+      }
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema disabled should not add new columns") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "false") {
+        createTable("target", "a INT, b STRING", Seq("a"))
+        spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+        createTable("source", "a INT, b STRING, c INT", Seq("a"))
+        spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.a = source.a
+                    |WHEN MATCHED THEN
+                    |UPDATE SET *
+                    |WHEN NOT MATCHED THEN
+                    |INSERT *
+                    |""".stripMargin)
+
+        checkAnswer(
+          spark.sql("SELECT * FROM target ORDER BY a"),
+          Seq(Row(1, "u1"), Row(2, "v2"), Row(3, "u3")))
+      }
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema with DV table") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable(
+          "target",
+          "a INT, b STRING",
+          Seq("a"),
+          extraProps = Map("deletion-vectors.enabled" -> "true"))
+        spark.sql("INSERT INTO target VALUES (1, 'v1'), (2, 'v2')")
+
+        createTable("source", "a INT, b STRING, c INT", Seq("a"))
+        spark.sql("INSERT INTO source VALUES (1, 'u1', 10), (3, 'u3', 30)")
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.a = source.a
+                    |WHEN MATCHED THEN
+                    |UPDATE SET *
+                    |WHEN NOT MATCHED THEN
+                    |INSERT *
+                    |""".stripMargin)
+
+        checkAnswer(
+          spark.sql("SELECT * FROM target ORDER BY a"),
+          Seq(Row(1, "u1", 10), Row(2, "v2", null), Row(3, "u3", 30)))
+      }
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema with nested struct new fields") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable("target", "a INT, info STRUCT<f1 STRING, f2 STRING>", 
Seq("a"))
+        spark.sql("INSERT INTO target VALUES (1, struct('v1a', 'v2a')), (2, 
struct('v1b', 'v2b'))")
+
+        createTable("source", "a INT, info STRUCT<f1 STRING, f2 STRING, f3 
STRING>", Seq("a"))
+        spark.sql(
+          "INSERT INTO source VALUES (1, struct('u1a', 'u2a', 'u3a')), (3, 
struct('u1c', 'u2c', 'u3c'))")
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.a = source.a
+                    |WHEN MATCHED THEN
+                    |UPDATE SET *
+                    |WHEN NOT MATCHED THEN
+                    |INSERT *
+                    |""".stripMargin)
+
+        checkAnswer(
+          spark.sql("SELECT * FROM target ORDER BY a"),
+          Seq(
+            Row(1, Row("u1a", "u2a", "u3a")),
+            Row(2, Row("v1b", "v2b", null)),
+            Row(3, Row("u1c", "u2c", "u3c")))
+        )
+      }
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema with new top-level and nested fields") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable("target", "a INT, info STRUCT<f1 STRING>", Seq("a"))
+        spark.sql("INSERT INTO target VALUES (1, struct('v1')), (2, 
struct('v2'))")
+
+        createTable("source", "a INT, info STRUCT<f1 STRING, f2 STRING>, extra 
STRING", Seq("a"))
+        spark.sql(
+          "INSERT INTO source VALUES (1, struct('u1', 'new1'), 'e1'), (3, 
struct('u3', 'new3'), 'e3')")
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.a = source.a
+                    |WHEN MATCHED THEN
+                    |UPDATE SET *
+                    |WHEN NOT MATCHED THEN
+                    |INSERT *
+                    |""".stripMargin)
+
+        checkAnswer(
+          spark.sql("SELECT * FROM target ORDER BY a"),
+          Seq(
+            Row(1, Row("u1", "new1"), "e1"),
+            Row(2, Row("v2", null), null),
+            Row(3, Row("u3", "new3"), "e3"))
+        )
+      }
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema complex scenario with multiple clauses 
and conditions") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable("target", "id INT, name STRING, score INT", Seq("id"))
+        spark.sql(
+          "INSERT INTO target VALUES (1, 'Alice', 80), (2, 'Bob', 70), (3, 
'Carol', 60), (4, 'Dave', 50)")
+
+        createTable(
+          "source",
+          "id INT, name STRING, score INT, grade STRING, info STRUCT<city 
STRING, zip STRING>",
+          Seq("id"))
+        spark.sql("""INSERT INTO source VALUES
+                    |(1, 'Alice_v2', 95, 'A', struct('Beijing', '100000')),
+                    |(2, 'Bob_v2', 55, 'C', struct('Shanghai', '200000')),
+                    |(5, 'Eve', 88, 'B', struct('Hangzhou', '310000')),
+                    |(6, 'Frank', 42, 'D', struct('Nanjing', 
'210000'))""".stripMargin)
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.id = source.id
+                    |WHEN MATCHED AND source.score >= 60 THEN
+                    |  UPDATE SET *
+                    |WHEN MATCHED AND source.score < 60 THEN
+                    |  DELETE
+                    |WHEN NOT MATCHED AND source.score >= 50 THEN
+                    |  INSERT *
+                    |""".stripMargin)
+
+        checkAnswer(
+          spark.sql("SELECT * FROM target ORDER BY id"),
+          Seq(
+            Row(1, "Alice_v2", 95, "A", Row("Beijing", "100000")),
+            Row(3, "Carol", 60, null, null),
+            Row(4, "Dave", 50, null, null),
+            Row(5, "Eve", 88, "B", Row("Hangzhou", "310000"))
+          )
+        )
+      }
+    }
+  }
+
+  test("Paimon MergeInto: struct field reorder by name without merge-schema") {
+    withTable("source", "target") {
+      createTable(
+        "target",
+        "id INT, name STRING, address STRUCT<city STRING, zip STRING>",
+        Seq("id"))
+      spark.sql("""INSERT INTO target VALUES
+                  |(1, 'Alice', struct('Shanghai', '200000')),
+                  |(2, 'Bob', struct('Beijing', '100000'))""".stripMargin)
+
+      // Source struct has sub-fields in reversed order — alignment must be by 
name.
+      createTable(
+        "source",
+        "id INT, name STRING, address STRUCT<zip STRING, city STRING>",
+        Seq("id"))
+      spark.sql("""INSERT INTO source VALUES
+                  |(1, 'Alice_v2', struct('210000', 'Nanjing')),
+                  |(3, 'Carol', struct('310000', 'Hangzhou'))""".stripMargin)
+
+      spark.sql("""
+                  |MERGE INTO target
+                  |USING source
+                  |ON target.id = source.id
+                  |WHEN MATCHED THEN
+                  |UPDATE SET *
+                  |WHEN NOT MATCHED THEN
+                  |INSERT *
+                  |""".stripMargin)
+
+      checkAnswer(
+        spark.sql("SELECT id, name, address FROM target ORDER BY id"),
+        Seq(
+          Row(1, "Alice_v2", Row("Nanjing", "210000")),
+          Row(2, "Bob", Row("Beijing", "100000")),
+          Row(3, "Carol", Row("Hangzhou", "310000"))
+        )
+      )
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema with struct field change") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable(
+          "target",
+          "id INT, name STRING, address STRUCT<city STRING, zip STRING>",
+          Seq("id"))
+        spark.sql("""INSERT INTO target VALUES
+                    |(1, 'Alice', struct('Shanghai', '200000')),
+                    |(2, 'Bob', struct('Beijing', '100000'))""".stripMargin)
+
+        // Source struct: reversed order (zip, city) + new sub-field 'country' 
+ new top col 'extra'.
+        createTable(
+          "source",
+          "id INT, name STRING, address STRUCT<zip STRING, city STRING, 
country STRING>, extra STRING",
+          Seq("id"))
+        spark.sql("""INSERT INTO source VALUES
+                    |(1, 'Alice_v2', struct('210000', 'Nanjing', 'CN'), 'e1'),
+                    |(3, 'Carol', struct('310000', 'Hangzhou', 'CN'), 
'e3')""".stripMargin)
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.id = source.id
+                    |WHEN MATCHED THEN
+                    |UPDATE SET *
+                    |WHEN NOT MATCHED THEN
+                    |INSERT *
+                    |""".stripMargin)
+
+        checkAnswer(
+          spark.sql("SELECT id, name, address, extra FROM target ORDER BY id"),
+          Seq(
+            Row(1, "Alice_v2", Row("Nanjing", "210000", "CN"), "e1"),
+            Row(2, "Bob", Row("Beijing", "100000", null), null),
+            Row(3, "Carol", Row("Hangzhou", "310000", "CN"), "e3")
+          )
+        )
+      }
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema with array of struct field reorder and 
new top-level col") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable(
+          "target",
+          "id INT, name STRING, items ARRAY<STRUCT<code STRING, quantity 
INT>>",
+          Seq("id"))
+        spark.sql("""INSERT INTO target VALUES
+                    |(1, 'Alice', array(struct('A01', 10), struct('A02', 20))),
+                    |(2, 'Bob', array(struct('B01', 30)))""".stripMargin)
+
+        // Source array<struct>: reversed sub-field order (quantity, code) + 
new top col 'extra'.
+        createTable(
+          "source",
+          "id INT, name STRING, items ARRAY<STRUCT<quantity INT, code 
STRING>>, extra STRING",
+          Seq("id"))
+        spark.sql("""INSERT INTO source VALUES
+                    |(1, 'Alice_v2', array(struct(15, 'A01')), 'e1'),
+                    |(3, 'Carol', array(struct(5, 'C01'), struct(8, 'C02')), 
'e3')""".stripMargin)
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.id = source.id
+                    |WHEN MATCHED THEN
+                    |UPDATE SET *
+                    |WHEN NOT MATCHED THEN
+                    |INSERT *
+                    |""".stripMargin)
+
+        checkAnswer(
+          spark.sql("SELECT id, name, items, extra FROM target ORDER BY id"),
+          Seq(
+            Row(1, "Alice_v2", Seq(Row("A01", 15)), "e1"),
+            Row(2, "Bob", Seq(Row("B01", 30)), null),
+            Row(3, "Carol", Seq(Row("C01", 5), Row("C02", 8)), "e3")
+          )
+        )
+      }
+    }
+  }
+
+  test("Paimon MergeInto: struct field reorder when target has fields absent 
from source") {
+    withTable("source", "target") {
+      // Target struct has 3 sub-fields; source struct only has 2 of them in a 
different order.
+      createTable("target", "id INT, info STRUCT<x INT, y INT, z INT>", 
Seq("id"))
+      spark.sql("INSERT INTO target VALUES (1, struct(1, 2, 3)), (2, struct(4, 
5, 6))")
+
+      createTable("source", "id INT, info STRUCT<y INT, x INT>", Seq("id"))
+      spark.sql("INSERT INTO source VALUES (1, struct(20, 10)), (3, struct(80, 
70))")
+
+      spark.sql("""
+                  |MERGE INTO target
+                  |USING source
+                  |ON target.id = source.id
+                  |WHEN MATCHED THEN
+                  |UPDATE SET target.info = source.info
+                  |WHEN NOT MATCHED THEN
+                  |INSERT (id, info) VALUES (source.id, source.info)
+                  |""".stripMargin)
+
+      checkAnswer(
+        spark.sql("SELECT * FROM target ORDER BY id"),
+        Seq(
+          Row(1, Row(10, 20, null)),
+          Row(2, Row(4, 5, 6)),
+          Row(3, Row(70, 80, null))
+        )
+      )
+    }
+  }
+
+  test("Paimon MergeInto: map with struct value field reorder") {
+    withTable("source", "target") {
+      createTable("target", "id INT, props MAP<STRING, STRUCT<a INT, b 
STRING>>", Seq("id"))
+      spark.sql(
+        "INSERT INTO target VALUES (1, map('k1', struct(1, 'v1'))), (2, 
map('k2', struct(2, 'v2')))")
+
+      // Source map value struct has reversed sub-field order (b, a).
+      createTable("source", "id INT, props MAP<STRING, STRUCT<b STRING, a 
INT>>", Seq("id"))
+      spark.sql(
+        "INSERT INTO source VALUES (1, map('k1', struct('u1', 10))), (3, 
map('k3', struct('u3', 30)))")
+
+      spark.sql("""
+                  |MERGE INTO target
+                  |USING source
+                  |ON target.id = source.id
+                  |WHEN MATCHED THEN
+                  |UPDATE SET target.props = source.props
+                  |WHEN NOT MATCHED THEN
+                  |INSERT (id, props) VALUES (source.id, source.props)
+                  |""".stripMargin)
+
+      checkAnswer(
+        spark.sql("SELECT * FROM target ORDER BY id"),
+        Seq(
+          Row(1, Map("k1" -> Row(10, "u1"))),
+          Row(2, Map("k2" -> Row(2, "v2"))),
+          Row(3, Map("k3" -> Row(30, "u3")))
+        )
+      )
+    }
+  }
 }
 
 trait MergeIntoPrimaryKeyTableTest extends PaimonSparkTestBase with 
PaimonPrimaryKeyTable {
@@ -829,6 +1306,8 @@ trait MergeIntoPrimaryKeyTableTest extends 
PaimonSparkTestBase with PaimonPrimar
 
 trait MergeIntoAppendTableTest extends PaimonSparkTestBase with 
PaimonAppendTable {
 
+  import testImplicits._
+
   test("Paimon MergeInto: non pk table commit kind") {
     withTable("s", "t") {
       createTable("s", "id INT, b INT, c INT", Seq("id"))
@@ -970,6 +1449,37 @@ trait MergeIntoAppendTableTest extends 
PaimonSparkTestBase with PaimonAppendTabl
     }
   }
 
+  test("Paimon MergeInto: type cast STRING to INT in update and insert") {
+    withTable("target") {
+      createTable("target", "id INT, name STRING, status_id INT", Seq("id"))
+      spark.sql("INSERT INTO target VALUES (1, 'Alice', 100), (2, 'Bob', 200), 
(3, 'Charlie', 300)")
+
+      Seq((1, "Alice_Updated", "111"), (2, "Bob_Updated", "222"), (4, 
"David_New", "400"))
+        .toDF("id", "name", "status_id")
+        .createTempView("source")
+      try {
+        spark.sql(
+          """
+            |MERGE INTO target
+            |USING source
+            |ON target.id = source.id
+            |WHEN MATCHED THEN
+            |  UPDATE SET target.id = source.id, target.name = source.name, 
target.status_id = source.status_id
+            |WHEN NOT MATCHED THEN
+            |  INSERT (id, name, status_id) VALUES (source.id, source.name, 
source.status_id)
+            |""".stripMargin)
+
+        checkAnswer(
+          spark.sql("SELECT * FROM target ORDER BY id"),
+          Row(1, "Alice_Updated", 111) :: Row(2, "Bob_Updated", 222) ::
+            Row(3, "Charlie", 300) :: Row(4, "David_New", 400) :: Nil
+        )
+      } finally {
+        spark.catalog.dropTempView("source")
+      }
+    }
+  }
+
   def createPositiveRandomInt(): Int = {
     val random = new Random()
     val positiveInt = random.nextInt()
diff --git 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
index 5f989f525d..c71b3df923 100644
--- 
a/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
+++ 
b/paimon-spark/paimon-spark3-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark3Shim.scala
@@ -29,10 +29,10 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{CTESubstitution, 
SubstituteUnresolvedOrdinals}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, 
LogicalPlan, MergeAction, MergeIntoTable, SubqueryAlias, UnresolvedWith}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, 
SubqueryAlias, UnresolvedWith, UpdateAction}
 // NOTE: `MergeRows` / `MergeRows.Keep` were introduced in Spark 3.4. We 
access them only via
 // reflection inside the `mergeRowsKeep*` method bodies so that loading 
`Spark3Shim` does not fail
 // on Spark 3.2 / 3.3 runtimes that still ship `paimon-spark3-common` (the 
module targets 3.5.8 at
@@ -43,6 +43,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, 
Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.SparkFormatTable
 import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, 
PartitionSpec}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.{FileStreamSink, 
MetadataLogFileIndex}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -119,6 +120,25 @@ class Spark3Shim extends SparkShim {
       notMatchedBySourceActions)
   }
 
+  override def copyDataSourceV2Relation(
+      relation: DataSourceV2Relation,
+      table: Table,
+      output: Seq[AttributeReference]): DataSourceV2Relation = {
+    relation.copy(table = table, output = output)
+  }
+
+  override def copyUpdateAction(
+      action: UpdateAction,
+      assignments: Seq[Assignment]): UpdateAction = {
+    action.copy(assignments = assignments)
+  }
+
+  override def copyInsertAction(
+      action: InsertAction,
+      assignments: Seq[Assignment]): InsertAction = {
+    action.copy(assignments = assignments)
+  }
+
   override def earlyBatchRules(): Seq[Rule[LogicalPlan]] =
     Seq(CTESubstitution, SubstituteUnresolvedOrdinals)
 
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
index 6b50691717..921092572e 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/Spark41MergeIntoRewrite.scala
@@ -18,9 +18,10 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
-import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.spark.catalyst.analysis.{AssignmentAlignmentHelper, 
MergeSchemaEvolutionHelper, PaimonRelation}
 
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{Alias, And, Attribute, 
AttributeReference, Exists, Expression, IsNotNull, Literal, MetadataAttribute, 
MonotonicallyIncreasingID, OuterReference, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -80,6 +81,7 @@ object Spark41MergeIntoRewrite
   extends RewriteRowLevelCommand
   with PredicateHelper
   with AssignmentAlignmentHelper
+  with MergeSchemaEvolutionHelper
   with PureAppendOnlyScope {
 
   final private val ROW_FROM_SOURCE = "__row_from_source"
@@ -92,11 +94,21 @@ object Spark41MergeIntoRewrite
         case m: MergeIntoTable
             if m.resolved && m.rewritable && !m.needSchemaEvolution &&
               targetsPureAppendOnly(m.targetTable) =>
-          rewrite(alignMergeIntoTable(m))
+          // Pure append-only tables never reach the postHoc 
`PaimonMergeIntoBase`, so evolve here.
+          rewrite(alignMergeIntoTable(evolveSchemaIfPaimon(m)))
       }
     }
   }
 
+  private def evolveSchemaIfPaimon(m: MergeIntoTable): MergeIntoTable = {
+    if (!PaimonRelation.isPaimonTable(m.targetTable)) return m
+    val relation = PaimonRelation.getPaimonRelation(m.targetTable)
+    val v2Table = relation.table.asInstanceOf[SparkTable]
+    evolveTargetIfNeeded(m, relation, v2Table, SparkSession.active, 
_.notMatchedBySourceActions)
+      .map(_._1)
+      .getOrElse(m)
+  }
+
   /* 
-------------------------------------------------------------------------------------------
 *
    * Dispatcher mirroring `RewriteMergeIntoTable.apply`'s three 
`ReplaceData`/`AppendData`
    * branches. The `SupportsDelta` branch from Spark is intentionally omitted.
diff --git 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
index 01b6d8d79b..b66b896927 100644
--- 
a/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
+++ 
b/paimon-spark/paimon-spark4-common/src/main/scala/org/apache/spark/sql/paimon/shims/Spark4Shim.scala
@@ -29,10 +29,10 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.CTESubstitution
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, CTERelationRef, 
LogicalPlan, MergeAction, MergeIntoTable, MergeRows, SubqueryAlias, 
UnresolvedWith}
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Assignment, 
CTERelationRef, InsertAction, LogicalPlan, MergeAction, MergeIntoTable, 
MergeRows, SubqueryAlias, UnresolvedWith, UpdateAction}
 import org.apache.spark.sql.catalyst.plans.logical.MergeRows.{Copy, Insert, 
Keep, Update}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.ArrayData
@@ -40,6 +40,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, 
Identifier, Table,
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.execution.SparkFormatTable
 import org.apache.spark.sql.execution.datasources.{PartitioningAwareFileIndex, 
PartitionSpec}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.streaming.runtime.MetadataLogFileIndex
 import org.apache.spark.sql.execution.streaming.sinks.FileStreamSink
 import org.apache.spark.sql.internal.SQLConf
@@ -122,6 +123,25 @@ class Spark4Shim extends SparkShim {
       withSchemaEvolution)
   }
 
+  override def copyDataSourceV2Relation(
+      relation: DataSourceV2Relation,
+      table: Table,
+      output: Seq[AttributeReference]): DataSourceV2Relation = {
+    relation.copy(table = table, output = output)
+  }
+
+  override def copyUpdateAction(
+      action: UpdateAction,
+      assignments: Seq[Assignment]): UpdateAction = {
+    action.copy(assignments = assignments)
+  }
+
+  override def copyInsertAction(
+      action: InsertAction,
+      assignments: Seq[Assignment]): InsertAction = {
+    action.copy(assignments = assignments)
+  }
+
   override def earlyBatchRules(): Seq[Rule[LogicalPlan]] = Seq(CTESubstitution)
 
   override def mergeRowsKeepCopy(condition: Expression, output: 
Seq[Expression]): AnyRef =

Reply via email to