This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 22f3fd344c [spark] Fix schema merge creating duplicate columns with 
case-mismatched names (#8034)
22f3fd344c is described below

commit 22f3fd344cb86bb39d8149b764109cdb5c24a70c
Author: Zouxxyy <[email protected]>
AuthorDate: Fri May 29 21:45:20 2026 +0800

    [spark] Fix schema merge creating duplicate columns with case-mismatched 
names (#8034)
    
    When `merge-schema` is enabled and source column names differ only in
    case from target columns (e.g. source `ID` vs target `id`),
    `SchemaMergingUtils` treats them as new columns due to case-sensitive
    `HashMap` lookups. This causes duplicate columns in the schema and makes
    the table unreadable (`Field names must be unique`).
    
    This PR adds a `caseSensitive` parameter through the schema merge chain
    (`SchemaMergingUtils` → `SchemaManager` → `FileStore` → Spark
    `SchemaHelper`), using `TreeMap(String.CASE_INSENSITIVE_ORDER)` for
    field matching when `caseSensitive=false`. Spark callers pass
    `spark.sql.caseSensitive` config (default `false`).
    
    Affects both `INSERT ... merge-schema=true` and `MERGE INTO ...
    merge-schema=true` paths.
---
 .../java/org/apache/paimon/AbstractFileStore.java  |   4 +-
 .../src/main/java/org/apache/paimon/FileStore.java |   2 +-
 .../paimon/privilege/PrivilegedFileStore.java      |   4 +-
 .../org/apache/paimon/schema/SchemaManager.java    |   7 +-
 .../apache/paimon/schema/SchemaMergingUtils.java   | 154 ++++++----
 .../paimon/operation/FileStoreCommitTest.java      |   2 +-
 .../paimon/schema/SchemaMergingUtilsTest.java      | 333 +++++++++++++--------
 .../analysis/MergeSchemaEvolutionHelper.scala      |   3 +-
 .../paimon/spark/commands/SchemaHelper.scala       |  21 +-
 .../paimon/spark/sql/WriteMergeSchemaTest.scala    | 320 ++++++++++++++++++++
 10 files changed, 644 insertions(+), 206 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index cb3bae1dbe..e8153aee52 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -271,9 +271,9 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
     }
 
     @Override
-    public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+    public boolean mergeSchema(RowType rowType, boolean allowExplicitCast, 
boolean caseSensitive) {
         return schemaManager.mergeSchema(
-                rowType, allowExplicitCast, 
catalogEnvironment.schemaModification());
+                rowType, allowExplicitCast, caseSensitive, 
catalogEnvironment.schemaModification());
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 98905b47e5..7a917adde8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -117,7 +117,7 @@ public interface FileStore<T> {
 
     ServiceManager newServiceManager();
 
-    boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
+    boolean mergeSchema(RowType rowType, boolean allowExplicitCast, boolean 
caseSensitive);
 
     List<TagCallback> createTagCallbacks(FileStoreTable table);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 6ced52bbd1..f4e6922b98 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -219,9 +219,9 @@ public class PrivilegedFileStore<T> implements FileStore<T> 
{
     }
 
     @Override
-    public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+    public boolean mergeSchema(RowType rowType, boolean allowExplicitCast, 
boolean caseSensitive) {
         privilegeChecker.assertCanInsert(identifier);
-        return wrapped.mergeSchema(rowType, allowExplicitCast);
+        return wrapped.mergeSchema(rowType, allowExplicitCast, caseSensitive);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index bde20d66ab..8a4128e58d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -753,19 +753,22 @@ public class SchemaManager implements Serializable {
     public boolean mergeSchema(
             RowType rowType,
             boolean allowExplicitCast,
+            boolean caseSensitive,
             @Nullable SchemaModification schemaModification) {
         TableSchema current =
                 latest().orElseThrow(
                                 () ->
                                         new RuntimeException(
                                                 "It requires that the current 
schema to exist when calling 'mergeSchema'"));
-        TableSchema update = SchemaMergingUtils.mergeSchemas(current, rowType, 
allowExplicitCast);
+        TableSchema update =
+                SchemaMergingUtils.mergeSchemas(current, rowType, 
allowExplicitCast, caseSensitive);
         if (current.equals(update)) {
             return false;
         }
         try {
             if (schemaModification != null) {
-                List<SchemaChange> changes = 
SchemaMergingUtils.diffSchemaChanges(current, update);
+                List<SchemaChange> changes =
+                        SchemaMergingUtils.diffSchemaChanges(current, update, 
caseSensitive);
                 schemaModification.alterSchema(changes);
                 return true;
             } else {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
index 990087d792..d007b96db0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
@@ -30,11 +30,11 @@ import org.apache.paimon.types.ReassignFieldId;
 import org.apache.paimon.types.RowType;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 
 /** The util class for merging the schemas. */
 public class SchemaMergingUtils {
@@ -43,7 +43,10 @@ public class SchemaMergingUtils {
     public static final String MAP_VALUE_FIELD_NAME = "value";
 
     public static TableSchema mergeSchemas(
-            TableSchema currentTableSchema, RowType targetType, boolean 
allowExplicitCast) {
+            TableSchema currentTableSchema,
+            RowType targetType,
+            boolean allowExplicitCast,
+            boolean caseSensitive) {
         RowType currentType = currentTableSchema.logicalRowType();
         if (currentType.equals(targetType)) {
             return currentTableSchema;
@@ -51,7 +54,8 @@ public class SchemaMergingUtils {
 
         AtomicInteger highestFieldId = new 
AtomicInteger(currentTableSchema.highestFieldId());
         RowType newRowType =
-                mergeSchemas(currentType, targetType, highestFieldId, 
allowExplicitCast);
+                mergeSchemas(
+                        currentType, targetType, highestFieldId, 
allowExplicitCast, caseSensitive);
         if (newRowType.equals(currentType)) {
             // It happens if the `targetType` only changes `nullability` but 
we always respect the
             // current's.
@@ -72,8 +76,10 @@ public class SchemaMergingUtils {
             RowType tableSchema,
             RowType dataSchema,
             AtomicInteger highestFieldId,
-            boolean allowExplicitCast) {
-        return (RowType) merge(tableSchema, dataSchema, highestFieldId, 
allowExplicitCast);
+            boolean allowExplicitCast,
+            boolean caseSensitive) {
+        return (RowType)
+                merge(tableSchema, dataSchema, highestFieldId, 
allowExplicitCast, caseSensitive);
     }
 
     /**
@@ -92,7 +98,8 @@ public class SchemaMergingUtils {
             DataType base0,
             DataType update0,
             AtomicInteger highestFieldId,
-            boolean allowExplicitCast) {
+            boolean allowExplicitCast,
+            boolean caseSensitive) {
         // Here we try to merge the base0 and update0 without regard to the 
nullability,
         // and set the base0's nullability to the return's.
         DataType base = base0.copy(true);
@@ -103,45 +110,37 @@ public class SchemaMergingUtils {
         } else if (base instanceof RowType && update instanceof RowType) {
             List<DataField> baseFields = ((RowType) base).getFields();
             List<DataField> updateFields = ((RowType) update).getFields();
-            Map<String, DataField> updateFieldMap =
-                    updateFields.stream()
-                            .collect(Collectors.toMap(DataField::name, 
Function.identity()));
-            List<DataField> updatedFields =
-                    baseFields.stream()
-                            .map(
-                                    baseField -> {
-                                        if 
(updateFieldMap.containsKey(baseField.name())) {
-                                            DataField updateField =
-                                                    
updateFieldMap.get(baseField.name());
-                                            DataType updatedDataType =
-                                                    merge(
-                                                            baseField.type(),
-                                                            updateField.type(),
-                                                            highestFieldId,
-                                                            allowExplicitCast);
-                                            return new DataField(
-                                                    baseField.id(),
-                                                    baseField.name(),
-                                                    updatedDataType,
-                                                    baseField.description(),
-                                                    baseField.defaultValue());
-                                        } else {
-                                            return baseField;
-                                        }
-                                    })
-                            .collect(Collectors.toList());
+            Map<String, DataField> updateFieldMap = 
buildFieldMap(updateFields, caseSensitive);
+            List<DataField> updatedFields = new ArrayList<>();
+            for (DataField baseField : baseFields) {
+                if (updateFieldMap.containsKey(baseField.name())) {
+                    DataField updateField = 
updateFieldMap.get(baseField.name());
+                    DataType updatedDataType =
+                            merge(
+                                    baseField.type(),
+                                    updateField.type(),
+                                    highestFieldId,
+                                    allowExplicitCast,
+                                    caseSensitive);
+                    updatedFields.add(
+                            new DataField(
+                                    baseField.id(),
+                                    baseField.name(),
+                                    updatedDataType,
+                                    baseField.description(),
+                                    baseField.defaultValue()));
+                } else {
+                    updatedFields.add(baseField);
+                }
+            }
 
-            Map<String, DataField> baseFieldMap =
-                    baseFields.stream()
-                            .collect(Collectors.toMap(DataField::name, 
Function.identity()));
-            List<DataField> newFields =
-                    updateFields.stream()
-                            .filter(field -> 
!baseFieldMap.containsKey(field.name()))
-                            .map(field -> assignIdForNewField(field, 
highestFieldId))
-                            .map(field -> field.copy(true))
-                            .collect(Collectors.toList());
+            Map<String, DataField> baseFieldMap = buildFieldMap(baseFields, 
caseSensitive);
+            for (DataField field : updateFields) {
+                if (!baseFieldMap.containsKey(field.name())) {
+                    updatedFields.add(assignIdForNewField(field, 
highestFieldId).copy(true));
+                }
+            }
 
-            updatedFields.addAll(newFields);
             return new RowType(base0.isNullable(), updatedFields);
         } else if (base instanceof MapType && update instanceof MapType) {
             return new MapType(
@@ -150,12 +149,14 @@ public class SchemaMergingUtils {
                             ((MapType) base).getKeyType(),
                             ((MapType) update).getKeyType(),
                             highestFieldId,
-                            allowExplicitCast),
+                            allowExplicitCast,
+                            caseSensitive),
                     merge(
                             ((MapType) base).getValueType(),
                             ((MapType) update).getValueType(),
                             highestFieldId,
-                            allowExplicitCast));
+                            allowExplicitCast,
+                            caseSensitive));
         } else if (base instanceof ArrayType && update instanceof ArrayType) {
             return new ArrayType(
                     base0.isNullable(),
@@ -163,7 +164,8 @@ public class SchemaMergingUtils {
                             ((ArrayType) base).getElementType(),
                             ((ArrayType) update).getElementType(),
                             highestFieldId,
-                            allowExplicitCast));
+                            allowExplicitCast,
+                            caseSensitive));
         } else if (base instanceof MultisetType && update instanceof 
MultisetType) {
             return new MultisetType(
                     base0.isNullable(),
@@ -171,7 +173,8 @@ public class SchemaMergingUtils {
                             ((MultisetType) base).getElementType(),
                             ((MultisetType) update).getElementType(),
                             highestFieldId,
-                            allowExplicitCast));
+                            allowExplicitCast,
+                            caseSensitive));
         } else if (base instanceof DecimalType && update instanceof 
DecimalType) {
             if (((DecimalType) base).getScale() == ((DecimalType) 
update).getScale()) {
                 return new DecimalType(
@@ -243,13 +246,14 @@ public class SchemaMergingUtils {
      * This supports detecting added columns and type changes (including 
nested structs).
      */
     public static List<SchemaChange> diffSchemaChanges(
-            TableSchema oldSchema, TableSchema newSchema) {
+            TableSchema oldSchema, TableSchema newSchema, boolean 
caseSensitive) {
         List<SchemaChange> changes = new ArrayList<>();
         diffFields(
                 oldSchema.logicalRowType().getFields(),
                 newSchema.logicalRowType().getFields(),
                 new String[0],
-                changes);
+                changes,
+                caseSensitive);
         return changes;
     }
 
@@ -257,9 +261,9 @@ public class SchemaMergingUtils {
             List<DataField> oldFields,
             List<DataField> newFields,
             String[] parentNames,
-            List<SchemaChange> changes) {
-        Map<String, DataField> oldFieldMap =
-                oldFields.stream().collect(Collectors.toMap(DataField::name, 
Function.identity()));
+            List<SchemaChange> changes,
+            boolean caseSensitive) {
+        Map<String, DataField> oldFieldMap = buildFieldMap(oldFields, 
caseSensitive);
 
         for (DataField newField : newFields) {
             String[] fieldNames = appendFieldName(parentNames, 
newField.name());
@@ -271,7 +275,7 @@ public class SchemaMergingUtils {
                                 fieldNames, newField.type(), 
newField.description(), null));
             } else if (!oldField.type().equals(newField.type())
                     && !diffNestedTypeChanges(
-                            oldField.type(), newField.type(), fieldNames, 
changes)) {
+                            oldField.type(), newField.type(), fieldNames, 
changes, caseSensitive)) {
                 changes.add(SchemaChange.updateColumnType(fieldNames, 
newField.type(), true));
             }
         }
@@ -282,9 +286,15 @@ public class SchemaMergingUtils {
      * changes. Returns false to let the caller fall back to {@link 
SchemaChange.UpdateColumnType}.
      */
     private static boolean diffNestedTypeChanges(
-            DataType oldType, DataType newType, String[] fieldNames, 
List<SchemaChange> changes) {
+            DataType oldType,
+            DataType newType,
+            String[] fieldNames,
+            List<SchemaChange> changes,
+            boolean caseSensitive) {
         List<SchemaChange> stagedChanges = new ArrayList<>();
-        boolean handled = diffNestedTypeChangesInner(oldType, newType, 
fieldNames, stagedChanges);
+        boolean handled =
+                diffNestedTypeChangesInner(
+                        oldType, newType, fieldNames, stagedChanges, 
caseSensitive);
         if (handled) {
             changes.addAll(stagedChanges);
         }
@@ -292,21 +302,26 @@ public class SchemaMergingUtils {
     }
 
     private static boolean diffNestedTypeChangesInner(
-            DataType oldType, DataType newType, String[] fieldNames, 
List<SchemaChange> changes) {
+            DataType oldType,
+            DataType newType,
+            String[] fieldNames,
+            List<SchemaChange> changes,
+            boolean caseSensitive) {
         if (oldType instanceof RowType && newType instanceof RowType) {
             List<DataField> oldFields = ((RowType) oldType).getFields();
             List<DataField> newFields = ((RowType) newType).getFields();
-            if (hasRemovedFields(oldFields, newFields)) {
+            if (hasRemovedFields(oldFields, newFields, caseSensitive)) {
                 return false;
             }
-            diffFields(oldFields, newFields, fieldNames, changes);
+            diffFields(oldFields, newFields, fieldNames, changes, 
caseSensitive);
             return true;
         } else if (oldType instanceof ArrayType && newType instanceof 
ArrayType) {
             return diffNestedTypeChanges(
                     ((ArrayType) oldType).getElementType(),
                     ((ArrayType) newType).getElementType(),
                     appendFieldName(fieldNames, ARRAY_ELEMENT_FIELD_NAME),
-                    changes);
+                    changes,
+                    caseSensitive);
         } else if (oldType instanceof MapType && newType instanceof MapType) {
             MapType oldMapType = (MapType) oldType;
             MapType newMapType = (MapType) newType;
@@ -317,14 +332,15 @@ public class SchemaMergingUtils {
                     oldMapType.getValueType(),
                     newMapType.getValueType(),
                     appendFieldName(fieldNames, MAP_VALUE_FIELD_NAME),
-                    changes);
+                    changes,
+                    caseSensitive);
         }
         return false;
     }
 
-    private static boolean hasRemovedFields(List<DataField> oldFields, 
List<DataField> newFields) {
-        Map<String, DataField> newFieldMap =
-                newFields.stream().collect(Collectors.toMap(DataField::name, 
Function.identity()));
+    private static boolean hasRemovedFields(
+            List<DataField> oldFields, List<DataField> newFields, boolean 
caseSensitive) {
+        Map<String, DataField> newFieldMap = buildFieldMap(newFields, 
caseSensitive);
         for (DataField oldField : oldFields) {
             if (!newFieldMap.containsKey(oldField.name())) {
                 return true;
@@ -333,6 +349,16 @@ public class SchemaMergingUtils {
         return false;
     }
 
+    private static Map<String, DataField> buildFieldMap(
+            List<DataField> fields, boolean caseSensitive) {
+        Map<String, DataField> map =
+                caseSensitive ? new HashMap<>() : new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+        for (DataField field : fields) {
+            map.put(field.name(), field);
+        }
+        return map;
+    }
+
     private static String[] appendFieldName(String[] parentNames, String 
fieldName) {
         String[] result = new String[parentNames.length + 1];
         System.arraycopy(parentNames, 0, result, 0, parentNames.length);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 71eb081de8..496242db70 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -807,7 +807,7 @@ public class FileStoreCommitTest {
         ArrayList<DataField> newFields =
                 new 
ArrayList<>(TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields());
         newFields.add(new DataField(-1, "newField", DataTypes.INT()));
-        store.mergeSchema(new RowType(false, newFields), true);
+        store.mergeSchema(new RowType(false, newFields), true, true);
         store.commitData(generateDataList(10), gen::getPartition, kv -> 0);
         readStats = statsFileHandler.readStats();
         assertThat(readStats).isEmpty();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
index ff04a167a7..050a4f12d7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
@@ -83,7 +83,7 @@ public class SchemaMergingUtilsTest {
         DataField f = new DataField(-1, "f", fDataType);
         RowType t = new RowType(Lists.newArrayList(a, b, d, f));
 
-        TableSchema merged = SchemaMergingUtils.mergeSchemas(current, t, 
false);
+        TableSchema merged = SchemaMergingUtils.mergeSchemas(current, t, 
false, true);
         assertThat(merged.id()).isEqualTo(1);
         assertThat(merged.highestFieldId()).isEqualTo(6);
         assertThat(merged.primaryKeys()).containsExactlyInAnyOrder("a", "d");
@@ -110,7 +110,7 @@ public class SchemaMergingUtilsTest {
 
         // fake the RowType of data with different field sequences
         RowType t = new RowType(Lists.newArrayList(b, a));
-        TableSchema merged = SchemaMergingUtils.mergeSchemas(current, t, 
false);
+        TableSchema merged = SchemaMergingUtils.mergeSchemas(current, t, 
false, true);
         assertThat(merged.id()).isEqualTo(0);
     }
 
@@ -132,7 +132,7 @@ public class SchemaMergingUtilsTest {
         // Case 1: an additional field.
         DataField e = new DataField(-1, "e", new DateType());
         RowType t1 = new RowType(Lists.newArrayList(a, b, c, d, e));
-        RowType r1 = (RowType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false);
+        RowType r1 = (RowType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false, true);
         assertThat(highestFieldId.get()).isEqualTo(4);
         assertThat(r1.isNullable()).isTrue();
         assertThat(r1.getFieldCount()).isEqualTo(5);
@@ -143,7 +143,7 @@ public class SchemaMergingUtilsTest {
 
         // Case 2: two missing fields.
         RowType t2 = new RowType(Lists.newArrayList(a, c, e));
-        RowType r2 = SchemaMergingUtils.mergeSchemas(r1, t2, highestFieldId, 
false);
+        RowType r2 = SchemaMergingUtils.mergeSchemas(r1, t2, highestFieldId, 
false, true);
         assertThat(highestFieldId.get()).isEqualTo(4);
         assertThat(r2.getFieldCount()).isEqualTo(5);
         assertThat(r2.getTypeAt(3)).isEqualTo(d.type());
@@ -155,7 +155,7 @@ public class SchemaMergingUtilsTest {
         RowType fDataType = new RowType(Lists.newArrayList(f1, f2));
         DataField f = new DataField(-1, "f", fDataType);
         RowType t3 = new RowType(Lists.newArrayList(a, b, c, d, f));
-        RowType r3 = (RowType) SchemaMergingUtils.merge(r2, t3, 
highestFieldId, false);
+        RowType r3 = (RowType) SchemaMergingUtils.merge(r2, t3, 
highestFieldId, false, true);
         assertThat(highestFieldId.get()).isEqualTo(7);
         assertThat(r3.getFieldCount()).isEqualTo(6);
         RowType expectedFDataType = new 
RowType(Lists.newArrayList(f1.newId(5), f2.newId(6)));
@@ -168,7 +168,7 @@ public class SchemaMergingUtilsTest {
         RowType newFDataType = new RowType(Lists.newArrayList(f1, f2, f3));
         DataField newF = new DataField(-1, "f", newFDataType);
         RowType t4 = new RowType(Lists.newArrayList(a, b, c, d, e, newF));
-        RowType r4 = SchemaMergingUtils.mergeSchemas(r3, t4, highestFieldId, 
false);
+        RowType r4 = SchemaMergingUtils.mergeSchemas(r3, t4, highestFieldId, 
false, true);
         assertThat(highestFieldId.get()).isEqualTo(8);
         assertThat(r4.getFieldCount()).isEqualTo(6);
         RowType newExpectedFDataType =
@@ -178,14 +178,14 @@ public class SchemaMergingUtilsTest {
         // Case 5: a field that isn't compatible with the existing one.
         DataField newA = new DataField(-1, "a", new SmallIntType());
         RowType t5 = new RowType(Lists.newArrayList(newA, b, c, d, e, newF));
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(r4, t5, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(r4, t5, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
 
         // Case 6: all new-coming fields
         DataField g = new DataField(-1, "g", new TimeType());
         DataField h = new DataField(-1, "h", new TimeType());
         RowType t6 = new RowType(Lists.newArrayList(g, h));
-        RowType r6 = SchemaMergingUtils.mergeSchemas(r4, t6, highestFieldId, 
false);
+        RowType r6 = SchemaMergingUtils.mergeSchemas(r4, t6, highestFieldId, 
false, true);
         assertThat(highestFieldId.get()).isEqualTo(10);
         assertThat(r6.getFieldCount()).isEqualTo(8);
     }
@@ -261,7 +261,8 @@ public class SchemaMergingUtilsTest {
                         new HashMap<>(),
                         "");
 
-        List<SchemaChange> changes = 
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+        List<SchemaChange> changes =
+                SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema, 
true);
 
         assertThat(changes).hasSize(2);
         SchemaChange.AddColumn addArrayNestedField = (SchemaChange.AddColumn) 
changes.get(0);
@@ -317,7 +318,8 @@ public class SchemaMergingUtilsTest {
                         new HashMap<>(),
                         "");
 
-        List<SchemaChange> changes = 
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+        List<SchemaChange> changes =
+                SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema, 
true);
 
         assertThat(changes).hasSize(1);
         SchemaChange.UpdateColumnType updateMapType =
@@ -360,7 +362,8 @@ public class SchemaMergingUtilsTest {
                         new HashMap<>(),
                         "");
 
-        List<SchemaChange> changes = 
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+        List<SchemaChange> changes =
+                SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema, 
true);
 
         assertThat(changes).hasSize(1);
         SchemaChange.UpdateColumnType updateItemsType =
@@ -377,23 +380,25 @@ public class SchemaMergingUtilsTest {
 
         // the element types are same.
         DataType t1 = new ArrayType(true, new IntType());
-        ArrayType r1 = (ArrayType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false);
+        ArrayType r1 =
+                (ArrayType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false, true);
         assertThat(r1.isNullable()).isFalse();
         assertThat(r1.getElementType() instanceof IntType).isTrue();
 
         // the element types aren't same, but can be evolved safety.
         DataType t2 = new ArrayType(true, new BigIntType());
-        ArrayType r2 = (ArrayType) SchemaMergingUtils.merge(source, t2, 
highestFieldId, false);
+        ArrayType r2 =
+                (ArrayType) SchemaMergingUtils.merge(source, t2, 
highestFieldId, false, true);
         assertThat(r2.isNullable()).isFalse();
         assertThat(r2.getElementType() instanceof BigIntType).isTrue();
 
         // the element types aren't same, and can't be evolved safety.
         DataType t3 = new ArrayType(true, new SmallIntType());
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // the value type of target's isn't same to the source's, but the 
source type can be cast to
         // the target type explicitly.
-        ArrayType r3 = (ArrayType) SchemaMergingUtils.merge(source, t3, 
highestFieldId, true);
+        ArrayType r3 = (ArrayType) SchemaMergingUtils.merge(source, t3, 
highestFieldId, true, true);
         assertThat(r3.isNullable()).isFalse();
         assertThat(r3.getElementType() instanceof SmallIntType).isTrue();
     }
@@ -406,25 +411,25 @@ public class SchemaMergingUtilsTest {
 
         // both the key and value types are same to the source's.
         DataType t1 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new 
IntType());
-        MapType r1 = (MapType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false);
+        MapType r1 = (MapType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false, true);
         assertThat(r1.isNullable()).isTrue();
         assertThat(r1.getKeyType() instanceof VarCharType).isTrue();
         assertThat(r1.getValueType() instanceof IntType).isTrue();
 
         // the value type of target's isn't same to the source's, but can be 
evolved safety.
         DataType t2 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new 
DoubleType());
-        MapType r2 = (MapType) SchemaMergingUtils.merge(source, t2, 
highestFieldId, false);
+        MapType r2 = (MapType) SchemaMergingUtils.merge(source, t2, 
highestFieldId, false, true);
         assertThat(r2.isNullable()).isTrue();
         assertThat(r2.getKeyType() instanceof VarCharType).isTrue();
         assertThat(r2.getValueType() instanceof DoubleType).isTrue();
 
         // the value type of target's isn't same to the source's, and can't be 
evolved safety.
         DataType t3 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new 
SmallIntType());
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // the value type of target's isn't same to the source's, but the 
source type can be cast to
         // the target type explicitly.
-        MapType r3 = (MapType) SchemaMergingUtils.merge(source, t3, 
highestFieldId, true);
+        MapType r3 = (MapType) SchemaMergingUtils.merge(source, t3, 
highestFieldId, true, true);
         assertThat(r3.isNullable()).isTrue();
         assertThat(r3.getKeyType() instanceof VarCharType).isTrue();
         assertThat(r3.getValueType() instanceof SmallIntType).isTrue();
@@ -439,24 +444,25 @@ public class SchemaMergingUtilsTest {
         // the element types are same.
         DataType t1 = new MultisetType(true, new IntType());
         MultisetType r1 =
-                (MultisetType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false);
+                (MultisetType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false, true);
         assertThat(r1.isNullable()).isFalse();
         assertThat(r1.getElementType() instanceof IntType).isTrue();
 
         // the element types aren't same, but can be evolved safety.
         DataType t2 = new MultisetType(true, new BigIntType());
         MultisetType r2 =
-                (MultisetType) SchemaMergingUtils.merge(source, t2, 
highestFieldId, false);
+                (MultisetType) SchemaMergingUtils.merge(source, t2, 
highestFieldId, false, true);
         assertThat(r2.isNullable()).isFalse();
         assertThat(r2.getElementType() instanceof BigIntType).isTrue();
 
         // the element types aren't same, and can't be evolved safety.
         DataType t3 = new MultisetType(true, new SmallIntType());
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // the value type of target's isn't same to the source's, but the 
source type can be cast to
         // the target type explicitly.
-        MultisetType r3 = (MultisetType) SchemaMergingUtils.merge(source, t3, 
highestFieldId, true);
+        MultisetType r3 =
+                (MultisetType) SchemaMergingUtils.merge(source, t3, 
highestFieldId, true, true);
         assertThat(r3.isNullable()).isFalse();
         assertThat(r3.getElementType() instanceof SmallIntType).isTrue();
     }
@@ -467,26 +473,29 @@ public class SchemaMergingUtilsTest {
 
         DataType s1 = new DecimalType();
         DataType t1 = new DecimalType(10, 0);
-        DecimalType r1 = (DecimalType) SchemaMergingUtils.merge(s1, t1, 
highestFieldId, false);
+        DecimalType r1 =
+                (DecimalType) SchemaMergingUtils.merge(s1, t1, highestFieldId, 
false, true);
         assertThat(r1.isNullable()).isTrue();
         assertThat(r1.getPrecision()).isEqualTo(DecimalType.DEFAULT_PRECISION);
         assertThat(r1.getScale()).isEqualTo(DecimalType.DEFAULT_SCALE);
 
         DataType s2 = new DecimalType(5, 2);
         DataType t2 = new DecimalType(7, 3);
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(s2, t2, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(s2, t2, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
 
         DataType s3 = new DecimalType(false, 5, 2);
         DataType t3 = new DecimalType(7, 2);
-        DecimalType r3 = (DecimalType) SchemaMergingUtils.merge(s3, t3, 
highestFieldId, false);
+        DecimalType r3 =
+                (DecimalType) SchemaMergingUtils.merge(s3, t3, highestFieldId, 
false, true);
         assertThat(r3.isNullable()).isFalse();
         assertThat(r3.getPrecision()).isEqualTo(7);
         assertThat(r3.getScale()).isEqualTo(2);
 
         DataType s4 = new DecimalType(7, 2);
         DataType t4 = new DecimalType(5, 2);
-        DecimalType r4 = (DecimalType) SchemaMergingUtils.merge(s4, t4, 
highestFieldId, false);
+        DecimalType r4 =
+                (DecimalType) SchemaMergingUtils.merge(s4, t4, highestFieldId, 
false, true);
         assertThat(r4.isNullable()).isTrue();
         assertThat(r4.getPrecision()).isEqualTo(7);
         assertThat(r4.getScale()).isEqualTo(2);
@@ -495,10 +504,12 @@ public class SchemaMergingUtilsTest {
         DataType dcmSource = new DecimalType();
         DataType iTarget = new IntType();
         assertThatThrownBy(
-                        () -> SchemaMergingUtils.merge(dcmSource, iTarget, 
highestFieldId, false))
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        dcmSource, iTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // DecimalType -> Other Numeric Type with allowExplicitCast = true
-        DataType res = SchemaMergingUtils.merge(dcmSource, iTarget, 
highestFieldId, true);
+        DataType res = SchemaMergingUtils.merge(dcmSource, iTarget, 
highestFieldId, true, true);
         assertThat(res instanceof IntType).isTrue();
     }
 
@@ -509,118 +520,122 @@ public class SchemaMergingUtilsTest {
         // BinaryType
         DataType s1 = new BinaryType(10);
         DataType t1 = new BinaryType(10);
-        BinaryType r1 = (BinaryType) SchemaMergingUtils.merge(s1, t1, 
highestFieldId, false);
+        BinaryType r1 = (BinaryType) SchemaMergingUtils.merge(s1, t1, 
highestFieldId, false, true);
         assertThat(r1.getLength()).isEqualTo(10);
 
         DataType s2 = new BinaryType(2);
         DataType t2 = new BinaryType();
         // smaller length
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(s2, t2, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(s2, t2, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // smaller length  with allowExplicitCast = true
-        BinaryType r2 = (BinaryType) SchemaMergingUtils.merge(s2, t2, 
highestFieldId, true);
+        BinaryType r2 = (BinaryType) SchemaMergingUtils.merge(s2, t2, 
highestFieldId, true, true);
         assertThat(r2.getLength()).isEqualTo(BinaryType.DEFAULT_LENGTH);
         // bigger length
         DataType t3 = new BinaryType(5);
-        BinaryType r3 = (BinaryType) SchemaMergingUtils.merge(s2, t3, 
highestFieldId, false);
+        BinaryType r3 = (BinaryType) SchemaMergingUtils.merge(s2, t3, 
highestFieldId, false, true);
         assertThat(r3.getLength()).isEqualTo(5);
 
         // VarCharType
         DataType s4 = new VarCharType();
         DataType t4 = new VarCharType(1);
-        VarCharType r4 = (VarCharType) SchemaMergingUtils.merge(s4, t4, 
highestFieldId, false);
+        VarCharType r4 =
+                (VarCharType) SchemaMergingUtils.merge(s4, t4, highestFieldId, 
false, true);
         assertThat(r4.getLength()).isEqualTo(VarCharType.DEFAULT_LENGTH);
 
         DataType s5 = new VarCharType(2);
         DataType t5 = new VarCharType();
         // smaller length
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(s5, t5, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(s5, t5, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // smaller length  with allowExplicitCast = true
-        VarCharType r5 = (VarCharType) SchemaMergingUtils.merge(s5, t5, 
highestFieldId, true);
+        VarCharType r5 = (VarCharType) SchemaMergingUtils.merge(s5, t5, 
highestFieldId, true, true);
         assertThat(r5.getLength()).isEqualTo(VarCharType.DEFAULT_LENGTH);
         // bigger length
         DataType t6 = new VarCharType(5);
-        VarCharType r6 = (VarCharType) SchemaMergingUtils.merge(s5, t6, 
highestFieldId, false);
+        VarCharType r6 =
+                (VarCharType) SchemaMergingUtils.merge(s5, t6, highestFieldId, 
false, true);
         assertThat(r6.getLength()).isEqualTo(5);
 
         // CharType
         DataType s7 = new CharType();
         DataType t7 = new CharType(1);
-        CharType r7 = (CharType) SchemaMergingUtils.merge(s7, t7, 
highestFieldId, false);
+        CharType r7 = (CharType) SchemaMergingUtils.merge(s7, t7, 
highestFieldId, false, true);
         assertThat(r7.getLength()).isEqualTo(CharType.DEFAULT_LENGTH);
 
         DataType s8 = new CharType(2);
         DataType t8 = new CharType();
         // smaller length
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(s8, t8, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(s8, t8, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // smaller length  with allowExplicitCast = true
-        CharType r8 = (CharType) SchemaMergingUtils.merge(s8, t8, 
highestFieldId, true);
+        CharType r8 = (CharType) SchemaMergingUtils.merge(s8, t8, 
highestFieldId, true, true);
         assertThat(r8.getLength()).isEqualTo(CharType.DEFAULT_LENGTH);
         // bigger length
         DataType t9 = new CharType(5);
-        CharType r9 = (CharType) SchemaMergingUtils.merge(s8, t9, 
highestFieldId, false);
+        CharType r9 = (CharType) SchemaMergingUtils.merge(s8, t9, 
highestFieldId, false, true);
         assertThat(r9.getLength()).isEqualTo(5);
 
         // VarBinaryType
         DataType s10 = new VarBinaryType();
         DataType t10 = new VarBinaryType(1);
         VarBinaryType r10 =
-                (VarBinaryType) SchemaMergingUtils.merge(s10, t10, 
highestFieldId, false);
+                (VarBinaryType) SchemaMergingUtils.merge(s10, t10, 
highestFieldId, false, true);
         assertThat(r10.getLength()).isEqualTo(VarBinaryType.DEFAULT_LENGTH);
 
         DataType s11 = new VarBinaryType(2);
         DataType t11 = new VarBinaryType();
         // smaller length
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(s11, t11, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(s11, t11, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // smaller length  with allowExplicitCast = true
         VarBinaryType r11 =
-                (VarBinaryType) SchemaMergingUtils.merge(s11, t11, 
highestFieldId, true);
+                (VarBinaryType) SchemaMergingUtils.merge(s11, t11, 
highestFieldId, true, true);
         assertThat(r11.getLength()).isEqualTo(VarBinaryType.DEFAULT_LENGTH);
         // bigger length
         DataType t12 = new VarBinaryType(5);
         VarBinaryType r12 =
-                (VarBinaryType) SchemaMergingUtils.merge(s11, t12, 
highestFieldId, false);
+                (VarBinaryType) SchemaMergingUtils.merge(s11, t12, 
highestFieldId, false, true);
         assertThat(r12.getLength()).isEqualTo(5);
 
         // CharType -> VarCharType
         DataType s13 = new CharType();
         DataType t13 = new VarCharType(10);
-        VarCharType r13 = (VarCharType) SchemaMergingUtils.merge(s13, t13, 
highestFieldId, false);
+        VarCharType r13 =
+                (VarCharType) SchemaMergingUtils.merge(s13, t13, 
highestFieldId, false, true);
         assertThat(r13.getLength()).isEqualTo(10);
 
         // VarCharType ->CharType
         DataType s14 = new VarCharType(10);
         DataType t14 = new CharType();
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(s14, t14, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(s14, t14, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
-        CharType r14 = (CharType) SchemaMergingUtils.merge(s14, t14, 
highestFieldId, true);
+        CharType r14 = (CharType) SchemaMergingUtils.merge(s14, t14, 
highestFieldId, true, true);
         assertThat(r14.getLength()).isEqualTo(CharType.DEFAULT_LENGTH);
 
         // BinaryType -> VarBinaryType
         DataType s15 = new BinaryType();
         DataType t15 = new VarBinaryType(10);
         VarBinaryType r15 =
-                (VarBinaryType) SchemaMergingUtils.merge(s15, t15, 
highestFieldId, false);
+                (VarBinaryType) SchemaMergingUtils.merge(s15, t15, 
highestFieldId, false, true);
         assertThat(r15.getLength()).isEqualTo(10);
 
         // VarBinaryType -> BinaryType
         DataType s16 = new VarBinaryType(10);
         DataType t16 = new BinaryType();
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(s16, t16, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(s16, t16, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
-        BinaryType r16 = (BinaryType) SchemaMergingUtils.merge(s16, t16, 
highestFieldId, true);
+        BinaryType r16 =
+                (BinaryType) SchemaMergingUtils.merge(s16, t16, 
highestFieldId, true, true);
         assertThat(r16.getLength()).isEqualTo(BinaryType.DEFAULT_LENGTH);
 
         // VarCharType -> VarBinaryType
         DataType s17 = new VarCharType(10);
         DataType t17 = new VarBinaryType();
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(s17, t17, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(s17, t17, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         VarBinaryType r17 =
-                (VarBinaryType) SchemaMergingUtils.merge(s17, t17, 
highestFieldId, true);
+                (VarBinaryType) SchemaMergingUtils.merge(s17, t17, 
highestFieldId, true, true);
         assertThat(r17.getLength()).isEqualTo(VarBinaryType.DEFAULT_LENGTH);
     }
 
@@ -632,7 +647,8 @@ public class SchemaMergingUtilsTest {
         DataType s1 = new LocalZonedTimestampType();
         DataType t1 = new LocalZonedTimestampType();
         LocalZonedTimestampType r1 =
-                (LocalZonedTimestampType) SchemaMergingUtils.merge(s1, t1, 
highestFieldId, false);
+                (LocalZonedTimestampType)
+                        SchemaMergingUtils.merge(s1, t1, highestFieldId, 
false, true);
         assertThat(r1.isNullable()).isTrue();
         
assertThat(r1.getPrecision()).isEqualTo(LocalZonedTimestampType.DEFAULT_PRECISION);
 
@@ -640,31 +656,38 @@ public class SchemaMergingUtilsTest {
         assertThatThrownBy(
                         () ->
                                 SchemaMergingUtils.merge(
-                                        s1, new LocalZonedTimestampType(3), 
highestFieldId, false))
+                                        s1,
+                                        new LocalZonedTimestampType(3),
+                                        highestFieldId,
+                                        false,
+                                        true))
                 .isInstanceOf(UnsupportedOperationException.class);
 
         // higher precision
         DataType t2 = new LocalZonedTimestampType(6);
         LocalZonedTimestampType r2 =
-                (LocalZonedTimestampType) SchemaMergingUtils.merge(s1, t2, 
highestFieldId, false);
+                (LocalZonedTimestampType)
+                        SchemaMergingUtils.merge(s1, t2, highestFieldId, 
false, true);
         assertThat(r2.getPrecision()).isEqualTo(6);
 
         // LocalZonedTimestampType -> TimeType
         DataType s3 = new LocalZonedTimestampType();
         DataType t3 = new TimeType(6);
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(s3, t3, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(s3, t3, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
 
         // LocalZonedTimestampType -> TimestampType
         DataType s4 = new LocalZonedTimestampType();
         DataType t4 = new TimestampType();
-        TimestampType r4 = (TimestampType) SchemaMergingUtils.merge(s4, t4, 
highestFieldId, false);
+        TimestampType r4 =
+                (TimestampType) SchemaMergingUtils.merge(s4, t4, 
highestFieldId, false, true);
         
assertThat(r4.getPrecision()).isEqualTo(TimestampType.DEFAULT_PRECISION);
 
         // TimestampType.
         DataType s5 = new TimestampType();
         DataType t5 = new TimestampType();
-        TimestampType r5 = (TimestampType) SchemaMergingUtils.merge(s5, t5, 
highestFieldId, false);
+        TimestampType r5 =
+                (TimestampType) SchemaMergingUtils.merge(s5, t5, 
highestFieldId, false, true);
         assertThat(r5.isNullable()).isTrue();
         
assertThat(r5.getPrecision()).isEqualTo(TimestampType.DEFAULT_PRECISION);
 
@@ -672,65 +695,70 @@ public class SchemaMergingUtilsTest {
         assertThatThrownBy(
                         () ->
                                 SchemaMergingUtils.merge(
-                                        s5, new TimestampType(3), 
highestFieldId, false))
+                                        s5, new TimestampType(3), 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
 
         // higher precision
         DataType t6 = new TimestampType(9);
-        TimestampType r6 = (TimestampType) SchemaMergingUtils.merge(s5, t6, 
highestFieldId, false);
+        TimestampType r6 =
+                (TimestampType) SchemaMergingUtils.merge(s5, t6, 
highestFieldId, false, true);
         assertThat(r6.getPrecision()).isEqualTo(9);
 
         // TimestampType -> LocalZonedTimestampType
         DataType s7 = new TimestampType();
         DataType t7 = new LocalZonedTimestampType();
         LocalZonedTimestampType r7 =
-                (LocalZonedTimestampType) SchemaMergingUtils.merge(s7, t7, 
highestFieldId, false);
+                (LocalZonedTimestampType)
+                        SchemaMergingUtils.merge(s7, t7, highestFieldId, 
false, true);
         
assertThat(r7.getPrecision()).isEqualTo(TimestampType.DEFAULT_PRECISION);
 
         // TimestampType -> TimestampType
         DataType s8 = new TimestampType();
         DataType t8 = new TimeType(6);
-        TimeType r8 = (TimeType) SchemaMergingUtils.merge(s8, t8, 
highestFieldId, false);
+        TimeType r8 = (TimeType) SchemaMergingUtils.merge(s8, t8, 
highestFieldId, false, true);
         
assertThat(r8.getPrecision()).isEqualTo(TimestampType.DEFAULT_PRECISION);
 
         // TimeType.
         DataType s9 = new TimeType();
         DataType t9 = new TimeType();
-        TimeType r9 = (TimeType) SchemaMergingUtils.merge(s9, t9, 
highestFieldId, false);
+        TimeType r9 = (TimeType) SchemaMergingUtils.merge(s9, t9, 
highestFieldId, false, true);
         assertThat(r9.isNullable()).isTrue();
         assertThat(r9.getPrecision()).isEqualTo(TimeType.DEFAULT_PRECISION);
 
         // lower precision
         DataType s10 = new TimeType(6);
         assertThatThrownBy(
-                        () -> SchemaMergingUtils.merge(s10, new TimeType(3), 
highestFieldId, false))
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        s10, new TimeType(3), highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
 
         // higher precision
         DataType t10 = new TimeType(9);
-        TimeType r10 = (TimeType) SchemaMergingUtils.merge(s9, t10, 
highestFieldId, false);
+        TimeType r10 = (TimeType) SchemaMergingUtils.merge(s9, t10, 
highestFieldId, false, true);
         assertThat(r10.getPrecision()).isEqualTo(9);
 
         // TimeType -> LocalZonedTimestampType
         DataType s11 = new TimeType();
         DataType t11 = new LocalZonedTimestampType();
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(s11, t11, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(s11, t11, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
 
         // TimeType -> LocalZonedTimestampType with allowExplicitCast = true
         LocalZonedTimestampType r11 =
-                (LocalZonedTimestampType) SchemaMergingUtils.merge(s11, t11, 
highestFieldId, true);
+                (LocalZonedTimestampType)
+                        SchemaMergingUtils.merge(s11, t11, highestFieldId, 
true, true);
         
assertThat(r11.getPrecision()).isEqualTo(LocalZonedTimestampType.DEFAULT_PRECISION);
 
         // TimeType -> TimestampType
         DataType s12 = new TimeType();
         DataType t12 = new TimestampType();
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(s12, t12, 
highestFieldId, false))
+        assertThatThrownBy(() -> SchemaMergingUtils.merge(s12, t12, 
highestFieldId, false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
 
         // TimeType -> TimestampType with allowExplicitCast = true
         TimestampType r12 =
-                (TimestampType) SchemaMergingUtils.merge(s12, t12, 
highestFieldId, true);
+                (TimestampType) SchemaMergingUtils.merge(s12, t12, 
highestFieldId, true, true);
         
assertThat(r12.getPrecision()).isEqualTo(TimestampType.DEFAULT_PRECISION);
     }
 
@@ -756,186 +784,237 @@ public class SchemaMergingUtilsTest {
         DataType dcmTarget = new DecimalType();
 
         // BooleanType
-        DataType btRes1 = SchemaMergingUtils.merge(bSource, bTarget, 
highestFieldId, false);
+        DataType btRes1 = SchemaMergingUtils.merge(bSource, bTarget, 
highestFieldId, false, true);
         assertThat(btRes1 instanceof BooleanType).isTrue();
         // BooleanType -> Numeric Type
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(bSource, tiTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        bSource, tiTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
 
         // BooleanType -> Numeric Type with allowExplicitCast = true
-        DataType btRes2 = SchemaMergingUtils.merge(bSource, tiTarget, 
highestFieldId, true);
+        DataType btRes2 = SchemaMergingUtils.merge(bSource, tiTarget, 
highestFieldId, true, true);
         assertThat(btRes2 instanceof TinyIntType).isTrue();
 
         // TinyIntType
-        DataType tiRes1 = SchemaMergingUtils.merge(tiSource, tiTarget, 
highestFieldId, false);
+        DataType tiRes1 = SchemaMergingUtils.merge(tiSource, tiTarget, 
highestFieldId, false, true);
         assertThat(tiRes1 instanceof TinyIntType).isTrue();
         // TinyIntType -> SmallIntType
-        DataType tiRes2 = SchemaMergingUtils.merge(tiSource, siTarget, 
highestFieldId, false);
+        DataType tiRes2 = SchemaMergingUtils.merge(tiSource, siTarget, 
highestFieldId, false, true);
         assertThat(tiRes2 instanceof SmallIntType).isTrue();
         // TinyIntType -> IntType
-        DataType tiRes3 = SchemaMergingUtils.merge(tiSource, iTarget, 
highestFieldId, false);
+        DataType tiRes3 = SchemaMergingUtils.merge(tiSource, iTarget, 
highestFieldId, false, true);
         assertThat(tiRes3 instanceof IntType).isTrue();
         // TinyIntType -> BigIntType
-        DataType tiRes4 = SchemaMergingUtils.merge(tiSource, biTarget, 
highestFieldId, false);
+        DataType tiRes4 = SchemaMergingUtils.merge(tiSource, biTarget, 
highestFieldId, false, true);
         assertThat(tiRes4 instanceof BigIntType).isTrue();
         // TinyIntType -> FloatType
-        DataType tiRes5 = SchemaMergingUtils.merge(tiSource, fTarget, 
highestFieldId, false);
+        DataType tiRes5 = SchemaMergingUtils.merge(tiSource, fTarget, 
highestFieldId, false, true);
         assertThat(tiRes5 instanceof FloatType).isTrue();
         // TinyIntType -> DoubleType
-        DataType tiRes6 = SchemaMergingUtils.merge(tiSource, dTarget, 
highestFieldId, false);
+        DataType tiRes6 = SchemaMergingUtils.merge(tiSource, dTarget, 
highestFieldId, false, true);
         assertThat(tiRes6 instanceof DoubleType).isTrue();
         // TinyIntType -> DecimalType
-        DataType tiRes7 = SchemaMergingUtils.merge(tiSource, dcmTarget, 
highestFieldId, false);
+        DataType tiRes7 =
+                SchemaMergingUtils.merge(tiSource, dcmTarget, highestFieldId, 
false, true);
         assertThat(tiRes7 instanceof DecimalType).isTrue();
         // TinyIntType -> BooleanType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(tiSource, bTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        tiSource, bTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
 
         // TinyIntType -> BooleanType with allowExplicitCast = true
-        DataType tiRes8 = SchemaMergingUtils.merge(tiSource, bTarget, 
highestFieldId, true);
+        DataType tiRes8 = SchemaMergingUtils.merge(tiSource, bTarget, 
highestFieldId, true, true);
         assertThat(tiRes8 instanceof BooleanType).isTrue();
 
         // SmallIntType
-        DataType siRes1 = SchemaMergingUtils.merge(siSource, siTarget, 
highestFieldId, false);
+        DataType siRes1 = SchemaMergingUtils.merge(siSource, siTarget, 
highestFieldId, false, true);
         assertThat(siRes1 instanceof SmallIntType).isTrue();
         // SmallIntType -> TinyIntType
         assertThatThrownBy(
-                        () -> SchemaMergingUtils.merge(siSource, tiTarget, 
highestFieldId, false))
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        siSource, tiTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // SmallIntType -> TinyIntType with allowExplicitCast = true
-        DataType siRes2 = SchemaMergingUtils.merge(siSource, tiTarget, 
highestFieldId, true);
+        DataType siRes2 = SchemaMergingUtils.merge(siSource, tiTarget, 
highestFieldId, true, true);
         assertThat(siRes2 instanceof TinyIntType).isTrue();
         // SmallIntType -> IntType
-        DataType siRes3 = SchemaMergingUtils.merge(siSource, iTarget, 
highestFieldId, false);
+        DataType siRes3 = SchemaMergingUtils.merge(siSource, iTarget, 
highestFieldId, false, true);
         assertThat(siRes3 instanceof IntType).isTrue();
         // SmallIntType -> BigIntType
-        DataType siRes4 = SchemaMergingUtils.merge(siSource, biTarget, 
highestFieldId, false);
+        DataType siRes4 = SchemaMergingUtils.merge(siSource, biTarget, 
highestFieldId, false, true);
         assertThat(siRes4 instanceof BigIntType).isTrue();
         // SmallIntType -> FloatType
-        DataType siRes5 = SchemaMergingUtils.merge(siSource, fTarget, 
highestFieldId, false);
+        DataType siRes5 = SchemaMergingUtils.merge(siSource, fTarget, 
highestFieldId, false, true);
         assertThat(siRes5 instanceof FloatType).isTrue();
         // SmallIntType -> DoubleType
-        DataType siRes6 = SchemaMergingUtils.merge(siSource, dTarget, 
highestFieldId, false);
+        DataType siRes6 = SchemaMergingUtils.merge(siSource, dTarget, 
highestFieldId, false, true);
         assertThat(siRes6 instanceof DoubleType).isTrue();
         // SmallIntType -> DecimalType
-        DataType siRes7 = SchemaMergingUtils.merge(siSource, dcmTarget, 
highestFieldId, false);
+        DataType siRes7 =
+                SchemaMergingUtils.merge(siSource, dcmTarget, highestFieldId, 
false, true);
         assertThat(siRes7 instanceof DecimalType).isTrue();
 
         // IntType
-        DataType iRes1 = SchemaMergingUtils.merge(iSource, iTarget, 
highestFieldId, false);
+        DataType iRes1 = SchemaMergingUtils.merge(iSource, iTarget, 
highestFieldId, false, true);
         assertThat(iRes1 instanceof IntType).isTrue();
         // IntType -> TinyIntType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(iSource, tiTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        iSource, tiTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // IntType -> TinyIntType with allowExplicitCast = true
-        DataType iRes2 = SchemaMergingUtils.merge(iSource, tiTarget, 
highestFieldId, true);
+        DataType iRes2 = SchemaMergingUtils.merge(iSource, tiTarget, 
highestFieldId, true, true);
         assertThat(iRes2 instanceof TinyIntType).isTrue();
         // IntType -> SmallIntType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(iSource, siTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        iSource, siTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // IntType -> SmallIntType with allowExplicitCast = true
-        DataType iRes3 = SchemaMergingUtils.merge(iSource, siTarget, 
highestFieldId, true);
+        DataType iRes3 = SchemaMergingUtils.merge(iSource, siTarget, 
highestFieldId, true, true);
         assertThat(iRes3 instanceof SmallIntType).isTrue();
         // IntType -> BigIntType
-        DataType iRes4 = SchemaMergingUtils.merge(iSource, biTarget, 
highestFieldId, false);
+        DataType iRes4 = SchemaMergingUtils.merge(iSource, biTarget, 
highestFieldId, false, true);
         assertThat(iRes4 instanceof BigIntType).isTrue();
         // IntType -> FloatType
-        DataType iRes5 = SchemaMergingUtils.merge(iSource, fTarget, 
highestFieldId, false);
+        DataType iRes5 = SchemaMergingUtils.merge(iSource, fTarget, 
highestFieldId, false, true);
         assertThat(iRes5 instanceof FloatType).isTrue();
         // IntType -> DoubleType
-        DataType iRes6 = SchemaMergingUtils.merge(iSource, dTarget, 
highestFieldId, false);
+        DataType iRes6 = SchemaMergingUtils.merge(iSource, dTarget, 
highestFieldId, false, true);
         assertThat(iRes6 instanceof DoubleType).isTrue();
         // IntType -> DecimalType
-        DataType iRes7 = SchemaMergingUtils.merge(iSource, dcmTarget, 
highestFieldId, false);
+        DataType iRes7 = SchemaMergingUtils.merge(iSource, dcmTarget, 
highestFieldId, false, true);
         assertThat(iRes7 instanceof DecimalType).isTrue();
 
         // BigIntType
-        DataType biRes1 = SchemaMergingUtils.merge(biSource, biTarget, 
highestFieldId, false);
+        DataType biRes1 = SchemaMergingUtils.merge(biSource, biTarget, 
highestFieldId, false, true);
         assertThat(biRes1 instanceof BigIntType).isTrue();
         // BigIntType -> TinyIntType
         assertThatThrownBy(
-                        () -> SchemaMergingUtils.merge(biSource, tiTarget, 
highestFieldId, false))
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        biSource, tiTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // BigIntType -> TinyIntType with allowExplicitCast = true
-        DataType biRes2 = SchemaMergingUtils.merge(biSource, tiTarget, 
highestFieldId, true);
+        DataType biRes2 = SchemaMergingUtils.merge(biSource, tiTarget, 
highestFieldId, true, true);
         assertThat(biRes2 instanceof TinyIntType).isTrue();
         // BigIntType -> SmallIntType
         assertThatThrownBy(
-                        () -> SchemaMergingUtils.merge(biSource, siTarget, 
highestFieldId, false))
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        biSource, siTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // BigIntType -> SmallIntType with allowExplicitCast = true
-        DataType biRes3 = SchemaMergingUtils.merge(biSource, siTarget, 
highestFieldId, true);
+        DataType biRes3 = SchemaMergingUtils.merge(biSource, siTarget, 
highestFieldId, true, true);
         assertThat(biRes3 instanceof SmallIntType).isTrue();
         // BigIntType -> IntType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(biSource, iTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        biSource, iTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // BigIntType -> IntType with allowExplicitCast = true
-        DataType biRes4 = SchemaMergingUtils.merge(biSource, iTarget, 
highestFieldId, true);
+        DataType biRes4 = SchemaMergingUtils.merge(biSource, iTarget, 
highestFieldId, true, true);
         assertThat(biRes4 instanceof IntType).isTrue();
         // BigIntType -> FloatType
-        DataType biRes5 = SchemaMergingUtils.merge(biSource, fTarget, 
highestFieldId, false);
+        DataType biRes5 = SchemaMergingUtils.merge(biSource, fTarget, 
highestFieldId, false, true);
         assertThat(biRes5 instanceof FloatType).isTrue();
         // BigIntType -> DoubleType
-        DataType biRes6 = SchemaMergingUtils.merge(biSource, dTarget, 
highestFieldId, false);
+        DataType biRes6 = SchemaMergingUtils.merge(biSource, dTarget, 
highestFieldId, false, true);
         assertThat(biRes6 instanceof DoubleType).isTrue();
         // BigIntType -> DecimalType
-        DataType biRes7 = SchemaMergingUtils.merge(biSource, dcmTarget, 
highestFieldId, false);
+        DataType biRes7 =
+                SchemaMergingUtils.merge(biSource, dcmTarget, highestFieldId, 
false, true);
         assertThat(biRes7 instanceof DecimalType).isTrue();
 
         // FloatType
-        DataType fRes1 = SchemaMergingUtils.merge(fSource, fTarget, 
highestFieldId, false);
+        DataType fRes1 = SchemaMergingUtils.merge(fSource, fTarget, 
highestFieldId, false, true);
         assertThat(fRes1 instanceof FloatType).isTrue();
         // FloatType -> TinyIntType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(fSource, tiTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        fSource, tiTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // FloatType -> TinyIntType with allowExplicitCast = true
-        DataType fRes2 = SchemaMergingUtils.merge(fSource, tiTarget, 
highestFieldId, true);
+        DataType fRes2 = SchemaMergingUtils.merge(fSource, tiTarget, 
highestFieldId, true, true);
         assertThat(fRes2 instanceof TinyIntType).isTrue();
         // FloatType -> SmallIntType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(fSource, siTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        fSource, siTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // FloatType -> IntType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(fSource, iTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        fSource, iTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // FloatType -> IntType with allowExplicitCast = true
-        DataType fRes4 = SchemaMergingUtils.merge(fSource, iTarget, 
highestFieldId, true);
+        DataType fRes4 = SchemaMergingUtils.merge(fSource, iTarget, 
highestFieldId, true, true);
         assertThat(fRes4 instanceof IntType).isTrue();
         // FloatType -> BigIntType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(fSource, biTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        fSource, biTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // FloatType -> DoubleType
-        DataType fRes6 = SchemaMergingUtils.merge(fSource, dTarget, 
highestFieldId, false);
+        DataType fRes6 = SchemaMergingUtils.merge(fSource, dTarget, 
highestFieldId, false, true);
         assertThat(fRes6 instanceof DoubleType).isTrue();
         // FloatType -> DecimalType
-        DataType fRes7 = SchemaMergingUtils.merge(fSource, dcmTarget, 
highestFieldId, false);
+        DataType fRes7 = SchemaMergingUtils.merge(fSource, dcmTarget, 
highestFieldId, false, true);
         assertThat(fRes7 instanceof DecimalType).isTrue();
 
         // DoubleType
-        DataType dRes1 = SchemaMergingUtils.merge(dSource, dTarget, 
highestFieldId, false);
+        DataType dRes1 = SchemaMergingUtils.merge(dSource, dTarget, 
highestFieldId, false, true);
         assertThat(dRes1 instanceof DoubleType).isTrue();
         // DoubleType -> TinyIntType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(dSource, tiTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        dSource, tiTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // DoubleType -> SmallIntType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(dSource, siTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        dSource, siTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // DoubleType -> SmallIntType with allowExplicitCast = true
-        DataType dRes3 = SchemaMergingUtils.merge(dSource, siTarget, 
highestFieldId, true);
+        DataType dRes3 = SchemaMergingUtils.merge(dSource, siTarget, 
highestFieldId, true, true);
         assertThat(dRes3 instanceof SmallIntType).isTrue();
         // DoubleType -> IntType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(dSource, iTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        dSource, iTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // DoubleType -> BigIntType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(dSource, biTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        dSource, biTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // DoubleType -> BigIntType with allowExplicitCast = true
-        DataType dRes5 = SchemaMergingUtils.merge(dSource, biTarget, 
highestFieldId, true);
+        DataType dRes5 = SchemaMergingUtils.merge(dSource, biTarget, 
highestFieldId, true, true);
         assertThat(dRes5 instanceof BigIntType).isTrue();
         // DoubleType -> FloatType
-        assertThatThrownBy(() -> SchemaMergingUtils.merge(dSource, fTarget, 
highestFieldId, false))
+        assertThatThrownBy(
+                        () ->
+                                SchemaMergingUtils.merge(
+                                        dSource, fTarget, highestFieldId, 
false, true))
                 .isInstanceOf(UnsupportedOperationException.class);
         // DoubleType -> DecimalType
-        DataType dRes7 = SchemaMergingUtils.merge(dSource, dcmTarget, 
highestFieldId, false);
+        DataType dRes7 = SchemaMergingUtils.merge(dSource, dcmTarget, 
highestFieldId, false, true);
         assertThat(dRes7 instanceof DecimalType).isTrue();
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
index e94621c2de..f4dbbdac96 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
@@ -83,8 +83,9 @@ trait MergeSchemaEvolutionHelper extends ExpressionHelper {
         .map(a => StructField(a.name, a.dataType, a.nullable)))
     val filteredSourceSchema = 
SparkSystemColumns.filterSparkSystemColumns(sourceSchema)
     val allowExplicitCast = OptionUtils.writeMergeSchemaExplicitCastEnabled()
+    val caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis
     val updatedFileStoreTable = SchemaHelper
-      .mergeAndCommitSchema(fileStoreTable, filteredSourceSchema, 
allowExplicitCast)
+      .mergeAndCommitSchema(fileStoreTable, filteredSourceSchema, 
allowExplicitCast, caseSensitive)
       .getOrElse(return None)
 
     // Invalidate Spark catalog cache so subsequent queries see the new schema.
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
index 1416602a5f..928d6211d1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
@@ -41,7 +41,7 @@ private[spark] trait SchemaHelper extends WithFileStoreTable {
 
   def mergeSchema(sparkSession: SparkSession, input: DataFrame, options: 
Options): DataFrame = {
     val dataSchema = SparkSystemColumns.filterSparkSystemColumns(input.schema)
-    val writeSchema = mergeSchema(dataSchema, options)
+    val writeSchema = mergeSchema(sparkSession, dataSchema, options)
     if (!PaimonUtils.sameType(writeSchema, dataSchema)) {
       val resolve = sparkSession.sessionState.conf.resolver
       val cols = SchemaHelper.alignColumns(writeSchema, dataSchema, resolve)
@@ -52,6 +52,13 @@ private[spark] trait SchemaHelper extends WithFileStoreTable 
{
   }
 
   def mergeSchema(dataSchema: StructType, options: Options): StructType = {
+    mergeSchema(SparkSession.active, dataSchema, options)
+  }
+
+  def mergeSchema(
+      sparkSession: SparkSession,
+      dataSchema: StructType,
+      options: Options): StructType = {
     val mergeSchemaEnabled =
       options.get(SparkConnectorOptions.MERGE_SCHEMA) || 
OptionUtils.writeMergeSchemaEnabled()
     if (!mergeSchemaEnabled) {
@@ -61,9 +68,10 @@ private[spark] trait SchemaHelper extends WithFileStoreTable 
{
     val filteredDataSchema = 
SparkSystemColumns.filterSparkSystemColumns(dataSchema)
     val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST) 
|| OptionUtils
       .writeMergeSchemaExplicitCastEnabled()
-    SchemaHelper.mergeAndCommitSchema(table, filteredDataSchema, 
allowExplicitCast).foreach {
-      updatedTable => newTable = Some(updatedTable)
-    }
+    val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+    SchemaHelper
+      .mergeAndCommitSchema(table, filteredDataSchema, allowExplicitCast, 
caseSensitive)
+      .foreach { updatedTable => newTable = Some(updatedTable) }
 
     val writeSchema = 
SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
     if (!PaimonUtils.sameType(writeSchema, filteredDataSchema)) {
@@ -87,9 +95,10 @@ private[spark] object SchemaHelper {
   def mergeAndCommitSchema(
       table: FileStoreTable,
       dataSchema: StructType,
-      allowExplicitCast: Boolean): Option[FileStoreTable] = {
+      allowExplicitCast: Boolean,
+      caseSensitive: Boolean = true): Option[FileStoreTable] = {
     val dataRowType = 
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
-    if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
+    if (table.store().mergeSchema(dataRowType, allowExplicitCast, 
caseSensitive)) {
       Some(table.copyWithLatestSchema())
     } else {
       None
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
index 83b423438a..ae830bcd97 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
@@ -499,6 +499,308 @@ class WriteMergeSchemaTest extends PaimonSparkTestBase {
     }
   }
 
+  test("Write merge schema: case-insensitive column matching with new column") 
{
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, name STRING)")
+      sql("INSERT INTO t VALUES (1, 'a'), (2, 'b')")
+
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        sql("INSERT INTO t BY NAME SELECT 3 AS ID, 'c' AS NAME, 100 AS extra")
+      }
+
+      val columnNames = spark.table("t").schema.fieldNames
+      assert(
+        columnNames.length == 3,
+        s"Expected 3 columns (id, name, extra) but got ${columnNames.length}: 
${columnNames.mkString(", ")}")
+
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq(Row(1, "a", null), Row(2, "b", null), Row(3, "c", 100)))
+    }
+  }
+
+  test("Write merge schema: case-insensitive dataframe write") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, name STRING)")
+      Seq((1, "a"), (2, "b"))
+        .toDF("id", "name")
+        .write
+        .format("paimon")
+        .mode("append")
+        .saveAsTable("t")
+
+      Seq((3, "c", 100))
+        .toDF("ID", "NAME", "extra")
+        .write
+        .format("paimon")
+        .mode("append")
+        .option("write.merge-schema", "true")
+        .saveAsTable("t")
+
+      val columnNames = spark.table("t").schema.fieldNames
+      assert(
+        columnNames.length == 3,
+        s"Expected 3 columns but got ${columnNames.length}: 
${columnNames.mkString(", ")}")
+
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq(Row(1, "a", null), Row(2, "b", null), Row(3, "c", 100)))
+    }
+  }
+
+  test("Write merge schema: only case differs, no schema change") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, name STRING)")
+      sql("INSERT INTO t VALUES (1, 'a')")
+
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        sql("INSERT INTO t BY NAME SELECT 2 AS ID, 'b' AS NAME")
+      }
+
+      val columnNames = spark.table("t").schema.fieldNames
+      assert(
+        columnNames.toSeq == Seq("id", "name"),
+        s"Schema changed unexpectedly: ${columnNames.mkString(", ")}")
+
+      checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, "a"), Row(2, 
"b")))
+    }
+  }
+
+  test("Write merge schema: repeated writes with alternating case") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, name STRING)")
+      sql("INSERT INTO t VALUES (1, 'a')")
+
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        sql("INSERT INTO t BY NAME SELECT 2 AS ID, 'b' AS NAME")
+        sql("INSERT INTO t BY NAME SELECT 3 AS Id, 'c' AS NaMe")
+        sql("INSERT INTO t BY NAME SELECT 4 AS iD, 'd' AS nAmE")
+      }
+
+      val columnNames = spark.table("t").schema.fieldNames
+      assert(columnNames.length == 2, s"Expected 2 columns but got: 
${columnNames.mkString(", ")}")
+
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY id"),
+        Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"), Row(4, "d")))
+    }
+  }
+
+  test("Write merge schema: nested struct case mismatch with new sub-field") {
+    withTable("t") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        sql("CREATE TABLE t (id INT, info STRUCT<key1: STRING, key2: STRING>)")
+        sql("INSERT INTO t VALUES (1, named_struct('key1', 'a', 'key2', 'b'))")
+
+        sql(
+          "INSERT INTO t BY NAME SELECT 2 AS id, " +
+            "named_struct('KEY1', 'A', 'KEY2', 'B', 'key3', 'C') AS info")
+
+        val infoFields = spark
+          .table("t")
+          .schema("info")
+          .dataType
+          .asInstanceOf[org.apache.spark.sql.types.StructType]
+          .fieldNames
+        assert(
+          infoFields.length == 3,
+          s"Expected 3 sub-fields but got ${infoFields.length}: 
${infoFields.mkString(", ")}")
+
+        checkAnswer(
+          sql("SELECT id, info.key1, info.key2, info.key3 FROM t ORDER BY id"),
+          Seq(Row(1, "a", "b", null), Row(2, "A", "B", "C")))
+      }
+    }
+  }
+
+  test("Merge into with merge-schema: source uppercase should not create 
duplicate columns") {
+    withTable("t") {
+      sql("""CREATE TABLE t (id INT, name STRING, value INT)
+            | USING paimon
+            | TBLPROPERTIES ('primary-key' = 'id', 'bucket' = 
'1')""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'a', 10), (2, 'b', 20)")
+
+      spark
+        .sql("SELECT 1 AS ID, 'A' AS NAME, 100 AS VALUE UNION ALL SELECT 3 AS 
ID, 'c' AS NAME, 30 AS VALUE")
+        .createOrReplaceTempView("s")
+
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        sql("""MERGE INTO t USING s ON t.id = s.ID
+              | WHEN MATCHED THEN UPDATE SET *
+              | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+      }
+
+      val columnNames = spark.table("t").schema.fieldNames.toSeq
+      assert(
+        columnNames.size == 3,
+        s"Expected 3 columns but got ${columnNames.size}: 
${columnNames.mkString(", ")}")
+
+      checkAnswer(
+        sql("SELECT id, name, value FROM t ORDER BY id"),
+        Seq(Row(1, "A", 100), Row(2, "b", 20), Row(3, "c", 30)))
+    }
+  }
+
+  test("Merge into with merge-schema: extra column with case-mismatched 
existing columns") {
+    withTable("t") {
+      sql("""CREATE TABLE t (id INT, name STRING)
+            | USING paimon
+            | TBLPROPERTIES ('primary-key' = 'id', 'bucket' = 
'1')""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'a'), (2, 'b')")
+
+      spark
+        .sql("SELECT 1 AS ID, 'A' AS NAME, 100 AS extra_col UNION ALL SELECT 3 
AS ID, 'c' AS NAME, 30 AS extra_col")
+        .createOrReplaceTempView("s")
+
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        sql("""MERGE INTO t USING s ON t.id = s.ID
+              | WHEN MATCHED THEN UPDATE SET *
+              | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+      }
+
+      val columnNames = spark.table("t").schema.fieldNames.toSeq
+      assert(
+        columnNames.size == 3,
+        s"Expected 3 columns (id, name, extra_col) but got 
${columnNames.size}: ${columnNames.mkString(", ")}")
+
+      checkAnswer(
+        sql("SELECT id, name, extra_col FROM t ORDER BY id"),
+        Seq(Row(1, "A", 100), Row(2, "b", null), Row(3, "c", 30)))
+    }
+  }
+
+  test("Merge into with merge-schema: only case differs, schema should not 
change") {
+    withTable("t") {
+      sql("""CREATE TABLE t (id INT, name STRING)
+            | USING paimon
+            | TBLPROPERTIES ('primary-key' = 'id', 'bucket' = 
'1')""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'a')")
+
+      spark.sql("SELECT 1 AS ID, 'A' AS NAME").createOrReplaceTempView("s")
+
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        sql("""MERGE INTO t USING s ON t.id = s.ID
+              | WHEN MATCHED THEN UPDATE SET *""".stripMargin)
+      }
+
+      val columnNames = spark.table("t").schema.fieldNames
+      assert(
+        columnNames.toSeq == Seq("id", "name"),
+        s"Schema changed unexpectedly: ${columnNames.mkString(", ")}")
+
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, "A")))
+    }
+  }
+
+  test("Merge into with merge-schema: nested struct fields case mismatch") {
+    withTable("t") {
+      sql("""CREATE TABLE t (id INT, info STRUCT<key1: STRING, key2: STRING>)
+            | USING paimon
+            | TBLPROPERTIES ('primary-key' = 'id', 'bucket' = 
'1')""".stripMargin)
+      sql("INSERT INTO t VALUES (1, named_struct('key1', 'a', 'key2', 'b'))")
+
+      spark
+        .sql("SELECT 1 AS id, named_struct('KEY1', 'A', 'KEY2', 'B', 'key3', 
'C') AS info")
+        .createOrReplaceTempView("s")
+
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        sql("""MERGE INTO t USING s ON t.id = s.id
+              | WHEN MATCHED THEN UPDATE SET *
+              | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+      }
+
+      val infoFields = spark
+        .table("t")
+        .schema("info")
+        .dataType
+        .asInstanceOf[org.apache.spark.sql.types.StructType]
+        .fieldNames
+      assert(
+        infoFields.length == 3,
+        s"Expected 3 sub-fields but got ${infoFields.length}: 
${infoFields.mkString(", ")}")
+
+      checkAnswer(
+        sql("SELECT id, info.key1, info.key2, info.key3 FROM t"),
+        Seq(Row(1, "A", "B", "C")))
+    }
+  }
+
+  test("Merge into with merge-schema: repeated writes with alternating case") {
+    withTable("t") {
+      sql("""CREATE TABLE t (id INT, name STRING)
+            | USING paimon
+            | TBLPROPERTIES ('primary-key' = 'id', 'bucket' = 
'1')""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'a')")
+
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        spark.sql("SELECT 2 AS ID, 'b' AS NAME").createOrReplaceTempView("s1")
+        sql("""MERGE INTO t USING s1 ON t.id = s1.ID
+              | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+
+        spark.sql("SELECT 3 AS Id, 'c' AS NaMe").createOrReplaceTempView("s2")
+        sql("""MERGE INTO t USING s2 ON t.id = s2.Id
+              | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+      }
+
+      val columnNames = spark.table("t").schema.fieldNames
+      assert(columnNames.length == 2, s"Expected 2 columns but got: 
${columnNames.mkString(", ")}")
+
+      checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, "a"), Row(2, 
"b"), Row(3, "c")))
+    }
+  }
+
+  test("Merge into without merge-schema: case-insensitive matching works 
normally") {
+    withTable("t") {
+      sql("""CREATE TABLE t (id INT, name STRING)
+            | USING paimon
+            | TBLPROPERTIES ('primary-key' = 'id', 'bucket' = 
'1')""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'a'), (2, 'b')")
+
+      spark
+        .sql("SELECT 1 AS ID, 'A' AS NAME UNION ALL SELECT 3 AS ID, 'c' AS 
NAME")
+        .createOrReplaceTempView("s")
+
+      sql("""MERGE INTO t USING s ON t.id = s.ID
+            | WHEN MATCHED THEN UPDATE SET *
+            | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+
+      val schema = spark.table("t").schema
+      assert(
+        schema.fieldNames.length == 2,
+        s"Expected 2 columns but got: ${schema.fieldNames.mkString(", ")}")
+
+      checkAnswer(
+        sql("SELECT id, name FROM t ORDER BY id"),
+        Seq(Row(1, "A"), Row(2, "b"), Row(3, "c")))
+    }
+  }
+
+  test("Merge into with merge-schema: case-sensitive mode treats different 
case as new columns") {
+    withTable("t") {
+      sql("""CREATE TABLE t (id INT, name STRING)
+            | USING paimon
+            | TBLPROPERTIES ('primary-key' = 'id', 'bucket' = 
'1')""".stripMargin)
+      sql("INSERT INTO t VALUES (1, 'a')")
+
+      spark
+        .sql("SELECT 1 AS ID, 'A' AS NAME, 100 AS extra")
+        .createOrReplaceTempView("s")
+
+      withSparkSQLConf(
+        "spark.paimon.write.merge-schema" -> "true",
+        "spark.sql.caseSensitive" -> "true") {
+        sql("""MERGE INTO t USING s ON t.id = s.ID
+              | WHEN MATCHED THEN UPDATE SET *
+              | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+      }
+
+      val columnNames = spark.table("t").schema.fieldNames
+      assert(
+        columnNames.length == 5,
+        s"Expected 5 columns (id, name, ID, NAME, extra) but got 
${columnNames.length}: ${columnNames.mkString(", ")}")
+    }
+  }
+
   test("Write merge schema: array of struct missing nested field by 
dataframe") {
     withTable("t") {
       sql("""
@@ -546,4 +848,22 @@ class WriteMergeSchemaTest extends PaimonSparkTestBase {
         Seq(Row(Seq(1L, 2L), 1L, 2L, Seq(Row(10, "v2", "v3", "v4", "v5", 
null)))))
     }
   }
+
+  test("Write merge schema: case-sensitive mode treats different case as new 
columns") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, name STRING)")
+      sql("INSERT INTO t VALUES (1, 'a')")
+
+      withSparkSQLConf(
+        "spark.paimon.write.merge-schema" -> "true",
+        "spark.sql.caseSensitive" -> "true") {
+        sql("INSERT INTO t BY NAME SELECT 2 AS ID, 'b' AS NAME, 100 AS extra")
+      }
+
+      val columnNames = spark.table("t").schema.fieldNames
+      assert(
+        columnNames.length == 5,
+        s"Expected 5 columns (id, name, ID, NAME, extra) but got 
${columnNames.length}: ${columnNames.mkString(", ")}")
+    }
+  }
 }


Reply via email to