This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 22f3fd344c [spark] Fix schema merge creating duplicate columns with
case-mismatched names (#8034)
22f3fd344c is described below
commit 22f3fd344cb86bb39d8149b764109cdb5c24a70c
Author: Zouxxyy <[email protected]>
AuthorDate: Fri May 29 21:45:20 2026 +0800
[spark] Fix schema merge creating duplicate columns with case-mismatched
names (#8034)
When `merge-schema` is enabled and source column names differ only in
case from target columns (e.g. source `ID` vs target `id`),
`SchemaMergingUtils` treats them as new columns due to case-sensitive
`HashMap` lookups. This causes duplicate columns in the schema and makes
the table unreadable (`Field names must be unique`).
This PR adds a `caseSensitive` parameter through the schema merge chain
(`SchemaMergingUtils` → `SchemaManager` → `FileStore` → Spark
`SchemaHelper`), using `TreeMap(String.CASE_INSENSITIVE_ORDER)` for
field matching when `caseSensitive=false`. Spark callers pass
`spark.sql.caseSensitive` config (default `false`).
Affects both `INSERT ... merge-schema=true` and `MERGE INTO ...
merge-schema=true` paths.
---
.../java/org/apache/paimon/AbstractFileStore.java | 4 +-
.../src/main/java/org/apache/paimon/FileStore.java | 2 +-
.../paimon/privilege/PrivilegedFileStore.java | 4 +-
.../org/apache/paimon/schema/SchemaManager.java | 7 +-
.../apache/paimon/schema/SchemaMergingUtils.java | 154 ++++++----
.../paimon/operation/FileStoreCommitTest.java | 2 +-
.../paimon/schema/SchemaMergingUtilsTest.java | 333 +++++++++++++--------
.../analysis/MergeSchemaEvolutionHelper.scala | 3 +-
.../paimon/spark/commands/SchemaHelper.scala | 21 +-
.../paimon/spark/sql/WriteMergeSchemaTest.scala | 320 ++++++++++++++++++++
10 files changed, 644 insertions(+), 206 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index cb3bae1dbe..e8153aee52 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -271,9 +271,9 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
}
@Override
- public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast,
boolean caseSensitive) {
return schemaManager.mergeSchema(
- rowType, allowExplicitCast,
catalogEnvironment.schemaModification());
+ rowType, allowExplicitCast, caseSensitive,
catalogEnvironment.schemaModification());
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 98905b47e5..7a917adde8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -117,7 +117,7 @@ public interface FileStore<T> {
ServiceManager newServiceManager();
- boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
+ boolean mergeSchema(RowType rowType, boolean allowExplicitCast, boolean
caseSensitive);
List<TagCallback> createTagCallbacks(FileStoreTable table);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index 6ced52bbd1..f4e6922b98 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -219,9 +219,9 @@ public class PrivilegedFileStore<T> implements FileStore<T>
{
}
@Override
- public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast,
boolean caseSensitive) {
privilegeChecker.assertCanInsert(identifier);
- return wrapped.mergeSchema(rowType, allowExplicitCast);
+ return wrapped.mergeSchema(rowType, allowExplicitCast, caseSensitive);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index bde20d66ab..8a4128e58d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -753,19 +753,22 @@ public class SchemaManager implements Serializable {
public boolean mergeSchema(
RowType rowType,
boolean allowExplicitCast,
+ boolean caseSensitive,
@Nullable SchemaModification schemaModification) {
TableSchema current =
latest().orElseThrow(
() ->
new RuntimeException(
"It requires that the current
schema to exist when calling 'mergeSchema'"));
- TableSchema update = SchemaMergingUtils.mergeSchemas(current, rowType,
allowExplicitCast);
+ TableSchema update =
+ SchemaMergingUtils.mergeSchemas(current, rowType,
allowExplicitCast, caseSensitive);
if (current.equals(update)) {
return false;
}
try {
if (schemaModification != null) {
- List<SchemaChange> changes =
SchemaMergingUtils.diffSchemaChanges(current, update);
+ List<SchemaChange> changes =
+ SchemaMergingUtils.diffSchemaChanges(current, update,
caseSensitive);
schemaModification.alterSchema(changes);
return true;
} else {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
index 990087d792..d007b96db0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
@@ -30,11 +30,11 @@ import org.apache.paimon.types.ReassignFieldId;
import org.apache.paimon.types.RowType;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Function;
-import java.util.stream.Collectors;
/** The util class for merging the schemas. */
public class SchemaMergingUtils {
@@ -43,7 +43,10 @@ public class SchemaMergingUtils {
public static final String MAP_VALUE_FIELD_NAME = "value";
public static TableSchema mergeSchemas(
- TableSchema currentTableSchema, RowType targetType, boolean
allowExplicitCast) {
+ TableSchema currentTableSchema,
+ RowType targetType,
+ boolean allowExplicitCast,
+ boolean caseSensitive) {
RowType currentType = currentTableSchema.logicalRowType();
if (currentType.equals(targetType)) {
return currentTableSchema;
@@ -51,7 +54,8 @@ public class SchemaMergingUtils {
AtomicInteger highestFieldId = new
AtomicInteger(currentTableSchema.highestFieldId());
RowType newRowType =
- mergeSchemas(currentType, targetType, highestFieldId,
allowExplicitCast);
+ mergeSchemas(
+ currentType, targetType, highestFieldId,
allowExplicitCast, caseSensitive);
if (newRowType.equals(currentType)) {
// It happens if the `targetType` only changes `nullability` but
we always respect the
// current's.
@@ -72,8 +76,10 @@ public class SchemaMergingUtils {
RowType tableSchema,
RowType dataSchema,
AtomicInteger highestFieldId,
- boolean allowExplicitCast) {
- return (RowType) merge(tableSchema, dataSchema, highestFieldId,
allowExplicitCast);
+ boolean allowExplicitCast,
+ boolean caseSensitive) {
+ return (RowType)
+ merge(tableSchema, dataSchema, highestFieldId,
allowExplicitCast, caseSensitive);
}
/**
@@ -92,7 +98,8 @@ public class SchemaMergingUtils {
DataType base0,
DataType update0,
AtomicInteger highestFieldId,
- boolean allowExplicitCast) {
+ boolean allowExplicitCast,
+ boolean caseSensitive) {
// Here we try to merge the base0 and update0 without regard to the
nullability,
// and set the base0's nullability to the return's.
DataType base = base0.copy(true);
@@ -103,45 +110,37 @@ public class SchemaMergingUtils {
} else if (base instanceof RowType && update instanceof RowType) {
List<DataField> baseFields = ((RowType) base).getFields();
List<DataField> updateFields = ((RowType) update).getFields();
- Map<String, DataField> updateFieldMap =
- updateFields.stream()
- .collect(Collectors.toMap(DataField::name,
Function.identity()));
- List<DataField> updatedFields =
- baseFields.stream()
- .map(
- baseField -> {
- if
(updateFieldMap.containsKey(baseField.name())) {
- DataField updateField =
-
updateFieldMap.get(baseField.name());
- DataType updatedDataType =
- merge(
- baseField.type(),
- updateField.type(),
- highestFieldId,
- allowExplicitCast);
- return new DataField(
- baseField.id(),
- baseField.name(),
- updatedDataType,
- baseField.description(),
- baseField.defaultValue());
- } else {
- return baseField;
- }
- })
- .collect(Collectors.toList());
+ Map<String, DataField> updateFieldMap =
buildFieldMap(updateFields, caseSensitive);
+ List<DataField> updatedFields = new ArrayList<>();
+ for (DataField baseField : baseFields) {
+ if (updateFieldMap.containsKey(baseField.name())) {
+ DataField updateField =
updateFieldMap.get(baseField.name());
+ DataType updatedDataType =
+ merge(
+ baseField.type(),
+ updateField.type(),
+ highestFieldId,
+ allowExplicitCast,
+ caseSensitive);
+ updatedFields.add(
+ new DataField(
+ baseField.id(),
+ baseField.name(),
+ updatedDataType,
+ baseField.description(),
+ baseField.defaultValue()));
+ } else {
+ updatedFields.add(baseField);
+ }
+ }
- Map<String, DataField> baseFieldMap =
- baseFields.stream()
- .collect(Collectors.toMap(DataField::name,
Function.identity()));
- List<DataField> newFields =
- updateFields.stream()
- .filter(field ->
!baseFieldMap.containsKey(field.name()))
- .map(field -> assignIdForNewField(field,
highestFieldId))
- .map(field -> field.copy(true))
- .collect(Collectors.toList());
+ Map<String, DataField> baseFieldMap = buildFieldMap(baseFields,
caseSensitive);
+ for (DataField field : updateFields) {
+ if (!baseFieldMap.containsKey(field.name())) {
+ updatedFields.add(assignIdForNewField(field,
highestFieldId).copy(true));
+ }
+ }
- updatedFields.addAll(newFields);
return new RowType(base0.isNullable(), updatedFields);
} else if (base instanceof MapType && update instanceof MapType) {
return new MapType(
@@ -150,12 +149,14 @@ public class SchemaMergingUtils {
((MapType) base).getKeyType(),
((MapType) update).getKeyType(),
highestFieldId,
- allowExplicitCast),
+ allowExplicitCast,
+ caseSensitive),
merge(
((MapType) base).getValueType(),
((MapType) update).getValueType(),
highestFieldId,
- allowExplicitCast));
+ allowExplicitCast,
+ caseSensitive));
} else if (base instanceof ArrayType && update instanceof ArrayType) {
return new ArrayType(
base0.isNullable(),
@@ -163,7 +164,8 @@ public class SchemaMergingUtils {
((ArrayType) base).getElementType(),
((ArrayType) update).getElementType(),
highestFieldId,
- allowExplicitCast));
+ allowExplicitCast,
+ caseSensitive));
} else if (base instanceof MultisetType && update instanceof
MultisetType) {
return new MultisetType(
base0.isNullable(),
@@ -171,7 +173,8 @@ public class SchemaMergingUtils {
((MultisetType) base).getElementType(),
((MultisetType) update).getElementType(),
highestFieldId,
- allowExplicitCast));
+ allowExplicitCast,
+ caseSensitive));
} else if (base instanceof DecimalType && update instanceof
DecimalType) {
if (((DecimalType) base).getScale() == ((DecimalType)
update).getScale()) {
return new DecimalType(
@@ -243,13 +246,14 @@ public class SchemaMergingUtils {
* This supports detecting added columns and type changes (including
nested structs).
*/
public static List<SchemaChange> diffSchemaChanges(
- TableSchema oldSchema, TableSchema newSchema) {
+ TableSchema oldSchema, TableSchema newSchema, boolean
caseSensitive) {
List<SchemaChange> changes = new ArrayList<>();
diffFields(
oldSchema.logicalRowType().getFields(),
newSchema.logicalRowType().getFields(),
new String[0],
- changes);
+ changes,
+ caseSensitive);
return changes;
}
@@ -257,9 +261,9 @@ public class SchemaMergingUtils {
List<DataField> oldFields,
List<DataField> newFields,
String[] parentNames,
- List<SchemaChange> changes) {
- Map<String, DataField> oldFieldMap =
- oldFields.stream().collect(Collectors.toMap(DataField::name,
Function.identity()));
+ List<SchemaChange> changes,
+ boolean caseSensitive) {
+ Map<String, DataField> oldFieldMap = buildFieldMap(oldFields,
caseSensitive);
for (DataField newField : newFields) {
String[] fieldNames = appendFieldName(parentNames,
newField.name());
@@ -271,7 +275,7 @@ public class SchemaMergingUtils {
fieldNames, newField.type(),
newField.description(), null));
} else if (!oldField.type().equals(newField.type())
&& !diffNestedTypeChanges(
- oldField.type(), newField.type(), fieldNames,
changes)) {
+ oldField.type(), newField.type(), fieldNames,
changes, caseSensitive)) {
changes.add(SchemaChange.updateColumnType(fieldNames,
newField.type(), true));
}
}
@@ -282,9 +286,15 @@ public class SchemaMergingUtils {
* changes. Returns false to let the caller fall back to {@link
SchemaChange.UpdateColumnType}.
*/
private static boolean diffNestedTypeChanges(
- DataType oldType, DataType newType, String[] fieldNames,
List<SchemaChange> changes) {
+ DataType oldType,
+ DataType newType,
+ String[] fieldNames,
+ List<SchemaChange> changes,
+ boolean caseSensitive) {
List<SchemaChange> stagedChanges = new ArrayList<>();
- boolean handled = diffNestedTypeChangesInner(oldType, newType,
fieldNames, stagedChanges);
+ boolean handled =
+ diffNestedTypeChangesInner(
+ oldType, newType, fieldNames, stagedChanges,
caseSensitive);
if (handled) {
changes.addAll(stagedChanges);
}
@@ -292,21 +302,26 @@ public class SchemaMergingUtils {
}
private static boolean diffNestedTypeChangesInner(
- DataType oldType, DataType newType, String[] fieldNames,
List<SchemaChange> changes) {
+ DataType oldType,
+ DataType newType,
+ String[] fieldNames,
+ List<SchemaChange> changes,
+ boolean caseSensitive) {
if (oldType instanceof RowType && newType instanceof RowType) {
List<DataField> oldFields = ((RowType) oldType).getFields();
List<DataField> newFields = ((RowType) newType).getFields();
- if (hasRemovedFields(oldFields, newFields)) {
+ if (hasRemovedFields(oldFields, newFields, caseSensitive)) {
return false;
}
- diffFields(oldFields, newFields, fieldNames, changes);
+ diffFields(oldFields, newFields, fieldNames, changes,
caseSensitive);
return true;
} else if (oldType instanceof ArrayType && newType instanceof
ArrayType) {
return diffNestedTypeChanges(
((ArrayType) oldType).getElementType(),
((ArrayType) newType).getElementType(),
appendFieldName(fieldNames, ARRAY_ELEMENT_FIELD_NAME),
- changes);
+ changes,
+ caseSensitive);
} else if (oldType instanceof MapType && newType instanceof MapType) {
MapType oldMapType = (MapType) oldType;
MapType newMapType = (MapType) newType;
@@ -317,14 +332,15 @@ public class SchemaMergingUtils {
oldMapType.getValueType(),
newMapType.getValueType(),
appendFieldName(fieldNames, MAP_VALUE_FIELD_NAME),
- changes);
+ changes,
+ caseSensitive);
}
return false;
}
- private static boolean hasRemovedFields(List<DataField> oldFields,
List<DataField> newFields) {
- Map<String, DataField> newFieldMap =
- newFields.stream().collect(Collectors.toMap(DataField::name,
Function.identity()));
+ private static boolean hasRemovedFields(
+ List<DataField> oldFields, List<DataField> newFields, boolean
caseSensitive) {
+ Map<String, DataField> newFieldMap = buildFieldMap(newFields,
caseSensitive);
for (DataField oldField : oldFields) {
if (!newFieldMap.containsKey(oldField.name())) {
return true;
@@ -333,6 +349,16 @@ public class SchemaMergingUtils {
return false;
}
+ private static Map<String, DataField> buildFieldMap(
+ List<DataField> fields, boolean caseSensitive) {
+ Map<String, DataField> map =
+ caseSensitive ? new HashMap<>() : new
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+ for (DataField field : fields) {
+ map.put(field.name(), field);
+ }
+ return map;
+ }
+
private static String[] appendFieldName(String[] parentNames, String
fieldName) {
String[] result = new String[parentNames.length + 1];
System.arraycopy(parentNames, 0, result, 0, parentNames.length);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 71eb081de8..496242db70 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -807,7 +807,7 @@ public class FileStoreCommitTest {
ArrayList<DataField> newFields =
new
ArrayList<>(TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields());
newFields.add(new DataField(-1, "newField", DataTypes.INT()));
- store.mergeSchema(new RowType(false, newFields), true);
+ store.mergeSchema(new RowType(false, newFields), true, true);
store.commitData(generateDataList(10), gen::getPartition, kv -> 0);
readStats = statsFileHandler.readStats();
assertThat(readStats).isEmpty();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
index ff04a167a7..050a4f12d7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
@@ -83,7 +83,7 @@ public class SchemaMergingUtilsTest {
DataField f = new DataField(-1, "f", fDataType);
RowType t = new RowType(Lists.newArrayList(a, b, d, f));
- TableSchema merged = SchemaMergingUtils.mergeSchemas(current, t,
false);
+ TableSchema merged = SchemaMergingUtils.mergeSchemas(current, t,
false, true);
assertThat(merged.id()).isEqualTo(1);
assertThat(merged.highestFieldId()).isEqualTo(6);
assertThat(merged.primaryKeys()).containsExactlyInAnyOrder("a", "d");
@@ -110,7 +110,7 @@ public class SchemaMergingUtilsTest {
// fake the RowType of data with different field sequences
RowType t = new RowType(Lists.newArrayList(b, a));
- TableSchema merged = SchemaMergingUtils.mergeSchemas(current, t,
false);
+ TableSchema merged = SchemaMergingUtils.mergeSchemas(current, t,
false, true);
assertThat(merged.id()).isEqualTo(0);
}
@@ -132,7 +132,7 @@ public class SchemaMergingUtilsTest {
// Case 1: an additional field.
DataField e = new DataField(-1, "e", new DateType());
RowType t1 = new RowType(Lists.newArrayList(a, b, c, d, e));
- RowType r1 = (RowType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false);
+ RowType r1 = (RowType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false, true);
assertThat(highestFieldId.get()).isEqualTo(4);
assertThat(r1.isNullable()).isTrue();
assertThat(r1.getFieldCount()).isEqualTo(5);
@@ -143,7 +143,7 @@ public class SchemaMergingUtilsTest {
// Case 2: two missing fields.
RowType t2 = new RowType(Lists.newArrayList(a, c, e));
- RowType r2 = SchemaMergingUtils.mergeSchemas(r1, t2, highestFieldId,
false);
+ RowType r2 = SchemaMergingUtils.mergeSchemas(r1, t2, highestFieldId,
false, true);
assertThat(highestFieldId.get()).isEqualTo(4);
assertThat(r2.getFieldCount()).isEqualTo(5);
assertThat(r2.getTypeAt(3)).isEqualTo(d.type());
@@ -155,7 +155,7 @@ public class SchemaMergingUtilsTest {
RowType fDataType = new RowType(Lists.newArrayList(f1, f2));
DataField f = new DataField(-1, "f", fDataType);
RowType t3 = new RowType(Lists.newArrayList(a, b, c, d, f));
- RowType r3 = (RowType) SchemaMergingUtils.merge(r2, t3,
highestFieldId, false);
+ RowType r3 = (RowType) SchemaMergingUtils.merge(r2, t3,
highestFieldId, false, true);
assertThat(highestFieldId.get()).isEqualTo(7);
assertThat(r3.getFieldCount()).isEqualTo(6);
RowType expectedFDataType = new
RowType(Lists.newArrayList(f1.newId(5), f2.newId(6)));
@@ -168,7 +168,7 @@ public class SchemaMergingUtilsTest {
RowType newFDataType = new RowType(Lists.newArrayList(f1, f2, f3));
DataField newF = new DataField(-1, "f", newFDataType);
RowType t4 = new RowType(Lists.newArrayList(a, b, c, d, e, newF));
- RowType r4 = SchemaMergingUtils.mergeSchemas(r3, t4, highestFieldId,
false);
+ RowType r4 = SchemaMergingUtils.mergeSchemas(r3, t4, highestFieldId,
false, true);
assertThat(highestFieldId.get()).isEqualTo(8);
assertThat(r4.getFieldCount()).isEqualTo(6);
RowType newExpectedFDataType =
@@ -178,14 +178,14 @@ public class SchemaMergingUtilsTest {
// Case 5: a field that isn't compatible with the existing one.
DataField newA = new DataField(-1, "a", new SmallIntType());
RowType t5 = new RowType(Lists.newArrayList(newA, b, c, d, e, newF));
- assertThatThrownBy(() -> SchemaMergingUtils.merge(r4, t5,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(r4, t5,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// Case 6: all new-coming fields
DataField g = new DataField(-1, "g", new TimeType());
DataField h = new DataField(-1, "h", new TimeType());
RowType t6 = new RowType(Lists.newArrayList(g, h));
- RowType r6 = SchemaMergingUtils.mergeSchemas(r4, t6, highestFieldId,
false);
+ RowType r6 = SchemaMergingUtils.mergeSchemas(r4, t6, highestFieldId,
false, true);
assertThat(highestFieldId.get()).isEqualTo(10);
assertThat(r6.getFieldCount()).isEqualTo(8);
}
@@ -261,7 +261,8 @@ public class SchemaMergingUtilsTest {
new HashMap<>(),
"");
- List<SchemaChange> changes =
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+ List<SchemaChange> changes =
+ SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema,
true);
assertThat(changes).hasSize(2);
SchemaChange.AddColumn addArrayNestedField = (SchemaChange.AddColumn)
changes.get(0);
@@ -317,7 +318,8 @@ public class SchemaMergingUtilsTest {
new HashMap<>(),
"");
- List<SchemaChange> changes =
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+ List<SchemaChange> changes =
+ SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema,
true);
assertThat(changes).hasSize(1);
SchemaChange.UpdateColumnType updateMapType =
@@ -360,7 +362,8 @@ public class SchemaMergingUtilsTest {
new HashMap<>(),
"");
- List<SchemaChange> changes =
SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema);
+ List<SchemaChange> changes =
+ SchemaMergingUtils.diffSchemaChanges(oldSchema, newSchema,
true);
assertThat(changes).hasSize(1);
SchemaChange.UpdateColumnType updateItemsType =
@@ -377,23 +380,25 @@ public class SchemaMergingUtilsTest {
// the element types are same.
DataType t1 = new ArrayType(true, new IntType());
- ArrayType r1 = (ArrayType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false);
+ ArrayType r1 =
+ (ArrayType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false, true);
assertThat(r1.isNullable()).isFalse();
assertThat(r1.getElementType() instanceof IntType).isTrue();
// the element types aren't same, but can be evolved safety.
DataType t2 = new ArrayType(true, new BigIntType());
- ArrayType r2 = (ArrayType) SchemaMergingUtils.merge(source, t2,
highestFieldId, false);
+ ArrayType r2 =
+ (ArrayType) SchemaMergingUtils.merge(source, t2,
highestFieldId, false, true);
assertThat(r2.isNullable()).isFalse();
assertThat(r2.getElementType() instanceof BigIntType).isTrue();
// the element types aren't same, and can't be evolved safety.
DataType t3 = new ArrayType(true, new SmallIntType());
- assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// the value type of target's isn't same to the source's, but the
source type can be cast to
// the target type explicitly.
- ArrayType r3 = (ArrayType) SchemaMergingUtils.merge(source, t3,
highestFieldId, true);
+ ArrayType r3 = (ArrayType) SchemaMergingUtils.merge(source, t3,
highestFieldId, true, true);
assertThat(r3.isNullable()).isFalse();
assertThat(r3.getElementType() instanceof SmallIntType).isTrue();
}
@@ -406,25 +411,25 @@ public class SchemaMergingUtilsTest {
// both the key and value types are same to the source's.
DataType t1 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new
IntType());
- MapType r1 = (MapType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false);
+ MapType r1 = (MapType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false, true);
assertThat(r1.isNullable()).isTrue();
assertThat(r1.getKeyType() instanceof VarCharType).isTrue();
assertThat(r1.getValueType() instanceof IntType).isTrue();
// the value type of target's isn't same to the source's, but can be
evolved safety.
DataType t2 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new
DoubleType());
- MapType r2 = (MapType) SchemaMergingUtils.merge(source, t2,
highestFieldId, false);
+ MapType r2 = (MapType) SchemaMergingUtils.merge(source, t2,
highestFieldId, false, true);
assertThat(r2.isNullable()).isTrue();
assertThat(r2.getKeyType() instanceof VarCharType).isTrue();
assertThat(r2.getValueType() instanceof DoubleType).isTrue();
// the value type of target's isn't same to the source's, and can't be
evolved safety.
DataType t3 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new
SmallIntType());
- assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// the value type of target's isn't same to the source's, but the
source type can be cast to
// the target type explicitly.
- MapType r3 = (MapType) SchemaMergingUtils.merge(source, t3,
highestFieldId, true);
+ MapType r3 = (MapType) SchemaMergingUtils.merge(source, t3,
highestFieldId, true, true);
assertThat(r3.isNullable()).isTrue();
assertThat(r3.getKeyType() instanceof VarCharType).isTrue();
assertThat(r3.getValueType() instanceof SmallIntType).isTrue();
@@ -439,24 +444,25 @@ public class SchemaMergingUtilsTest {
// the element types are same.
DataType t1 = new MultisetType(true, new IntType());
MultisetType r1 =
- (MultisetType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false);
+ (MultisetType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false, true);
assertThat(r1.isNullable()).isFalse();
assertThat(r1.getElementType() instanceof IntType).isTrue();
// the element types aren't same, but can be evolved safety.
DataType t2 = new MultisetType(true, new BigIntType());
MultisetType r2 =
- (MultisetType) SchemaMergingUtils.merge(source, t2,
highestFieldId, false);
+ (MultisetType) SchemaMergingUtils.merge(source, t2,
highestFieldId, false, true);
assertThat(r2.isNullable()).isFalse();
assertThat(r2.getElementType() instanceof BigIntType).isTrue();
// the element types aren't same, and can't be evolved safety.
DataType t3 = new MultisetType(true, new SmallIntType());
- assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(source, t3,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// the value type of target's isn't same to the source's, but the
source type can be cast to
// the target type explicitly.
- MultisetType r3 = (MultisetType) SchemaMergingUtils.merge(source, t3,
highestFieldId, true);
+ MultisetType r3 =
+ (MultisetType) SchemaMergingUtils.merge(source, t3,
highestFieldId, true, true);
assertThat(r3.isNullable()).isFalse();
assertThat(r3.getElementType() instanceof SmallIntType).isTrue();
}
@@ -467,26 +473,29 @@ public class SchemaMergingUtilsTest {
DataType s1 = new DecimalType();
DataType t1 = new DecimalType(10, 0);
- DecimalType r1 = (DecimalType) SchemaMergingUtils.merge(s1, t1,
highestFieldId, false);
+ DecimalType r1 =
+ (DecimalType) SchemaMergingUtils.merge(s1, t1, highestFieldId,
false, true);
assertThat(r1.isNullable()).isTrue();
assertThat(r1.getPrecision()).isEqualTo(DecimalType.DEFAULT_PRECISION);
assertThat(r1.getScale()).isEqualTo(DecimalType.DEFAULT_SCALE);
DataType s2 = new DecimalType(5, 2);
DataType t2 = new DecimalType(7, 3);
- assertThatThrownBy(() -> SchemaMergingUtils.merge(s2, t2,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(s2, t2,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
DataType s3 = new DecimalType(false, 5, 2);
DataType t3 = new DecimalType(7, 2);
- DecimalType r3 = (DecimalType) SchemaMergingUtils.merge(s3, t3,
highestFieldId, false);
+ DecimalType r3 =
+ (DecimalType) SchemaMergingUtils.merge(s3, t3, highestFieldId,
false, true);
assertThat(r3.isNullable()).isFalse();
assertThat(r3.getPrecision()).isEqualTo(7);
assertThat(r3.getScale()).isEqualTo(2);
DataType s4 = new DecimalType(7, 2);
DataType t4 = new DecimalType(5, 2);
- DecimalType r4 = (DecimalType) SchemaMergingUtils.merge(s4, t4,
highestFieldId, false);
+ DecimalType r4 =
+ (DecimalType) SchemaMergingUtils.merge(s4, t4, highestFieldId,
false, true);
assertThat(r4.isNullable()).isTrue();
assertThat(r4.getPrecision()).isEqualTo(7);
assertThat(r4.getScale()).isEqualTo(2);
@@ -495,10 +504,12 @@ public class SchemaMergingUtilsTest {
DataType dcmSource = new DecimalType();
DataType iTarget = new IntType();
assertThatThrownBy(
- () -> SchemaMergingUtils.merge(dcmSource, iTarget,
highestFieldId, false))
+ () ->
+ SchemaMergingUtils.merge(
+ dcmSource, iTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// DecimalType -> Other Numeric Type with allowExplicitCast = true
- DataType res = SchemaMergingUtils.merge(dcmSource, iTarget,
highestFieldId, true);
+ DataType res = SchemaMergingUtils.merge(dcmSource, iTarget,
highestFieldId, true, true);
assertThat(res instanceof IntType).isTrue();
}
@@ -509,118 +520,122 @@ public class SchemaMergingUtilsTest {
// BinaryType
DataType s1 = new BinaryType(10);
DataType t1 = new BinaryType(10);
- BinaryType r1 = (BinaryType) SchemaMergingUtils.merge(s1, t1,
highestFieldId, false);
+ BinaryType r1 = (BinaryType) SchemaMergingUtils.merge(s1, t1,
highestFieldId, false, true);
assertThat(r1.getLength()).isEqualTo(10);
DataType s2 = new BinaryType(2);
DataType t2 = new BinaryType();
// smaller length
- assertThatThrownBy(() -> SchemaMergingUtils.merge(s2, t2,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(s2, t2,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// smaller length with allowExplicitCast = true
- BinaryType r2 = (BinaryType) SchemaMergingUtils.merge(s2, t2,
highestFieldId, true);
+ BinaryType r2 = (BinaryType) SchemaMergingUtils.merge(s2, t2,
highestFieldId, true, true);
assertThat(r2.getLength()).isEqualTo(BinaryType.DEFAULT_LENGTH);
// bigger length
DataType t3 = new BinaryType(5);
- BinaryType r3 = (BinaryType) SchemaMergingUtils.merge(s2, t3,
highestFieldId, false);
+ BinaryType r3 = (BinaryType) SchemaMergingUtils.merge(s2, t3,
highestFieldId, false, true);
assertThat(r3.getLength()).isEqualTo(5);
// VarCharType
DataType s4 = new VarCharType();
DataType t4 = new VarCharType(1);
- VarCharType r4 = (VarCharType) SchemaMergingUtils.merge(s4, t4,
highestFieldId, false);
+ VarCharType r4 =
+ (VarCharType) SchemaMergingUtils.merge(s4, t4, highestFieldId,
false, true);
assertThat(r4.getLength()).isEqualTo(VarCharType.DEFAULT_LENGTH);
DataType s5 = new VarCharType(2);
DataType t5 = new VarCharType();
// smaller length
- assertThatThrownBy(() -> SchemaMergingUtils.merge(s5, t5,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(s5, t5,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// smaller length with allowExplicitCast = true
- VarCharType r5 = (VarCharType) SchemaMergingUtils.merge(s5, t5,
highestFieldId, true);
+ VarCharType r5 = (VarCharType) SchemaMergingUtils.merge(s5, t5,
highestFieldId, true, true);
assertThat(r5.getLength()).isEqualTo(VarCharType.DEFAULT_LENGTH);
// bigger length
DataType t6 = new VarCharType(5);
- VarCharType r6 = (VarCharType) SchemaMergingUtils.merge(s5, t6,
highestFieldId, false);
+ VarCharType r6 =
+ (VarCharType) SchemaMergingUtils.merge(s5, t6, highestFieldId,
false, true);
assertThat(r6.getLength()).isEqualTo(5);
// CharType
DataType s7 = new CharType();
DataType t7 = new CharType(1);
- CharType r7 = (CharType) SchemaMergingUtils.merge(s7, t7,
highestFieldId, false);
+ CharType r7 = (CharType) SchemaMergingUtils.merge(s7, t7,
highestFieldId, false, true);
assertThat(r7.getLength()).isEqualTo(CharType.DEFAULT_LENGTH);
DataType s8 = new CharType(2);
DataType t8 = new CharType();
// smaller length
- assertThatThrownBy(() -> SchemaMergingUtils.merge(s8, t8,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(s8, t8,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// smaller length with allowExplicitCast = true
- CharType r8 = (CharType) SchemaMergingUtils.merge(s8, t8,
highestFieldId, true);
+ CharType r8 = (CharType) SchemaMergingUtils.merge(s8, t8,
highestFieldId, true, true);
assertThat(r8.getLength()).isEqualTo(CharType.DEFAULT_LENGTH);
// bigger length
DataType t9 = new CharType(5);
- CharType r9 = (CharType) SchemaMergingUtils.merge(s8, t9,
highestFieldId, false);
+ CharType r9 = (CharType) SchemaMergingUtils.merge(s8, t9,
highestFieldId, false, true);
assertThat(r9.getLength()).isEqualTo(5);
// VarBinaryType
DataType s10 = new VarBinaryType();
DataType t10 = new VarBinaryType(1);
VarBinaryType r10 =
- (VarBinaryType) SchemaMergingUtils.merge(s10, t10,
highestFieldId, false);
+ (VarBinaryType) SchemaMergingUtils.merge(s10, t10,
highestFieldId, false, true);
assertThat(r10.getLength()).isEqualTo(VarBinaryType.DEFAULT_LENGTH);
DataType s11 = new VarBinaryType(2);
DataType t11 = new VarBinaryType();
// smaller length
- assertThatThrownBy(() -> SchemaMergingUtils.merge(s11, t11,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(s11, t11,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// smaller length with allowExplicitCast = true
VarBinaryType r11 =
- (VarBinaryType) SchemaMergingUtils.merge(s11, t11,
highestFieldId, true);
+ (VarBinaryType) SchemaMergingUtils.merge(s11, t11,
highestFieldId, true, true);
assertThat(r11.getLength()).isEqualTo(VarBinaryType.DEFAULT_LENGTH);
// bigger length
DataType t12 = new VarBinaryType(5);
VarBinaryType r12 =
- (VarBinaryType) SchemaMergingUtils.merge(s11, t12,
highestFieldId, false);
+ (VarBinaryType) SchemaMergingUtils.merge(s11, t12,
highestFieldId, false, true);
assertThat(r12.getLength()).isEqualTo(5);
// CharType -> VarCharType
DataType s13 = new CharType();
DataType t13 = new VarCharType(10);
- VarCharType r13 = (VarCharType) SchemaMergingUtils.merge(s13, t13,
highestFieldId, false);
+ VarCharType r13 =
+ (VarCharType) SchemaMergingUtils.merge(s13, t13,
highestFieldId, false, true);
assertThat(r13.getLength()).isEqualTo(10);
// VarCharType ->CharType
DataType s14 = new VarCharType(10);
DataType t14 = new CharType();
- assertThatThrownBy(() -> SchemaMergingUtils.merge(s14, t14,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(s14, t14,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
- CharType r14 = (CharType) SchemaMergingUtils.merge(s14, t14,
highestFieldId, true);
+ CharType r14 = (CharType) SchemaMergingUtils.merge(s14, t14,
highestFieldId, true, true);
assertThat(r14.getLength()).isEqualTo(CharType.DEFAULT_LENGTH);
// BinaryType -> VarBinaryType
DataType s15 = new BinaryType();
DataType t15 = new VarBinaryType(10);
VarBinaryType r15 =
- (VarBinaryType) SchemaMergingUtils.merge(s15, t15,
highestFieldId, false);
+ (VarBinaryType) SchemaMergingUtils.merge(s15, t15,
highestFieldId, false, true);
assertThat(r15.getLength()).isEqualTo(10);
// VarBinaryType -> BinaryType
DataType s16 = new VarBinaryType(10);
DataType t16 = new BinaryType();
- assertThatThrownBy(() -> SchemaMergingUtils.merge(s16, t16,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(s16, t16,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
- BinaryType r16 = (BinaryType) SchemaMergingUtils.merge(s16, t16,
highestFieldId, true);
+ BinaryType r16 =
+ (BinaryType) SchemaMergingUtils.merge(s16, t16,
highestFieldId, true, true);
assertThat(r16.getLength()).isEqualTo(BinaryType.DEFAULT_LENGTH);
// VarCharType -> VarBinaryType
DataType s17 = new VarCharType(10);
DataType t17 = new VarBinaryType();
- assertThatThrownBy(() -> SchemaMergingUtils.merge(s17, t17,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(s17, t17,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
VarBinaryType r17 =
- (VarBinaryType) SchemaMergingUtils.merge(s17, t17,
highestFieldId, true);
+ (VarBinaryType) SchemaMergingUtils.merge(s17, t17,
highestFieldId, true, true);
assertThat(r17.getLength()).isEqualTo(VarBinaryType.DEFAULT_LENGTH);
}
@@ -632,7 +647,8 @@ public class SchemaMergingUtilsTest {
DataType s1 = new LocalZonedTimestampType();
DataType t1 = new LocalZonedTimestampType();
LocalZonedTimestampType r1 =
- (LocalZonedTimestampType) SchemaMergingUtils.merge(s1, t1,
highestFieldId, false);
+ (LocalZonedTimestampType)
+ SchemaMergingUtils.merge(s1, t1, highestFieldId,
false, true);
assertThat(r1.isNullable()).isTrue();
assertThat(r1.getPrecision()).isEqualTo(LocalZonedTimestampType.DEFAULT_PRECISION);
@@ -640,31 +656,38 @@ public class SchemaMergingUtilsTest {
assertThatThrownBy(
() ->
SchemaMergingUtils.merge(
- s1, new LocalZonedTimestampType(3),
highestFieldId, false))
+ s1,
+ new LocalZonedTimestampType(3),
+ highestFieldId,
+ false,
+ true))
.isInstanceOf(UnsupportedOperationException.class);
// higher precision
DataType t2 = new LocalZonedTimestampType(6);
LocalZonedTimestampType r2 =
- (LocalZonedTimestampType) SchemaMergingUtils.merge(s1, t2,
highestFieldId, false);
+ (LocalZonedTimestampType)
+ SchemaMergingUtils.merge(s1, t2, highestFieldId,
false, true);
assertThat(r2.getPrecision()).isEqualTo(6);
// LocalZonedTimestampType -> TimeType
DataType s3 = new LocalZonedTimestampType();
DataType t3 = new TimeType(6);
- assertThatThrownBy(() -> SchemaMergingUtils.merge(s3, t3,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(s3, t3,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// LocalZonedTimestampType -> TimestampType
DataType s4 = new LocalZonedTimestampType();
DataType t4 = new TimestampType();
- TimestampType r4 = (TimestampType) SchemaMergingUtils.merge(s4, t4,
highestFieldId, false);
+ TimestampType r4 =
+ (TimestampType) SchemaMergingUtils.merge(s4, t4,
highestFieldId, false, true);
assertThat(r4.getPrecision()).isEqualTo(TimestampType.DEFAULT_PRECISION);
// TimestampType.
DataType s5 = new TimestampType();
DataType t5 = new TimestampType();
- TimestampType r5 = (TimestampType) SchemaMergingUtils.merge(s5, t5,
highestFieldId, false);
+ TimestampType r5 =
+ (TimestampType) SchemaMergingUtils.merge(s5, t5,
highestFieldId, false, true);
assertThat(r5.isNullable()).isTrue();
assertThat(r5.getPrecision()).isEqualTo(TimestampType.DEFAULT_PRECISION);
@@ -672,65 +695,70 @@ public class SchemaMergingUtilsTest {
assertThatThrownBy(
() ->
SchemaMergingUtils.merge(
- s5, new TimestampType(3),
highestFieldId, false))
+ s5, new TimestampType(3),
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// higher precision
DataType t6 = new TimestampType(9);
- TimestampType r6 = (TimestampType) SchemaMergingUtils.merge(s5, t6,
highestFieldId, false);
+ TimestampType r6 =
+ (TimestampType) SchemaMergingUtils.merge(s5, t6,
highestFieldId, false, true);
assertThat(r6.getPrecision()).isEqualTo(9);
// TimestampType -> LocalZonedTimestampType
DataType s7 = new TimestampType();
DataType t7 = new LocalZonedTimestampType();
LocalZonedTimestampType r7 =
- (LocalZonedTimestampType) SchemaMergingUtils.merge(s7, t7,
highestFieldId, false);
+ (LocalZonedTimestampType)
+ SchemaMergingUtils.merge(s7, t7, highestFieldId,
false, true);
assertThat(r7.getPrecision()).isEqualTo(TimestampType.DEFAULT_PRECISION);
// TimestampType -> TimestampType
DataType s8 = new TimestampType();
DataType t8 = new TimeType(6);
- TimeType r8 = (TimeType) SchemaMergingUtils.merge(s8, t8,
highestFieldId, false);
+ TimeType r8 = (TimeType) SchemaMergingUtils.merge(s8, t8,
highestFieldId, false, true);
assertThat(r8.getPrecision()).isEqualTo(TimestampType.DEFAULT_PRECISION);
// TimeType.
DataType s9 = new TimeType();
DataType t9 = new TimeType();
- TimeType r9 = (TimeType) SchemaMergingUtils.merge(s9, t9,
highestFieldId, false);
+ TimeType r9 = (TimeType) SchemaMergingUtils.merge(s9, t9,
highestFieldId, false, true);
assertThat(r9.isNullable()).isTrue();
assertThat(r9.getPrecision()).isEqualTo(TimeType.DEFAULT_PRECISION);
// lower precision
DataType s10 = new TimeType(6);
assertThatThrownBy(
- () -> SchemaMergingUtils.merge(s10, new TimeType(3),
highestFieldId, false))
+ () ->
+ SchemaMergingUtils.merge(
+ s10, new TimeType(3), highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// higher precision
DataType t10 = new TimeType(9);
- TimeType r10 = (TimeType) SchemaMergingUtils.merge(s9, t10,
highestFieldId, false);
+ TimeType r10 = (TimeType) SchemaMergingUtils.merge(s9, t10,
highestFieldId, false, true);
assertThat(r10.getPrecision()).isEqualTo(9);
// TimeType -> LocalZonedTimestampType
DataType s11 = new TimeType();
DataType t11 = new LocalZonedTimestampType();
- assertThatThrownBy(() -> SchemaMergingUtils.merge(s11, t11,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(s11, t11,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// TimeType -> LocalZonedTimestampType with allowExplicitCast = true
LocalZonedTimestampType r11 =
- (LocalZonedTimestampType) SchemaMergingUtils.merge(s11, t11,
highestFieldId, true);
+ (LocalZonedTimestampType)
+ SchemaMergingUtils.merge(s11, t11, highestFieldId,
true, true);
assertThat(r11.getPrecision()).isEqualTo(LocalZonedTimestampType.DEFAULT_PRECISION);
// TimeType -> TimestampType
DataType s12 = new TimeType();
DataType t12 = new TimestampType();
- assertThatThrownBy(() -> SchemaMergingUtils.merge(s12, t12,
highestFieldId, false))
+ assertThatThrownBy(() -> SchemaMergingUtils.merge(s12, t12,
highestFieldId, false, true))
.isInstanceOf(UnsupportedOperationException.class);
// TimeType -> TimestampType with allowExplicitCast = true
TimestampType r12 =
- (TimestampType) SchemaMergingUtils.merge(s12, t12,
highestFieldId, true);
+ (TimestampType) SchemaMergingUtils.merge(s12, t12,
highestFieldId, true, true);
assertThat(r12.getPrecision()).isEqualTo(TimestampType.DEFAULT_PRECISION);
}
@@ -756,186 +784,237 @@ public class SchemaMergingUtilsTest {
DataType dcmTarget = new DecimalType();
// BooleanType
- DataType btRes1 = SchemaMergingUtils.merge(bSource, bTarget,
highestFieldId, false);
+ DataType btRes1 = SchemaMergingUtils.merge(bSource, bTarget,
highestFieldId, false, true);
assertThat(btRes1 instanceof BooleanType).isTrue();
// BooleanType -> Numeric Type
- assertThatThrownBy(() -> SchemaMergingUtils.merge(bSource, tiTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ bSource, tiTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// BooleanType -> Numeric Type with allowExplicitCast = true
- DataType btRes2 = SchemaMergingUtils.merge(bSource, tiTarget,
highestFieldId, true);
+ DataType btRes2 = SchemaMergingUtils.merge(bSource, tiTarget,
highestFieldId, true, true);
assertThat(btRes2 instanceof TinyIntType).isTrue();
// TinyIntType
- DataType tiRes1 = SchemaMergingUtils.merge(tiSource, tiTarget,
highestFieldId, false);
+ DataType tiRes1 = SchemaMergingUtils.merge(tiSource, tiTarget,
highestFieldId, false, true);
assertThat(tiRes1 instanceof TinyIntType).isTrue();
// TinyIntType -> SmallIntType
- DataType tiRes2 = SchemaMergingUtils.merge(tiSource, siTarget,
highestFieldId, false);
+ DataType tiRes2 = SchemaMergingUtils.merge(tiSource, siTarget,
highestFieldId, false, true);
assertThat(tiRes2 instanceof SmallIntType).isTrue();
// TinyIntType -> IntType
- DataType tiRes3 = SchemaMergingUtils.merge(tiSource, iTarget,
highestFieldId, false);
+ DataType tiRes3 = SchemaMergingUtils.merge(tiSource, iTarget,
highestFieldId, false, true);
assertThat(tiRes3 instanceof IntType).isTrue();
// TinyIntType -> BigIntType
- DataType tiRes4 = SchemaMergingUtils.merge(tiSource, biTarget,
highestFieldId, false);
+ DataType tiRes4 = SchemaMergingUtils.merge(tiSource, biTarget,
highestFieldId, false, true);
assertThat(tiRes4 instanceof BigIntType).isTrue();
// TinyIntType -> FloatType
- DataType tiRes5 = SchemaMergingUtils.merge(tiSource, fTarget,
highestFieldId, false);
+ DataType tiRes5 = SchemaMergingUtils.merge(tiSource, fTarget,
highestFieldId, false, true);
assertThat(tiRes5 instanceof FloatType).isTrue();
// TinyIntType -> DoubleType
- DataType tiRes6 = SchemaMergingUtils.merge(tiSource, dTarget,
highestFieldId, false);
+ DataType tiRes6 = SchemaMergingUtils.merge(tiSource, dTarget,
highestFieldId, false, true);
assertThat(tiRes6 instanceof DoubleType).isTrue();
// TinyIntType -> DecimalType
- DataType tiRes7 = SchemaMergingUtils.merge(tiSource, dcmTarget,
highestFieldId, false);
+ DataType tiRes7 =
+ SchemaMergingUtils.merge(tiSource, dcmTarget, highestFieldId,
false, true);
assertThat(tiRes7 instanceof DecimalType).isTrue();
// TinyIntType -> BooleanType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(tiSource, bTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ tiSource, bTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// TinyIntType -> BooleanType with allowExplicitCast = true
- DataType tiRes8 = SchemaMergingUtils.merge(tiSource, bTarget,
highestFieldId, true);
+ DataType tiRes8 = SchemaMergingUtils.merge(tiSource, bTarget,
highestFieldId, true, true);
assertThat(tiRes8 instanceof BooleanType).isTrue();
// SmallIntType
- DataType siRes1 = SchemaMergingUtils.merge(siSource, siTarget,
highestFieldId, false);
+ DataType siRes1 = SchemaMergingUtils.merge(siSource, siTarget,
highestFieldId, false, true);
assertThat(siRes1 instanceof SmallIntType).isTrue();
// SmallIntType -> TinyIntType
assertThatThrownBy(
- () -> SchemaMergingUtils.merge(siSource, tiTarget,
highestFieldId, false))
+ () ->
+ SchemaMergingUtils.merge(
+ siSource, tiTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// SmallIntType -> TinyIntType with allowExplicitCast = true
- DataType siRes2 = SchemaMergingUtils.merge(siSource, tiTarget,
highestFieldId, true);
+ DataType siRes2 = SchemaMergingUtils.merge(siSource, tiTarget,
highestFieldId, true, true);
assertThat(siRes2 instanceof TinyIntType).isTrue();
// SmallIntType -> IntType
- DataType siRes3 = SchemaMergingUtils.merge(siSource, iTarget,
highestFieldId, false);
+ DataType siRes3 = SchemaMergingUtils.merge(siSource, iTarget,
highestFieldId, false, true);
assertThat(siRes3 instanceof IntType).isTrue();
// SmallIntType -> BigIntType
- DataType siRes4 = SchemaMergingUtils.merge(siSource, biTarget,
highestFieldId, false);
+ DataType siRes4 = SchemaMergingUtils.merge(siSource, biTarget,
highestFieldId, false, true);
assertThat(siRes4 instanceof BigIntType).isTrue();
// SmallIntType -> FloatType
- DataType siRes5 = SchemaMergingUtils.merge(siSource, fTarget,
highestFieldId, false);
+ DataType siRes5 = SchemaMergingUtils.merge(siSource, fTarget,
highestFieldId, false, true);
assertThat(siRes5 instanceof FloatType).isTrue();
// SmallIntType -> DoubleType
- DataType siRes6 = SchemaMergingUtils.merge(siSource, dTarget,
highestFieldId, false);
+ DataType siRes6 = SchemaMergingUtils.merge(siSource, dTarget,
highestFieldId, false, true);
assertThat(siRes6 instanceof DoubleType).isTrue();
// SmallIntType -> DecimalType
- DataType siRes7 = SchemaMergingUtils.merge(siSource, dcmTarget,
highestFieldId, false);
+ DataType siRes7 =
+ SchemaMergingUtils.merge(siSource, dcmTarget, highestFieldId,
false, true);
assertThat(siRes7 instanceof DecimalType).isTrue();
// IntType
- DataType iRes1 = SchemaMergingUtils.merge(iSource, iTarget,
highestFieldId, false);
+ DataType iRes1 = SchemaMergingUtils.merge(iSource, iTarget,
highestFieldId, false, true);
assertThat(iRes1 instanceof IntType).isTrue();
// IntType -> TinyIntType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(iSource, tiTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ iSource, tiTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// IntType -> TinyIntType with allowExplicitCast = true
- DataType iRes2 = SchemaMergingUtils.merge(iSource, tiTarget,
highestFieldId, true);
+ DataType iRes2 = SchemaMergingUtils.merge(iSource, tiTarget,
highestFieldId, true, true);
assertThat(iRes2 instanceof TinyIntType).isTrue();
// IntType -> SmallIntType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(iSource, siTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ iSource, siTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// IntType -> SmallIntType with allowExplicitCast = true
- DataType iRes3 = SchemaMergingUtils.merge(iSource, siTarget,
highestFieldId, true);
+ DataType iRes3 = SchemaMergingUtils.merge(iSource, siTarget,
highestFieldId, true, true);
assertThat(iRes3 instanceof SmallIntType).isTrue();
// IntType -> BigIntType
- DataType iRes4 = SchemaMergingUtils.merge(iSource, biTarget,
highestFieldId, false);
+ DataType iRes4 = SchemaMergingUtils.merge(iSource, biTarget,
highestFieldId, false, true);
assertThat(iRes4 instanceof BigIntType).isTrue();
// IntType -> FloatType
- DataType iRes5 = SchemaMergingUtils.merge(iSource, fTarget,
highestFieldId, false);
+ DataType iRes5 = SchemaMergingUtils.merge(iSource, fTarget,
highestFieldId, false, true);
assertThat(iRes5 instanceof FloatType).isTrue();
// IntType -> DoubleType
- DataType iRes6 = SchemaMergingUtils.merge(iSource, dTarget,
highestFieldId, false);
+ DataType iRes6 = SchemaMergingUtils.merge(iSource, dTarget,
highestFieldId, false, true);
assertThat(iRes6 instanceof DoubleType).isTrue();
// IntType -> DecimalType
- DataType iRes7 = SchemaMergingUtils.merge(iSource, dcmTarget,
highestFieldId, false);
+ DataType iRes7 = SchemaMergingUtils.merge(iSource, dcmTarget,
highestFieldId, false, true);
assertThat(iRes7 instanceof DecimalType).isTrue();
// BigIntType
- DataType biRes1 = SchemaMergingUtils.merge(biSource, biTarget,
highestFieldId, false);
+ DataType biRes1 = SchemaMergingUtils.merge(biSource, biTarget,
highestFieldId, false, true);
assertThat(biRes1 instanceof BigIntType).isTrue();
// BigIntType -> TinyIntType
assertThatThrownBy(
- () -> SchemaMergingUtils.merge(biSource, tiTarget,
highestFieldId, false))
+ () ->
+ SchemaMergingUtils.merge(
+ biSource, tiTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// BigIntType -> TinyIntType with allowExplicitCast = true
- DataType biRes2 = SchemaMergingUtils.merge(biSource, tiTarget,
highestFieldId, true);
+ DataType biRes2 = SchemaMergingUtils.merge(biSource, tiTarget,
highestFieldId, true, true);
assertThat(biRes2 instanceof TinyIntType).isTrue();
// BigIntType -> SmallIntType
assertThatThrownBy(
- () -> SchemaMergingUtils.merge(biSource, siTarget,
highestFieldId, false))
+ () ->
+ SchemaMergingUtils.merge(
+ biSource, siTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// BigIntType -> SmallIntType with allowExplicitCast = true
- DataType biRes3 = SchemaMergingUtils.merge(biSource, siTarget,
highestFieldId, true);
+ DataType biRes3 = SchemaMergingUtils.merge(biSource, siTarget,
highestFieldId, true, true);
assertThat(biRes3 instanceof SmallIntType).isTrue();
// BigIntType -> IntType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(biSource, iTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ biSource, iTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// BigIntType -> IntType with allowExplicitCast = true
- DataType biRes4 = SchemaMergingUtils.merge(biSource, iTarget,
highestFieldId, true);
+ DataType biRes4 = SchemaMergingUtils.merge(biSource, iTarget,
highestFieldId, true, true);
assertThat(biRes4 instanceof IntType).isTrue();
// BigIntType -> FloatType
- DataType biRes5 = SchemaMergingUtils.merge(biSource, fTarget,
highestFieldId, false);
+ DataType biRes5 = SchemaMergingUtils.merge(biSource, fTarget,
highestFieldId, false, true);
assertThat(biRes5 instanceof FloatType).isTrue();
// BigIntType -> DoubleType
- DataType biRes6 = SchemaMergingUtils.merge(biSource, dTarget,
highestFieldId, false);
+ DataType biRes6 = SchemaMergingUtils.merge(biSource, dTarget,
highestFieldId, false, true);
assertThat(biRes6 instanceof DoubleType).isTrue();
// BigIntType -> DecimalType
- DataType biRes7 = SchemaMergingUtils.merge(biSource, dcmTarget,
highestFieldId, false);
+ DataType biRes7 =
+ SchemaMergingUtils.merge(biSource, dcmTarget, highestFieldId,
false, true);
assertThat(biRes7 instanceof DecimalType).isTrue();
// FloatType
- DataType fRes1 = SchemaMergingUtils.merge(fSource, fTarget,
highestFieldId, false);
+ DataType fRes1 = SchemaMergingUtils.merge(fSource, fTarget,
highestFieldId, false, true);
assertThat(fRes1 instanceof FloatType).isTrue();
// FloatType -> TinyIntType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(fSource, tiTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ fSource, tiTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// FloatType -> TinyIntType with allowExplicitCast = true
- DataType fRes2 = SchemaMergingUtils.merge(fSource, tiTarget,
highestFieldId, true);
+ DataType fRes2 = SchemaMergingUtils.merge(fSource, tiTarget,
highestFieldId, true, true);
assertThat(fRes2 instanceof TinyIntType).isTrue();
// FloatType -> SmallIntType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(fSource, siTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ fSource, siTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// FloatType -> IntType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(fSource, iTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ fSource, iTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// FloatType -> IntType with allowExplicitCast = true
- DataType fRes4 = SchemaMergingUtils.merge(fSource, iTarget,
highestFieldId, true);
+ DataType fRes4 = SchemaMergingUtils.merge(fSource, iTarget,
highestFieldId, true, true);
assertThat(fRes4 instanceof IntType).isTrue();
// FloatType -> BigIntType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(fSource, biTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ fSource, biTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// FloatType -> DoubleType
- DataType fRes6 = SchemaMergingUtils.merge(fSource, dTarget,
highestFieldId, false);
+ DataType fRes6 = SchemaMergingUtils.merge(fSource, dTarget,
highestFieldId, false, true);
assertThat(fRes6 instanceof DoubleType).isTrue();
// FloatType -> DecimalType
- DataType fRes7 = SchemaMergingUtils.merge(fSource, dcmTarget,
highestFieldId, false);
+ DataType fRes7 = SchemaMergingUtils.merge(fSource, dcmTarget,
highestFieldId, false, true);
assertThat(fRes7 instanceof DecimalType).isTrue();
// DoubleType
- DataType dRes1 = SchemaMergingUtils.merge(dSource, dTarget,
highestFieldId, false);
+ DataType dRes1 = SchemaMergingUtils.merge(dSource, dTarget,
highestFieldId, false, true);
assertThat(dRes1 instanceof DoubleType).isTrue();
// DoubleType -> TinyIntType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(dSource, tiTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ dSource, tiTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// DoubleType -> SmallIntType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(dSource, siTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ dSource, siTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// DoubleType -> SmallIntType with allowExplicitCast = true
- DataType dRes3 = SchemaMergingUtils.merge(dSource, siTarget,
highestFieldId, true);
+ DataType dRes3 = SchemaMergingUtils.merge(dSource, siTarget,
highestFieldId, true, true);
assertThat(dRes3 instanceof SmallIntType).isTrue();
// DoubleType -> IntType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(dSource, iTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ dSource, iTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// DoubleType -> BigIntType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(dSource, biTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ dSource, biTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// DoubleType -> BigIntType with allowExplicitCast = true
- DataType dRes5 = SchemaMergingUtils.merge(dSource, biTarget,
highestFieldId, true);
+ DataType dRes5 = SchemaMergingUtils.merge(dSource, biTarget,
highestFieldId, true, true);
assertThat(dRes5 instanceof BigIntType).isTrue();
// DoubleType -> FloatType
- assertThatThrownBy(() -> SchemaMergingUtils.merge(dSource, fTarget,
highestFieldId, false))
+ assertThatThrownBy(
+ () ->
+ SchemaMergingUtils.merge(
+ dSource, fTarget, highestFieldId,
false, true))
.isInstanceOf(UnsupportedOperationException.class);
// DoubleType -> DecimalType
- DataType dRes7 = SchemaMergingUtils.merge(dSource, dcmTarget,
highestFieldId, false);
+ DataType dRes7 = SchemaMergingUtils.merge(dSource, dcmTarget,
highestFieldId, false, true);
assertThat(dRes7 instanceof DecimalType).isTrue();
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
index e94621c2de..f4dbbdac96 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/MergeSchemaEvolutionHelper.scala
@@ -83,8 +83,9 @@ trait MergeSchemaEvolutionHelper extends ExpressionHelper {
.map(a => StructField(a.name, a.dataType, a.nullable)))
val filteredSourceSchema =
SparkSystemColumns.filterSparkSystemColumns(sourceSchema)
val allowExplicitCast = OptionUtils.writeMergeSchemaExplicitCastEnabled()
+ val caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis
val updatedFileStoreTable = SchemaHelper
- .mergeAndCommitSchema(fileStoreTable, filteredSourceSchema,
allowExplicitCast)
+ .mergeAndCommitSchema(fileStoreTable, filteredSourceSchema,
allowExplicitCast, caseSensitive)
.getOrElse(return None)
// Invalidate Spark catalog cache so subsequent queries see the new schema.
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
index 1416602a5f..928d6211d1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
@@ -41,7 +41,7 @@ private[spark] trait SchemaHelper extends WithFileStoreTable {
def mergeSchema(sparkSession: SparkSession, input: DataFrame, options:
Options): DataFrame = {
val dataSchema = SparkSystemColumns.filterSparkSystemColumns(input.schema)
- val writeSchema = mergeSchema(dataSchema, options)
+ val writeSchema = mergeSchema(sparkSession, dataSchema, options)
if (!PaimonUtils.sameType(writeSchema, dataSchema)) {
val resolve = sparkSession.sessionState.conf.resolver
val cols = SchemaHelper.alignColumns(writeSchema, dataSchema, resolve)
@@ -52,6 +52,13 @@ private[spark] trait SchemaHelper extends WithFileStoreTable
{
}
def mergeSchema(dataSchema: StructType, options: Options): StructType = {
+ mergeSchema(SparkSession.active, dataSchema, options)
+ }
+
+ def mergeSchema(
+ sparkSession: SparkSession,
+ dataSchema: StructType,
+ options: Options): StructType = {
val mergeSchemaEnabled =
options.get(SparkConnectorOptions.MERGE_SCHEMA) ||
OptionUtils.writeMergeSchemaEnabled()
if (!mergeSchemaEnabled) {
@@ -61,9 +68,10 @@ private[spark] trait SchemaHelper extends WithFileStoreTable
{
val filteredDataSchema =
SparkSystemColumns.filterSparkSystemColumns(dataSchema)
val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
|| OptionUtils
.writeMergeSchemaExplicitCastEnabled()
- SchemaHelper.mergeAndCommitSchema(table, filteredDataSchema,
allowExplicitCast).foreach {
- updatedTable => newTable = Some(updatedTable)
- }
+ val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
+ SchemaHelper
+ .mergeAndCommitSchema(table, filteredDataSchema, allowExplicitCast,
caseSensitive)
+ .foreach { updatedTable => newTable = Some(updatedTable) }
val writeSchema =
SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
if (!PaimonUtils.sameType(writeSchema, filteredDataSchema)) {
@@ -87,9 +95,10 @@ private[spark] object SchemaHelper {
def mergeAndCommitSchema(
table: FileStoreTable,
dataSchema: StructType,
- allowExplicitCast: Boolean): Option[FileStoreTable] = {
+ allowExplicitCast: Boolean,
+ caseSensitive: Boolean = true): Option[FileStoreTable] = {
val dataRowType =
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
- if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
+ if (table.store().mergeSchema(dataRowType, allowExplicitCast,
caseSensitive)) {
Some(table.copyWithLatestSchema())
} else {
None
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
index 83b423438a..ae830bcd97 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
@@ -499,6 +499,308 @@ class WriteMergeSchemaTest extends PaimonSparkTestBase {
}
}
+ test("Write merge schema: case-insensitive column matching with new column")
{
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, name STRING)")
+ sql("INSERT INTO t VALUES (1, 'a'), (2, 'b')")
+
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ sql("INSERT INTO t BY NAME SELECT 3 AS ID, 'c' AS NAME, 100 AS extra")
+ }
+
+ val columnNames = spark.table("t").schema.fieldNames
+ assert(
+ columnNames.length == 3,
+ s"Expected 3 columns (id, name, extra) but got ${columnNames.length}:
${columnNames.mkString(", ")}")
+
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq(Row(1, "a", null), Row(2, "b", null), Row(3, "c", 100)))
+ }
+ }
+
+ test("Write merge schema: case-insensitive dataframe write") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, name STRING)")
+ Seq((1, "a"), (2, "b"))
+ .toDF("id", "name")
+ .write
+ .format("paimon")
+ .mode("append")
+ .saveAsTable("t")
+
+ Seq((3, "c", 100))
+ .toDF("ID", "NAME", "extra")
+ .write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .saveAsTable("t")
+
+ val columnNames = spark.table("t").schema.fieldNames
+ assert(
+ columnNames.length == 3,
+ s"Expected 3 columns but got ${columnNames.length}:
${columnNames.mkString(", ")}")
+
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq(Row(1, "a", null), Row(2, "b", null), Row(3, "c", 100)))
+ }
+ }
+
+ test("Write merge schema: only case differs, no schema change") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, name STRING)")
+ sql("INSERT INTO t VALUES (1, 'a')")
+
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ sql("INSERT INTO t BY NAME SELECT 2 AS ID, 'b' AS NAME")
+ }
+
+ val columnNames = spark.table("t").schema.fieldNames
+ assert(
+ columnNames.toSeq == Seq("id", "name"),
+ s"Schema changed unexpectedly: ${columnNames.mkString(", ")}")
+
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, "a"), Row(2,
"b")))
+ }
+ }
+
+ test("Write merge schema: repeated writes with alternating case") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, name STRING)")
+ sql("INSERT INTO t VALUES (1, 'a')")
+
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ sql("INSERT INTO t BY NAME SELECT 2 AS ID, 'b' AS NAME")
+ sql("INSERT INTO t BY NAME SELECT 3 AS Id, 'c' AS NaMe")
+ sql("INSERT INTO t BY NAME SELECT 4 AS iD, 'd' AS nAmE")
+ }
+
+ val columnNames = spark.table("t").schema.fieldNames
+ assert(columnNames.length == 2, s"Expected 2 columns but got:
${columnNames.mkString(", ")}")
+
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Seq(Row(1, "a"), Row(2, "b"), Row(3, "c"), Row(4, "d")))
+ }
+ }
+
+ test("Write merge schema: nested struct case mismatch with new sub-field") {
+ withTable("t") {
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ sql("CREATE TABLE t (id INT, info STRUCT<key1: STRING, key2: STRING>)")
+ sql("INSERT INTO t VALUES (1, named_struct('key1', 'a', 'key2', 'b'))")
+
+ sql(
+ "INSERT INTO t BY NAME SELECT 2 AS id, " +
+ "named_struct('KEY1', 'A', 'KEY2', 'B', 'key3', 'C') AS info")
+
+ val infoFields = spark
+ .table("t")
+ .schema("info")
+ .dataType
+ .asInstanceOf[org.apache.spark.sql.types.StructType]
+ .fieldNames
+ assert(
+ infoFields.length == 3,
+ s"Expected 3 sub-fields but got ${infoFields.length}:
${infoFields.mkString(", ")}")
+
+ checkAnswer(
+ sql("SELECT id, info.key1, info.key2, info.key3 FROM t ORDER BY id"),
+ Seq(Row(1, "a", "b", null), Row(2, "A", "B", "C")))
+ }
+ }
+ }
+
+ test("Merge into with merge-schema: source uppercase should not create
duplicate columns") {
+ withTable("t") {
+ sql("""CREATE TABLE t (id INT, name STRING, value INT)
+ | USING paimon
+ | TBLPROPERTIES ('primary-key' = 'id', 'bucket' =
'1')""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'a', 10), (2, 'b', 20)")
+
+ spark
+ .sql("SELECT 1 AS ID, 'A' AS NAME, 100 AS VALUE UNION ALL SELECT 3 AS
ID, 'c' AS NAME, 30 AS VALUE")
+ .createOrReplaceTempView("s")
+
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ sql("""MERGE INTO t USING s ON t.id = s.ID
+ | WHEN MATCHED THEN UPDATE SET *
+ | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+ }
+
+ val columnNames = spark.table("t").schema.fieldNames.toSeq
+ assert(
+ columnNames.size == 3,
+ s"Expected 3 columns but got ${columnNames.size}:
${columnNames.mkString(", ")}")
+
+ checkAnswer(
+ sql("SELECT id, name, value FROM t ORDER BY id"),
+ Seq(Row(1, "A", 100), Row(2, "b", 20), Row(3, "c", 30)))
+ }
+ }
+
+ test("Merge into with merge-schema: extra column with case-mismatched
existing columns") {
+ withTable("t") {
+ sql("""CREATE TABLE t (id INT, name STRING)
+ | USING paimon
+ | TBLPROPERTIES ('primary-key' = 'id', 'bucket' =
'1')""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'a'), (2, 'b')")
+
+ spark
+ .sql("SELECT 1 AS ID, 'A' AS NAME, 100 AS extra_col UNION ALL SELECT 3
AS ID, 'c' AS NAME, 30 AS extra_col")
+ .createOrReplaceTempView("s")
+
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ sql("""MERGE INTO t USING s ON t.id = s.ID
+ | WHEN MATCHED THEN UPDATE SET *
+ | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+ }
+
+ val columnNames = spark.table("t").schema.fieldNames.toSeq
+ assert(
+ columnNames.size == 3,
+ s"Expected 3 columns (id, name, extra_col) but got
${columnNames.size}: ${columnNames.mkString(", ")}")
+
+ checkAnswer(
+ sql("SELECT id, name, extra_col FROM t ORDER BY id"),
+ Seq(Row(1, "A", 100), Row(2, "b", null), Row(3, "c", 30)))
+ }
+ }
+
+ test("Merge into with merge-schema: only case differs, schema should not
change") {
+ withTable("t") {
+ sql("""CREATE TABLE t (id INT, name STRING)
+ | USING paimon
+ | TBLPROPERTIES ('primary-key' = 'id', 'bucket' =
'1')""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'a')")
+
+ spark.sql("SELECT 1 AS ID, 'A' AS NAME").createOrReplaceTempView("s")
+
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ sql("""MERGE INTO t USING s ON t.id = s.ID
+ | WHEN MATCHED THEN UPDATE SET *""".stripMargin)
+ }
+
+ val columnNames = spark.table("t").schema.fieldNames
+ assert(
+ columnNames.toSeq == Seq("id", "name"),
+ s"Schema changed unexpectedly: ${columnNames.mkString(", ")}")
+
+ checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, "A")))
+ }
+ }
+
+ test("Merge into with merge-schema: nested struct fields case mismatch") {
+ withTable("t") {
+ sql("""CREATE TABLE t (id INT, info STRUCT<key1: STRING, key2: STRING>)
+ | USING paimon
+ | TBLPROPERTIES ('primary-key' = 'id', 'bucket' =
'1')""".stripMargin)
+ sql("INSERT INTO t VALUES (1, named_struct('key1', 'a', 'key2', 'b'))")
+
+ spark
+ .sql("SELECT 1 AS id, named_struct('KEY1', 'A', 'KEY2', 'B', 'key3',
'C') AS info")
+ .createOrReplaceTempView("s")
+
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ sql("""MERGE INTO t USING s ON t.id = s.id
+ | WHEN MATCHED THEN UPDATE SET *
+ | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+ }
+
+ val infoFields = spark
+ .table("t")
+ .schema("info")
+ .dataType
+ .asInstanceOf[org.apache.spark.sql.types.StructType]
+ .fieldNames
+ assert(
+ infoFields.length == 3,
+ s"Expected 3 sub-fields but got ${infoFields.length}:
${infoFields.mkString(", ")}")
+
+ checkAnswer(
+ sql("SELECT id, info.key1, info.key2, info.key3 FROM t"),
+ Seq(Row(1, "A", "B", "C")))
+ }
+ }
+
+ test("Merge into with merge-schema: repeated writes with alternating case") {
+ withTable("t") {
+ sql("""CREATE TABLE t (id INT, name STRING)
+ | USING paimon
+ | TBLPROPERTIES ('primary-key' = 'id', 'bucket' =
'1')""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'a')")
+
+ withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+ spark.sql("SELECT 2 AS ID, 'b' AS NAME").createOrReplaceTempView("s1")
+ sql("""MERGE INTO t USING s1 ON t.id = s1.ID
+ | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+
+ spark.sql("SELECT 3 AS Id, 'c' AS NaMe").createOrReplaceTempView("s2")
+ sql("""MERGE INTO t USING s2 ON t.id = s2.Id
+ | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+ }
+
+ val columnNames = spark.table("t").schema.fieldNames
+ assert(columnNames.length == 2, s"Expected 2 columns but got:
${columnNames.mkString(", ")}")
+
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, "a"), Row(2,
"b"), Row(3, "c")))
+ }
+ }
+
+ test("Merge into without merge-schema: case-insensitive matching works
normally") {
+ withTable("t") {
+ sql("""CREATE TABLE t (id INT, name STRING)
+ | USING paimon
+ | TBLPROPERTIES ('primary-key' = 'id', 'bucket' =
'1')""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'a'), (2, 'b')")
+
+ spark
+ .sql("SELECT 1 AS ID, 'A' AS NAME UNION ALL SELECT 3 AS ID, 'c' AS
NAME")
+ .createOrReplaceTempView("s")
+
+ sql("""MERGE INTO t USING s ON t.id = s.ID
+ | WHEN MATCHED THEN UPDATE SET *
+ | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+
+ val schema = spark.table("t").schema
+ assert(
+ schema.fieldNames.length == 2,
+ s"Expected 2 columns but got: ${schema.fieldNames.mkString(", ")}")
+
+ checkAnswer(
+ sql("SELECT id, name FROM t ORDER BY id"),
+ Seq(Row(1, "A"), Row(2, "b"), Row(3, "c")))
+ }
+ }
+
+ test("Merge into with merge-schema: case-sensitive mode treats different
case as new columns") {
+ withTable("t") {
+ sql("""CREATE TABLE t (id INT, name STRING)
+ | USING paimon
+ | TBLPROPERTIES ('primary-key' = 'id', 'bucket' =
'1')""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'a')")
+
+ spark
+ .sql("SELECT 1 AS ID, 'A' AS NAME, 100 AS extra")
+ .createOrReplaceTempView("s")
+
+ withSparkSQLConf(
+ "spark.paimon.write.merge-schema" -> "true",
+ "spark.sql.caseSensitive" -> "true") {
+ sql("""MERGE INTO t USING s ON t.id = s.ID
+ | WHEN MATCHED THEN UPDATE SET *
+ | WHEN NOT MATCHED THEN INSERT *""".stripMargin)
+ }
+
+ val columnNames = spark.table("t").schema.fieldNames
+ assert(
+ columnNames.length == 5,
+ s"Expected 5 columns (id, name, ID, NAME, extra) but got
${columnNames.length}: ${columnNames.mkString(", ")}")
+ }
+ }
+
test("Write merge schema: array of struct missing nested field by
dataframe") {
withTable("t") {
sql("""
@@ -546,4 +848,22 @@ class WriteMergeSchemaTest extends PaimonSparkTestBase {
Seq(Row(Seq(1L, 2L), 1L, 2L, Seq(Row(10, "v2", "v3", "v4", "v5",
null)))))
}
}
+
+ test("Write merge schema: case-sensitive mode treats different case as new
columns") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, name STRING)")
+ sql("INSERT INTO t VALUES (1, 'a')")
+
+ withSparkSQLConf(
+ "spark.paimon.write.merge-schema" -> "true",
+ "spark.sql.caseSensitive" -> "true") {
+ sql("INSERT INTO t BY NAME SELECT 2 AS ID, 'b' AS NAME, 100 AS extra")
+ }
+
+ val columnNames = spark.table("t").schema.fieldNames
+ assert(
+ columnNames.length == 5,
+ s"Expected 5 columns (id, name, ID, NAME, extra) but got
${columnNames.length}: ${columnNames.mkString(", ")}")
+ }
+ }
}