This is an automated email from the ASF dual-hosted git repository.

hope pushed a commit to branch release-1.4
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 12c9c9a441a979639d5f6a41a91f2b3d01bb67d0
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Mar 27 08:48:51 2026 +0800

    [spark] Fix nested struct field mapping for V2 write merge schema (#7542)
---
 .../paimon/spark/SparkInternalRowWrapper.java      |  18 +++-
 .../paimon/spark/sql/V2WriteMergeSchemaTest.scala  | 108 +++++++++++++++++++++
 2 files changed, 123 insertions(+), 3 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
index d2e70b9ed3..ffd077741c 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
@@ -63,6 +63,7 @@ public class SparkInternalRowWrapper implements InternalRow, 
Serializable {
     private final int length;
     @Nullable private final UriReaderFactory uriReaderFactory;
     @Nullable private final int[] fieldIndexMap;
+    @Nullable private final StructType dataSchema;
 
     private transient org.apache.spark.sql.catalyst.InternalRow internalRow;
 
@@ -77,6 +78,7 @@ public class SparkInternalRowWrapper implements InternalRow, 
Serializable {
             CatalogContext catalogContext) {
         this.tableSchema = tableSchema;
         this.length = length;
+        this.dataSchema = dataSchema;
         this.fieldIndexMap =
                 dataSchema != null ? buildFieldIndexMap(tableSchema, 
dataSchema) : null;
         this.uriReaderFactory = new UriReaderFactory(catalogContext);
@@ -240,7 +242,11 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
 
     @Override
     public Blob getBlob(int pos) {
-        byte[] bytes = internalRow.getBinary(pos);
+        int actualPos = getActualFieldPosition(pos);
+        if (actualPos == -1 || internalRow.isNullAt(actualPos)) {
+            return null;
+        }
+        byte[] bytes = internalRow.getBinary(actualPos);
         boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
         if (blobDes) {
             BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
@@ -284,8 +290,14 @@ public class SparkInternalRowWrapper implements 
InternalRow, Serializable {
         if (actualPos == -1 || internalRow.isNullAt(actualPos)) {
             return null;
         }
-        return new SparkInternalRowWrapper(
-                        (StructType) 
tableSchema.fields()[actualPos].dataType(), numFields)
+        StructType nestedTableSchema = (StructType) 
tableSchema.fields()[pos].dataType();
+        if (dataSchema != null) {
+            StructType nestedDataSchema = (StructType) 
dataSchema.fields()[actualPos].dataType();
+            int dataNumFields = nestedDataSchema.size();
+            return new SparkInternalRowWrapper(nestedTableSchema, numFields, 
nestedDataSchema, null)
+                    .replace(internalRow.getStruct(actualPos, dataNumFields));
+        }
+        return new SparkInternalRowWrapper(nestedTableSchema, numFields)
                 .replace(internalRow.getStruct(actualPos, numFields));
     }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
index 0b6e589d9f..c73048f7d0 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/V2WriteMergeSchemaTest.scala
@@ -316,4 +316,112 @@ class V2WriteMergeSchemaTest extends PaimonSparkTestBase {
     }
   }
 
+  test("Write merge schema: nested struct with new fields by sql") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, info STRUCT<key1 STRUCT<key2 STRING, key3 
STRING>>)")
+      sql("INSERT INTO t VALUES (1, struct(struct('v2a', 'v3a')))")
+      sql("INSERT INTO t VALUES (2, struct(struct('v2b', 'v3b')))")
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq(Row(1, Row(Row("v2a", "v3a"))), Row(2, Row(Row("v2b", "v3b"))))
+      )
+
+      sql(
+        "INSERT INTO t BY NAME " +
+          "SELECT 3 AS id, " +
+          "named_struct('key1', named_struct('key2', 'v2c', 'key4', 'v4c', 
'key3', 'v3c')) AS info")
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq(
+          Row(1, Row(Row("v2a", "v3a", null))),
+          Row(2, Row(Row("v2b", "v3b", null))),
+          Row(3, Row(Row("v2c", "v3c", "v4c"))))
+      )
+    }
+  }
+
+  test("Write merge schema: deeply nested struct with new fields") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, data STRUCT<a STRUCT<b STRUCT<c1 STRING, c2 
STRING>>>)")
+      sql("INSERT INTO t VALUES (1, struct(struct(struct('c1v', 'c2v'))))")
+
+      sql(
+        "INSERT INTO t BY NAME " +
+          "SELECT 2 AS id, " +
+          "named_struct('a', named_struct('b', named_struct('c1', 'c1v2', 
'c3', 'c3v2', 'c2', 'c2v2'))) AS data")
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq(
+          Row(1, Row(Row(Row("c1v", "c2v", null)))),
+          Row(2, Row(Row(Row("c1v2", "c2v2", "c3v2")))))
+      )
+    }
+  }
+
+  test("Write merge schema: nested struct new fields and top-level new column 
together") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, info STRUCT<f1 STRING, f2 STRING>)")
+      sql("INSERT INTO t VALUES (1, struct('a', 'b'))")
+
+      sql(
+        "INSERT INTO t BY NAME " +
+          "SELECT 2 AS id, " +
+          "named_struct('f1', 'c', 'f3', 'd', 'f2', 'e') AS info, " +
+          "'top' AS extra")
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq(Row(1, Row("a", "b", null), null), Row(2, Row("c", "e", "d"), 
"top"))
+      )
+    }
+  }
+
+  test("Write merge schema: nested struct with missing fields") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, info STRUCT<f1 STRING, f2 STRING, f3 
STRING>)")
+      sql("INSERT INTO t VALUES (1, struct('a', 'b', 'c'))")
+
+      sql(
+        "INSERT INTO t BY NAME " +
+          "SELECT 2 AS id, " +
+          "named_struct('f2', 'y', 'f3', 'z') AS info")
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq(Row(1, Row("a", "b", "c")), Row(2, Row(null, "y", "z")))
+      )
+
+      sql(
+        "INSERT INTO t BY NAME " +
+          "SELECT 3 AS id, " +
+          "named_struct('f1', 'x', 'f4', 'w', 'f3', 'z') AS info")
+
+      sql(
+        "INSERT INTO t BY NAME " +
+          "SELECT 4 AS id, " +
+          "named_struct('f2', 'p', 'f3', 'q', 'f4', 'r') AS info")
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq(
+          Row(1, Row("a", "b", "c", null)),
+          Row(2, Row(null, "y", "z", null)),
+          Row(3, Row("x", null, "z", "w")),
+          Row(4, Row(null, "p", "q", "r")))
+      )
+    }
+  }
+
+  test("Write merge schema: nested struct with type evolution") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, info STRUCT<f1 INT, f2 STRING>)")
+      sql("INSERT INTO t VALUES (1, struct(10, 'a'))")
+
+      sql(
+        "INSERT INTO t BY NAME " +
+          "SELECT 2 AS id, " +
+          "named_struct('f1', cast(20 as bigint), 'f3', 'new', 'f2', 'b') AS 
info")
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq(Row(1, Row(10L, "a", null)), Row(2, Row(20L, "b", "new")))
+      )
+    }
+  }
 }

Reply via email to