This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fa1b9dc31f [core] Diff nested schema changes inside ARRAY and MAP 
value types (#7843)
fa1b9dc31f is described below

commit fa1b9dc31fdeb71ac6479bff6779b93d8848c12a
Author: Zouxxyy <[email protected]>
AuthorDate: Wed May 13 22:06:24 2026 +0800

    [core] Diff nested schema changes inside ARRAY and MAP value types (#7843)
    
    Follow-up of #7789. Extend `SchemaMergingUtils.diffSchemaChanges` to
    emit precise `AddColumn` for new struct fields nested inside `ARRAY`
    element / `MAP` value, instead of a coarse `UpdateColumnType` rewriting
    the whole nested type.
    
    Falls back to `UpdateColumnType` when a nested field is removed or a map
    key type changes.
---
 .../apache/paimon/schema/SchemaMergingUtils.java   |  74 +++++++--
 .../paimon/schema/SchemaMergingUtilsTest.java      | 179 +++++++++++++++++++++
 .../paimon/spark/sql/MergeIntoTableTestBase.scala  | 179 +++++++++++++++++++++
 3 files changed, 421 insertions(+), 11 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
index 1ed9bad7e6..990087d792 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
@@ -39,6 +39,9 @@ import java.util.stream.Collectors;
 /** The util class for merging the schemas. */
 public class SchemaMergingUtils {
 
+    public static final String ARRAY_ELEMENT_FIELD_NAME = "element";
+    public static final String MAP_VALUE_FIELD_NAME = "value";
+
     public static TableSchema mergeSchemas(
             TableSchema currentTableSchema, RowType targetType, boolean 
allowExplicitCast) {
         RowType currentType = currentTableSchema.logicalRowType();
@@ -266,19 +269,68 @@ public class SchemaMergingUtils {
                 changes.add(
                         SchemaChange.addColumn(
                                 fieldNames, newField.type(), 
newField.description(), null));
-            } else if (!oldField.type().equals(newField.type())) {
-                // type changed — check if it's a nested struct change
-                if (oldField.type() instanceof RowType && newField.type() 
instanceof RowType) {
-                    diffFields(
-                            ((RowType) oldField.type()).getFields(),
-                            ((RowType) newField.type()).getFields(),
-                            fieldNames,
-                            changes);
-                } else {
-                    changes.add(SchemaChange.updateColumnType(fieldNames, 
newField.type(), true));
-                }
+            } else if (!oldField.type().equals(newField.type())
+                    && !diffNestedTypeChanges(
+                            oldField.type(), newField.type(), fieldNames, 
changes)) {
+                changes.add(SchemaChange.updateColumnType(fieldNames, 
newField.type(), true));
+            }
+        }
+    }
+
+    /**
+     * Returns true only when the type difference has been fully represented 
by nested schema
+     * changes. Returns false to let the caller fall back to {@link 
SchemaChange.UpdateColumnType}.
+     */
+    private static boolean diffNestedTypeChanges(
+            DataType oldType, DataType newType, String[] fieldNames, 
List<SchemaChange> changes) {
+        List<SchemaChange> stagedChanges = new ArrayList<>();
+        boolean handled = diffNestedTypeChangesInner(oldType, newType, 
fieldNames, stagedChanges);
+        if (handled) {
+            changes.addAll(stagedChanges);
+        }
+        return handled;
+    }
+
+    private static boolean diffNestedTypeChangesInner(
+            DataType oldType, DataType newType, String[] fieldNames, 
List<SchemaChange> changes) {
+        if (oldType instanceof RowType && newType instanceof RowType) {
+            List<DataField> oldFields = ((RowType) oldType).getFields();
+            List<DataField> newFields = ((RowType) newType).getFields();
+            if (hasRemovedFields(oldFields, newFields)) {
+                return false;
+            }
+            diffFields(oldFields, newFields, fieldNames, changes);
+            return true;
+        } else if (oldType instanceof ArrayType && newType instanceof 
ArrayType) {
+            return diffNestedTypeChanges(
+                    ((ArrayType) oldType).getElementType(),
+                    ((ArrayType) newType).getElementType(),
+                    appendFieldName(fieldNames, ARRAY_ELEMENT_FIELD_NAME),
+                    changes);
+        } else if (oldType instanceof MapType && newType instanceof MapType) {
+            MapType oldMapType = (MapType) oldType;
+            MapType newMapType = (MapType) newType;
+            if (!oldMapType.getKeyType().equals(newMapType.getKeyType())) {
+                return false;
+            }
+            return diffNestedTypeChanges(
+                    oldMapType.getValueType(),
+                    newMapType.getValueType(),
+                    appendFieldName(fieldNames, MAP_VALUE_FIELD_NAME),
+                    changes);
+        }
+        return false;
+    }
+
+    private static boolean hasRemovedFields(List<DataField> oldFields, 
List<DataField> newFields) {
+        Map<String, DataField> newFieldMap =
+                newFields.stream().collect(Collectors.toMap(DataField::name, 
Function.identity()));
+        for (DataField oldField : oldFields) {
+            if (!newFieldMap.containsKey(oldField.name())) {
+                return true;
             }
         }
+        return false;
     }
 
     private static String[] appendFieldName(String[] parentNames, String 
fieldName) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
index bfc6dd7aa3..ff04a167a7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
@@ -190,6 +190,185 @@ public class SchemaMergingUtilsTest {
         assertThat(r6.getFieldCount()).isEqualTo(8);
     }
 
+    @Test
+    public void testDiffNestedSchemaChangesInArrayAndMap() {
+        DataField arrayField =
+                new DataField(
+                        0,
+                        "items",
+                        new ArrayType(
+                                new RowType(
+                                        Lists.newArrayList(
+                                                new DataField(1, "f1", new 
IntType()),
+                                                new DataField(
+                                                        2,
+                                                        "f2",
+                                                        new VarCharType(
+                                                                
VarCharType.MAX_LENGTH))))));
+        DataField mapField =
+                new DataField(
+                        3,
+                        "attributes",
+                        new MapType(
+                                new VarCharType(VarCharType.MAX_LENGTH),
+                                new RowType(
+                                        Lists.newArrayList(
+                                                new DataField(4, "v1", new 
IntType())))));
+        TableSchema oldSchema =
+                new TableSchema(
+                        0,
+                        Lists.newArrayList(arrayField, mapField),
+                        4,
+                        Lists.newArrayList(),
+                        Lists.newArrayList(),
+                        new HashMap<>(),
+                        "");
+
+        DataField evolvedArrayField =
+                new DataField(
+                        0,
+                        "items",
+                        new ArrayType(
+                                new RowType(
+                                        Lists.newArrayList(
+                                                new DataField(1, "f1", new 
IntType()),
+                                                new DataField(
+                                                        2,
+                                                        "f2",
+                                                        new 
VarCharType(VarCharType.MAX_LENGTH)),
+                                                new DataField(
+                                                        5,
+                                                        "f3",
+                                                        new VarCharType(
+                                                                
VarCharType.MAX_LENGTH))))));
+        DataField evolvedMapField =
+                new DataField(
+                        3,
+                        "attributes",
+                        new MapType(
+                                new VarCharType(VarCharType.MAX_LENGTH),
+                                new RowType(
+                                        Lists.newArrayList(
+                                                new DataField(4, "v1", new 
IntType()),
+                                                new DataField(6, "v2", new 
BigIntType())))));
+        TableSchema newSchema =
+                new TableSchema(
+                        1,
+                        Lists.newArrayList(evolvedArrayField, evolvedMapField),
+                        6,
+                        Lists.newArrayList(),
+                        Lists.newArrayList(),
+                        new HashMap<>(),
+                        "");
+
+        List<SchemaChange> changes = 
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+
+        assertThat(changes).hasSize(2);
+        SchemaChange.AddColumn addArrayNestedField = (SchemaChange.AddColumn) 
changes.get(0);
+        assertThat(addArrayNestedField.fieldNames())
+                .containsExactly("items", 
SchemaMergingUtils.ARRAY_ELEMENT_FIELD_NAME, "f3");
+        assertThat(addArrayNestedField.dataType())
+                .isEqualTo(new VarCharType(VarCharType.MAX_LENGTH));
+        SchemaChange.AddColumn addMapValueNestedField = 
(SchemaChange.AddColumn) changes.get(1);
+        assertThat(addMapValueNestedField.fieldNames())
+                .containsExactly("attributes", 
SchemaMergingUtils.MAP_VALUE_FIELD_NAME, "v2");
+        assertThat(addMapValueNestedField.dataType()).isEqualTo(new 
BigIntType());
+    }
+
+    @Test
+    public void testDiffNestedSchemaChangesDoesNotTreatMapKeyAsValueChange() {
+        DataField mapField =
+                new DataField(
+                        0,
+                        "attributes",
+                        new MapType(
+                                new RowType(
+                                        Lists.newArrayList(new DataField(1, 
"k1", new IntType()))),
+                                new RowType(
+                                        Lists.newArrayList(
+                                                new DataField(2, "v1", new 
IntType())))));
+        TableSchema oldSchema =
+                new TableSchema(
+                        0,
+                        Lists.newArrayList(mapField),
+                        2,
+                        Lists.newArrayList(),
+                        Lists.newArrayList(),
+                        new HashMap<>(),
+                        "");
+
+        DataType evolvedMapType =
+                new MapType(
+                        new RowType(
+                                Lists.newArrayList(
+                                        new DataField(1, "k1", new IntType()),
+                                        new DataField(3, "k2", new 
BigIntType()))),
+                        new RowType(
+                                Lists.newArrayList(
+                                        new DataField(2, "v1", new IntType()),
+                                        new DataField(4, "v2", new 
BigIntType()))));
+        TableSchema newSchema =
+                new TableSchema(
+                        1,
+                        Lists.newArrayList(new DataField(0, "attributes", 
evolvedMapType)),
+                        4,
+                        Lists.newArrayList(),
+                        Lists.newArrayList(),
+                        new HashMap<>(),
+                        "");
+
+        List<SchemaChange> changes = 
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+
+        assertThat(changes).hasSize(1);
+        SchemaChange.UpdateColumnType updateMapType =
+                (SchemaChange.UpdateColumnType) changes.get(0);
+        assertThat(updateMapType.fieldNames()).containsExactly("attributes");
+        assertThat(updateMapType.newDataType()).isEqualTo(evolvedMapType);
+    }
+
+    @Test
+    public void 
testDiffNestedSchemaChangesFallsBackToTypeUpdateWhenNestedFieldRemoved() {
+        DataType itemsType =
+                new ArrayType(
+                        new RowType(
+                                Lists.newArrayList(
+                                        new DataField(1, "f1", new IntType()),
+                                        new DataField(
+                                                2,
+                                                "f2",
+                                                new 
VarCharType(VarCharType.MAX_LENGTH)))));
+        TableSchema oldSchema =
+                new TableSchema(
+                        0,
+                        Lists.newArrayList(new DataField(0, "items", 
itemsType)),
+                        2,
+                        Lists.newArrayList(),
+                        Lists.newArrayList(),
+                        new HashMap<>(),
+                        "");
+
+        DataType evolvedItemsType =
+                new ArrayType(
+                        new RowType(Lists.newArrayList(new DataField(1, "f1", 
new IntType()))));
+        TableSchema newSchema =
+                new TableSchema(
+                        1,
+                        Lists.newArrayList(new DataField(0, "items", 
evolvedItemsType)),
+                        2,
+                        Lists.newArrayList(),
+                        Lists.newArrayList(),
+                        new HashMap<>(),
+                        "");
+
+        List<SchemaChange> changes = 
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+
+        assertThat(changes).hasSize(1);
+        SchemaChange.UpdateColumnType updateItemsType =
+                (SchemaChange.UpdateColumnType) changes.get(0);
+        assertThat(updateItemsType.fieldNames()).containsExactly("items");
+        assertThat(updateItemsType.newDataType()).isEqualTo(evolvedItemsType);
+    }
+
     @Test
     public void testMergeArrayTypes() {
         AtomicInteger highestFieldId = new AtomicInteger(1);
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index b00e911c07..b328fa1867 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -1199,6 +1199,185 @@ abstract class MergeIntoTableTestBase extends 
PaimonSparkTestBase with PaimonTab
     }
   }
 
+  test("Paimon MergeInto: merge-schema with star and new nested array struct 
field") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable(
+          "source",
+          """
+            |id STRING,
+            |items ARRAY<STRUCT<
+            |  text: STRING,
+            |  lang: STRING,
+            |  `0`: STRUCT<extra_url: STRING>,
+            |  url: STRING>>,
+            |payload STRING
+            |""".stripMargin,
+          Seq("id")
+        )
+        spark.sql("""
+                    |INSERT INTO source VALUES
+                    |  ('1', array(named_struct(
+                    |    'text', 'updated-text',
+                    |    'lang', 'en',
+                    |    '0', named_struct('extra_url', 
'https://example.com/extra'),
+                    |    'url', 'https://example.com/item')), 'updated'),
+                    |  ('2', array(named_struct(
+                    |    'text', 'inserted-text',
+                    |    'lang', 'en',
+                    |    '0', named_struct('extra_url', 
'https://example.com/extra'),
+                    |    'url', 'https://example.com/item')), 'inserted')
+                    |""".stripMargin)
+
+        createTable(
+          "target",
+          """
+            |id STRING,
+            |payload STRING,
+            |items ARRAY<STRUCT<lang: STRING, text: STRING, url: STRING>>
+            |""".stripMargin,
+          Seq("id")
+        )
+
+        spark.sql("""
+                    |INSERT INTO target VALUES (
+                    |  '1',
+                    |  'old',
+                    |  array(named_struct('lang', 'old-lang', 'text', 
'old-text', 'url', 'old-url'))
+                    |)
+                    |""".stripMargin)
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.id = source.id
+                    |WHEN MATCHED THEN UPDATE SET *
+                    |WHEN NOT MATCHED THEN INSERT *
+                    |""".stripMargin)
+
+        checkAnswer(
+          spark.sql("""
+                      |SELECT
+                      |  id,
+                      |  payload,
+                      |  items[0].lang,
+                      |  items[0].text,
+                      |  items[0].url,
+                      |  items[0].`0`.extra_url
+                      |FROM target
+                      |ORDER BY id
+                      |""".stripMargin),
+          Seq(
+            Row(
+              "1",
+              "updated",
+              "en",
+              "updated-text",
+              "https://example.com/item";,
+              "https://example.com/extra";),
+            Row(
+              "2",
+              "inserted",
+              "en",
+              "inserted-text",
+              "https://example.com/item";,
+              "https://example.com/extra";)
+          )
+        )
+      }
+    }
+  }
+
+  test("Paimon MergeInto: merge-schema with star and new nested map value 
struct field") {
+    withTable("source", "target") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        createTable(
+          "source",
+          """
+            |id STRING,
+            |attributes MAP<STRING, STRUCT<
+            |  text: STRING,
+            |  lang: STRING,
+            |  `0`: STRUCT<extra_url: STRING>,
+            |  url: STRING>>,
+            |payload STRING
+            |""".stripMargin,
+          Seq("id")
+        )
+        spark.sql("""
+                    |INSERT INTO source VALUES
+                    |  ('1', map('main', named_struct(
+                    |    'text', 'updated-text',
+                    |    'lang', 'en',
+                    |    '0', named_struct('extra_url', 
'https://example.com/extra'),
+                    |    'url', 'https://example.com/item')), 'updated'),
+                    |  ('2', map('main', named_struct(
+                    |    'text', 'inserted-text',
+                    |    'lang', 'en',
+                    |    '0', named_struct('extra_url', 
'https://example.com/extra'),
+                    |    'url', 'https://example.com/item')), 'inserted')
+                    |""".stripMargin)
+
+        createTable(
+          "target",
+          """
+            |id STRING,
+            |payload STRING,
+            |attributes MAP<STRING, STRUCT<lang: STRING, text: STRING, url: 
STRING>>
+            |""".stripMargin,
+          Seq("id")
+        )
+
+        spark.sql(
+          """
+            |INSERT INTO target VALUES (
+            |  '1',
+            |  'old',
+            |  map('main', named_struct('lang', 'old-lang', 'text', 
'old-text', 'url', 'old-url'))
+            |)
+            |""".stripMargin)
+
+        spark.sql("""
+                    |MERGE INTO target
+                    |USING source
+                    |ON target.id = source.id
+                    |WHEN MATCHED THEN UPDATE SET *
+                    |WHEN NOT MATCHED THEN INSERT *
+                    |""".stripMargin)
+
+        checkAnswer(
+          spark.sql("""
+                      |SELECT
+                      |  id,
+                      |  payload,
+                      |  attributes['main'].lang,
+                      |  attributes['main'].text,
+                      |  attributes['main'].url,
+                      |  attributes['main'].`0`.extra_url
+                      |FROM target
+                      |ORDER BY id
+                      |""".stripMargin),
+          Seq(
+            Row(
+              "1",
+              "updated",
+              "en",
+              "updated-text",
+              "https://example.com/item";,
+              "https://example.com/extra";),
+            Row(
+              "2",
+              "inserted",
+              "en",
+              "inserted-text",
+              "https://example.com/item";,
+              "https://example.com/extra";)
+          )
+        )
+      }
+    }
+  }
+
   test("Paimon MergeInto: struct field reorder when target has fields absent 
from source") {
     withTable("source", "target") {
       // Target struct has 3 sub-fields; source struct only has 2 of them in a 
different order.

Reply via email to