This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c1561ef4 [spark] supports schema merging when write (#1678)
6c1561ef4 is described below

commit 6c1561ef4f9047daeb08fe06bea1fb26c4d9f4e4
Author: Yann Byron <[email protected]>
AuthorDate: Tue Aug 1 12:22:45 2023 +0800

    [spark] supports schema merging when write (#1678)
---
 docs/content/maintenance/configurations.md         |   6 +
 .../generated/spark_connector_configuration.html   |  42 ++
 .../java/org/apache/paimon/types/DataField.java    |   4 +
 .../java/org/apache/paimon/AbstractFileStore.java  |   5 +
 .../src/main/java/org/apache/paimon/FileStore.java |   2 +
 .../org/apache/paimon/schema/SchemaManager.java    |  18 +
 .../apache/paimon/schema/SchemaMergingUtils.java   | 225 +++++++
 .../paimon/table/AbstractFileStoreTable.java       |   4 +
 .../paimon/schema/SchemaMergingUtilsTest.java      | 740 +++++++++++++++++++++
 paimon-docs/pom.xml                                |   7 +
 .../configuration/ConfigOptionsDocGenerator.java   |   4 +-
 .../apache/paimon/spark/SparkConnectorOptions.java |  40 ++
 .../java/org/apache/paimon/spark/SparkTable.java   |   3 +-
 .../org/apache/paimon/spark/SparkSource.scala      |   3 +-
 .../scala/org/apache/paimon/spark/SparkWrite.scala |   7 +-
 .../apache/paimon/spark/SparkWriteBuilder.scala    |   9 +-
 .../paimon/spark/commands/PaimonCommand.scala      |  10 +-
 .../paimon/spark/commands/SchemaHelper.scala       |  50 ++
 .../spark/commands/WriteIntoPaimonTable.scala      |  42 +-
 .../paimon/spark/sql/DataFrameWriteTest.scala      | 199 ++++++
 20 files changed, 1389 insertions(+), 31 deletions(-)

diff --git a/docs/content/maintenance/configurations.md 
b/docs/content/maintenance/configurations.md
index 5d46dbfaa..c88676345 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -55,3 +55,9 @@ Flink catalog options for paimon.
 Flink connector options for paimon.
 
 {{< generated/flink_connector_configuration >}}
+
+### SparkConnectorOptions
+
+Spark connector options for paimon.
+
+{{< generated/spark_connector_configuration >}}
\ No newline at end of file
diff --git 
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html 
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
new file mode 100644
index 000000000..c38a5cc85
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -0,0 +1,42 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+<table class="configuration table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 20%">Key</th>
+        <th class="text-left" style="width: 15%">Default</th>
+        <th class="text-left" style="width: 10%">Type</th>
+        <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+        <td><h5>write.merge-schema</h5></td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>If true, merge the data schema and the table schema automatically 
before write data.</td>
+    </tr>
+    <tr>
+        <td><h5>write.merge-schema.explicit-cast</h5></td>
+        <td style="word-wrap: break-word;">false</td>
+        <td>Boolean</td>
+        <td>If true, allow to merge data types if the two types meet the rules 
for explicit casting.</td>
+    </tr>
+    </tbody>
+</table>
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java 
b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java
index 28728f1d1..4e51372c2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java
@@ -76,6 +76,10 @@ public final class DataField implements Serializable {
         return type;
     }
 
+    public DataField newId(int newid) {
+        return new DataField(newid, name, type, description);
+    }
+
     public DataField newName(String newName) {
         return new DataField(id, newName, type, description);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index b31dfebb4..48cb6e231 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -142,6 +142,11 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
         return options;
     }
 
+    @Override
+    public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+        return schemaManager.mergeSchema(rowType, allowExplicitCast);
+    }
+
     @Override
     public FileStoreCommitImpl newCommit(String commitUser) {
         return new FileStoreCommitImpl(
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 918e665bc..bf927861b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -78,4 +78,6 @@ public interface FileStore<T> extends Serializable {
 
     @Nullable
     TagAutoCreation newTagCreationManager();
+
+    boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 15db61062..2adee62ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -388,6 +388,24 @@ public class SchemaManager implements Serializable {
         }
     }
 
+    public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+        TableSchema current =
+                latest().orElseThrow(
+                                () ->
+                                        new RuntimeException(
+                                                "It requires that the current 
schema to exist when calling 'mergeSchema'"));
+        TableSchema update = SchemaMergingUtils.mergeSchemas(current, rowType, 
allowExplicitCast);
+        if (current.equals(update)) {
+            return false;
+        } else {
+            try {
+                return commit(update);
+            } catch (Exception e) {
+                throw new RuntimeException("Failed to commit the schema.", e);
+            }
+        }
+    }
+
     private static void checkMoveIndexEqual(SchemaChange.Move move, int 
fieldIndex, int refIndex) {
         if (refIndex == fieldIndex) {
             throw new UnsupportedOperationException(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
new file mode 100644
index 000000000..a60e6128f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeCasts;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.ReassignFieldId;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** The util class for merging the schemas. */
+public class SchemaMergingUtils {
+
+    public static TableSchema mergeSchemas(
+            TableSchema currentTableSchema, RowType dataFields, boolean 
allowExplicitCast) {
+        if (currentTableSchema.logicalRowType().equals(dataFields)) {
+            return currentTableSchema;
+        }
+
+        AtomicInteger highestFieldId = new 
AtomicInteger(currentTableSchema.highestFieldId());
+        RowType newRowType =
+                mergeSchemas(
+                        currentTableSchema.logicalRowType(),
+                        dataFields,
+                        highestFieldId,
+                        allowExplicitCast);
+        return new TableSchema(
+                currentTableSchema.id() + 1,
+                newRowType.getFields(),
+                highestFieldId.get(),
+                currentTableSchema.partitionKeys(),
+                currentTableSchema.primaryKeys(),
+                currentTableSchema.options(),
+                currentTableSchema.comment());
+    }
+
+    public static RowType mergeSchemas(
+            RowType tableSchema,
+            RowType dataSchema,
+            AtomicInteger highestFieldId,
+            boolean allowExplicitCast) {
+        return (RowType) merge(tableSchema, dataSchema, highestFieldId, 
allowExplicitCast);
+    }
+
+    /**
+     * Merge the base data type and the update data type if possible.
+     *
+     * <p>For RowType, find the fields which exists in both the base schema 
and the update schema,
+     * and try to merge them by calling the method iteratively; remain those 
fields that are only in
+     * the base schema and append those fields that are only in the update 
schema.
+     *
+     * <p>For other complex type, try to merge the element types.
+     *
+     * <p>For primitive data type, we treat that's compatible if the original 
type can be safely
+     * cast to the new type.
+     */
+    public static DataType merge(
+            DataType base0,
+            DataType update0,
+            AtomicInteger highestFieldId,
+            boolean allowExplicitCast) {
+        // Here we try t0 merge the base0 and update0 without regard to the 
nullability,
+        // and set the base0's nullability to the return's.
+        DataType base = base0.copy(true);
+        DataType update = update0.copy(true);
+
+        if (base.equals(update)) {
+            return base0;
+        } else if (base instanceof RowType && update instanceof RowType) {
+            List<DataField> baseFields = ((RowType) base).getFields();
+            List<DataField> updateFields = ((RowType) update).getFields();
+            Map<String, DataField> updateFieldMap =
+                    updateFields.stream()
+                            .collect(Collectors.toMap(DataField::name, 
Function.identity()));
+            List<DataField> updatedFields =
+                    baseFields.stream()
+                            .map(
+                                    baseField -> {
+                                        if 
(updateFieldMap.containsKey(baseField.name())) {
+                                            DataField updateField =
+                                                    
updateFieldMap.get(baseField.name());
+                                            DataType updatedDataType =
+                                                    merge(
+                                                            baseField.type(),
+                                                            updateField.type(),
+                                                            highestFieldId,
+                                                            allowExplicitCast);
+                                            return new DataField(
+                                                    baseField.id(),
+                                                    baseField.name(),
+                                                    updatedDataType,
+                                                    baseField.description());
+                                        } else {
+                                            return baseField;
+                                        }
+                                    })
+                            .collect(Collectors.toList());
+
+            Map<String, DataField> baseFieldMap =
+                    baseFields.stream()
+                            .collect(Collectors.toMap(DataField::name, 
Function.identity()));
+            List<DataField> newFields =
+                    updateFields.stream()
+                            .filter(field -> 
!baseFieldMap.containsKey(field.name()))
+                            .map(field -> assignIdForNewField(field, 
highestFieldId))
+                            .collect(Collectors.toList());
+
+            updatedFields.addAll(newFields);
+            return new RowType(base.isNullable(), updatedFields);
+        } else if (base instanceof MapType && update instanceof MapType) {
+            return new MapType(
+                    base.isNullable(),
+                    merge(
+                            ((MapType) base).getKeyType(),
+                            ((MapType) update).getKeyType(),
+                            highestFieldId,
+                            allowExplicitCast),
+                    merge(
+                            ((MapType) base).getValueType(),
+                            ((MapType) update).getValueType(),
+                            highestFieldId,
+                            allowExplicitCast));
+        } else if (base instanceof ArrayType && update instanceof ArrayType) {
+            return new ArrayType(
+                    base.isNullable(),
+                    merge(
+                            ((ArrayType) base).getElementType(),
+                            ((ArrayType) update).getElementType(),
+                            highestFieldId,
+                            allowExplicitCast));
+        } else if (base instanceof MultisetType && update instanceof 
MultisetType) {
+            return new MultisetType(
+                    base.isNullable(),
+                    merge(
+                            ((MultisetType) base).getElementType(),
+                            ((MultisetType) update).getElementType(),
+                            highestFieldId,
+                            allowExplicitCast));
+        } else if (base instanceof DecimalType && update instanceof 
DecimalType) {
+            if (base.equals(update)) {
+                return base0;
+            } else {
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Failed to merge decimal types with different 
precision or scale: %s and %s",
+                                base, update));
+            }
+        } else if (supportsDataTypesCast(base, update, allowExplicitCast)) {
+            if (DataTypes.getLength(base).isPresent() && 
DataTypes.getLength(update).isPresent()) {
+                // this will check and merge types which has a `length` 
attribute, like BinaryType,
+                // CharType, VarBinaryType, VarCharType.
+                if (allowExplicitCast
+                        || DataTypes.getLength(base).getAsInt()
+                                <= DataTypes.getLength(update).getAsInt()) {
+                    return update.copy(base0.isNullable());
+                } else {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Failed to merge the target type that has 
a smaller length: %s and %s",
+                                    base, update));
+                }
+            } else if (DataTypes.getPrecision(base).isPresent()
+                    && DataTypes.getPrecision(update).isPresent()) {
+                // this will check and merge types which has a `precision` 
attribute, like
+                // LocalZonedTimestampType, TimeType, TimestampType.
+                if (allowExplicitCast
+                        || DataTypes.getPrecision(base).getAsInt()
+                                <= DataTypes.getPrecision(update).getAsInt()) {
+                    return update.copy(base0.isNullable());
+                } else {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Failed to merge the target type that has 
a lower precision: %s and %s",
+                                    base, update));
+                }
+            } else {
+                return update.copy(base0.isNullable());
+            }
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Failed to merge data types %s and %s", 
base, update));
+        }
+    }
+
+    private static boolean supportsDataTypesCast(
+            DataType sourceType, DataType targetType, boolean 
allowExplicitCast) {
+        boolean canImplicitCast = 
DataTypeCasts.supportsImplicitCast(sourceType, targetType);
+        boolean canExplicitCast =
+                allowExplicitCast && 
DataTypeCasts.supportsExplicitCast(sourceType, targetType);
+        return canImplicitCast || canExplicitCast;
+    }
+
+    private static DataField assignIdForNewField(DataField field, 
AtomicInteger highestFieldId) {
+        DataType dataType = ReassignFieldId.reassign(field.type(), 
highestFieldId);
+        return new DataField(
+                highestFieldId.incrementAndGet(), field.name(), dataType, 
field.description());
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 4d9ba1115..15bb81e61 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -161,6 +161,10 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     @Override
     public FileStoreTable copy(Map<String, String> dynamicOptions) {
+        if (dynamicOptions == null || dynamicOptions.isEmpty()) {
+            return this;
+        }
+
         Map<String, String> options = tableSchema.options();
         // check option is not immutable
         dynamicOptions.forEach(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
new file mode 100644
index 000000000..ac3e33d10
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** UT for SchemaMergingUtils. */
+public class SchemaMergingUtilsTest {
+
+    @Test
+    public void testMergeTableSchemas() {
+        // Init the table schema
+        DataField a = new DataField(0, "a", new IntType());
+        DataField b = new DataField(1, "b", new DoubleType());
+        DataField c =
+                new DataField(
+                        2,
+                        "c",
+                        new MapType(new VarCharType(VarCharType.MAX_LENGTH), 
new BooleanType()));
+        DataField d = new DataField(3, "d", new 
VarCharType(VarCharType.MAX_LENGTH));
+        TableSchema current =
+                new TableSchema(
+                        0,
+                        Lists.newArrayList(a, b, c, d),
+                        3,
+                        Lists.newArrayList("d"),
+                        Lists.newArrayList("a", "d"),
+                        new HashMap<>(),
+                        "");
+
+        // fake the RowType of data
+        DataField f1 = new DataField(-1, "f1", new CharType(10));
+        DataField f2 = new DataField(-1, "f2", new DateType());
+        RowType fDataType = new RowType(Lists.newArrayList(f1, f2));
+        DataField f = new DataField(-1, "f", fDataType);
+        RowType t = new RowType(Lists.newArrayList(a, b, d, f));
+
+        TableSchema merged = SchemaMergingUtils.mergeSchemas(current, t, 
false);
+        Assertions.assertEquals(merged.id(), 1);
+        Assertions.assertEquals(merged.highestFieldId(), 6);
+        Assertions.assertArrayEquals(
+                merged.primaryKeys().toArray(new String[0]), new String[] 
{"a", "d"});
+        Assertions.assertArrayEquals(
+                merged.partitionKeys().toArray(new String[0]), new String[] 
{"d"});
+        List<DataField> fields = merged.fields();
+        Assertions.assertEquals(fields.size(), 5);
+        Assertions.assertTrue(fields.get(4).type() instanceof RowType);
+    }
+
+    @Test
+    public void testMergeSchemas() {
+        // This will test both `mergeSchemas` and `merge` methods.
+        // Init the source schema
+        DataField a = new DataField(0, "a", new IntType());
+        DataField b = new DataField(1, "b", new DoubleType());
+        DataField c =
+                new DataField(
+                        2,
+                        "c",
+                        new MapType(new VarCharType(VarCharType.MAX_LENGTH), 
new BooleanType()));
+        DataField d = new DataField(3, "d", new 
VarCharType(VarCharType.MAX_LENGTH));
+        RowType source = new RowType(Lists.newArrayList(a, b, c, d));
+        AtomicInteger highestFieldId = new AtomicInteger(3);
+
+        // Case 1: an additional field.
+        DataField e = new DataField(-1, "e", new DateType());
+        RowType t1 = new RowType(Lists.newArrayList(a, b, c, d, e));
+        RowType r1 = (RowType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false);
+        Assertions.assertEquals(highestFieldId.get(), 4);
+        Assertions.assertTrue(r1.isNullable());
+        Assertions.assertEquals(r1.getFieldCount(), 5);
+        Assertions.assertEquals(r1.getTypeAt(2), c.type());
+        // the id of DataField has assigned to an incremental num.
+        DataField expectedE = e.newId(4);
+        Assertions.assertEquals(r1.getFields().get(highestFieldId.get()), 
expectedE);
+
+        // Case 2: two missing fields.
+        RowType t2 = new RowType(Lists.newArrayList(a, c, e));
+        RowType r2 = SchemaMergingUtils.mergeSchemas(r1, t2, highestFieldId, 
false);
+        Assertions.assertEquals(highestFieldId.get(), 4);
+        Assertions.assertEquals(r2.getFieldCount(), 5);
+        Assertions.assertEquals(r2.getTypeAt(3), d.type());
+        Assertions.assertEquals(r2.getFields().get(highestFieldId.get()), 
expectedE);
+
+        // Case 3: an additional nested field and a missing field.
+        DataField f1 = new DataField(-1, "f1", new CharType(10));
+        DataField f2 = new DataField(-1, "f2", new IntType());
+        RowType fDataType = new RowType(Lists.newArrayList(f1, f2));
+        DataField f = new DataField(-1, "f", fDataType);
+        RowType t3 = new RowType(Lists.newArrayList(a, b, c, d, f));
+        RowType r3 = (RowType) SchemaMergingUtils.merge(r2, t3, 
highestFieldId, false);
+        Assertions.assertEquals(highestFieldId.get(), 7);
+        Assertions.assertEquals(r3.getFieldCount(), 6);
+        RowType expectedFDataType = new 
RowType(Lists.newArrayList(f1.newId(5), f2.newId(6)));
+        Assertions.assertEquals(r3.getTypeAt(5), expectedFDataType);
+        DataField expectedF = new DataField(7, "f", expectedFDataType);
+        Assertions.assertEquals(r3.getFields().get(5), expectedF);
+
+        // Case 4: an additional sub-field in a nested field.
+        DataField f3 = new DataField(-1, "f3", new TimestampType());
+        RowType newFDataType = new RowType(Lists.newArrayList(f1, f2, f3));
+        DataField newF = new DataField(-1, "f", newFDataType);
+        RowType t4 = new RowType(Lists.newArrayList(a, b, c, d, e, newF));
+        RowType r4 = SchemaMergingUtils.mergeSchemas(r3, t4, highestFieldId, 
false);
+        Assertions.assertEquals(highestFieldId.get(), 8);
+        Assertions.assertEquals(r4.getFieldCount(), 6);
+        RowType newExpectedFDataType =
+                new RowType(Lists.newArrayList(f1.newId(5), f2.newId(6), 
f3.newId(8)));
+        Assertions.assertEquals(r4.getTypeAt(5), newExpectedFDataType);
+
+        // Case 5: a field that isn't compatible with the existing one.
+        DataField newA = new DataField(-1, "a", new SmallIntType());
+        RowType t5 = new RowType(Lists.newArrayList(newA, b, c, d, e, newF));
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(r4, t5, highestFieldId, false));
+
+        // Case 6: all new-coming fields
+        DataField g = new DataField(-1, "g", new TimeType());
+        DataField h = new DataField(-1, "h", new TimeType());
+        RowType t6 = new RowType(Lists.newArrayList(g, h));
+        RowType r6 = SchemaMergingUtils.mergeSchemas(r4, t6, highestFieldId, 
false);
+        Assertions.assertEquals(highestFieldId.get(), 10);
+        Assertions.assertEquals(r6.getFieldCount(), 8);
+    }
+
+    @Test
+    public void testMergeArrayTypes() {
+        AtomicInteger highestFieldId = new AtomicInteger(1);
+
+        DataType source = new ArrayType(false, new IntType());
+
+        // the element types are same.
+        DataType t1 = new ArrayType(true, new IntType());
+        ArrayType r1 = (ArrayType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false);
+        Assertions.assertFalse(r1.isNullable());
+        Assertions.assertTrue(r1.getElementType() instanceof IntType);
+
+        // the element types aren't same, but can be evolved safety.
+        DataType t2 = new ArrayType(true, new BigIntType());
+        ArrayType r2 = (ArrayType) SchemaMergingUtils.merge(source, t2, 
highestFieldId, false);
+        Assertions.assertTrue(r2.getElementType() instanceof BigIntType);
+
+        // the element types aren't same, and can't be evolved safety.
+        DataType t3 = new ArrayType(true, new SmallIntType());
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(source, t3, highestFieldId, 
false));
+        // the value type of target's isn't same to the source's, but the 
source type can be cast to
+        // the target type explicitly.
+        ArrayType r3 = (ArrayType) SchemaMergingUtils.merge(source, t3, 
highestFieldId, true);
+        Assertions.assertTrue(r3.getElementType() instanceof SmallIntType);
+    }
+
+    @Test
+    public void testMergeMapTypes() {
+        AtomicInteger highestFieldId = new AtomicInteger(1);
+
+        DataType source = new MapType(new VarCharType(VarCharType.MAX_LENGTH), 
new IntType());
+
+        // both the key and value types are same to the source's.
+        DataType t1 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new 
IntType());
+        MapType r1 = (MapType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false);
+        Assertions.assertTrue(r1.isNullable());
+        Assertions.assertTrue(r1.getKeyType() instanceof VarCharType);
+        Assertions.assertTrue(r1.getValueType() instanceof IntType);
+
+        // the value type of target's isn't same to the source's, but can be 
evolved safety.
+        DataType t2 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new 
DoubleType());
+        MapType r2 = (MapType) SchemaMergingUtils.merge(source, t2, 
highestFieldId, false);
+        Assertions.assertTrue(r2.getKeyType() instanceof VarCharType);
+        Assertions.assertTrue(r2.getValueType() instanceof DoubleType);
+
+        // the value type of target's isn't same to the source's, and can't be 
evolved safety.
+        DataType t3 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new 
SmallIntType());
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(source, t3, highestFieldId, 
false));
+        // the value type of target's isn't same to the source's, but the 
source type can be cast to
+        // the target type explicitly.
+        MapType r3 = (MapType) SchemaMergingUtils.merge(source, t3, 
highestFieldId, true);
+        Assertions.assertTrue(r3.getKeyType() instanceof VarCharType);
+        Assertions.assertTrue(r3.getValueType() instanceof SmallIntType);
+    }
+
+    @Test
+    public void testMergeMultisetTypes() {
+        AtomicInteger highestFieldId = new AtomicInteger(1);
+
+        DataType source = new MultisetType(false, new IntType());
+
+        // the element types are same.
+        DataType t1 = new MultisetType(true, new IntType());
+        MultisetType r1 =
+                (MultisetType) SchemaMergingUtils.merge(source, t1, 
highestFieldId, false);
+        Assertions.assertFalse(r1.isNullable());
+        Assertions.assertTrue(r1.getElementType() instanceof IntType);
+
+        // the element types aren't same, but can be evolved safety.
+        DataType t2 = new MultisetType(true, new BigIntType());
+        MultisetType r2 =
+                (MultisetType) SchemaMergingUtils.merge(source, t2, 
highestFieldId, false);
+        Assertions.assertTrue(r2.getElementType() instanceof BigIntType);
+
+        // the element types aren't same, and can't be evolved safety.
+        DataType t3 = new MultisetType(true, new SmallIntType());
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(source, t3, highestFieldId, 
false));
+        // the value type of target's isn't same to the source's, but the 
source type can be cast to
+        // the target type explicitly.
+        MultisetType r3 = (MultisetType) SchemaMergingUtils.merge(source, t3, 
highestFieldId, true);
+        Assertions.assertTrue(r3.getElementType() instanceof SmallIntType);
+    }
+
+    @Test
+    public void testMergeDecimalTypes() {
+        AtomicInteger highestFieldId = new AtomicInteger(1);
+
+        DataType s1 = new DecimalType();
+        DataType t1 = new DecimalType(10, 0);
+        DecimalType r1 = (DecimalType) SchemaMergingUtils.merge(s1, t1, 
highestFieldId, false);
+        Assertions.assertTrue(r1.isNullable());
+        Assertions.assertEquals(r1.getPrecision(), 
DecimalType.DEFAULT_PRECISION);
+        Assertions.assertEquals(r1.getScale(), DecimalType.DEFAULT_SCALE);
+
+        DataType s2 = new DecimalType(5, 2);
+        DataType t2 = new DecimalType(7, 2);
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s2, t2, highestFieldId, false));
+
+        // DecimalType -> Other Numeric Type
+        DataType dcmSource = new DecimalType();
+        DataType iTarget = new IntType();
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(dcmSource, iTarget, 
highestFieldId, false));
+        // DecimalType -> Other Numeric Type with allowExplicitCast = true
+        DataType res = SchemaMergingUtils.merge(dcmSource, iTarget, 
highestFieldId, true);
+        Assertions.assertTrue(res instanceof IntType);
+    }
+
+    @Test
+    public void testMergeTypesWithLength() {
+        AtomicInteger highestFieldId = new AtomicInteger(1);
+
+        // BinaryType
+        DataType s1 = new BinaryType(10);
+        DataType t1 = new BinaryType(10);
+        BinaryType r1 = (BinaryType) SchemaMergingUtils.merge(s1, t1, 
highestFieldId, false);
+        Assertions.assertEquals(r1.getLength(), 10);
+
+        DataType s2 = new BinaryType(2);
+        DataType t2 = new BinaryType();
+        // smaller length
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s2, t2, highestFieldId, false));
+        // smaller length  with allowExplicitCast = true
+        BinaryType r2 = (BinaryType) SchemaMergingUtils.merge(s2, t2, 
highestFieldId, true);
+        Assertions.assertEquals(r2.getLength(), BinaryType.DEFAULT_LENGTH);
+        // bigger length
+        DataType t3 = new BinaryType(5);
+        BinaryType r3 = (BinaryType) SchemaMergingUtils.merge(s2, t3, 
highestFieldId, false);
+        Assertions.assertEquals(r3.getLength(), 5);
+
+        // VarCharType
+        DataType s4 = new VarCharType();
+        DataType t4 = new VarCharType(1);
+        VarCharType r4 = (VarCharType) SchemaMergingUtils.merge(s4, t4, 
highestFieldId, false);
+        Assertions.assertEquals(r4.getLength(), VarCharType.DEFAULT_LENGTH);
+
+        DataType s5 = new VarCharType(2);
+        DataType t5 = new VarCharType();
+        // smaller length
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s5, t5, highestFieldId, false));
+        // smaller length  with allowExplicitCast = true
+        VarCharType r5 = (VarCharType) SchemaMergingUtils.merge(s5, t5, 
highestFieldId, true);
+        Assertions.assertEquals(r5.getLength(), VarCharType.DEFAULT_LENGTH);
+        // bigger length
+        DataType t6 = new VarCharType(5);
+        VarCharType r6 = (VarCharType) SchemaMergingUtils.merge(s5, t6, 
highestFieldId, false);
+        Assertions.assertEquals(r6.getLength(), 5);
+
+        // CharType
+        DataType s7 = new CharType();
+        DataType t7 = new CharType(1);
+        CharType r7 = (CharType) SchemaMergingUtils.merge(s7, t7, 
highestFieldId, false);
+        Assertions.assertEquals(r7.getLength(), CharType.DEFAULT_LENGTH);
+
+        DataType s8 = new CharType(2);
+        DataType t8 = new CharType();
+        // smaller length
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s8, t8, highestFieldId, false));
+        // smaller length  with allowExplicitCast = true
+        CharType r8 = (CharType) SchemaMergingUtils.merge(s8, t8, 
highestFieldId, true);
+        Assertions.assertEquals(r8.getLength(), CharType.DEFAULT_LENGTH);
+        // bigger length
+        DataType t9 = new CharType(5);
+        CharType r9 = (CharType) SchemaMergingUtils.merge(s8, t9, 
highestFieldId, false);
+        Assertions.assertEquals(r9.getLength(), 5);
+
+        // VarBinaryType
+        DataType s10 = new VarBinaryType();
+        DataType t10 = new VarBinaryType(1);
+        VarBinaryType r10 =
+                (VarBinaryType) SchemaMergingUtils.merge(s10, t10, 
highestFieldId, false);
+        Assertions.assertEquals(r10.getLength(), VarBinaryType.DEFAULT_LENGTH);
+
+        DataType s11 = new VarBinaryType(2);
+        DataType t11 = new VarBinaryType();
+        // smaller length
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s11, t11, highestFieldId, 
false));
+        // smaller length  with allowExplicitCast = true
+        VarBinaryType r11 =
+                (VarBinaryType) SchemaMergingUtils.merge(s11, t11, 
highestFieldId, true);
+        Assertions.assertEquals(r11.getLength(), VarBinaryType.DEFAULT_LENGTH);
+        // bigger length
+        DataType t12 = new VarBinaryType(5);
+        VarBinaryType r12 =
+                (VarBinaryType) SchemaMergingUtils.merge(s11, t12, 
highestFieldId, false);
+        Assertions.assertEquals(r12.getLength(), 5);
+
+        // CharType -> VarCharType
+        DataType s13 = new CharType();
+        DataType t13 = new VarCharType(10);
+        VarCharType r13 = (VarCharType) SchemaMergingUtils.merge(s13, t13, 
highestFieldId, false);
+        Assertions.assertEquals(r13.getLength(), 10);
+
+        // VarCharType ->CharType
+        DataType s14 = new VarCharType(10);
+        DataType t14 = new CharType();
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s14, t14, highestFieldId, 
false));
+        CharType r14 = (CharType) SchemaMergingUtils.merge(s14, t14, 
highestFieldId, true);
+        Assertions.assertEquals(r14.getLength(), CharType.DEFAULT_LENGTH);
+
+        // BinaryType -> VarBinaryType
+        DataType s15 = new BinaryType();
+        DataType t15 = new VarBinaryType(10);
+        VarBinaryType r15 =
+                (VarBinaryType) SchemaMergingUtils.merge(s15, t15, 
highestFieldId, false);
+        Assertions.assertEquals(r15.getLength(), 10);
+
+        // VarBinaryType -> BinaryType
+        DataType s16 = new VarBinaryType(10);
+        DataType t16 = new BinaryType();
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s16, t16, highestFieldId, 
false));
+        BinaryType r16 = (BinaryType) SchemaMergingUtils.merge(s16, t16, 
highestFieldId, true);
+        Assertions.assertEquals(r16.getLength(), BinaryType.DEFAULT_LENGTH);
+
+        // VarCharType -> VarBinaryType
+        DataType s17 = new VarCharType(10);
+        DataType t17 = new VarBinaryType();
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s17, t17, highestFieldId, 
false));
+        VarBinaryType r17 =
+                (VarBinaryType) SchemaMergingUtils.merge(s17, t17, 
highestFieldId, true);
+        Assertions.assertEquals(r17.getLength(), VarBinaryType.DEFAULT_LENGTH);
+    }
+
+    @Test
+    public void testMergeTypesWithPrecision() {
+        AtomicInteger highestFieldId = new AtomicInteger(1);
+
+        // LocalZonedTimestampType
+        DataType s1 = new LocalZonedTimestampType();
+        DataType t1 = new LocalZonedTimestampType();
+        LocalZonedTimestampType r1 =
+                (LocalZonedTimestampType) SchemaMergingUtils.merge(s1, t1, 
highestFieldId, false);
+        Assertions.assertTrue(r1.isNullable());
+        Assertions.assertEquals(r1.getPrecision(), 
LocalZonedTimestampType.DEFAULT_PRECISION);
+
+        // lower precision
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () ->
+                        SchemaMergingUtils.merge(
+                                s1, new LocalZonedTimestampType(3), 
highestFieldId, false));
+        // higher precision
+        DataType t2 = new LocalZonedTimestampType(6);
+        LocalZonedTimestampType r2 =
+                (LocalZonedTimestampType) SchemaMergingUtils.merge(s1, t2, 
highestFieldId, false);
+        Assertions.assertEquals(r2.getPrecision(), 6);
+
+        // LocalZonedTimestampType -> TimeType
+        DataType s3 = new LocalZonedTimestampType();
+        DataType t3 = new TimeType(6);
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s3, t3, highestFieldId, false));
+
+        // LocalZonedTimestampType -> TimestampType
+        DataType s4 = new LocalZonedTimestampType();
+        DataType t4 = new TimestampType();
+        TimestampType r4 = (TimestampType) SchemaMergingUtils.merge(s4, t4, 
highestFieldId, false);
+        Assertions.assertEquals(r4.getPrecision(), 
TimestampType.DEFAULT_PRECISION);
+
+        // TimestampType.
+        DataType s5 = new TimestampType();
+        DataType t5 = new TimestampType();
+        TimestampType r5 = (TimestampType) SchemaMergingUtils.merge(s5, t5, 
highestFieldId, false);
+        Assertions.assertTrue(r5.isNullable());
+        Assertions.assertEquals(r5.getPrecision(), 
TimestampType.DEFAULT_PRECISION);
+
+        // lower precision
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s5, new TimestampType(3), 
highestFieldId, false));
+        // higher precision
+        DataType t6 = new TimestampType(9);
+        TimestampType r6 = (TimestampType) SchemaMergingUtils.merge(s5, t6, 
highestFieldId, false);
+        Assertions.assertEquals(r6.getPrecision(), 9);
+
+        // TimestampType -> LocalZonedTimestampType
+        DataType s7 = new TimestampType();
+        DataType t7 = new LocalZonedTimestampType();
+        LocalZonedTimestampType r7 =
+                (LocalZonedTimestampType) SchemaMergingUtils.merge(s7, t7, 
highestFieldId, false);
+        Assertions.assertEquals(r7.getPrecision(), 
TimestampType.DEFAULT_PRECISION);
+
+        // TimestampType -> TimestampType
+        DataType s8 = new TimestampType();
+        DataType t8 = new TimeType(6);
+        TimeType r8 = (TimeType) SchemaMergingUtils.merge(s8, t8, 
highestFieldId, false);
+        Assertions.assertEquals(r8.getPrecision(), 
TimestampType.DEFAULT_PRECISION);
+
+        // TimeType.
+        DataType s9 = new TimeType();
+        DataType t9 = new TimeType();
+        TimeType r9 = (TimeType) SchemaMergingUtils.merge(s9, t9, 
highestFieldId, false);
+        Assertions.assertTrue(r9.isNullable());
+        Assertions.assertEquals(r9.getPrecision(), TimeType.DEFAULT_PRECISION);
+
+        // lower precision
+        DataType s10 = new TimeType(6);
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s10, new TimeType(3), 
highestFieldId, false));
+        // higher precision
+        DataType t10 = new TimeType(9);
+        TimeType r10 = (TimeType) SchemaMergingUtils.merge(s9, t10, 
highestFieldId, false);
+        Assertions.assertEquals(r10.getPrecision(), 9);
+
+        // TimeType -> LocalZonedTimestampType
+        DataType s11 = new TimeType();
+        DataType t11 = new LocalZonedTimestampType();
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s11, t11, highestFieldId, 
false));
+        // TimeType -> LocalZonedTimestampType with allowExplicitCast = true
+        LocalZonedTimestampType r11 =
+                (LocalZonedTimestampType) SchemaMergingUtils.merge(s11, t11, 
highestFieldId, true);
+        Assertions.assertEquals(r11.getPrecision(), 
LocalZonedTimestampType.DEFAULT_PRECISION);
+
+        // TimeType -> TimestampType
+        DataType s12 = new TimeType();
+        DataType t12 = new TimestampType();
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(s12, t12, highestFieldId, 
false));
+        // TimeType -> TimestampType with allowExplicitCast = true
+        TimestampType r12 =
+                (TimestampType) SchemaMergingUtils.merge(s12, t12, 
highestFieldId, true);
+        Assertions.assertEquals(r12.getPrecision(), 
TimestampType.DEFAULT_PRECISION);
+    }
+
+    @Test
+    public void testMergePrimitiveTypes() {
+        AtomicInteger highestFieldId = new AtomicInteger(1);
+
+        // declare the primitive source and target types
+        DataType bSource = new BooleanType();
+        DataType bTarget = new BooleanType();
+        DataType tiSource = new TinyIntType();
+        DataType tiTarget = new TinyIntType();
+        DataType siSource = new SmallIntType();
+        DataType siTarget = new SmallIntType();
+        DataType iSource = new IntType();
+        DataType iTarget = new IntType();
+        DataType biSource = new BigIntType();
+        DataType biTarget = new BigIntType();
+        DataType fSource = new FloatType();
+        DataType fTarget = new FloatType();
+        DataType dSource = new DoubleType();
+        DataType dTarget = new DoubleType();
+        DataType dcmTarget = new DecimalType();
+
+        // BooleanType
+        DataType btRes1 = SchemaMergingUtils.merge(bSource, bTarget, 
highestFieldId, false);
+        Assertions.assertTrue(btRes1 instanceof BooleanType);
+        // BooleanType -> Numeric Type
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(bSource, tiTarget, 
highestFieldId, false));
+        // BooleanType -> Numeric Type with allowExplicitCast = true
+        DataType btRes2 = SchemaMergingUtils.merge(bSource, tiTarget, 
highestFieldId, true);
+        Assertions.assertTrue(btRes2 instanceof TinyIntType);
+
+        // TinyIntType
+        DataType tiRes1 = SchemaMergingUtils.merge(tiSource, tiTarget, 
highestFieldId, false);
+        Assertions.assertTrue(tiRes1 instanceof TinyIntType);
+        // TinyIntType -> SmallIntType
+        DataType tiRes2 = SchemaMergingUtils.merge(tiSource, siTarget, 
highestFieldId, false);
+        Assertions.assertTrue(tiRes2 instanceof SmallIntType);
+        // TinyIntType -> IntType
+        DataType tiRes3 = SchemaMergingUtils.merge(tiSource, iTarget, 
highestFieldId, false);
+        Assertions.assertTrue(tiRes3 instanceof IntType);
+        // TinyIntType -> BigIntType
+        DataType tiRes4 = SchemaMergingUtils.merge(tiSource, biTarget, 
highestFieldId, false);
+        Assertions.assertTrue(tiRes4 instanceof BigIntType);
+        // TinyIntType -> FloatType
+        DataType tiRes5 = SchemaMergingUtils.merge(tiSource, fTarget, 
highestFieldId, false);
+        Assertions.assertTrue(tiRes5 instanceof FloatType);
+        // TinyIntType -> DoubleType
+        DataType tiRes6 = SchemaMergingUtils.merge(tiSource, dTarget, 
highestFieldId, false);
+        Assertions.assertTrue(tiRes6 instanceof DoubleType);
+        // TinyIntType -> DecimalType
+        DataType tiRes7 = SchemaMergingUtils.merge(tiSource, dcmTarget, 
highestFieldId, false);
+        Assertions.assertTrue(tiRes7 instanceof DecimalType);
+        // TinyIntType -> BooleanType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(tiSource, bTarget, 
highestFieldId, false));
+        // TinyIntType -> BooleanType with allowExplicitCast = true
+        DataType tiRes8 = SchemaMergingUtils.merge(tiSource, bTarget, 
highestFieldId, true);
+        Assertions.assertTrue(tiRes8 instanceof BooleanType);
+
+        // SmallIntType
+        DataType siRes1 = SchemaMergingUtils.merge(siSource, siTarget, 
highestFieldId, false);
+        Assertions.assertTrue(siRes1 instanceof SmallIntType);
+        // SmallIntType -> TinyIntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(siSource, tiTarget, 
highestFieldId, false));
+        // SmallIntType -> TinyIntType with allowExplicitCast = true
+        DataType siRes2 = SchemaMergingUtils.merge(siSource, tiTarget, 
highestFieldId, true);
+        Assertions.assertTrue(siRes2 instanceof TinyIntType);
+        // SmallIntType -> IntType
+        DataType siRes3 = SchemaMergingUtils.merge(siSource, iTarget, 
highestFieldId, false);
+        Assertions.assertTrue(siRes3 instanceof IntType);
+        // SmallIntType -> BigIntType
+        DataType siRes4 = SchemaMergingUtils.merge(siSource, biTarget, 
highestFieldId, false);
+        Assertions.assertTrue(siRes4 instanceof BigIntType);
+        // SmallIntType -> FloatType
+        DataType siRes5 = SchemaMergingUtils.merge(siSource, fTarget, 
highestFieldId, false);
+        Assertions.assertTrue(siRes5 instanceof FloatType);
+        // SmallIntType -> DoubleType
+        DataType siRes6 = SchemaMergingUtils.merge(siSource, dTarget, 
highestFieldId, false);
+        Assertions.assertTrue(siRes6 instanceof DoubleType);
+        // SmallIntType -> DecimalType
+        DataType siRes7 = SchemaMergingUtils.merge(siSource, dcmTarget, 
highestFieldId, false);
+        Assertions.assertTrue(siRes7 instanceof DecimalType);
+
+        // IntType
+        DataType iRes1 = SchemaMergingUtils.merge(iSource, iTarget, 
highestFieldId, false);
+        Assertions.assertTrue(iRes1 instanceof IntType);
+        // IntType -> TinyIntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(iSource, tiTarget, 
highestFieldId, false));
+        // IntType -> TinyIntType with allowExplicitCast = true
+        DataType iRes2 = SchemaMergingUtils.merge(iSource, tiTarget, 
highestFieldId, true);
+        Assertions.assertTrue(iRes2 instanceof TinyIntType);
+        // IntType -> SmallIntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(iSource, siTarget, 
highestFieldId, false));
+        // IntType -> SmallIntType with allowExplicitCast = true
+        DataType iRes3 = SchemaMergingUtils.merge(iSource, siTarget, 
highestFieldId, true);
+        Assertions.assertTrue(iRes3 instanceof SmallIntType);
+        // IntType -> BigIntType
+        DataType iRes4 = SchemaMergingUtils.merge(iSource, biTarget, 
highestFieldId, false);
+        Assertions.assertTrue(iRes4 instanceof BigIntType);
+        // IntType -> FloatType
+        DataType iRes5 = SchemaMergingUtils.merge(iSource, fTarget, 
highestFieldId, false);
+        Assertions.assertTrue(iRes5 instanceof FloatType);
+        // IntType -> DoubleType
+        DataType iRes6 = SchemaMergingUtils.merge(iSource, dTarget, 
highestFieldId, false);
+        Assertions.assertTrue(iRes6 instanceof DoubleType);
+        // IntType -> DecimalType
+        DataType iRes7 = SchemaMergingUtils.merge(iSource, dcmTarget, 
highestFieldId, false);
+        Assertions.assertTrue(iRes7 instanceof DecimalType);
+
+        // BigIntType
+        DataType biRes1 = SchemaMergingUtils.merge(biSource, biTarget, 
highestFieldId, false);
+        Assertions.assertTrue(biRes1 instanceof BigIntType);
+        // BigIntType -> TinyIntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(biSource, tiTarget, 
highestFieldId, false));
+        // BigIntType -> TinyIntType with allowExplicitCast = true
+        DataType biRes2 = SchemaMergingUtils.merge(biSource, tiTarget, 
highestFieldId, true);
+        Assertions.assertTrue(biRes2 instanceof TinyIntType);
+        // BigIntType -> SmallIntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(biSource, siTarget, 
highestFieldId, false));
+        // BigIntType -> SmallIntType with allowExplicitCast = true
+        DataType biRes3 = SchemaMergingUtils.merge(biSource, siTarget, 
highestFieldId, true);
+        Assertions.assertTrue(biRes3 instanceof SmallIntType);
+        // BigIntType -> IntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(biSource, iTarget, 
highestFieldId, false));
+        // BigIntType -> IntType with allowExplicitCast = true
+        DataType biRes4 = SchemaMergingUtils.merge(biSource, iTarget, 
highestFieldId, true);
+        Assertions.assertTrue(biRes4 instanceof IntType);
+        // BigIntType -> FloatType
+        DataType biRes5 = SchemaMergingUtils.merge(biSource, fTarget, 
highestFieldId, false);
+        Assertions.assertTrue(biRes5 instanceof FloatType);
+        // BigIntType -> DoubleType
+        DataType biRes6 = SchemaMergingUtils.merge(biSource, dTarget, 
highestFieldId, false);
+        Assertions.assertTrue(biRes6 instanceof DoubleType);
+        // BigIntType -> DecimalType
+        DataType biRes7 = SchemaMergingUtils.merge(biSource, dcmTarget, 
highestFieldId, false);
+        Assertions.assertTrue(biRes7 instanceof DecimalType);
+
+        // FloatType
+        DataType fRes1 = SchemaMergingUtils.merge(fSource, fTarget, 
highestFieldId, false);
+        Assertions.assertTrue(fRes1 instanceof FloatType);
+        // FloatType -> TinyIntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(fSource, tiTarget, 
highestFieldId, false));
+        // FloatType -> TinyIntType with allowExplicitCast = true
+        DataType fRes2 = SchemaMergingUtils.merge(fSource, tiTarget, 
highestFieldId, true);
+        Assertions.assertTrue(fRes2 instanceof TinyIntType);
+        // FloatType -> SmallIntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(fSource, siTarget, 
highestFieldId, false));
+        // FloatType -> IntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(fSource, iTarget, 
highestFieldId, false));
+        // FloatType -> IntType with allowExplicitCast = true
+        DataType fRes4 = SchemaMergingUtils.merge(fSource, iTarget, 
highestFieldId, true);
+        Assertions.assertTrue(fRes4 instanceof IntType);
+        // FloatType -> BigIntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(fSource, biTarget, 
highestFieldId, false));
+        // FloatType -> DoubleType
+        DataType fRes6 = SchemaMergingUtils.merge(fSource, dTarget, 
highestFieldId, false);
+        Assertions.assertTrue(fRes6 instanceof DoubleType);
+        // FloatType -> DecimalType
+        DataType fRes7 = SchemaMergingUtils.merge(fSource, dcmTarget, 
highestFieldId, false);
+        Assertions.assertTrue(fRes7 instanceof DecimalType);
+
+        // DoubleType
+        DataType dRes1 = SchemaMergingUtils.merge(dSource, dTarget, 
highestFieldId, false);
+        Assertions.assertTrue(dRes1 instanceof DoubleType);
+        // DoubleType -> TinyIntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(dSource, tiTarget, 
highestFieldId, false));
+        // DoubleType -> SmallIntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(dSource, siTarget, 
highestFieldId, false));
+        // DoubleType -> SmallIntType with allowExplicitCast = true
+        DataType dRes3 = SchemaMergingUtils.merge(dSource, siTarget, 
highestFieldId, true);
+        Assertions.assertTrue(dRes3 instanceof SmallIntType);
+        // DoubleType -> IntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(dSource, iTarget, 
highestFieldId, false));
+        // DoubleType -> BigIntType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(dSource, biTarget, 
highestFieldId, false));
+        // DoubleType -> BigIntType with allowExplicitCast = true
+        DataType dRes5 = SchemaMergingUtils.merge(dSource, biTarget, 
highestFieldId, true);
+        Assertions.assertTrue(dRes5 instanceof BigIntType);
+        // DoubleType -> FloatType
+        Assertions.assertThrows(
+                UnsupportedOperationException.class,
+                () -> SchemaMergingUtils.merge(dSource, fTarget, 
highestFieldId, false));
+        // DoubleType -> DecimalType
+        DataType dRes7 = SchemaMergingUtils.merge(dSource, dcmTarget, 
highestFieldId, false);
+        Assertions.assertTrue(dRes7 instanceof DecimalType);
+    }
+}
diff --git a/paimon-docs/pom.xml b/paimon-docs/pom.xml
index 771ff1a3e..10965dbe2 100644
--- a/paimon-docs/pom.xml
+++ b/paimon-docs/pom.xml
@@ -50,6 +50,13 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-spark-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.paimon</groupId>
             <artifactId>paimon-test-utils</artifactId>
diff --git 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index b1c53b538..83c883f9c 100644
--- 
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++ 
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -79,7 +79,9 @@ public class ConfigOptionsDocGenerator {
                 new OptionsClassLocation(
                         "paimon-flink/paimon-flink-common", 
"org.apache.paimon.flink.kafka"),
                 new OptionsClassLocation(
-                        "paimon-hive/paimon-hive-catalog", 
"org.apache.paimon.hive")
+                        "paimon-hive/paimon-hive-catalog", 
"org.apache.paimon.hive"),
+                new OptionsClassLocation(
+                        "paimon-spark/paimon-spark-common", 
"org.apache.paimon.spark")
             };
     static final String DEFAULT_PATH_PREFIX = "src/main/java";
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
new file mode 100644
index 000000000..54950fdd9
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.options.ConfigOption;
+
+import static org.apache.paimon.options.ConfigOptions.key;
+
+/** Options for spark connector. */
+public class SparkConnectorOptions {
+    public static final ConfigOption<Boolean> MERGE_SCHEMA =
+            key("write.merge-schema")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, merge the data schema and the table 
schema automatically before write data.");
+
+    public static final ConfigOption<Boolean> EXPLICIT_CAST =
+            key("write.merge-schema.explicit-cast")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, allow to merge data types if the two 
types meet the rules for explicit casting.");
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index be13803f0..2cd64c4bc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
@@ -96,7 +97,7 @@ public class SparkTable
     @Override
     public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
         try {
-            return new SparkWriteBuilder((FileStoreTable) table);
+            return new SparkWriteBuilder((FileStoreTable) table, 
Options.fromMap(info.options()));
         } catch (Exception e) {
             throw new RuntimeException("Only FileStoreTable can be written.");
         }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index ae940c469..d73bb78df 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -67,7 +67,8 @@ class SparkSource
       parameters: Map[String, String],
       data: DataFrame): BaseRelation = {
     val table = loadTable(parameters.asJava)
-    WriteIntoPaimonTable(table, SaveMode.transform(mode), 
data).run(sqlContext.sparkSession)
+    WriteIntoPaimonTable(table, SaveMode.transform(mode), data, 
Options.fromMap(parameters.asJava))
+      .run(sqlContext.sparkSession)
     SparkSource.toBaseRelation(table, sqlContext)
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
index 44b017414..60549b073 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
@@ -17,6 +17,7 @@
  */
 package org.apache.paimon.spark
 
+import org.apache.paimon.options.Options
 import org.apache.paimon.spark.commands.WriteIntoPaimonTable
 import org.apache.paimon.table.FileStoreTable
 
@@ -24,13 +25,13 @@ import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.connector.write.V1Write
 import org.apache.spark.sql.sources.InsertableRelation
 
-/** Spark {@link V1Write}, it is required to use v1 write for grouping by 
bucket. */
-class SparkWrite(val table: FileStoreTable, saveMode: SaveMode) extends 
V1Write {
+/** Spark [[V1Write]], it is required to use v1 write for grouping by bucket. 
*/
+class SparkWrite(val table: FileStoreTable, saveMode: SaveMode, options: 
Options) extends V1Write {
 
   override def toInsertableRelation: InsertableRelation = {
     (data: DataFrame, overwrite: Boolean) =>
       {
-        WriteIntoPaimonTable(table, saveMode, data).run(data.sparkSession)
+        WriteIntoPaimonTable(table, saveMode, data, 
options).run(data.sparkSession)
       }
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
index 5cf429af1..f515cb33e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
@@ -17,16 +17,19 @@
  */
 package org.apache.paimon.spark
 
+import org.apache.paimon.options.Options
 import org.apache.paimon.table.FileStoreTable
 
-import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite, 
SupportsOverwrite, WriteBuilder}
+import org.apache.spark.sql.connector.write.{SupportsOverwrite, WriteBuilder}
 import org.apache.spark.sql.sources.{And, Filter}
 
-private class SparkWriteBuilder(table: FileStoreTable) extends WriteBuilder 
with SupportsOverwrite {
+private class SparkWriteBuilder(table: FileStoreTable, options: Options)
+  extends WriteBuilder
+  with SupportsOverwrite {
 
   private var saveMode: SaveMode = InsertInto
 
-  override def build = new SparkWrite(table, saveMode)
+  override def build = new SparkWrite(table, saveMode, options)
 
   override def overwrite(filters: Array[Filter]): WriteBuilder = {
     val conjunctiveFilters = if (filters.nonEmpty) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 9e8c964b1..9738c5c83 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -17,14 +17,12 @@
  */
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.predicate.PredicateBuilder
 import org.apache.paimon.spark.SparkFilterConverter
-import org.apache.paimon.table.{BucketMode, FileStoreTable, Table}
+import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageSerializer}
 import org.apache.paimon.types.RowType
 
-import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter, 
Not, Or}
+import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
 
 import java.io.IOException
 
@@ -33,9 +31,9 @@ trait PaimonCommand {
 
   val BUCKET_COL = "_bucket_"
 
-  def getTable: Table
+  def table: FileStoreTable
 
-  lazy val bucketMode: BucketMode = getTable match {
+  lazy val bucketMode: BucketMode = table match {
     case fileStoreTable: FileStoreTable =>
       fileStoreTable.bucketMode
     case _ =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
new file mode 100644
index 000000000..8a8415d64
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.schema.{SchemaMergingUtils, TableSchema}
+import org.apache.paimon.spark.SparkTypeUtils
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.RowType
+
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+trait SchemaHelper {
+
+  val originTable: FileStoreTable
+
+  protected var newTable: Option[FileStoreTable] = None
+
+  def table: FileStoreTable = newTable.getOrElse(originTable)
+
+  def tableSchema: TableSchema = table.schema
+
+  def mergeAndCommitSchema(dataSchema: StructType, allowExplicitCast: 
Boolean): Unit = {
+    val dataRowType = 
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
+    if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
+      newTable = Some(table.copyWithLatestSchema())
+    }
+  }
+
+  def updateTableWithOptions(options: Map[String, String]): Unit = {
+    newTable = Some(table.copy(options.asJava))
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 6ec4987d4..4324a797f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -20,13 +20,15 @@ package org.apache.paimon.spark.commands
 import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
 import org.apache.paimon.data.BinaryRow
 import org.apache.paimon.index.PartitionIndex
-import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite, 
SaveMode, SparkRow}
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite, 
SaveMode, SparkConnectorOptions, SparkRow}
 import org.apache.paimon.spark.SparkUtils.createIOManager
-import org.apache.paimon.table.{BucketMode, FileStoreTable, Table}
+import org.apache.paimon.table.{BucketMode, FileStoreTable}
 import org.apache.paimon.table.sink.{BatchWriteBuilder, 
CommitMessageSerializer, DynamicBucketRow, RowPartitionKeyExtractor}
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -39,29 +41,35 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 /** Used to write a [[DataFrame]] into a paimon table. */
-case class WriteIntoPaimonTable(_table: FileStoreTable, saveMode: SaveMode, 
data: DataFrame)
+case class WriteIntoPaimonTable(
+    override val originTable: FileStoreTable,
+    saveMode: SaveMode,
+    data: DataFrame,
+    options: Options)
   extends RunnableCommand
-  with PaimonCommand {
+  with PaimonCommand
+  with SchemaHelper
+  with Logging {
 
   import WriteIntoPaimonTable._
 
-  private var table = _table
-
-  private lazy val tableSchema = table.schema()
-
-  private lazy val rowType = table.rowType()
-
-  private lazy val writeBuilder: BatchWriteBuilder = 
table.newBatchWriteBuilder()
-
   private lazy val serializer = new CommitMessageSerializer
 
+  private lazy val mergeSchema = 
options.get(SparkConnectorOptions.MERGE_SCHEMA)
+
+  /** \1. 2. */
   override def run(sparkSession: SparkSession): Seq[Row] = {
     import sparkSession.implicits._
 
+    if (mergeSchema) {
+      val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
+      mergeAndCommitSchema(data.schema, allowExplicitCast)
+    }
+
     val (dynamicPartitionOverwriteMode, overwritePartition) = parseSaveMode()
     // use the extra options to rebuild the table object
-    table = table.copy(
-      Map(DYNAMIC_PARTITION_OVERWRITE.key() -> 
dynamicPartitionOverwriteMode.toString).asJava)
+    updateTableWithOptions(
+      Map(DYNAMIC_PARTITION_OVERWRITE.key -> 
dynamicPartitionOverwriteMode.toString))
 
     val primaryKeyCols = tableSchema.trimmedPrimaryKeys().asScala.map(col)
     val partitionCols = tableSchema.partitionKeys().asScala.map(col)
@@ -80,6 +88,9 @@ case class WriteIntoPaimonTable(_table: FileStoreTable, 
saveMode: SaveMode, data
       ds.toDF().repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*)
     }
 
+    val rowType = table.rowType()
+    val writeBuilder = table.newBatchWriteBuilder()
+
     val df =
       bucketMode match {
         case BucketMode.DYNAMIC =>
@@ -153,7 +164,7 @@ case class WriteIntoPaimonTable(_table: FileStoreTable, 
saveMode: SaveMode, data
         } else if (isTruncate(filter.get)) {
           Map.empty[String, String]
         } else {
-          convertFilterToMap(filter.get, tableSchema.logicalPartitionType())
+          convertFilterToMap(filter.get, table.schema.logicalPartitionType())
         }
       case DynamicOverWrite =>
         dynamicPartitionOverwriteMode = true
@@ -167,7 +178,6 @@ case class WriteIntoPaimonTable(_table: FileStoreTable, 
saveMode: SaveMode, data
   override def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]): 
LogicalPlan =
     this.asInstanceOf[WriteIntoPaimonTable]
 
-  override def getTable: Table = table
 }
 
 object WriteIntoPaimonTable {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index ae95e505d..1bf21a8f3 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -22,6 +22,8 @@ import org.apache.paimon.WriteMode._
 import org.apache.spark.sql.Row
 import org.scalactic.source.Position
 
+import java.sql.Date
+
 class DataFrameWriteTest extends PaimonSparkTestBase {
 
   writeModes.foreach {
@@ -71,4 +73,201 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
           }
       }
   }
+
+  writeModes.foreach {
+    writeMode =>
+      bucketModes.foreach {
+        bucket =>
+          test(s"Schema evolution: write data into Paimon: $writeMode, bucket: 
$bucket") {
+            val _spark = spark
+            import _spark.implicits._
+
+            val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+              "'primary-key'='a',"
+            } else {
+              ""
+            }
+
+            spark.sql(
+              s"""
+                 |CREATE TABLE T (a INT, b STRING)
+                 |TBLPROPERTIES ($primaryKeysProp 
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+                 |""".stripMargin)
+
+            val paimonTable = loadTable("T")
+            val location = paimonTable.location().getPath
+
+            val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b")
+            df1.write.format("paimon").mode("append").save(location)
+            checkAnswer(
+              spark.sql("SELECT * FROM T ORDER BY a, b"),
+              Row(1, "a") :: Row(2, "b") :: Nil)
+
+            // Case 1: two additional fields
+            val df2 = Seq((1, "a2", 123L, Map("k" -> 11.1)), (3, "c", 345L, 
Map("k" -> 33.3)))
+              .toDF("a", "b", "c", "d")
+            df2.write
+              .format("paimon")
+              .mode("append")
+              .option("write.merge-schema", "true")
+              .save(location)
+            val expected2 = if (writeMode == CHANGE_LOG) {
+              Row(1, "a2", 123L, Map("k" -> 11.1)) ::
+                Row(2, "b", null, null) :: Row(3, "c", 345L, Map("k" -> 33.3)) 
:: Nil
+            } else {
+              Row(1, "a", null, null) :: Row(1, "a2", 123L, Map("k" -> 11.1)) 
:: Row(
+                2,
+                "b",
+                null,
+                null) :: Row(3, "c", 345L, Map("k" -> 33.3)) :: Nil
+            }
+            checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected2)
+
+            // Case 2: two fields with the evolved types: Int -> Long, Long -> 
Decimal
+            val df3 = Seq(
+              (2L, "b2", BigDecimal.decimal(234), Map("k" -> 22.2)),
+              (4L, "d", BigDecimal.decimal(456), Map("k" -> 44.4))).toDF("a", 
"b", "c", "d")
+            df3.write
+              .format("paimon")
+              .mode("append")
+              .option("write.merge-schema", "true")
+              .save(location)
+            val expected3 = if (writeMode == CHANGE_LOG) {
+              Row(1L, "a2", BigDecimal.decimal(123), Map("k" -> 11.1)) :: Row(
+                2L,
+                "b2",
+                BigDecimal.decimal(234),
+                Map("k" -> 22.2)) :: Row(3L, "c", BigDecimal.decimal(345), 
Map("k" -> 33.3)) :: Row(
+                4L,
+                "d",
+                BigDecimal.decimal(456),
+                Map("k" -> 44.4)) :: Nil
+            } else {
+              Row(1L, "a", null, null) :: Row(
+                1L,
+                "a2",
+                BigDecimal.decimal(123),
+                Map("k" -> 11.1)) :: Row(2L, "b", null, null) :: Row(
+                2L,
+                "b2",
+                BigDecimal.decimal(234),
+                Map("k" -> 22.2)) :: Row(3L, "c", BigDecimal.decimal(345), 
Map("k" -> 33.3)) :: Row(
+                4L,
+                "d",
+                BigDecimal.decimal(456),
+                Map("k" -> 44.4)) :: Nil
+            }
+            checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected3)
+
+          }
+      }
+  }
+
+  writeModes.foreach {
+    writeMode =>
+      bucketModes.foreach {
+        bucket =>
+          test(
+            s"Schema evolution: write data into Paimon with allowExplicitCast 
= true: $writeMode, bucket: $bucket") {
+            val _spark = spark
+            import _spark.implicits._
+
+            val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+              "'primary-key'='a',"
+            } else {
+              ""
+            }
+
+            spark.sql(
+              s"""
+                 |CREATE TABLE T (a INT, b STRING)
+                 |TBLPROPERTIES ($primaryKeysProp 
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+                 |""".stripMargin)
+
+            val paimonTable = loadTable("T")
+            val location = paimonTable.location().getPath
+
+            val df1 = Seq((1, "2023-08-01"), (2, "2023-08-02")).toDF("a", "b")
+            df1.write.format("paimon").mode("append").save(location)
+            checkAnswer(
+              spark.sql("SELECT * FROM T ORDER BY a, b"),
+              Row(1, "2023-08-01") :: Row(2, "2023-08-02") :: Nil)
+
+            // Case 1: two additional fields: DoubleType and TimestampType
+            val ts = java.sql.Timestamp.valueOf("2023-08-01 10:00:00.0")
+            val df2 = Seq((1, "2023-08-01", 12.3d, ts), (3, "2023-08-03", 
34.5d, ts))
+              .toDF("a", "b", "c", "d")
+            df2.write
+              .format("paimon")
+              .mode("append")
+              .option("write.merge-schema", "true")
+              .save(location)
+            val expected2 = if (writeMode == CHANGE_LOG) {
+              Row(1, "2023-08-01", 12.3d, ts) ::
+                Row(2, "2023-08-02", null, null) :: Row(3, "2023-08-03", 
34.5d, ts) :: Nil
+            } else {
+              Row(1, "2023-08-01", null, null) :: Row(1, "2023-08-01", 12.3d, 
ts) :: Row(
+                2,
+                "2023-08-02",
+                null,
+                null) :: Row(3, "2023-08-03", 34.5d, ts) :: Nil
+            }
+            checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected2)
+
+            // Case 2: a: Int -> Long, b: String -> Date, c: Long -> Int, d: 
Map -> String
+            val date = java.sql.Date.valueOf("2023-07-31")
+            val df3 = Seq((2L, date, 234, null), (4L, date, 456, "2023-08-01 
11:00:00.0")).toDF(
+              "a",
+              "b",
+              "c",
+              "d")
+
+            // throw UnsupportedOperationException if 
write.merge-schema.explicit-cast = false
+            assertThrows[UnsupportedOperationException] {
+              df3.write
+                .format("paimon")
+                .mode("append")
+                .option("write.merge-schema", "true")
+                .save(location)
+            }
+            // merge schema and write data when 
write.merge-schema.explicit-cast = true
+            df3.write
+              .format("paimon")
+              .mode("append")
+              .option("write.merge-schema", "true")
+              .option("write.merge-schema.explicit-cast", "true")
+              .save(location)
+            val expected3 = if (writeMode == CHANGE_LOG) {
+              Row(1L, Date.valueOf("2023-08-01"), 12, ts.toString) :: Row(
+                2L,
+                date,
+                234,
+                null) :: Row(3L, Date.valueOf("2023-08-03"), 34, ts.toString) 
:: Row(
+                4L,
+                date,
+                456,
+                "2023-08-01 11:00:00.0") :: Nil
+            } else {
+              Row(1L, Date.valueOf("2023-08-01"), null, null) :: Row(
+                1L,
+                Date.valueOf("2023-08-01"),
+                12,
+                ts.toString) :: Row(2L, date, 234, null) :: Row(
+                2L,
+                Date.valueOf("2023-08-02"),
+                null,
+                null) :: Row(3L, Date.valueOf("2023-08-03"), 34, ts.toString) 
:: Row(
+                4L,
+                date,
+                456,
+                "2023-08-01 11:00:00.0") :: Nil
+            }
+            checkAnswer(
+              spark.sql("SELECT a, b, c, substring(d, 0, 21) FROM T ORDER BY 
a, b"),
+              expected3)
+
+          }
+      }
+  }
+
 }

Reply via email to