This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fa1b9dc31f [core] Diff nested schema changes inside ARRAY and MAP
value types (#7843)
fa1b9dc31f is described below
commit fa1b9dc31fdeb71ac6479bff6779b93d8848c12a
Author: Zouxxyy <[email protected]>
AuthorDate: Wed May 13 22:06:24 2026 +0800
[core] Diff nested schema changes inside ARRAY and MAP value types (#7843)
Follow-up of #7789. Extend `SchemaMergingUtils.diffSchemaChanges` to
emit precise `AddColumn` for new struct fields nested inside `ARRAY`
element / `MAP` value, instead of a coarse `UpdateColumnType` rewriting
the whole nested type.
Falls back to `UpdateColumnType` when a nested field is removed or a map
key type changes.
---
.../apache/paimon/schema/SchemaMergingUtils.java | 74 +++++++--
.../paimon/schema/SchemaMergingUtilsTest.java | 179 +++++++++++++++++++++
.../paimon/spark/sql/MergeIntoTableTestBase.scala | 179 +++++++++++++++++++++
3 files changed, 421 insertions(+), 11 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
index 1ed9bad7e6..990087d792 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
@@ -39,6 +39,9 @@ import java.util.stream.Collectors;
/** The util class for merging the schemas. */
public class SchemaMergingUtils {
+ public static final String ARRAY_ELEMENT_FIELD_NAME = "element";
+ public static final String MAP_VALUE_FIELD_NAME = "value";
+
public static TableSchema mergeSchemas(
TableSchema currentTableSchema, RowType targetType, boolean
allowExplicitCast) {
RowType currentType = currentTableSchema.logicalRowType();
@@ -266,19 +269,68 @@ public class SchemaMergingUtils {
changes.add(
SchemaChange.addColumn(
fieldNames, newField.type(),
newField.description(), null));
- } else if (!oldField.type().equals(newField.type())) {
- // type changed — check if it's a nested struct change
- if (oldField.type() instanceof RowType && newField.type()
instanceof RowType) {
- diffFields(
- ((RowType) oldField.type()).getFields(),
- ((RowType) newField.type()).getFields(),
- fieldNames,
- changes);
- } else {
- changes.add(SchemaChange.updateColumnType(fieldNames,
newField.type(), true));
- }
+ } else if (!oldField.type().equals(newField.type())
+ && !diffNestedTypeChanges(
+ oldField.type(), newField.type(), fieldNames,
changes)) {
+ changes.add(SchemaChange.updateColumnType(fieldNames,
newField.type(), true));
+ }
+ }
+ }
+
+ /**
+ * Returns true only when the type difference has been fully represented
by nested schema
+ * changes. Returns false to let the caller fall back to {@link
SchemaChange.UpdateColumnType}.
+ */
+ private static boolean diffNestedTypeChanges(
+ DataType oldType, DataType newType, String[] fieldNames,
List<SchemaChange> changes) {
+ List<SchemaChange> stagedChanges = new ArrayList<>();
+ boolean handled = diffNestedTypeChangesInner(oldType, newType,
fieldNames, stagedChanges);
+ if (handled) {
+ changes.addAll(stagedChanges);
+ }
+ return handled;
+ }
+
+ private static boolean diffNestedTypeChangesInner(
+ DataType oldType, DataType newType, String[] fieldNames,
List<SchemaChange> changes) {
+ if (oldType instanceof RowType && newType instanceof RowType) {
+ List<DataField> oldFields = ((RowType) oldType).getFields();
+ List<DataField> newFields = ((RowType) newType).getFields();
+ if (hasRemovedFields(oldFields, newFields)) {
+ return false;
+ }
+ diffFields(oldFields, newFields, fieldNames, changes);
+ return true;
+ } else if (oldType instanceof ArrayType && newType instanceof
ArrayType) {
+ return diffNestedTypeChanges(
+ ((ArrayType) oldType).getElementType(),
+ ((ArrayType) newType).getElementType(),
+ appendFieldName(fieldNames, ARRAY_ELEMENT_FIELD_NAME),
+ changes);
+ } else if (oldType instanceof MapType && newType instanceof MapType) {
+ MapType oldMapType = (MapType) oldType;
+ MapType newMapType = (MapType) newType;
+ if (!oldMapType.getKeyType().equals(newMapType.getKeyType())) {
+ return false;
+ }
+ return diffNestedTypeChanges(
+ oldMapType.getValueType(),
+ newMapType.getValueType(),
+ appendFieldName(fieldNames, MAP_VALUE_FIELD_NAME),
+ changes);
+ }
+ return false;
+ }
+
+ private static boolean hasRemovedFields(List<DataField> oldFields,
List<DataField> newFields) {
+ Map<String, DataField> newFieldMap =
+ newFields.stream().collect(Collectors.toMap(DataField::name,
Function.identity()));
+ for (DataField oldField : oldFields) {
+ if (!newFieldMap.containsKey(oldField.name())) {
+ return true;
}
}
+ return false;
}
private static String[] appendFieldName(String[] parentNames, String
fieldName) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
index bfc6dd7aa3..ff04a167a7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
@@ -190,6 +190,185 @@ public class SchemaMergingUtilsTest {
assertThat(r6.getFieldCount()).isEqualTo(8);
}
+ @Test
+ public void testDiffNestedSchemaChangesInArrayAndMap() {
+ DataField arrayField =
+ new DataField(
+ 0,
+ "items",
+ new ArrayType(
+ new RowType(
+ Lists.newArrayList(
+ new DataField(1, "f1", new
IntType()),
+ new DataField(
+ 2,
+ "f2",
+ new VarCharType(
+
VarCharType.MAX_LENGTH))))));
+ DataField mapField =
+ new DataField(
+ 3,
+ "attributes",
+ new MapType(
+ new VarCharType(VarCharType.MAX_LENGTH),
+ new RowType(
+ Lists.newArrayList(
+ new DataField(4, "v1", new
IntType())))));
+ TableSchema oldSchema =
+ new TableSchema(
+ 0,
+ Lists.newArrayList(arrayField, mapField),
+ 4,
+ Lists.newArrayList(),
+ Lists.newArrayList(),
+ new HashMap<>(),
+ "");
+
+ DataField evolvedArrayField =
+ new DataField(
+ 0,
+ "items",
+ new ArrayType(
+ new RowType(
+ Lists.newArrayList(
+ new DataField(1, "f1", new
IntType()),
+ new DataField(
+ 2,
+ "f2",
+ new
VarCharType(VarCharType.MAX_LENGTH)),
+ new DataField(
+ 5,
+ "f3",
+ new VarCharType(
+
VarCharType.MAX_LENGTH))))));
+ DataField evolvedMapField =
+ new DataField(
+ 3,
+ "attributes",
+ new MapType(
+ new VarCharType(VarCharType.MAX_LENGTH),
+ new RowType(
+ Lists.newArrayList(
+ new DataField(4, "v1", new
IntType()),
+ new DataField(6, "v2", new
BigIntType())))));
+ TableSchema newSchema =
+ new TableSchema(
+ 1,
+ Lists.newArrayList(evolvedArrayField, evolvedMapField),
+ 6,
+ Lists.newArrayList(),
+ Lists.newArrayList(),
+ new HashMap<>(),
+ "");
+
+ List<SchemaChange> changes =
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+
+ assertThat(changes).hasSize(2);
+ SchemaChange.AddColumn addArrayNestedField = (SchemaChange.AddColumn)
changes.get(0);
+ assertThat(addArrayNestedField.fieldNames())
+ .containsExactly("items",
SchemaMergingUtils.ARRAY_ELEMENT_FIELD_NAME, "f3");
+ assertThat(addArrayNestedField.dataType())
+ .isEqualTo(new VarCharType(VarCharType.MAX_LENGTH));
+ SchemaChange.AddColumn addMapValueNestedField =
(SchemaChange.AddColumn) changes.get(1);
+ assertThat(addMapValueNestedField.fieldNames())
+ .containsExactly("attributes",
SchemaMergingUtils.MAP_VALUE_FIELD_NAME, "v2");
+ assertThat(addMapValueNestedField.dataType()).isEqualTo(new
BigIntType());
+ }
+
+ @Test
+ public void testDiffNestedSchemaChangesDoesNotTreatMapKeyAsValueChange() {
+ DataField mapField =
+ new DataField(
+ 0,
+ "attributes",
+ new MapType(
+ new RowType(
+ Lists.newArrayList(new DataField(1,
"k1", new IntType()))),
+ new RowType(
+ Lists.newArrayList(
+ new DataField(2, "v1", new
IntType())))));
+ TableSchema oldSchema =
+ new TableSchema(
+ 0,
+ Lists.newArrayList(mapField),
+ 2,
+ Lists.newArrayList(),
+ Lists.newArrayList(),
+ new HashMap<>(),
+ "");
+
+ DataType evolvedMapType =
+ new MapType(
+ new RowType(
+ Lists.newArrayList(
+ new DataField(1, "k1", new IntType()),
+ new DataField(3, "k2", new
BigIntType()))),
+ new RowType(
+ Lists.newArrayList(
+ new DataField(2, "v1", new IntType()),
+ new DataField(4, "v2", new
BigIntType()))));
+ TableSchema newSchema =
+ new TableSchema(
+ 1,
+ Lists.newArrayList(new DataField(0, "attributes",
evolvedMapType)),
+ 4,
+ Lists.newArrayList(),
+ Lists.newArrayList(),
+ new HashMap<>(),
+ "");
+
+ List<SchemaChange> changes =
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+
+ assertThat(changes).hasSize(1);
+ SchemaChange.UpdateColumnType updateMapType =
+ (SchemaChange.UpdateColumnType) changes.get(0);
+ assertThat(updateMapType.fieldNames()).containsExactly("attributes");
+ assertThat(updateMapType.newDataType()).isEqualTo(evolvedMapType);
+ }
+
+ @Test
+ public void
testDiffNestedSchemaChangesFallsBackToTypeUpdateWhenNestedFieldRemoved() {
+ DataType itemsType =
+ new ArrayType(
+ new RowType(
+ Lists.newArrayList(
+ new DataField(1, "f1", new IntType()),
+ new DataField(
+ 2,
+ "f2",
+ new
VarCharType(VarCharType.MAX_LENGTH)))));
+ TableSchema oldSchema =
+ new TableSchema(
+ 0,
+ Lists.newArrayList(new DataField(0, "items",
itemsType)),
+ 2,
+ Lists.newArrayList(),
+ Lists.newArrayList(),
+ new HashMap<>(),
+ "");
+
+ DataType evolvedItemsType =
+ new ArrayType(
+ new RowType(Lists.newArrayList(new DataField(1, "f1",
new IntType()))));
+ TableSchema newSchema =
+ new TableSchema(
+ 1,
+ Lists.newArrayList(new DataField(0, "items",
evolvedItemsType)),
+ 2,
+ Lists.newArrayList(),
+ Lists.newArrayList(),
+ new HashMap<>(),
+ "");
+
+ List<SchemaChange> changes =
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+
+ assertThat(changes).hasSize(1);
+ SchemaChange.UpdateColumnType updateItemsType =
+ (SchemaChange.UpdateColumnType) changes.get(0);
+ assertThat(updateItemsType.fieldNames()).containsExactly("items");
+ assertThat(updateItemsType.newDataType()).isEqualTo(evolvedItemsType);
+ }
+
@Test
public void testMergeArrayTypes() {
AtomicInteger highestFieldId = new AtomicInteger(1);
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index b00e911c07..b328fa1867 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -1199,6 +1199,185 @@ abstract class MergeIntoTableTestBase extends
PaimonSparkTestBase with PaimonTab
}
}
+ test("Paimon MergeInto: merge-schema with star and new nested array struct
field") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable(
+ "source",
+ """
+ |id STRING,
+ |items ARRAY<STRUCT<
+ | text: STRING,
+ | lang: STRING,
+ | `0`: STRUCT<extra_url: STRING>,
+ | url: STRING>>,
+ |payload STRING
+ |""".stripMargin,
+ Seq("id")
+ )
+ spark.sql("""
+ |INSERT INTO source VALUES
+ | ('1', array(named_struct(
+ | 'text', 'updated-text',
+ | 'lang', 'en',
+ | '0', named_struct('extra_url',
'https://example.com/extra'),
+ | 'url', 'https://example.com/item')), 'updated'),
+ | ('2', array(named_struct(
+ | 'text', 'inserted-text',
+ | 'lang', 'en',
+ | '0', named_struct('extra_url',
'https://example.com/extra'),
+ | 'url', 'https://example.com/item')), 'inserted')
+ |""".stripMargin)
+
+ createTable(
+ "target",
+ """
+ |id STRING,
+ |payload STRING,
+ |items ARRAY<STRUCT<lang: STRING, text: STRING, url: STRING>>
+ |""".stripMargin,
+ Seq("id")
+ )
+
+ spark.sql("""
+ |INSERT INTO target VALUES (
+ | '1',
+ | 'old',
+ | array(named_struct('lang', 'old-lang', 'text',
'old-text', 'url', 'old-url'))
+ |)
+ |""".stripMargin)
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.id = source.id
+ |WHEN MATCHED THEN UPDATE SET *
+ |WHEN NOT MATCHED THEN INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("""
+ |SELECT
+ | id,
+ | payload,
+ | items[0].lang,
+ | items[0].text,
+ | items[0].url,
+ | items[0].`0`.extra_url
+ |FROM target
+ |ORDER BY id
+ |""".stripMargin),
+ Seq(
+ Row(
+ "1",
+ "updated",
+ "en",
+ "updated-text",
+ "https://example.com/item",
+ "https://example.com/extra"),
+ Row(
+ "2",
+ "inserted",
+ "en",
+ "inserted-text",
+ "https://example.com/item",
+ "https://example.com/extra")
+ )
+ )
+ }
+ }
+ }
+
+ test("Paimon MergeInto: merge-schema with star and new nested map value
struct field") {
+ withTable("source", "target") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ createTable(
+ "source",
+ """
+ |id STRING,
+ |attributes MAP<STRING, STRUCT<
+ | text: STRING,
+ | lang: STRING,
+ | `0`: STRUCT<extra_url: STRING>,
+ | url: STRING>>,
+ |payload STRING
+ |""".stripMargin,
+ Seq("id")
+ )
+ spark.sql("""
+ |INSERT INTO source VALUES
+ | ('1', map('main', named_struct(
+ | 'text', 'updated-text',
+ | 'lang', 'en',
+ | '0', named_struct('extra_url',
'https://example.com/extra'),
+ | 'url', 'https://example.com/item')), 'updated'),
+ | ('2', map('main', named_struct(
+ | 'text', 'inserted-text',
+ | 'lang', 'en',
+ | '0', named_struct('extra_url',
'https://example.com/extra'),
+ | 'url', 'https://example.com/item')), 'inserted')
+ |""".stripMargin)
+
+ createTable(
+ "target",
+ """
+ |id STRING,
+ |payload STRING,
+ |attributes MAP<STRING, STRUCT<lang: STRING, text: STRING, url:
STRING>>
+ |""".stripMargin,
+ Seq("id")
+ )
+
+ spark.sql(
+ """
+ |INSERT INTO target VALUES (
+ | '1',
+ | 'old',
+ | map('main', named_struct('lang', 'old-lang', 'text',
'old-text', 'url', 'old-url'))
+ |)
+ |""".stripMargin)
+
+ spark.sql("""
+ |MERGE INTO target
+ |USING source
+ |ON target.id = source.id
+ |WHEN MATCHED THEN UPDATE SET *
+ |WHEN NOT MATCHED THEN INSERT *
+ |""".stripMargin)
+
+ checkAnswer(
+ spark.sql("""
+ |SELECT
+ | id,
+ | payload,
+ | attributes['main'].lang,
+ | attributes['main'].text,
+ | attributes['main'].url,
+ | attributes['main'].`0`.extra_url
+ |FROM target
+ |ORDER BY id
+ |""".stripMargin),
+ Seq(
+ Row(
+ "1",
+ "updated",
+ "en",
+ "updated-text",
+ "https://example.com/item",
+ "https://example.com/extra"),
+ Row(
+ "2",
+ "inserted",
+ "en",
+ "inserted-text",
+ "https://example.com/item",
+ "https://example.com/extra")
+ )
+ )
+ }
+ }
+ }
+
test("Paimon MergeInto: struct field reorder when target has fields absent
from source") {
withTable("source", "target") {
// Target struct has 3 sub-fields; source struct only has 2 of them in a
different order.