This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 701d9e0d79 [spark] Unify Spark write column alignment via 
PaimonOutputResolver (#7928)
701d9e0d79 is described below

commit 701d9e0d79b045f41ee4b3a7ce54a295d0c06e16
Author: Zouxxyy <[email protected]>
AuthorDate: Fri May 22 10:40:11 2026 +0800

    [spark] Unify Spark write column alignment via PaimonOutputResolver (#7928)
    
    Replace the ad-hoc column-alignment logic in `PaimonAnalysis` (a
    parallel,
    top-level-only implementation of `TableOutputResolver`) with a dedicated
    `PaimonOutputResolver` — a trimmed fork of `TableOutputResolver` with
    `mergeSchemaEnabled` support that applies uniformly at every nesting
    depth.
    
    - Missing target fields → NULL-fill at any depth
    - Extra source fields → reject when `mergeSchemaEnabled=false`, else
    kept
      inline so `SchemaHelper.mergeSchema` evolves the table at write time
    - Nullable downcast → `AssertNotNull`; type mismatch → tagged `Cast`
    
    `paimonWriteResolved` now mirrors Spark's V2 `outputResolved` strictness
    (name + type + nullability, including nested). A `PAIMON_WRITE_RESOLVED`
    `TreeNodeTag` prevents re-entry when extras are kept inline.
---
 .../spark/catalyst/analysis/PaimonAnalysis.scala   | 361 ++-----------------
 .../catalyst/analysis/PaimonOutputResolver.scala   | 385 +++++++++++++++++++++
 .../scala/org/apache/spark/sql/PaimonUtils.scala   |   4 +
 .../paimon/spark/SparkSchemaEvolutionITCase.java   |   4 +-
 .../org/apache/paimon/spark/sql/DDLTestBase.scala  | 115 +++---
 .../paimon/spark/sql/DataFrameWriteTestBase.scala  | 270 ++++++++++++++-
 .../spark/sql/InsertOverwriteTableTestBase.scala   |  36 +-
 .../paimon/spark/sql/V2WriteMergeSchemaTest.scala  |   2 +-
 .../paimon/spark/sql/WriteMergeSchemaTest.scala    |  50 ++-
 9 files changed, 804 insertions(+), 423 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index 6009b6a807..d3ff947ce7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -26,30 +26,39 @@ import 
org.apache.paimon.spark.commands.{PaimonAnalyzeTableColumnCommand, Paimon
 import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.table.FileStoreTable
 
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{PaimonUtils, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.{NamedRelation, ResolvedTable}
-import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform, 
Attribute, CreateStruct, Expression, GetArrayItem, GetStructField, If, IsNull, 
LambdaFunction, Literal, NamedExpression, NamedLambdaVariable}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreeNodeTag
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
 import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, 
DataSourceV2Relation}
-import org.apache.spark.sql.types._
-
-import scala.collection.mutable
 
 class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
   import DataSourceV2Implicits._
+  import PaimonAnalysis._
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
 
-    case a @ PaimonV2WriteCommand(table) if !paimonWriteResolved(a.query, 
table, a.isByName) =>
+    case a @ PaimonV2WriteCommand(table)
+        if !paimonWriteResolved(a.query, table) &&
+          a.query.getTagValue(PAIMON_WRITE_RESOLVED).isEmpty =>
       val mergeSchemaEnabled =
         
writeOptions(a).get(SparkConnectorOptions.MERGE_SCHEMA.key()).contains("true") 
||
           OptionUtils.writeMergeSchemaEnabled()
-      resolveQueryColumns(a.query, table, a.isByName, mergeSchemaEnabled) 
match {
-        case Some(newQuery) if newQuery != a.query => 
Compatibility.withNewQuery(a, newQuery)
-        case _ => a
+      val newQuery = PaimonOutputResolver.resolveOutputColumns(
+        table.name,
+        table.output,
+        a.query,
+        a.isByName,
+        conf,
+        mergeSchemaEnabled)
+      if (newQuery ne a.query) {
+        // Tag to short-circuit the next Analyzer pass; otherwise inline-kept 
extras would loop.
+        newQuery.setTagValue(PAIMON_WRITE_RESOLVED, ())
+        Compatibility.withNewQuery(a, newQuery)
+      } else {
+        a
       }
 
     case o @ PaimonDynamicPartitionOverwrite(r, d) if o.resolved =>
@@ -77,337 +86,25 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
     }
   }
 
-  private def paimonWriteResolved(
-      query: LogicalPlan,
-      table: NamedRelation,
-      isByName: Boolean): Boolean = {
+  // Mirrors Spark's V2WriteCommand `outputResolved` strictness: query and 
table outputs must match
+  // by name, position, type (ignoring nullable compatibility), and 
nullability. Any nested
+  // structural differences also have to be reconciled before we declare the 
write resolved.
+  private def paimonWriteResolved(query: LogicalPlan, table: NamedRelation): 
Boolean = {
     query.output.size == table.output.size &&
     query.output.zip(table.output).forall {
       case (inAttr, outAttr) =>
         val inType = 
CharVarcharUtils.getRawType(inAttr.metadata).getOrElse(inAttr.dataType)
         val outType = 
CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
-        inAttr.name == outAttr.name && schemaCompatible(inType, outType, 
isByName)
-    }
-  }
-
-  private def resolveQueryColumns(
-      query: LogicalPlan,
-      table: NamedRelation,
-      byName: Boolean,
-      mergeSchemaEnabled: Boolean = false): Option[LogicalPlan] = {
-    // More details see: `TableOutputResolver#resolveOutputColumns`
-    if (byName) {
-      try {
-        Option.apply(resolveQueryColumnsByName(query, table))
-      } catch {
-        case e: Exception =>
-          // Merge schema is effective only when using byName mode.
-          // Schema validation is skipped here, because schema validation or 
merging will be
-          // done during insertion when mergeSchemaEnabled.
-          if (mergeSchemaEnabled) {
-            Option.empty
-          } else {
-            throw e
-          }
-      }
-    } else {
-      Option.apply(resolveQueryColumnsByPosition(query, table))
-    }
-  }
-
-  private def resolveQueryColumnsByName(query: LogicalPlan, table: 
NamedRelation): LogicalPlan = {
-    val inputCols = query.output
-    val expectedCols = table.output
-    if (inputCols.size > expectedCols.size) {
-      throw new RuntimeException(
-        s"Cannot write incompatible data for the table `${table.name}`, " +
-          "the number of data columns don't match with the table schema's.")
-    }
-
-    val matchedCols = mutable.HashSet.empty[String]
-    val reorderedCols = expectedCols.map {
-      expectedCol =>
-        val matched = inputCols.filter(col => conf.resolver(col.name, 
expectedCol.name))
-        if (matched.isEmpty) {
-          // TODO: Support Spark default value framework if Paimon supports to 
change default values.
-          if (!expectedCol.nullable) {
-            throw new RuntimeException(
-              s"Cannot write incompatible data for the table `${table.name}`, 
" +
-                s"due to non-nullable column `${expectedCol.name}` has no 
specified value.")
-          }
-          Alias(Literal(null, expectedCol.dataType), expectedCol.name)()
-        } else if (matched.length > 1) {
-          throw new RuntimeException(
-            s"Cannot write incompatible data for the table `${table.name}`, 
due to column name conflicts: ${matched
-                .mkString(", ")}.")
-        } else {
-          matchedCols += matched.head.name
-          val matchedCol = matched.head
-          addCastToColumn(matchedCol, expectedCol, isByName = true)
-        }
-    }
-
-    assert(reorderedCols.length == expectedCols.length)
-    if (matchedCols.size < inputCols.length) {
-      val extraCols = inputCols
-        .filterNot(col => matchedCols.contains(col.name))
-        .map(col => s"${toSQLId(col.name)}")
-        .mkString(", ")
-      // There are seme unknown column names
-      throw new RuntimeException(
-        s"Cannot write incompatible data for the table `${table.name}`, due to 
unknown column names: $extraCols.")
-    }
-    Project(reorderedCols, query)
-  }
-
-  private def resolveQueryColumnsByPosition(
-      query: LogicalPlan,
-      table: NamedRelation): LogicalPlan = {
-    val expectedCols = table.output
-    val queryCols = query.output
-    if (queryCols.size != expectedCols.size) {
-      throw new RuntimeException(
-        s"Cannot write incompatible data for the table `${table.name}`, " +
-          "the number of data columns don't match with the table schema's.")
-    }
-
-    val project = queryCols.zipWithIndex.map {
-      case (attr, i) =>
-        val targetAttr = expectedCols(i)
-        addCastToColumn(attr, targetAttr, isByName = false)
-    }
-    Project(project, query)
-  }
-
-  private def schemaCompatible(
-      dataSchema: DataType,
-      tableSchema: DataType,
-      checkFieldNames: Boolean): Boolean = {
-    (dataSchema, tableSchema) match {
-      case (s1: StructType, s2: StructType) =>
-        s1.length == s2.length &&
-        (!checkFieldNames ||
-          (!hasResolverConflicts(s1) &&
-            !hasResolverConflicts(s2) &&
-            structFieldsResolved(s1, s2))) &&
-        s1.zip(s2).forall {
-          case (d1, d2) => schemaCompatible(d1.dataType, d2.dataType, 
checkFieldNames)
-        }
-      case (a1: ArrayType, a2: ArrayType) =>
-        // todo: support array type nullable evaluation
-        schemaCompatible(a1.elementType, a2.elementType, checkFieldNames)
-      case (m1: MapType, m2: MapType) =>
-        m1.valueContainsNull == m2.valueContainsNull &&
-        schemaCompatible(m1.keyType, m2.keyType, checkFieldNames) &&
-        schemaCompatible(m1.valueType, m2.valueType, checkFieldNames)
-      case (d1, d2) => d1 == d2
-    }
-  }
-
-  private def structFieldsResolved(source: StructType, target: StructType): 
Boolean = {
-    source.zip(target).forall {
-      case (sourceField, targetField) =>
-        conf.resolver(sourceField.name, targetField.name)
-    }
-  }
-
-  private def hasResolverConflicts(struct: StructType): Boolean = {
-    struct.fields.combinations(2).exists { case Array(a, b) => 
conf.resolver(a.name, b.name) }
-  }
-
-  private def addCastToColumn(
-      attr: Attribute,
-      targetAttr: Attribute,
-      isByName: Boolean): NamedExpression = {
-    val expr = (attr.dataType, targetAttr.dataType) match {
-      case (s, t) if s == t =>
-        attr
-      case (s: StructType, t: StructType) if s != t =>
-        if (isByName) {
-          addCastToStructByName(attr, s, t)
-        } else {
-          addCastToStructByPosition(attr, s, t)
-        }
-      case (ArrayType(s: StructType, sNull: Boolean), ArrayType(t: StructType, 
_: Boolean))
-          if s != t =>
-        val castToStructFunc = if (isByName) {
-          addCastToStructByName _
-        } else {
-          addCastToStructByPosition _
-        }
-        castToArrayStruct(attr, s, t, sNull, castToStructFunc)
-      case _ =>
-        cast(attr, targetAttr.dataType)
-    }
-    Alias(stringLengthCheck(expr, targetAttr.metadata), 
targetAttr.name)(explicitMetadata =
-      Option(targetAttr.metadata))
-  }
-
-  private def addCastToStructByName(
-      parent: NamedExpression,
-      source: StructType,
-      target: StructType): NamedExpression = {
-    // Reject target fields that collide under the current resolver
-    // (e.g. `name` and `Name` with case-insensitive resolution), otherwise
-    // we would silently map both target fields to the same source field.
-    val targetConflicts = target.fields
-      .combinations(2)
-      .collect {
-        case Array(a, b) if conf.resolver(a.name, b.name) => (a.name, b.name)
-      }
-      .toSeq
-    if (targetConflicts.nonEmpty) {
-      throw new RuntimeException(
-        "Cannot write incompatible data: nested struct has conflicting target 
field names: " +
-          targetConflicts.map { case (a, b) => s"`$a` vs `$b`" }.mkString(", 
") + ".")
-    }
-
-    // Single pass: resolve each target field to its source match(es) and track
-    // which source indices were consumed, so we can detect extras without
-    // rescanning source and target repeatedly.
-    val sourceWithIndex = source.fields.zipWithIndex
-    val consumed = mutable.BitSet.empty
-    val resolved = target.fields.map {
-      tgt =>
-        val matches = sourceWithIndex.filter { case (f, _) => 
conf.resolver(f.name, tgt.name) }
-        matches.foreach { case (_, i) => consumed += i }
-        (tgt, matches)
-    }
-
-    // If source struct has fields not in target, reject so that merge-schema
-    // can handle the evolution instead of silently dropping the extra fields.
-    val extraFields = sourceWithIndex.collect {
-      case (f, i) if !consumed(i) => f.name
-    }
-    if (extraFields.nonEmpty) {
-      throw new RuntimeException(
-        s"Cannot write incompatible data: nested struct has extra fields: 
${extraFields.mkString(", ")}.")
-    }
-
-    val fields = resolved.map {
-      case (targetField, matches) =>
-        val (sourceIndex, sourceField) = resolveSingleSourceField(matches, 
targetField.name, source)
-        (targetField.dataType, sourceField.dataType) match {
-          case (nested: StructType, s: StructType) =>
-            val subField = extractStructField(parent, sourceIndex, 
sourceField.name, targetField)
-            addCastToStructByName(subField, s, nested)
-          case (_: StructType, o) =>
-            throw new RuntimeException(s"Can not support to cast $o to 
StructType.")
-          case _ =>
-            castStructField(parent, sourceIndex, sourceField.name, targetField)
-        }
-    }
-    structAlias(fields, parent)
-  }
-
-  private def resolveSingleSourceField(
-      matches: Array[(StructField, Int)],
-      name: String,
-      source: StructType): (Int, StructField) = {
-    if (matches.length == 1) {
-      val (field, index) = matches(0)
-      (index, field)
-    } else if (matches.isEmpty) {
-      throw new RuntimeException(
-        s"""Field "$name" does not exist in source struct type: 
${source.simpleString}.""")
-    } else {
-      throw new RuntimeException(
-        s"""Cannot resolve nested field "$name" due to name conflicts: """ +
-          matches.map(_._1.name).mkString(", ") + ".")
+        inAttr.name == outAttr.name &&
+        PaimonUtils.equalsIgnoreCompatibleNullability(inType, outType) &&
+        (outAttr.nullable || !inAttr.nullable)
     }
   }
 
-  private def addCastToStructByPosition(
-      parent: NamedExpression,
-      source: StructType,
-      target: StructType): NamedExpression = {
-    if (source.length != target.length) {
-      throw new RuntimeException("The number of fields in source and target is 
not same.")
-    }
-
-    val fields = target.zipWithIndex.map {
-      case (targetField @ StructField(_, nested: StructType, _, _), i) =>
-        val sourceField = source(i)
-        sourceField.dataType match {
-          case s: StructType =>
-            val subField = castStructField(parent, i, sourceField.name, 
targetField)
-            addCastToStructByPosition(subField, s, nested)
-          case o =>
-            throw new RuntimeException(s"Can not support to cast $o to 
StructType.")
-        }
-      case (targetField, i) =>
-        val sourceField = source(i)
-        castStructField(parent, i, sourceField.name, targetField)
-    }
-    structAlias(fields, parent)
-  }
-
-  private def structAlias(
-      fields: Seq[NamedExpression],
-      parent: NamedExpression): NamedExpression = {
-    val struct = CreateStruct(fields)
-    val res = if (parent.nullable) {
-      If(IsNull(parent), Literal(null, struct.dataType), struct)
-    } else {
-      struct
-    }
-    Alias(res, parent.name)(parent.exprId, parent.qualifier, 
Option(parent.metadata))
-  }
-
-  private def castStructField(
-      parent: NamedExpression,
-      i: Int,
-      sourceFieldName: String,
-      targetField: StructField): NamedExpression = {
-    Alias(
-      stringLengthCheck(
-        cast(GetStructField(parent, i, Option(sourceFieldName)), 
targetField.dataType),
-        targetField.metadata),
-      targetField.name)(explicitMetadata = Option(targetField.metadata))
-  }
-
-  private def extractStructField(
-      parent: NamedExpression,
-      i: Int,
-      sourceFieldName: String,
-      targetField: StructField): NamedExpression = {
-    Alias(GetStructField(parent, i, Option(sourceFieldName)), 
targetField.name)(
-      explicitMetadata = Option(targetField.metadata))
-  }
-
-  private def castToArrayStruct(
-      parent: NamedExpression,
-      source: StructType,
-      target: StructType,
-      sourceNullable: Boolean,
-      castToStructFunc: (NamedExpression, StructType, StructType) => 
NamedExpression
-  ): Expression = {
-    val structConverter: (Expression, Expression) => Expression = (_, i) =>
-      castToStructFunc(Alias(GetArrayItem(parent, i), i.toString)(), source, 
target)
-    val transformLambdaFunc = {
-      val elementVar = NamedLambdaVariable("elementVar", source, 
sourceNullable)
-      val indexVar = NamedLambdaVariable("indexVar", IntegerType, false)
-      LambdaFunction(structConverter(elementVar, indexVar), Seq(elementVar, 
indexVar))
-    }
-    ArrayTransform(parent, transformLambdaFunc)
-  }
-
-  private def cast(expr: Expression, dataType: DataType): Expression = {
-    val cast = Compatibility.cast(expr, dataType, 
Option(conf.sessionLocalTimeZone))
-    cast.setTagValue(Compatibility.castByTableInsertionTag, ())
-    cast
-  }
+}
 
-  private def stringLengthCheck(expr: Expression, metadata: Metadata): 
Expression = {
-    if (!conf.charVarcharAsString) {
-      CharVarcharUtils
-        .getRawType(metadata)
-        .map(rawType => CharVarcharUtils.stringLengthCheck(expr, rawType))
-        .getOrElse(expr)
-    } else {
-      expr
-    }
-  }
+object PaimonAnalysis {
+  val PAIMON_WRITE_RESOLVED: TreeNodeTag[Unit] = 
TreeNodeTag[Unit]("paimon.write.resolved")
 }
 
 case class PaimonPostHocResolutionRules(session: SparkSession) extends 
Rule[LogicalPlan] {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonOutputResolver.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonOutputResolver.scala
new file mode 100644
index 0000000000..7a8d4a878d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonOutputResolver.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.analysis
+
+import org.apache.paimon.spark.catalyst.Compatibility
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+import scala.collection.mutable
+
+/**
+ * Trimmed fork of Spark's `TableOutputResolver` with `mergeSchemaEnabled` for 
schema evolution.
+ *
+ *   - Missing target fields: NULL-fill at any depth.
+ *   - Extra source fields: throw when `mergeSchemaEnabled = false`; otherwise 
kept in the
+ *     projection at any depth so `SchemaHelper.mergeSchema` evolves the table 
at write time. Safe
+ *     because `PaimonSparkTableBase` advertises `ACCEPT_ANY_SCHEMA`, 
short-circuiting Spark's
+ *     `outputResolved` check.
+ *   - Nullable downcast: wrap with `AssertNotNull`.
+ *   - Type mismatch: wrap with `Compatibility.cast` tagged for downstream 
explicit-cast check.
+ */
+object PaimonOutputResolver {
+
+  def resolveOutputColumns(
+      tableName: String,
+      expected: Seq[Attribute],
+      query: LogicalPlan,
+      byName: Boolean,
+      conf: SQLConf,
+      mergeSchemaEnabled: Boolean): LogicalPlan = {
+    val resolved: Seq[NamedExpression] = if (byName) {
+      reorderColumnsByName(tableName, query.output, expected, conf, 
mergeSchemaEnabled, Nil)
+    } else {
+      resolveColumnsByPosition(tableName, query.output, expected, conf, Nil)
+    }
+    if (resolved == query.output) {
+      query
+    } else {
+      Project(resolved, query)
+    }
+  }
+
+  private def reorderColumnsByName(
+      tableName: String,
+      inputCols: Seq[NamedExpression],
+      expectedCols: Seq[Attribute],
+      conf: SQLConf,
+      mergeSchemaEnabled: Boolean,
+      colPath: Seq[String]): Seq[NamedExpression] = {
+    val isTopLevel = colPath.isEmpty
+    val matchedNames = mutable.HashSet.empty[String]
+    val reordered = expectedCols.map {
+      expectedCol =>
+        val matches = inputCols.filter(col => conf.resolver(col.name, 
expectedCol.name))
+        val newColPath = colPath :+ expectedCol.name
+        if (matches.isEmpty) {
+          nullFill(expectedCol)
+        } else if (matches.length > 1) {
+          throw new RuntimeException(
+            s"Cannot write to `$tableName`, " +
+              s"due to ambiguous column name `${newColPath.mkString(".")}`.")
+        } else {
+          matchedNames += matches.head.name
+          val actualExpectedCol = expectedCol.withDataType {
+            
CharVarcharUtils.getRawType(expectedCol.metadata).getOrElse(expectedCol.dataType)
+          }
+          resolveField(
+            tableName,
+            matches.head,
+            actualExpectedCol,
+            byName = true,
+            conf,
+            mergeSchemaEnabled,
+            newColPath)
+        }
+    }
+
+    if (matchedNames.size < inputCols.length) {
+      val extras = inputCols.filterNot(col => matchedNames.contains(col.name))
+      if (!mergeSchemaEnabled) {
+        val extrasStr = extras.map(c => s"`${c.name}`").mkString(", ")
+        val msg = if (isTopLevel) {
+          s"Cannot write to `$tableName`, extra columns: $extrasStr"
+        } else {
+          s"Cannot write to `$tableName`, " +
+            s"extra struct fields at `${colPath.mkString(".")}`: $extrasStr"
+        }
+        throw new RuntimeException(msg)
+      }
+      reordered ++ extras.map(preserveAsAlias)
+    } else {
+      reordered
+    }
+  }
+
+  private def resolveColumnsByPosition(
+      tableName: String,
+      inputCols: Seq[NamedExpression],
+      expectedCols: Seq[Attribute],
+      conf: SQLConf,
+      colPath: Seq[String]): Seq[NamedExpression] = {
+    val actualExpectedCols = expectedCols.map {
+      attr =>
+        attr.withDataType {
+          CharVarcharUtils.getRawType(attr.metadata).getOrElse(attr.dataType)
+        }
+    }
+    if (inputCols.size != actualExpectedCols.size) {
+      val where = if (colPath.isEmpty) {
+        s"`$tableName`"
+      } else {
+        s"`$tableName` at `${colPath.mkString(".")}`"
+      }
+      throw new RuntimeException(s"Cannot write to $where, " +
+        s"the number of data columns (${inputCols.size}) doesn't match the 
table schema's (${actualExpectedCols.size}).")
+    }
+    inputCols.zip(actualExpectedCols).map {
+      case (inputCol, expectedCol) =>
+        resolveField(
+          tableName,
+          inputCol,
+          expectedCol,
+          byName = false,
+          conf,
+          mergeSchemaEnabled = false,
+          colPath :+ expectedCol.name)
+    }
+  }
+
+  private def resolveField(
+      tableName: String,
+      input: NamedExpression,
+      expected: Attribute,
+      byName: Boolean,
+      conf: SQLConf,
+      mergeSchemaEnabled: Boolean,
+      colPath: Seq[String]): NamedExpression = {
+    (input.dataType, expected.dataType) match {
+      case (sourceType: StructType, targetType: StructType) =>
+        resolveStructType(
+          tableName,
+          input,
+          sourceType,
+          expected,
+          targetType,
+          byName,
+          conf,
+          mergeSchemaEnabled,
+          colPath)
+      case (sourceType: ArrayType, targetType: ArrayType) =>
+        resolveArrayType(
+          tableName,
+          input,
+          sourceType,
+          expected,
+          targetType,
+          byName,
+          conf,
+          mergeSchemaEnabled,
+          colPath)
+      case (sourceType: MapType, targetType: MapType) =>
+        resolveMapType(
+          tableName,
+          input,
+          sourceType,
+          expected,
+          targetType,
+          byName,
+          conf,
+          mergeSchemaEnabled,
+          colPath)
+      case _ =>
+        checkField(input, expected, conf, colPath)
+    }
+  }
+
+  private def resolveStructType(
+      tableName: String,
+      input: Expression,
+      sourceType: StructType,
+      expected: Attribute,
+      targetType: StructType,
+      byName: Boolean,
+      conf: SQLConf,
+      mergeSchemaEnabled: Boolean,
+      colPath: Seq[String]): NamedExpression = {
+    val nullCheckedInput = checkNullability(input, expected, colPath)
+    val fields = sourceType.zipWithIndex.map {
+      case (f, i) =>
+        Alias(GetStructField(nullCheckedInput, i, Option(f.name)), f.name)()
+    }
+    val resolved = if (byName) {
+      reorderColumnsByName(
+        tableName,
+        fields,
+        toAttributes(targetType),
+        conf,
+        mergeSchemaEnabled,
+        colPath)
+    } else {
+      resolveColumnsByPosition(tableName, fields, toAttributes(targetType), 
conf, colPath)
+    }
+    val struct = CreateStruct(resolved)
+    val res = if (nullCheckedInput.nullable) {
+      If(IsNull(nullCheckedInput), Literal(null, struct.dataType), struct)
+    } else {
+      struct
+    }
+    Alias(res, expected.name)(explicitMetadata = Option(expected.metadata))
+  }
+
+  private def resolveArrayType(
+      tableName: String,
+      input: Expression,
+      sourceType: ArrayType,
+      expected: Attribute,
+      targetType: ArrayType,
+      byName: Boolean,
+      conf: SQLConf,
+      mergeSchemaEnabled: Boolean,
+      colPath: Seq[String]): NamedExpression = {
+    val nullCheckedInput = checkNullability(input, expected, colPath)
+    val param = NamedLambdaVariable("element", sourceType.elementType, 
sourceType.containsNull)
+    val fakeAttr =
+      AttributeReference("element", targetType.elementType, 
targetType.containsNull)()
+    val resolved = if (byName) {
+      reorderColumnsByName(tableName, Seq(param), Seq(fakeAttr), conf, 
mergeSchemaEnabled, colPath)
+    } else {
+      resolveColumnsByPosition(tableName, Seq(param), Seq(fakeAttr), conf, 
colPath)
+    }
+    assert(resolved.length == 1)
+    val elementExpr = stripOuterAlias(resolved.head)
+    val transformed = if (elementExpr.fastEquals(param)) {
+      nullCheckedInput
+    } else {
+      ArrayTransform(nullCheckedInput, LambdaFunction(elementExpr, Seq(param)))
+    }
+    Alias(transformed, expected.name)(explicitMetadata = 
Option(expected.metadata))
+  }
+
+  private def resolveMapType(
+      tableName: String,
+      input: Expression,
+      sourceType: MapType,
+      expected: Attribute,
+      targetType: MapType,
+      byName: Boolean,
+      conf: SQLConf,
+      mergeSchemaEnabled: Boolean,
+      colPath: Seq[String]): NamedExpression = {
+    val nullCheckedInput = checkNullability(input, expected, colPath)
+    val keyParam = NamedLambdaVariable("key", sourceType.keyType, nullable = 
false)
+    val fakeKeyAttr = AttributeReference("key", targetType.keyType, nullable = 
false)()
+    val resolvedKey = if (byName) {
+      reorderColumnsByName(
+        tableName,
+        Seq(keyParam),
+        Seq(fakeKeyAttr),
+        conf,
+        mergeSchemaEnabled,
+        colPath)
+    } else {
+      resolveColumnsByPosition(tableName, Seq(keyParam), Seq(fakeKeyAttr), 
conf, colPath)
+    }
+
+    val valueParam =
+      NamedLambdaVariable("value", sourceType.valueType, 
sourceType.valueContainsNull)
+    val fakeValueAttr =
+      AttributeReference("value", targetType.valueType, 
targetType.valueContainsNull)()
+    val resolvedValue = if (byName) {
+      reorderColumnsByName(
+        tableName,
+        Seq(valueParam),
+        Seq(fakeValueAttr),
+        conf,
+        mergeSchemaEnabled,
+        colPath)
+    } else {
+      resolveColumnsByPosition(tableName, Seq(valueParam), Seq(fakeValueAttr), 
conf, colPath)
+    }
+
+    assert(resolvedKey.length == 1 && resolvedValue.length == 1)
+    val keyExpr = stripOuterAlias(resolvedKey.head)
+    val valueExpr = stripOuterAlias(resolvedValue.head)
+    val transformed = if (keyExpr.fastEquals(keyParam) && 
valueExpr.fastEquals(valueParam)) {
+      nullCheckedInput
+    } else {
+      val newKeys = if (keyExpr.fastEquals(keyParam)) {
+        MapKeys(nullCheckedInput)
+      } else {
+        ArrayTransform(MapKeys(nullCheckedInput), LambdaFunction(keyExpr, 
Seq(keyParam)))
+      }
+      val newValues = if (valueExpr.fastEquals(valueParam)) {
+        MapValues(nullCheckedInput)
+      } else {
+        ArrayTransform(MapValues(nullCheckedInput), LambdaFunction(valueExpr, 
Seq(valueParam)))
+      }
+      MapFromArrays(newKeys, newValues)
+    }
+    Alias(transformed, expected.name)(explicitMetadata = 
Option(expected.metadata))
+  }
+
+  private def checkField(
+      input: NamedExpression,
+      expected: Attribute,
+      conf: SQLConf,
+      colPath: Seq[String]): NamedExpression = {
+    val attrTypeHasCharVarchar = 
CharVarcharUtils.hasCharVarchar(expected.dataType)
+    val attrTypeWithoutCharVarchar = if (attrTypeHasCharVarchar) {
+      CharVarcharUtils.replaceCharVarcharWithString(expected.dataType)
+    } else {
+      expected.dataType
+    }
+    val casted = if (input.dataType == attrTypeWithoutCharVarchar) {
+      input
+    } else {
+      addCast(input, attrTypeWithoutCharVarchar, conf)
+    }
+    val withStrLenCheck = if (conf.charVarcharAsString || 
!attrTypeHasCharVarchar) {
+      casted
+    } else {
+      CharVarcharUtils.stringLengthCheck(casted, expected.dataType)
+    }
+    val nullChecked = checkNullability(withStrLenCheck, expected, colPath)
+    Alias(nullChecked, expected.name)(explicitMetadata = 
Option(expected.metadata))
+  }
+
+  private def checkNullability(
+      input: Expression,
+      expected: Attribute,
+      colPath: Seq[String]): Expression = {
+    if (input.nullable && !expected.nullable) {
+      AssertNotNull(input, colPath)
+    } else {
+      input
+    }
+  }
+
+  private def addCast(expr: Expression, dataType: DataType, conf: SQLConf): 
Expression = {
+    val cast = Compatibility.cast(expr, dataType, 
Option(conf.sessionLocalTimeZone))
+    cast.setTagValue(Compatibility.castByTableInsertionTag, ())
+    cast
+  }
+
+  private def nullFill(expected: Attribute): NamedExpression = {
+    Alias(Literal(null, expected.dataType), expected.name)(
+      explicitMetadata = Option(expected.metadata))
+  }
+
+  private def preserveAsAlias(expr: NamedExpression): NamedExpression = expr 
match {
+    case a: Alias => a
+    case other =>
+      Alias(other, other.name)(explicitMetadata = Option(other.metadata))
+  }
+
+  private def stripOuterAlias(expr: Expression): Expression = expr match {
+    case a: Alias => a.child
+    case other => other
+  }
+
+  private def toAttributes(structType: StructType): Seq[Attribute] = {
+    structType.map(f => AttributeReference(f.name, f.dataType, f.nullable, 
f.metadata)())
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index fe96ad2e14..d8b70f68f2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -132,6 +132,10 @@ object PaimonUtils {
     left.sameType(right)
   }
 
+  def equalsIgnoreCompatibleNullability(from: DataType, to: DataType): Boolean 
= {
+    DataType.equalsIgnoreCompatibleNullability(from, to)
+  }
+
   def classIsLoadable(clazz: String): Boolean = {
     SparkUtils.classIsLoadable(clazz)
   }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
index 1cdf210d89..fc8b4adb6e 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkSchemaEvolutionITCase.java
@@ -415,7 +415,7 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                         () -> {
                             writeTable("testAlterColumnType", "(1, null, 
'a')");
                         })
-                .hasMessageContaining("Cannot write null to non-null 
column(b)");
+                .hasStackTraceContaining("value appeared in non-nullable 
field");
 
         List<Row> beforeAlter = spark.sql("SHOW CREATE TABLE 
testAlterColumnType").collectAsList();
         assertThat(beforeAlter.toString())
@@ -437,7 +437,7 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                         () -> {
                             writeTable("testAlterColumnType", "(1, null, 
'a')");
                         })
-                .hasMessageContaining("Cannot write null to non-null 
column(b)");
+                .hasStackTraceContaining("value appeared in non-nullable 
field");
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index 47a280868d..b44561a09c 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -23,7 +23,6 @@ import org.apache.paimon.schema.Schema
 import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.types.DataTypes
 
-import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException
 import org.junit.jupiter.api.Assertions
@@ -41,10 +40,10 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
     withTable("T") {
       sql("CREATE TABLE T (id INT NOT NULL, name STRING)")
 
-      val e1 = intercept[SparkException] {
+      val e1 = intercept[Exception] {
         sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (null, "c")""")
       }
-      Assertions.assertTrue(e1.getMessage().contains("Cannot write null to 
non-null column"))
+      Assertions.assertTrue(e1.getMessage().contains("value appeared in 
non-nullable field"))
 
       sql("""INSERT INTO T VALUES (1, "a"), (2, "b"), (3, null)""")
       checkAnswer(
@@ -65,15 +64,15 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
             |TBLPROPERTIES ('primary-key' = 'id,pt')
             |""".stripMargin)
 
-      val e1 = intercept[SparkException] {
+      val e1 = intercept[Exception] {
         sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", null)""")
       }
-      Assertions.assertTrue(e1.getMessage().contains("Cannot write null to 
non-null column"))
+      Assertions.assertTrue(e1.getMessage().contains("value appeared in 
non-nullable field"))
 
-      val e2 = intercept[SparkException] {
+      val e2 = intercept[Exception] {
         sql("""INSERT INTO T VALUES (1, "a", "pt1"), (null, "b", "pt2")""")
       }
-      Assertions.assertTrue(e2.getMessage().contains("Cannot write null to 
non-null column"))
+      Assertions.assertTrue(e2.getMessage().contains("value appeared in 
non-nullable field"))
 
       sql("""INSERT INTO T VALUES (1, "a", "pt1"), (2, "b", "pt1"), (3, null, 
"pt2")""")
       checkAnswer(
@@ -340,65 +339,69 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
   test("Paimon DDL: CREATE OR REPLACE TABLE AS SELECT on partitioned table") {
     assume(gteqSpark3_4)
     withTable("t") {
-      sql("""
-            |CREATE TABLE t (id BIGINT, data STRING, pt STRING)
-            |USING paimon
-            |PARTITIONED BY (pt)
-            |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '2')
-            |""".stripMargin)
-      sql("INSERT INTO t VALUES (1, 'old', 'p0')")
-      val oldLocation = loadTable("t").location().toString
-      Seq((2L, "x2", "p1"), (3L, "x3", "p2"))
-        .toDF("id", "data", "pt")
-        .createOrReplaceTempView("source")
+      withTempView("source") {
+        sql("""
+              |CREATE TABLE t (id BIGINT, data STRING, pt STRING)
+              |USING paimon
+              |PARTITIONED BY (pt)
+              |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '2')
+              |""".stripMargin)
+        sql("INSERT INTO t VALUES (1, 'old', 'p0')")
+        val oldLocation = loadTable("t").location().toString
+        Seq((2L, "x2", "p1"), (3L, "x3", "p2"))
+          .toDF("id", "data", "pt")
+          .createOrReplaceTempView("source")
 
-      sql("""
-            |CREATE OR REPLACE TABLE t
-            |USING paimon
-            |PARTITIONED BY (pt)
-            |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '3')
-            |AS SELECT * FROM source
-            |""".stripMargin)
+        sql("""
+              |CREATE OR REPLACE TABLE t
+              |USING paimon
+              |PARTITIONED BY (pt)
+              |TBLPROPERTIES ('primary-key' = 'id,pt', 'bucket' = '3')
+              |AS SELECT * FROM source
+              |""".stripMargin)
 
-      val replaced = loadTable("t")
-      Assertions.assertEquals(oldLocation, replaced.location().toString)
-      Assertions.assertEquals("3", replaced.options().get("bucket"))
-      checkAnswer(
-        sql("SELECT * FROM t ORDER BY id"),
-        Seq((2L, "x2", "p1"), (3L, "x3", "p2")).toDF())
+        val replaced = loadTable("t")
+        Assertions.assertEquals(oldLocation, replaced.location().toString)
+        Assertions.assertEquals("3", replaced.options().get("bucket"))
+        checkAnswer(
+          sql("SELECT * FROM t ORDER BY id"),
+          Seq((2L, "x2", "p1"), (3L, "x3", "p2")).toDF())
+      }
     }
   }
 
   test("Paimon DDL: CREATE OR REPLACE TABLE AS SELECT supports incompatible 
schema") {
     assume(gteqSpark3_4)
     withTable("t") {
-      sql("""
-            |CREATE TABLE t (id BIGINT, data STRING)
-            |USING paimon
-            |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
-            |""".stripMargin)
-      sql("INSERT INTO t VALUES (1, 'old')")
-      val oldLocation = loadTable("t").location().toString
-      val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
-      Seq(("2", 20), ("3", 30)).toDF("id", 
"amount").createOrReplaceTempView("source")
+      withTempView("source") {
+        sql("""
+              |CREATE TABLE t (id BIGINT, data STRING)
+              |USING paimon
+              |TBLPROPERTIES ('primary-key' = 'id', 'bucket' = '2')
+              |""".stripMargin)
+        sql("INSERT INTO t VALUES (1, 'old')")
+        val oldLocation = loadTable("t").location().toString
+        val oldSnapshotId = loadTable("t").snapshotManager().latestSnapshotId()
+        Seq(("2", 20), ("3", 30)).toDF("id", 
"amount").createOrReplaceTempView("source")
 
-      sql("""
-            |CREATE OR REPLACE TABLE t
-            |USING paimon
-            |TBLPROPERTIES ('bucket' = '-1')
-            |AS SELECT * FROM source
-            |""".stripMargin)
+        sql("""
+              |CREATE OR REPLACE TABLE t
+              |USING paimon
+              |TBLPROPERTIES ('bucket' = '-1')
+              |AS SELECT * FROM source
+              |""".stripMargin)
 
-      val replaced = loadTable("t")
-      Assertions.assertEquals(oldLocation, replaced.location().toString)
-      Assertions.assertEquals("-1", replaced.options().get("bucket"))
-      Assertions.assertEquals(Seq("id", "amount"), 
spark.table("t").schema.fieldNames.toSeq)
-      Assertions.assertEquals("string", 
spark.table("t").schema("id").dataType.typeName)
-      Assertions.assertEquals("integer", 
spark.table("t").schema("amount").dataType.typeName)
-      checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(("2", 20), ("3", 
30)).toDF())
-      checkAnswer(
-        sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
-        Seq((1L, "old")).toDF())
+        val replaced = loadTable("t")
+        Assertions.assertEquals(oldLocation, replaced.location().toString)
+        Assertions.assertEquals("-1", replaced.options().get("bucket"))
+        Assertions.assertEquals(Seq("id", "amount"), 
spark.table("t").schema.fieldNames.toSeq)
+        Assertions.assertEquals("string", 
spark.table("t").schema("id").dataType.typeName)
+        Assertions.assertEquals("integer", 
spark.table("t").schema("amount").dataType.typeName)
+        checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(("2", 20), ("3", 
30)).toDF())
+        checkAnswer(
+          sql(s"SELECT id, data FROM t VERSION AS OF $oldSnapshotId"),
+          Seq((1L, "old")).toDF())
+      }
     }
   }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
index 01def579ef..87d80cb897 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTestBase.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
-import org.apache.spark.sql.types.DecimalType
+import org.apache.spark.sql.types.{DecimalType, StructType}
 import org.junit.jupiter.api.Assertions
 
 import java.sql.{Date, Timestamp}
@@ -366,6 +366,130 @@ abstract class DataFrameWriteTestBase extends 
PaimonSparkTestBase {
       }
   }
 
+  test("Paimon: DataFrameWrite.saveAsTable should recursively align nested 
fields by name") {
+    for (useV2Write <- Seq("true", "false")) {
+      withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) {
+        withTable("source", "target") {
+          sql("""
+                |CREATE TABLE source (
+                |  id INT,
+                |  outer STRUCT<inner_arr: ARRAY<STRUCT<flag: BOOLEAN, label: 
STRING>>>,
+                |  nested_arr ARRAY<ARRAY<STRUCT<flag: BOOLEAN, label: 
STRING>>>,
+                |  nested_map MAP<STRING, STRUCT<flag: BOOLEAN, label: STRING>>
+                |) USING paimon
+                |""".stripMargin)
+
+          sql("""
+                |CREATE TABLE target (
+                |  id INT,
+                |  outer STRUCT<inner_arr: ARRAY<STRUCT<label: STRING, flag: 
BOOLEAN>>>,
+                |  nested_arr ARRAY<ARRAY<STRUCT<label: STRING, flag: 
BOOLEAN>>>,
+                |  nested_map MAP<STRING, STRUCT<label: STRING, flag: BOOLEAN>>
+                |) USING paimon
+                |""".stripMargin)
+
+          sql("""
+                |INSERT INTO source VALUES (
+                |  1,
+                |  named_struct('inner_arr', array(named_struct('flag', true, 
'label', 'outer'))),
+                |  array(array(named_struct('flag', false, 'label', 'array'))),
+                |  map('k', named_struct('flag', true, 'label', 'map'))
+                |)
+                |""".stripMargin)
+
+          
spark.table("source").write.format("paimon").mode("append").saveAsTable("target")
+
+          checkAnswer(
+            sql("""
+                  |SELECT
+                  |  outer.inner_arr[0].label,
+                  |  outer.inner_arr[0].flag,
+                  |  nested_arr[0][0].label,
+                  |  nested_arr[0][0].flag,
+                  |  nested_map['k'].label,
+                  |  nested_map['k'].flag
+                  |FROM target
+                  |""".stripMargin),
+            Seq(Row("outer", true, "array", false, "map", true))
+          )
+        }
+      }
+    }
+  }
+
+  test("Paimon: SQL insert should recursively keep nested fields by-position 
semantics") {
+    withSparkSQLConf("spark.sql.ansi.enabled" -> "false") {
+      withTable("source", "target") {
+        sql("""
+              |CREATE TABLE source (
+              |  id INT,
+              |  outer STRUCT<inner_arr: ARRAY<STRUCT<flag: BOOLEAN, label: 
STRING>>>,
+              |  nested_arr ARRAY<ARRAY<STRUCT<flag: BOOLEAN, label: STRING>>>,
+              |  nested_map MAP<STRING, STRUCT<flag: BOOLEAN, label: STRING>>
+              |) USING paimon
+              |""".stripMargin)
+
+        sql("""
+              |CREATE TABLE target (
+              |  id INT,
+              |  outer STRUCT<inner_arr: ARRAY<STRUCT<label: STRING, flag: 
BOOLEAN>>>,
+              |  nested_arr ARRAY<ARRAY<STRUCT<label: STRING, flag: BOOLEAN>>>,
+              |  nested_map MAP<STRING, STRUCT<label: STRING, flag: BOOLEAN>>
+              |) USING paimon
+              |""".stripMargin)
+
+        sql("""
+              |INSERT INTO source VALUES (
+              |  1,
+              |  named_struct('inner_arr', array(named_struct('flag', true, 
'label', 'outer'))),
+              |  array(array(named_struct('flag', false, 'label', 'array'))),
+              |  map('k', named_struct('flag', true, 'label', 'map'))
+              |)
+              |""".stripMargin)
+
+        sql("INSERT INTO target SELECT * FROM source")
+
+        checkAnswer(
+          sql("""
+                |SELECT
+                |  outer.inner_arr[0].label,
+                |  outer.inner_arr[0].flag,
+                |  nested_arr[0][0].label,
+                |  nested_arr[0][0].flag,
+                |  nested_map['k'].label,
+                |  nested_map['k'].flag
+                |FROM target
+                |""".stripMargin),
+          Seq(Row("true", null, "false", null, "true", null))
+        )
+      }
+    }
+  }
+
+  test("Paimon: DataFrameWrite.saveAsTable should reject incompatible struct 
casts") {
+    withTable("source", "target") {
+      sql("CREATE TABLE source (id INT, payload STRUCT<value: INT>) USING 
paimon")
+      sql("CREATE TABLE target (id INT, payload INT) USING paimon")
+      sql("INSERT INTO source VALUES (1, named_struct('value', 1))")
+
+      val message = intercept[Exception] {
+        
spark.table("source").write.format("paimon").mode("append").saveAsTable("target")
+      }.getMessage
+      assert(message.toLowerCase.contains("cast"))
+    }
+
+    withTable("source", "target") {
+      sql("CREATE TABLE source (id INT, payload INT) USING paimon")
+      sql("CREATE TABLE target (id INT, payload STRUCT<value: INT>) USING 
paimon")
+      sql("INSERT INTO source VALUES (1, 1)")
+
+      val message = intercept[Exception] {
+        
spark.table("source").write.format("paimon").mode("append").saveAsTable("target")
+      }.getMessage
+      assert(message.toLowerCase.contains("cast"))
+    }
+  }
+
   withPk.foreach {
     hasPk =>
       bucketModes.foreach {
@@ -784,4 +908,148 @@ abstract class DataFrameWriteTestBase extends 
PaimonSparkTestBase {
         null) :: Nil
     checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected3)
   }
+
+  test("Paimon write merge-schema conflict: deep nested array element bigint 
-> string") {
+    for (useV2Write <- Seq("true", "false")) {
+      withSparkSQLConf(
+        "spark.paimon.write.use-v2-write" -> useV2Write,
+        "spark.paimon.write.merge-schema.explicit-cast" -> "true") {
+        withTable("target") {
+          sql("""
+                |CREATE TABLE target (
+                |  id STRING,
+                |  data STRUCT<wind: STRUCT<impactPrts: ARRAY<STRING>>>
+                |) USING paimon
+                |""".stripMargin)
+          sql("""
+                |INSERT INTO target VALUES (
+                |  'r0',
+                |  named_struct('wind', named_struct('impactPrts', array('p0', 
'p1')))
+                |)
+                |""".stripMargin)
+
+          val sourceSchema =
+            StructType.fromDDL("id STRING, data STRUCT<wind: 
STRUCT<impactPrts: ARRAY<BIGINT>>>")
+          val sourceDf = spark.createDataFrame(
+            java.util.Arrays.asList(Row("r1", 
Row(Row(java.util.Arrays.asList(10L, 20L))))),
+            sourceSchema)
+          sourceDf.write
+            .format("paimon")
+            .mode("append")
+            .option("write.merge-schema", "true")
+            .saveAsTable("target")
+
+          checkAnswer(
+            sql("SELECT id, data.wind.impactPrts FROM target ORDER BY id"),
+            Seq(
+              Row("r0", Array("p0", "p1")),
+              Row("r1", Array("10", "20"))
+            )
+          )
+        }
+      }
+    }
+  }
+
+  test("Paimon write merge-schema conflict: top-level same-name column string 
vs bigint") {
+    for (useV2Write <- Seq("true", "false")) {
+      withSparkSQLConf(
+        "spark.paimon.write.use-v2-write" -> useV2Write,
+        "spark.paimon.write.merge-schema.explicit-cast" -> "true") {
+        withTable("target") {
+          sql("CREATE TABLE target (id STRING, value BIGINT) USING paimon")
+          sql("INSERT INTO target VALUES ('r0', 1000L)")
+
+          val sourceSchema = StructType.fromDDL("id STRING, value STRING")
+          val sourceDf =
+            spark.createDataFrame(java.util.Arrays.asList(Row("r1", "2000")), 
sourceSchema)
+          sourceDf.write
+            .format("paimon")
+            .mode("append")
+            .option("write.merge-schema", "true")
+            .saveAsTable("target")
+
+          checkAnswer(
+            sql("SELECT id, value FROM target ORDER BY id"),
+            Seq(Row("r0", 1000L), Row("r1", 2000L))
+          )
+        }
+      }
+    }
+  }
+
+  test("Paimon write merge-schema conflict: deep nested struct widening int -> 
bigint") {
+    for (useV2Write <- Seq("true", "false")) {
+      withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) {
+        withTable("target") {
+          sql("""
+                |CREATE TABLE target (
+                |  id STRING,
+                |  payload STRUCT<
+                |    inner: STRUCT<
+                |      items: ARRAY<STRUCT<
+                |        prtId: BIGINT,
+                |        seqNum: BIGINT,
+                |        missing: ARRAY<STRUCT<
+                |          port: BIGINT,
+                |          terminal: BIGINT
+                |        >>
+                |      >>
+                |    >
+                |  >
+                |) USING paimon
+                |""".stripMargin)
+          sql("""
+                |INSERT INTO target VALUES (
+                |  'r0',
+                |  named_struct('inner', named_struct('items', 
array(named_struct(
+                |    'prtId', 10L,
+                |    'seqNum', 1L,
+                |    'missing', array(named_struct('port', 100L, 'terminal', 
200L))
+                |  ))))
+                |)
+                |""".stripMargin)
+
+          val sourceSchema = StructType.fromDDL(
+            "id STRING, " +
+              "payload STRUCT<" +
+              "  inner: STRUCT<" +
+              "    items: ARRAY<STRUCT<" +
+              "      prtId: INT, " +
+              "      seqNum: INT, " +
+              "      missing: ARRAY<STRUCT<port: INT, terminal: INT>>" +
+              "    >>" +
+              "  >" +
+              ">")
+          val sourceDf = spark.createDataFrame(
+            java.util.Arrays.asList(Row(
+              "r1",
+              Row(
+                Row(java.util.Arrays.asList(Row(20, 2, 
java.util.Arrays.asList(Row(300, 400)))))))),
+            sourceSchema)
+          sourceDf.write
+            .format("paimon")
+            .mode("append")
+            .option("write.merge-schema", "true")
+            .saveAsTable("target")
+
+          checkAnswer(
+            sql("""
+                  |SELECT
+                  |  id,
+                  |  payload.inner.items[0].prtId,
+                  |  payload.inner.items[0].seqNum,
+                  |  payload.inner.items[0].missing[0].port,
+                  |  payload.inner.items[0].missing[0].terminal
+                  |FROM target ORDER BY id
+                  |""".stripMargin),
+            Seq(
+              Row("r0", 10L, 1L, 100L, 200L),
+              Row("r1", 20L, 2L, 300L, 400L)
+            )
+          )
+        }
+      }
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
index baecdaf997..2780a49e53 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
@@ -92,25 +92,22 @@ abstract class InsertOverwriteTableTestBase extends 
PaimonSparkTestBase {
                 val msg1 = intercept[Exception] {
                   sql("INSERT INTO TABLE t1 BY NAME SELECT col1, col2 as col1 
FROM t1")
                 }
-                assert(msg1.getMessage.contains("due to column name 
conflicts"))
+                assert(msg1.getMessage.contains("due to ambiguous column name 
`col1`"))
                 // name does not match
                 val msg2 = intercept[Exception] {
                   sql("INSERT INTO TABLE t1 BY NAME SELECT col1, col2 as colx 
FROM t1")
                 }
-                assert(msg2.getMessage.contains("due to unknown column names"))
+                assert(msg2.getMessage.contains("extra columns: `colx`"))
                 // query column size bigger than table's
                 val msg3 = intercept[Exception] {
                   sql("INSERT INTO TABLE t1 BY NAME SELECT col1, col2, col3, 
col4, col4 as col5 FROM t1")
                 }
-                assert(
-                  msg3.getMessage.contains(
-                    "the number of data columns don't match with the table 
schema"))
+                assert(msg3.getMessage.contains("extra columns: `col5`"))
                 // non-nullable column has no specified value
                 val msg4 = intercept[Exception] {
                   sql("INSERT INTO TABLE t2 BY NAME SELECT col2 FROM t2")
                 }
-                assert(
-                  msg4.getMessage.contains("non-nullable column `col1` has no 
specified value"))
+                assert(msg4.getMessage.contains("Cannot write null to non-null 
column(col1)"))
 
                 // by position
                 // column size does not match
@@ -119,7 +116,7 @@ abstract class InsertOverwriteTableTestBase extends 
PaimonSparkTestBase {
                 }
                 assert(
                   msg5.getMessage.contains(
-                    "the number of data columns don't match with the table 
schema"))
+                    "the number of data columns (1) doesn't match the table 
schema's (4)"))
               }
             }
           }
@@ -209,28 +206,7 @@ abstract class InsertOverwriteTableTestBase extends 
PaimonSparkTestBase {
           val msg = intercept[Exception] {
             sql("INSERT INTO t2 BY NAME SELECT * FROM t1")
           }.getMessage
-          assert(msg.contains("name conflicts"))
-        }
-      }
-    }
-  }
-
-  test("Paimon: insert by name rejects nested struct target fields colliding 
under resolver") {
-    assume(gteqSpark3_5)
-    withSparkSQLConf("spark.sql.caseSensitive" -> "true") {
-      withTable("t1", "t2") {
-        spark.sql("""CREATE TABLE t1 (id INT NOT NULL, info STRUCT<name: 
STRING>)
-                    |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
-        // target has both `name` and `Name`, legal when session is 
case-sensitive
-        spark.sql("""CREATE TABLE t2 (id INT NOT NULL, info STRUCT<name: 
STRING, Name: STRING>)
-                    |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
-        sql("INSERT INTO t1 VALUES (1, struct('Alice'))")
-
-        withSparkSQLConf("spark.sql.caseSensitive" -> "false") {
-          val msg = intercept[Exception] {
-            sql("INSERT INTO t2 BY NAME SELECT * FROM t1")
-          }.getMessage
-          assert(msg.contains("conflicting target field names"))
+          assert(msg.contains("due to ambiguous column name `info.name`"))
         }
       }
     }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
index c73048f7d0..2a17d6297e 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
@@ -106,7 +106,7 @@ class V2WriteMergeSchemaTest extends PaimonSparkTestBase {
         val error = intercept[RuntimeException] {
           spark.sql("INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c")
         }.getMessage
-        assert(error.contains("the number of data columns don't match with the 
table schema's"))
+        assert(error.contains("extra columns: `c`"))
       }
     }
   }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
index 6be87a82fa..83b423438a 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
@@ -110,7 +110,7 @@ class WriteMergeSchemaTest extends PaimonSparkTestBase {
         val error = intercept[RuntimeException] {
           spark.sql("INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c")
         }.getMessage
-        assert(error.contains("the number of data columns don't match with the 
table schema's"))
+        assert(error.contains("extra columns"))
       }
     }
   }
@@ -498,4 +498,52 @@ class WriteMergeSchemaTest extends PaimonSparkTestBase {
       }
     }
   }
+
+  test("Write merge schema: array of struct missing nested field by 
dataframe") {
+    withTable("t") {
+      sql("""
+            |CREATE TABLE t (
+            |  ids ARRAY<BIGINT>,
+            |  a BIGINT,
+            |  b BIGINT,
+            |  items ARRAY<STRUCT<
+            |    f1: INT,
+            |    f2: STRING,
+            |    f3: STRING,
+            |    f4: STRING,
+            |    f5: STRING,
+            |    f6: INT>>)
+            |""".stripMargin)
+
+      val sourceDataFrame = sql("""
+                                  |SELECT
+                                  |  array(1, 2) AS ids,
+                                  |  1 AS a,
+                                  |  2 AS b,
+                                  |  array(named_struct(
+                                  |    'f1', 10,
+                                  |    'f2', 'v2',
+                                  |    'f3', 'v3',
+                                  |    'f4', 'v4',
+                                  |    'f5', 'v5')) AS items
+                                  |""".stripMargin)
+
+      val alignedDataFrame = sourceDataFrame.select(
+        sourceDataFrame("ids").cast("ARRAY<BIGINT>").as("ids"),
+        sourceDataFrame("a").cast("BIGINT").as("a"),
+        sourceDataFrame("b").cast("BIGINT").as("b"),
+        sourceDataFrame("items")
+      )
+
+      alignedDataFrame.write
+        .format("paimon")
+        .mode("append")
+        .option("write.merge-schema", "true")
+        .saveAsTable("t")
+
+      checkAnswer(
+        sql("SELECT * FROM t"),
+        Seq(Row(Seq(1L, 2L), 1L, 2L, Seq(Row(10, "v2", "v3", "v4", "v5", 
null)))))
+    }
+  }
 }

Reply via email to