This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c52adb97f1 [spark] Case-insensitive field matching when casting nested 
struct on write. (#7743)
c52adb97f1 is described below

commit c52adb97f1ff219ec68b393f900a66a15b8b4648
Author: Junrui Lee <[email protected]>
AuthorDate: Thu May 14 21:29:50 2026 +0800

    [spark] Case-insensitive field matching when casting nested struct on 
write. (#7743)
    
    When writing BY NAME into a Paimon table whose nested struct field names
    differ from the source only in case, the insert fails with "field does
    not exist", because the lookup is case-sensitive and ignores
    spark.sql.caseSensitive.
    
      Example:
    
      ```
      CREATE TABLE t1 (id INT, info STRUCT<Age: INT, Name: STRING>);
      CREATE TABLE t2 (id INT, info STRUCT<name: STRING, age: INT>);
      INSERT INTO t1 VALUES (1, struct(30, 'Alice'));
      INSERT INTO t2 BY NAME SELECT * FROM t1;  -- fails
    ```
---
 .../spark/catalyst/analysis/PaimonAnalysis.scala   | 119 +++++++++++++++++----
 .../spark/sql/InsertOverwriteTableTestBase.scala   | 110 +++++++++++++++++++
 2 files changed, 207 insertions(+), 22 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index 55d72013d6..6009b6a807 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -43,7 +43,7 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
   import DataSourceV2Implicits._
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
 
-    case a @ PaimonV2WriteCommand(table) if !paimonWriteResolved(a.query, 
table) =>
+    case a @ PaimonV2WriteCommand(table) if !paimonWriteResolved(a.query, 
table, a.isByName) =>
       val mergeSchemaEnabled =
         
writeOptions(a).get(SparkConnectorOptions.MERGE_SCHEMA.key()).contains("true") 
||
           OptionUtils.writeMergeSchemaEnabled()
@@ -77,13 +77,16 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
     }
   }
 
-  private def paimonWriteResolved(query: LogicalPlan, table: NamedRelation): 
Boolean = {
+  private def paimonWriteResolved(
+      query: LogicalPlan,
+      table: NamedRelation,
+      isByName: Boolean): Boolean = {
     query.output.size == table.output.size &&
     query.output.zip(table.output).forall {
       case (inAttr, outAttr) =>
         val inType = 
CharVarcharUtils.getRawType(inAttr.metadata).getOrElse(inAttr.dataType)
         val outType = 
CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType)
-        inAttr.name == outAttr.name && schemaCompatible(inType, outType)
+        inAttr.name == outAttr.name && schemaCompatible(inType, outType, 
isByName)
     }
   }
 
@@ -176,21 +179,42 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
     Project(project, query)
   }
 
-  private def schemaCompatible(dataSchema: DataType, tableSchema: DataType): 
Boolean = {
+  private def schemaCompatible(
+      dataSchema: DataType,
+      tableSchema: DataType,
+      checkFieldNames: Boolean): Boolean = {
     (dataSchema, tableSchema) match {
       case (s1: StructType, s2: StructType) =>
-        s1.zip(s2).forall { case (d1, d2) => schemaCompatible(d1.dataType, 
d2.dataType) }
+        s1.length == s2.length &&
+        (!checkFieldNames ||
+          (!hasResolverConflicts(s1) &&
+            !hasResolverConflicts(s2) &&
+            structFieldsResolved(s1, s2))) &&
+        s1.zip(s2).forall {
+          case (d1, d2) => schemaCompatible(d1.dataType, d2.dataType, 
checkFieldNames)
+        }
       case (a1: ArrayType, a2: ArrayType) =>
         // todo: support array type nullable evaluation
-        schemaCompatible(a1.elementType, a2.elementType)
+        schemaCompatible(a1.elementType, a2.elementType, checkFieldNames)
       case (m1: MapType, m2: MapType) =>
         m1.valueContainsNull == m2.valueContainsNull &&
-        schemaCompatible(m1.keyType, m2.keyType) &&
-        schemaCompatible(m1.valueType, m2.valueType)
+        schemaCompatible(m1.keyType, m2.keyType, checkFieldNames) &&
+        schemaCompatible(m1.valueType, m2.valueType, checkFieldNames)
       case (d1, d2) => d1 == d2
     }
   }
 
+  private def structFieldsResolved(source: StructType, target: StructType): 
Boolean = {
+    source.zip(target).forall {
+      case (sourceField, targetField) =>
+        conf.resolver(sourceField.name, targetField.name)
+    }
+  }
+
+  private def hasResolverConflicts(struct: StructType): Boolean = {
+    struct.fields.combinations(2).exists { case Array(a, b) => 
conf.resolver(a.name, b.name) }
+  }
+
   private def addCastToColumn(
       attr: Attribute,
       targetAttr: Attribute,
@@ -223,34 +247,76 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
       parent: NamedExpression,
       source: StructType,
       target: StructType): NamedExpression = {
+    // Reject target fields that collide under the current resolver
+    // (e.g. `name` and `Name` with case-insensitive resolution), otherwise
+    // we would silently map both target fields to the same source field.
+    val targetConflicts = target.fields
+      .combinations(2)
+      .collect {
+        case Array(a, b) if conf.resolver(a.name, b.name) => (a.name, b.name)
+      }
+      .toSeq
+    if (targetConflicts.nonEmpty) {
+      throw new RuntimeException(
+        "Cannot write incompatible data: nested struct has conflicting target 
field names: " +
+          targetConflicts.map { case (a, b) => s"`$a` vs `$b`" }.mkString(", 
") + ".")
+    }
+
+    // Single pass: resolve each target field to its source match(es) and track
+    // which source indices were consumed, so we can detect extras without
+    // rescanning source and target repeatedly.
+    val sourceWithIndex = source.fields.zipWithIndex
+    val consumed = mutable.BitSet.empty
+    val resolved = target.fields.map {
+      tgt =>
+        val matches = sourceWithIndex.filter { case (f, _) => 
conf.resolver(f.name, tgt.name) }
+        matches.foreach { case (_, i) => consumed += i }
+        (tgt, matches)
+    }
+
     // If source struct has fields not in target, reject so that merge-schema
     // can handle the evolution instead of silently dropping the extra fields.
-    val targetFieldNames = target.fieldNames.toSet
-    val extraFields = source.fieldNames.filterNot(targetFieldNames.contains)
+    val extraFields = sourceWithIndex.collect {
+      case (f, i) if !consumed(i) => f.name
+    }
     if (extraFields.nonEmpty) {
       throw new RuntimeException(
         s"Cannot write incompatible data: nested struct has extra fields: 
${extraFields.mkString(", ")}.")
     }
 
-    val fields = target.map {
-      case targetField @ StructField(name, nested: StructType, _, _) =>
-        val sourceIndex = source.fieldIndex(name)
-        val sourceField = source(sourceIndex)
-        sourceField.dataType match {
-          case s: StructType =>
-            val subField = castStructField(parent, sourceIndex, 
sourceField.name, targetField)
+    val fields = resolved.map {
+      case (targetField, matches) =>
+        val (sourceIndex, sourceField) = resolveSingleSourceField(matches, 
targetField.name, source)
+        (targetField.dataType, sourceField.dataType) match {
+          case (nested: StructType, s: StructType) =>
+            val subField = extractStructField(parent, sourceIndex, 
sourceField.name, targetField)
             addCastToStructByName(subField, s, nested)
-          case o =>
+          case (_: StructType, o) =>
             throw new RuntimeException(s"Can not support to cast $o to 
StructType.")
+          case _ =>
+            castStructField(parent, sourceIndex, sourceField.name, targetField)
         }
-      case targetField =>
-        val sourceIndex = source.fieldIndex(targetField.name)
-        val sourceField = source(sourceIndex)
-        castStructField(parent, sourceIndex, sourceField.name, targetField)
     }
     structAlias(fields, parent)
   }
 
+  private def resolveSingleSourceField(
+      matches: Array[(StructField, Int)],
+      name: String,
+      source: StructType): (Int, StructField) = {
+    if (matches.length == 1) {
+      val (field, index) = matches(0)
+      (index, field)
+    } else if (matches.isEmpty) {
+      throw new RuntimeException(
+        s"""Field "$name" does not exist in source struct type: 
${source.simpleString}.""")
+    } else {
+      throw new RuntimeException(
+        s"""Cannot resolve nested field "$name" due to name conflicts: """ +
+          matches.map(_._1.name).mkString(", ") + ".")
+    }
+  }
+
   private def addCastToStructByPosition(
       parent: NamedExpression,
       source: StructType,
@@ -300,6 +366,15 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
       targetField.name)(explicitMetadata = Option(targetField.metadata))
   }
 
+  private def extractStructField(
+      parent: NamedExpression,
+      i: Int,
+      sourceFieldName: String,
+      targetField: StructField): NamedExpression = {
+    Alias(GetStructField(parent, i, Option(sourceFieldName)), 
targetField.name)(
+      explicitMetadata = Option(targetField.metadata))
+  }
+
   private def castToArrayStruct(
       parent: NamedExpression,
       source: StructType,
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
index 92e2c3ee19..baecdaf997 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
@@ -126,6 +126,116 @@ abstract class InsertOverwriteTableTestBase extends 
PaimonSparkTestBase {
       }
   }
 
+  test("Paimon: insert by name with case-insensitive nested struct field 
matching") {
+    assume(gteqSpark3_5)
+    withTable("t1", "t2") {
+      // Source struct field order / types differ from target so 
paimonWriteResolved
+      // falls through schemaCompatible and we actually exercise 
addCastToStructByName.
+      spark.sql("""CREATE TABLE t1 (id INT NOT NULL, info STRUCT<Age: INT, 
Name: STRING>)
+                  |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+      spark.sql("""CREATE TABLE t2 (id INT NOT NULL, info STRUCT<name: STRING, 
age: INT>)
+                  |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+
+      sql("INSERT INTO t1 VALUES (1, struct(30, 'Alice')), (2, struct(25, 
'Bob'))")
+
+      sql("INSERT INTO t2 BY NAME SELECT * FROM t1")
+      checkAnswer(
+        sql("SELECT * FROM t2 ORDER BY id"),
+        Row(1, Row("Alice", 30)) :: Row(2, Row("Bob", 25)) :: Nil)
+    }
+  }
+
+  test("Paimon: insert by name reorders same-type nested struct fields") {
+    assume(gteqSpark3_5)
+    withTable("t1", "t2") {
+      spark.sql("""CREATE TABLE t1 (id INT NOT NULL, info STRUCT<Nick: STRING, 
Name: STRING>)
+                  |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+      spark.sql("""CREATE TABLE t2 (id INT NOT NULL, info STRUCT<name: STRING, 
nick: STRING>)
+                  |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+
+      sql("INSERT INTO t1 VALUES (1, struct('Ally', 'Alice'))")
+
+      sql("INSERT INTO t2 BY NAME SELECT * FROM t1")
+      checkAnswer(sql("SELECT * FROM t2"), Row(1, Row("Alice", "Ally")) :: Nil)
+    }
+  }
+
+  test("Paimon: insert by name with case-insensitive matching inside 
array<struct<...>>") {
+    assume(gteqSpark3_5)
+    withTable("t1", "t2") {
+      spark.sql("""CREATE TABLE t1 (id INT NOT NULL, items ARRAY<STRUCT<Age: 
INT, Name: STRING>>)
+                  |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+      spark.sql("""CREATE TABLE t2 (id INT NOT NULL, items ARRAY<STRUCT<name: 
STRING, age: INT>>)
+                  |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+
+      sql("INSERT INTO t1 VALUES (1, array(struct(30, 'Alice'), struct(25, 
'Bob')))")
+
+      sql("INSERT INTO t2 BY NAME SELECT * FROM t1")
+      checkAnswer(sql("SELECT * FROM t2"), Row(1, Seq(Row("Alice", 30), 
Row("Bob", 25))) :: Nil)
+    }
+  }
+
+  test("Paimon: insert by name with case-insensitive matching inside nested 
struct") {
+    assume(gteqSpark3_5)
+    withTable("t1", "t2") {
+      spark.sql("""CREATE TABLE t1 (
+                  |  id INT NOT NULL,
+                  |  info STRUCT<details: STRUCT<Age: INT, Name: STRING>>)
+                  |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+      spark.sql("""CREATE TABLE t2 (
+                  |  id INT NOT NULL,
+                  |  info STRUCT<details: STRUCT<name: STRING, age: INT>>)
+                  |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+
+      sql("INSERT INTO t1 VALUES (1, struct(struct(30, 'Alice')))")
+
+      sql("INSERT INTO t2 BY NAME SELECT * FROM t1")
+      checkAnswer(sql("SELECT * FROM t2"), Row(1, Row(Row("Alice", 30))) :: 
Nil)
+    }
+  }
+
+  test("Paimon: insert by name rejects ambiguous source nested struct fields") 
{
+    assume(gteqSpark3_5)
+    withSparkSQLConf("spark.sql.caseSensitive" -> "true") {
+      withTable("t1", "t2") {
+        // source has both `name` and `Name`, legal when session is 
case-sensitive
+        spark.sql("""CREATE TABLE t1 (id INT NOT NULL, info STRUCT<name: 
STRING, Name: STRING>)
+                    |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+        spark.sql("""CREATE TABLE t2 (id INT NOT NULL, info STRUCT<name: 
STRING>)
+                    |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+        sql("INSERT INTO t1 VALUES (1, struct('Alice', 'Bob'))")
+
+        withSparkSQLConf("spark.sql.caseSensitive" -> "false") {
+          val msg = intercept[Exception] {
+            sql("INSERT INTO t2 BY NAME SELECT * FROM t1")
+          }.getMessage
+          assert(msg.contains("name conflicts"))
+        }
+      }
+    }
+  }
+
+  test("Paimon: insert by name rejects nested struct target fields colliding 
under resolver") {
+    assume(gteqSpark3_5)
+    withSparkSQLConf("spark.sql.caseSensitive" -> "true") {
+      withTable("t1", "t2") {
+        spark.sql("""CREATE TABLE t1 (id INT NOT NULL, info STRUCT<name: 
STRING>)
+                    |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+        // target has both `name` and `Name`, legal when session is 
case-sensitive
+        spark.sql("""CREATE TABLE t2 (id INT NOT NULL, info STRUCT<name: 
STRING, Name: STRING>)
+                    |TBLPROPERTIES ('write-only' = 'true')""".stripMargin)
+        sql("INSERT INTO t1 VALUES (1, struct('Alice'))")
+
+        withSparkSQLConf("spark.sql.caseSensitive" -> "false") {
+          val msg = intercept[Exception] {
+            sql("INSERT INTO t2 BY NAME SELECT * FROM t1")
+          }.getMessage
+          assert(msg.contains("conflicting target field names"))
+        }
+      }
+    }
+  }
+
   withPk.foreach {
     hasPk =>
       bucketModes.foreach {

Reply via email to