This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6c1561ef4 [spark] supports schema merging when write (#1678)
6c1561ef4 is described below
commit 6c1561ef4f9047daeb08fe06bea1fb26c4d9f4e4
Author: Yann Byron <[email protected]>
AuthorDate: Tue Aug 1 12:22:45 2023 +0800
[spark] supports schema merging when write (#1678)
---
docs/content/maintenance/configurations.md | 6 +
.../generated/spark_connector_configuration.html | 42 ++
.../java/org/apache/paimon/types/DataField.java | 4 +
.../java/org/apache/paimon/AbstractFileStore.java | 5 +
.../src/main/java/org/apache/paimon/FileStore.java | 2 +
.../org/apache/paimon/schema/SchemaManager.java | 18 +
.../apache/paimon/schema/SchemaMergingUtils.java | 225 +++++++
.../paimon/table/AbstractFileStoreTable.java | 4 +
.../paimon/schema/SchemaMergingUtilsTest.java | 740 +++++++++++++++++++++
paimon-docs/pom.xml | 7 +
.../configuration/ConfigOptionsDocGenerator.java | 4 +-
.../apache/paimon/spark/SparkConnectorOptions.java | 40 ++
.../java/org/apache/paimon/spark/SparkTable.java | 3 +-
.../org/apache/paimon/spark/SparkSource.scala | 3 +-
.../scala/org/apache/paimon/spark/SparkWrite.scala | 7 +-
.../apache/paimon/spark/SparkWriteBuilder.scala | 9 +-
.../paimon/spark/commands/PaimonCommand.scala | 10 +-
.../paimon/spark/commands/SchemaHelper.scala | 50 ++
.../spark/commands/WriteIntoPaimonTable.scala | 42 +-
.../paimon/spark/sql/DataFrameWriteTest.scala | 199 ++++++
20 files changed, 1389 insertions(+), 31 deletions(-)
diff --git a/docs/content/maintenance/configurations.md
b/docs/content/maintenance/configurations.md
index 5d46dbfaa..c88676345 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -55,3 +55,9 @@ Flink catalog options for paimon.
Flink connector options for paimon.
{{< generated/flink_connector_configuration >}}
+
+### SparkConnectorOptions
+
+Spark connector options for paimon.
+
+{{< generated/spark_connector_configuration >}}
\ No newline at end of file
diff --git
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
new file mode 100644
index 000000000..c38a5cc85
--- /dev/null
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -0,0 +1,42 @@
+{{/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/}}
+<table class="configuration table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Key</th>
+ <th class="text-left" style="width: 15%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 55%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>write.merge-schema</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true, merge the data schema and the table schema automatically
before write data.</td>
+ </tr>
+ <tr>
+ <td><h5>write.merge-schema.explicit-cast</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true, allow to merge data types if the two types meet the rules
for explicit casting.</td>
+ </tr>
+ </tbody>
+</table>
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java
b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java
index 28728f1d1..4e51372c2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/DataField.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/DataField.java
@@ -76,6 +76,10 @@ public final class DataField implements Serializable {
return type;
}
+ public DataField newId(int newid) {
+ return new DataField(newid, name, type, description);
+ }
+
public DataField newName(String newName) {
return new DataField(id, newName, type, description);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index b31dfebb4..48cb6e231 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -142,6 +142,11 @@ public abstract class AbstractFileStore<T> implements
FileStore<T> {
return options;
}
+ @Override
+ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+ return schemaManager.mergeSchema(rowType, allowExplicitCast);
+ }
+
@Override
public FileStoreCommitImpl newCommit(String commitUser) {
return new FileStoreCommitImpl(
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 918e665bc..bf927861b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -78,4 +78,6 @@ public interface FileStore<T> extends Serializable {
@Nullable
TagAutoCreation newTagCreationManager();
+
+ boolean mergeSchema(RowType rowType, boolean allowExplicitCast);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 15db61062..2adee62ba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -388,6 +388,24 @@ public class SchemaManager implements Serializable {
}
}
+ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
+ TableSchema current =
+ latest().orElseThrow(
+ () ->
+ new RuntimeException(
+ "It requires that the current
schema to exist when calling 'mergeSchema'"));
+ TableSchema update = SchemaMergingUtils.mergeSchemas(current, rowType,
allowExplicitCast);
+ if (current.equals(update)) {
+ return false;
+ } else {
+ try {
+ return commit(update);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to commit the schema.", e);
+ }
+ }
+ }
+
private static void checkMoveIndexEqual(SchemaChange.Move move, int
fieldIndex, int refIndex) {
if (refIndex == fieldIndex) {
throw new UnsupportedOperationException(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
new file mode 100644
index 000000000..a60e6128f
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeCasts;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.ReassignFieldId;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** The util class for merging the schemas. */
+public class SchemaMergingUtils {
+
+ public static TableSchema mergeSchemas(
+ TableSchema currentTableSchema, RowType dataFields, boolean
allowExplicitCast) {
+ if (currentTableSchema.logicalRowType().equals(dataFields)) {
+ return currentTableSchema;
+ }
+
+ AtomicInteger highestFieldId = new
AtomicInteger(currentTableSchema.highestFieldId());
+ RowType newRowType =
+ mergeSchemas(
+ currentTableSchema.logicalRowType(),
+ dataFields,
+ highestFieldId,
+ allowExplicitCast);
+ return new TableSchema(
+ currentTableSchema.id() + 1,
+ newRowType.getFields(),
+ highestFieldId.get(),
+ currentTableSchema.partitionKeys(),
+ currentTableSchema.primaryKeys(),
+ currentTableSchema.options(),
+ currentTableSchema.comment());
+ }
+
+ public static RowType mergeSchemas(
+ RowType tableSchema,
+ RowType dataSchema,
+ AtomicInteger highestFieldId,
+ boolean allowExplicitCast) {
+ return (RowType) merge(tableSchema, dataSchema, highestFieldId,
allowExplicitCast);
+ }
+
+ /**
+ * Merge the base data type and the update data type if possible.
+ *
+ * <p>For RowType, find the fields which exists in both the base schema
and the update schema,
+ * and try to merge them by calling the method iteratively; remain those
fields that are only in
+ * the base schema and append those fields that are only in the update
schema.
+ *
+ * <p>For other complex type, try to merge the element types.
+ *
+ * <p>For primitive data type, we treat that's compatible if the original
type can be safely
+ * cast to the new type.
+ */
+ public static DataType merge(
+ DataType base0,
+ DataType update0,
+ AtomicInteger highestFieldId,
+ boolean allowExplicitCast) {
+ // Here we try t0 merge the base0 and update0 without regard to the
nullability,
+ // and set the base0's nullability to the return's.
+ DataType base = base0.copy(true);
+ DataType update = update0.copy(true);
+
+ if (base.equals(update)) {
+ return base0;
+ } else if (base instanceof RowType && update instanceof RowType) {
+ List<DataField> baseFields = ((RowType) base).getFields();
+ List<DataField> updateFields = ((RowType) update).getFields();
+ Map<String, DataField> updateFieldMap =
+ updateFields.stream()
+ .collect(Collectors.toMap(DataField::name,
Function.identity()));
+ List<DataField> updatedFields =
+ baseFields.stream()
+ .map(
+ baseField -> {
+ if
(updateFieldMap.containsKey(baseField.name())) {
+ DataField updateField =
+
updateFieldMap.get(baseField.name());
+ DataType updatedDataType =
+ merge(
+ baseField.type(),
+ updateField.type(),
+ highestFieldId,
+ allowExplicitCast);
+ return new DataField(
+ baseField.id(),
+ baseField.name(),
+ updatedDataType,
+ baseField.description());
+ } else {
+ return baseField;
+ }
+ })
+ .collect(Collectors.toList());
+
+ Map<String, DataField> baseFieldMap =
+ baseFields.stream()
+ .collect(Collectors.toMap(DataField::name,
Function.identity()));
+ List<DataField> newFields =
+ updateFields.stream()
+ .filter(field ->
!baseFieldMap.containsKey(field.name()))
+ .map(field -> assignIdForNewField(field,
highestFieldId))
+ .collect(Collectors.toList());
+
+ updatedFields.addAll(newFields);
+ return new RowType(base.isNullable(), updatedFields);
+ } else if (base instanceof MapType && update instanceof MapType) {
+ return new MapType(
+ base.isNullable(),
+ merge(
+ ((MapType) base).getKeyType(),
+ ((MapType) update).getKeyType(),
+ highestFieldId,
+ allowExplicitCast),
+ merge(
+ ((MapType) base).getValueType(),
+ ((MapType) update).getValueType(),
+ highestFieldId,
+ allowExplicitCast));
+ } else if (base instanceof ArrayType && update instanceof ArrayType) {
+ return new ArrayType(
+ base.isNullable(),
+ merge(
+ ((ArrayType) base).getElementType(),
+ ((ArrayType) update).getElementType(),
+ highestFieldId,
+ allowExplicitCast));
+ } else if (base instanceof MultisetType && update instanceof
MultisetType) {
+ return new MultisetType(
+ base.isNullable(),
+ merge(
+ ((MultisetType) base).getElementType(),
+ ((MultisetType) update).getElementType(),
+ highestFieldId,
+ allowExplicitCast));
+ } else if (base instanceof DecimalType && update instanceof
DecimalType) {
+ if (base.equals(update)) {
+ return base0;
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Failed to merge decimal types with different
precision or scale: %s and %s",
+ base, update));
+ }
+ } else if (supportsDataTypesCast(base, update, allowExplicitCast)) {
+ if (DataTypes.getLength(base).isPresent() &&
DataTypes.getLength(update).isPresent()) {
+ // this will check and merge types which has a `length`
attribute, like BinaryType,
+ // CharType, VarBinaryType, VarCharType.
+ if (allowExplicitCast
+ || DataTypes.getLength(base).getAsInt()
+ <= DataTypes.getLength(update).getAsInt()) {
+ return update.copy(base0.isNullable());
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Failed to merge the target type that has
a smaller length: %s and %s",
+ base, update));
+ }
+ } else if (DataTypes.getPrecision(base).isPresent()
+ && DataTypes.getPrecision(update).isPresent()) {
+ // this will check and merge types which has a `precision`
attribute, like
+ // LocalZonedTimestampType, TimeType, TimestampType.
+ if (allowExplicitCast
+ || DataTypes.getPrecision(base).getAsInt()
+ <= DataTypes.getPrecision(update).getAsInt()) {
+ return update.copy(base0.isNullable());
+ } else {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Failed to merge the target type that has
a lower precision: %s and %s",
+ base, update));
+ }
+ } else {
+ return update.copy(base0.isNullable());
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Failed to merge data types %s and %s",
base, update));
+ }
+ }
+
+ private static boolean supportsDataTypesCast(
+ DataType sourceType, DataType targetType, boolean
allowExplicitCast) {
+ boolean canImplicitCast =
DataTypeCasts.supportsImplicitCast(sourceType, targetType);
+ boolean canExplicitCast =
+ allowExplicitCast &&
DataTypeCasts.supportsExplicitCast(sourceType, targetType);
+ return canImplicitCast || canExplicitCast;
+ }
+
+ private static DataField assignIdForNewField(DataField field,
AtomicInteger highestFieldId) {
+ DataType dataType = ReassignFieldId.reassign(field.type(),
highestFieldId);
+ return new DataField(
+ highestFieldId.incrementAndGet(), field.name(), dataType,
field.description());
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 4d9ba1115..15bb81e61 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -161,6 +161,10 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
+ if (dynamicOptions == null || dynamicOptions.isEmpty()) {
+ return this;
+ }
+
Map<String, String> options = tableSchema.options();
// check option is not immutable
dynamicOptions.forEach(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
new file mode 100644
index 000000000..ac3e33d10
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaMergingUtilsTest.java
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.schema;
+
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimeType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** UT for SchemaMergingUtils. */
+public class SchemaMergingUtilsTest {
+
+ @Test
+ public void testMergeTableSchemas() {
+ // Init the table schema
+ DataField a = new DataField(0, "a", new IntType());
+ DataField b = new DataField(1, "b", new DoubleType());
+ DataField c =
+ new DataField(
+ 2,
+ "c",
+ new MapType(new VarCharType(VarCharType.MAX_LENGTH),
new BooleanType()));
+ DataField d = new DataField(3, "d", new
VarCharType(VarCharType.MAX_LENGTH));
+ TableSchema current =
+ new TableSchema(
+ 0,
+ Lists.newArrayList(a, b, c, d),
+ 3,
+ Lists.newArrayList("d"),
+ Lists.newArrayList("a", "d"),
+ new HashMap<>(),
+ "");
+
+ // fake the RowType of data
+ DataField f1 = new DataField(-1, "f1", new CharType(10));
+ DataField f2 = new DataField(-1, "f2", new DateType());
+ RowType fDataType = new RowType(Lists.newArrayList(f1, f2));
+ DataField f = new DataField(-1, "f", fDataType);
+ RowType t = new RowType(Lists.newArrayList(a, b, d, f));
+
+ TableSchema merged = SchemaMergingUtils.mergeSchemas(current, t,
false);
+ Assertions.assertEquals(merged.id(), 1);
+ Assertions.assertEquals(merged.highestFieldId(), 6);
+ Assertions.assertArrayEquals(
+ merged.primaryKeys().toArray(new String[0]), new String[]
{"a", "d"});
+ Assertions.assertArrayEquals(
+ merged.partitionKeys().toArray(new String[0]), new String[]
{"d"});
+ List<DataField> fields = merged.fields();
+ Assertions.assertEquals(fields.size(), 5);
+ Assertions.assertTrue(fields.get(4).type() instanceof RowType);
+ }
+
+ @Test
+ public void testMergeSchemas() {
+ // This will test both `mergeSchemas` and `merge` methods.
+ // Init the source schema
+ DataField a = new DataField(0, "a", new IntType());
+ DataField b = new DataField(1, "b", new DoubleType());
+ DataField c =
+ new DataField(
+ 2,
+ "c",
+ new MapType(new VarCharType(VarCharType.MAX_LENGTH),
new BooleanType()));
+ DataField d = new DataField(3, "d", new
VarCharType(VarCharType.MAX_LENGTH));
+ RowType source = new RowType(Lists.newArrayList(a, b, c, d));
+ AtomicInteger highestFieldId = new AtomicInteger(3);
+
+ // Case 1: an additional field.
+ DataField e = new DataField(-1, "e", new DateType());
+ RowType t1 = new RowType(Lists.newArrayList(a, b, c, d, e));
+ RowType r1 = (RowType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false);
+ Assertions.assertEquals(highestFieldId.get(), 4);
+ Assertions.assertTrue(r1.isNullable());
+ Assertions.assertEquals(r1.getFieldCount(), 5);
+ Assertions.assertEquals(r1.getTypeAt(2), c.type());
+ // the id of DataField has assigned to an incremental num.
+ DataField expectedE = e.newId(4);
+ Assertions.assertEquals(r1.getFields().get(highestFieldId.get()),
expectedE);
+
+ // Case 2: two missing fields.
+ RowType t2 = new RowType(Lists.newArrayList(a, c, e));
+ RowType r2 = SchemaMergingUtils.mergeSchemas(r1, t2, highestFieldId,
false);
+ Assertions.assertEquals(highestFieldId.get(), 4);
+ Assertions.assertEquals(r2.getFieldCount(), 5);
+ Assertions.assertEquals(r2.getTypeAt(3), d.type());
+ Assertions.assertEquals(r2.getFields().get(highestFieldId.get()),
expectedE);
+
+ // Case 3: an additional nested field and a missing field.
+ DataField f1 = new DataField(-1, "f1", new CharType(10));
+ DataField f2 = new DataField(-1, "f2", new IntType());
+ RowType fDataType = new RowType(Lists.newArrayList(f1, f2));
+ DataField f = new DataField(-1, "f", fDataType);
+ RowType t3 = new RowType(Lists.newArrayList(a, b, c, d, f));
+ RowType r3 = (RowType) SchemaMergingUtils.merge(r2, t3,
highestFieldId, false);
+ Assertions.assertEquals(highestFieldId.get(), 7);
+ Assertions.assertEquals(r3.getFieldCount(), 6);
+ RowType expectedFDataType = new
RowType(Lists.newArrayList(f1.newId(5), f2.newId(6)));
+ Assertions.assertEquals(r3.getTypeAt(5), expectedFDataType);
+ DataField expectedF = new DataField(7, "f", expectedFDataType);
+ Assertions.assertEquals(r3.getFields().get(5), expectedF);
+
+ // Case 4: an additional sub-field in a nested field.
+ DataField f3 = new DataField(-1, "f3", new TimestampType());
+ RowType newFDataType = new RowType(Lists.newArrayList(f1, f2, f3));
+ DataField newF = new DataField(-1, "f", newFDataType);
+ RowType t4 = new RowType(Lists.newArrayList(a, b, c, d, e, newF));
+ RowType r4 = SchemaMergingUtils.mergeSchemas(r3, t4, highestFieldId,
false);
+ Assertions.assertEquals(highestFieldId.get(), 8);
+ Assertions.assertEquals(r4.getFieldCount(), 6);
+ RowType newExpectedFDataType =
+ new RowType(Lists.newArrayList(f1.newId(5), f2.newId(6),
f3.newId(8)));
+ Assertions.assertEquals(r4.getTypeAt(5), newExpectedFDataType);
+
+ // Case 5: a field that isn't compatible with the existing one.
+ DataField newA = new DataField(-1, "a", new SmallIntType());
+ RowType t5 = new RowType(Lists.newArrayList(newA, b, c, d, e, newF));
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(r4, t5, highestFieldId, false));
+
+ // Case 6: all new-coming fields
+ DataField g = new DataField(-1, "g", new TimeType());
+ DataField h = new DataField(-1, "h", new TimeType());
+ RowType t6 = new RowType(Lists.newArrayList(g, h));
+ RowType r6 = SchemaMergingUtils.mergeSchemas(r4, t6, highestFieldId,
false);
+ Assertions.assertEquals(highestFieldId.get(), 10);
+ Assertions.assertEquals(r6.getFieldCount(), 8);
+ }
+
+ @Test
+ public void testMergeArrayTypes() {
+ AtomicInteger highestFieldId = new AtomicInteger(1);
+
+ DataType source = new ArrayType(false, new IntType());
+
+ // the element types are same.
+ DataType t1 = new ArrayType(true, new IntType());
+ ArrayType r1 = (ArrayType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false);
+ Assertions.assertFalse(r1.isNullable());
+ Assertions.assertTrue(r1.getElementType() instanceof IntType);
+
+ // the element types aren't same, but can be evolved safety.
+ DataType t2 = new ArrayType(true, new BigIntType());
+ ArrayType r2 = (ArrayType) SchemaMergingUtils.merge(source, t2,
highestFieldId, false);
+ Assertions.assertTrue(r2.getElementType() instanceof BigIntType);
+
+ // the element types aren't same, and can't be evolved safety.
+ DataType t3 = new ArrayType(true, new SmallIntType());
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(source, t3, highestFieldId,
false));
+ // the value type of target's isn't same to the source's, but the
source type can be cast to
+ // the target type explicitly.
+ ArrayType r3 = (ArrayType) SchemaMergingUtils.merge(source, t3,
highestFieldId, true);
+ Assertions.assertTrue(r3.getElementType() instanceof SmallIntType);
+ }
+
+ @Test
+ public void testMergeMapTypes() {
+ AtomicInteger highestFieldId = new AtomicInteger(1);
+
+ DataType source = new MapType(new VarCharType(VarCharType.MAX_LENGTH),
new IntType());
+
+ // both the key and value types are same to the source's.
+ DataType t1 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new
IntType());
+ MapType r1 = (MapType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false);
+ Assertions.assertTrue(r1.isNullable());
+ Assertions.assertTrue(r1.getKeyType() instanceof VarCharType);
+ Assertions.assertTrue(r1.getValueType() instanceof IntType);
+
+ // the value type of target's isn't same to the source's, but can be
evolved safety.
+ DataType t2 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new
DoubleType());
+ MapType r2 = (MapType) SchemaMergingUtils.merge(source, t2,
highestFieldId, false);
+ Assertions.assertTrue(r2.getKeyType() instanceof VarCharType);
+ Assertions.assertTrue(r2.getValueType() instanceof DoubleType);
+
+ // the value type of target's isn't same to the source's, and can't be
evolved safety.
+ DataType t3 = new MapType(new VarCharType(VarCharType.MAX_LENGTH), new
SmallIntType());
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(source, t3, highestFieldId,
false));
+ // the value type of target's isn't same to the source's, but the
source type can be cast to
+ // the target type explicitly.
+ MapType r3 = (MapType) SchemaMergingUtils.merge(source, t3,
highestFieldId, true);
+ Assertions.assertTrue(r3.getKeyType() instanceof VarCharType);
+ Assertions.assertTrue(r3.getValueType() instanceof SmallIntType);
+ }
+
+ @Test
+ public void testMergeMultisetTypes() {
+ AtomicInteger highestFieldId = new AtomicInteger(1);
+
+ DataType source = new MultisetType(false, new IntType());
+
+ // the element types are same.
+ DataType t1 = new MultisetType(true, new IntType());
+ MultisetType r1 =
+ (MultisetType) SchemaMergingUtils.merge(source, t1,
highestFieldId, false);
+ Assertions.assertFalse(r1.isNullable());
+ Assertions.assertTrue(r1.getElementType() instanceof IntType);
+
+ // the element types aren't same, but can be evolved safety.
+ DataType t2 = new MultisetType(true, new BigIntType());
+ MultisetType r2 =
+ (MultisetType) SchemaMergingUtils.merge(source, t2,
highestFieldId, false);
+ Assertions.assertTrue(r2.getElementType() instanceof BigIntType);
+
+ // the element types aren't same, and can't be evolved safety.
+ DataType t3 = new MultisetType(true, new SmallIntType());
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(source, t3, highestFieldId,
false));
+ // the value type of target's isn't same to the source's, but the
source type can be cast to
+ // the target type explicitly.
+ MultisetType r3 = (MultisetType) SchemaMergingUtils.merge(source, t3,
highestFieldId, true);
+ Assertions.assertTrue(r3.getElementType() instanceof SmallIntType);
+ }
+
+ @Test
+ public void testMergeDecimalTypes() {
+ AtomicInteger highestFieldId = new AtomicInteger(1);
+
+ DataType s1 = new DecimalType();
+ DataType t1 = new DecimalType(10, 0);
+ DecimalType r1 = (DecimalType) SchemaMergingUtils.merge(s1, t1,
highestFieldId, false);
+ Assertions.assertTrue(r1.isNullable());
+ Assertions.assertEquals(r1.getPrecision(),
DecimalType.DEFAULT_PRECISION);
+ Assertions.assertEquals(r1.getScale(), DecimalType.DEFAULT_SCALE);
+
+ DataType s2 = new DecimalType(5, 2);
+ DataType t2 = new DecimalType(7, 2);
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s2, t2, highestFieldId, false));
+
+ // DecimalType -> Other Numeric Type
+ DataType dcmSource = new DecimalType();
+ DataType iTarget = new IntType();
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(dcmSource, iTarget,
highestFieldId, false));
+ // DecimalType -> Other Numeric Type with allowExplicitCast = true
+ DataType res = SchemaMergingUtils.merge(dcmSource, iTarget,
highestFieldId, true);
+ Assertions.assertTrue(res instanceof IntType);
+ }
+
+ @Test
+ public void testMergeTypesWithLength() {
+ AtomicInteger highestFieldId = new AtomicInteger(1);
+
+ // BinaryType
+ DataType s1 = new BinaryType(10);
+ DataType t1 = new BinaryType(10);
+ BinaryType r1 = (BinaryType) SchemaMergingUtils.merge(s1, t1,
highestFieldId, false);
+ Assertions.assertEquals(r1.getLength(), 10);
+
+ DataType s2 = new BinaryType(2);
+ DataType t2 = new BinaryType();
+ // smaller length
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s2, t2, highestFieldId, false));
+ // smaller length with allowExplicitCast = true
+ BinaryType r2 = (BinaryType) SchemaMergingUtils.merge(s2, t2,
highestFieldId, true);
+ Assertions.assertEquals(r2.getLength(), BinaryType.DEFAULT_LENGTH);
+ // bigger length
+ DataType t3 = new BinaryType(5);
+ BinaryType r3 = (BinaryType) SchemaMergingUtils.merge(s2, t3,
highestFieldId, false);
+ Assertions.assertEquals(r3.getLength(), 5);
+
+ // VarCharType
+ DataType s4 = new VarCharType();
+ DataType t4 = new VarCharType(1);
+ VarCharType r4 = (VarCharType) SchemaMergingUtils.merge(s4, t4,
highestFieldId, false);
+ Assertions.assertEquals(r4.getLength(), VarCharType.DEFAULT_LENGTH);
+
+ DataType s5 = new VarCharType(2);
+ DataType t5 = new VarCharType();
+ // smaller length
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s5, t5, highestFieldId, false));
+ // smaller length with allowExplicitCast = true
+ VarCharType r5 = (VarCharType) SchemaMergingUtils.merge(s5, t5,
highestFieldId, true);
+ Assertions.assertEquals(r5.getLength(), VarCharType.DEFAULT_LENGTH);
+ // bigger length
+ DataType t6 = new VarCharType(5);
+ VarCharType r6 = (VarCharType) SchemaMergingUtils.merge(s5, t6,
highestFieldId, false);
+ Assertions.assertEquals(r6.getLength(), 5);
+
+ // CharType
+ DataType s7 = new CharType();
+ DataType t7 = new CharType(1);
+ CharType r7 = (CharType) SchemaMergingUtils.merge(s7, t7,
highestFieldId, false);
+ Assertions.assertEquals(r7.getLength(), CharType.DEFAULT_LENGTH);
+
+ DataType s8 = new CharType(2);
+ DataType t8 = new CharType();
+ // smaller length
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s8, t8, highestFieldId, false));
+ // smaller length with allowExplicitCast = true
+ CharType r8 = (CharType) SchemaMergingUtils.merge(s8, t8,
highestFieldId, true);
+ Assertions.assertEquals(r8.getLength(), CharType.DEFAULT_LENGTH);
+ // bigger length
+ DataType t9 = new CharType(5);
+ CharType r9 = (CharType) SchemaMergingUtils.merge(s8, t9,
highestFieldId, false);
+ Assertions.assertEquals(r9.getLength(), 5);
+
+ // VarBinaryType
+ DataType s10 = new VarBinaryType();
+ DataType t10 = new VarBinaryType(1);
+ VarBinaryType r10 =
+ (VarBinaryType) SchemaMergingUtils.merge(s10, t10,
highestFieldId, false);
+ Assertions.assertEquals(r10.getLength(), VarBinaryType.DEFAULT_LENGTH);
+
+ DataType s11 = new VarBinaryType(2);
+ DataType t11 = new VarBinaryType();
+ // smaller length
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s11, t11, highestFieldId,
false));
+ // smaller length with allowExplicitCast = true
+ VarBinaryType r11 =
+ (VarBinaryType) SchemaMergingUtils.merge(s11, t11,
highestFieldId, true);
+ Assertions.assertEquals(r11.getLength(), VarBinaryType.DEFAULT_LENGTH);
+ // bigger length
+ DataType t12 = new VarBinaryType(5);
+ VarBinaryType r12 =
+ (VarBinaryType) SchemaMergingUtils.merge(s11, t12,
highestFieldId, false);
+ Assertions.assertEquals(r12.getLength(), 5);
+
+ // CharType -> VarCharType
+ DataType s13 = new CharType();
+ DataType t13 = new VarCharType(10);
+ VarCharType r13 = (VarCharType) SchemaMergingUtils.merge(s13, t13,
highestFieldId, false);
+ Assertions.assertEquals(r13.getLength(), 10);
+
+ // VarCharType ->CharType
+ DataType s14 = new VarCharType(10);
+ DataType t14 = new CharType();
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s14, t14, highestFieldId,
false));
+ CharType r14 = (CharType) SchemaMergingUtils.merge(s14, t14,
highestFieldId, true);
+ Assertions.assertEquals(r14.getLength(), CharType.DEFAULT_LENGTH);
+
+ // BinaryType -> VarBinaryType
+ DataType s15 = new BinaryType();
+ DataType t15 = new VarBinaryType(10);
+ VarBinaryType r15 =
+ (VarBinaryType) SchemaMergingUtils.merge(s15, t15,
highestFieldId, false);
+ Assertions.assertEquals(r15.getLength(), 10);
+
+ // VarBinaryType -> BinaryType
+ DataType s16 = new VarBinaryType(10);
+ DataType t16 = new BinaryType();
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s16, t16, highestFieldId,
false));
+ BinaryType r16 = (BinaryType) SchemaMergingUtils.merge(s16, t16,
highestFieldId, true);
+ Assertions.assertEquals(r16.getLength(), BinaryType.DEFAULT_LENGTH);
+
+ // VarCharType -> VarBinaryType
+ DataType s17 = new VarCharType(10);
+ DataType t17 = new VarBinaryType();
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s17, t17, highestFieldId,
false));
+ VarBinaryType r17 =
+ (VarBinaryType) SchemaMergingUtils.merge(s17, t17,
highestFieldId, true);
+ Assertions.assertEquals(r17.getLength(), VarBinaryType.DEFAULT_LENGTH);
+ }
+
+ @Test
+ public void testMergeTypesWithPrecision() {
+ AtomicInteger highestFieldId = new AtomicInteger(1);
+
+ // LocalZonedTimestampType
+ DataType s1 = new LocalZonedTimestampType();
+ DataType t1 = new LocalZonedTimestampType();
+ LocalZonedTimestampType r1 =
+ (LocalZonedTimestampType) SchemaMergingUtils.merge(s1, t1,
highestFieldId, false);
+ Assertions.assertTrue(r1.isNullable());
+ Assertions.assertEquals(r1.getPrecision(),
LocalZonedTimestampType.DEFAULT_PRECISION);
+
+ // lower precision
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () ->
+ SchemaMergingUtils.merge(
+ s1, new LocalZonedTimestampType(3),
highestFieldId, false));
+ // higher precision
+ DataType t2 = new LocalZonedTimestampType(6);
+ LocalZonedTimestampType r2 =
+ (LocalZonedTimestampType) SchemaMergingUtils.merge(s1, t2,
highestFieldId, false);
+ Assertions.assertEquals(r2.getPrecision(), 6);
+
+ // LocalZonedTimestampType -> TimeType
+ DataType s3 = new LocalZonedTimestampType();
+ DataType t3 = new TimeType(6);
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s3, t3, highestFieldId, false));
+
+ // LocalZonedTimestampType -> TimestampType
+ DataType s4 = new LocalZonedTimestampType();
+ DataType t4 = new TimestampType();
+ TimestampType r4 = (TimestampType) SchemaMergingUtils.merge(s4, t4,
highestFieldId, false);
+ Assertions.assertEquals(r4.getPrecision(),
TimestampType.DEFAULT_PRECISION);
+
+ // TimestampType.
+ DataType s5 = new TimestampType();
+ DataType t5 = new TimestampType();
+ TimestampType r5 = (TimestampType) SchemaMergingUtils.merge(s5, t5,
highestFieldId, false);
+ Assertions.assertTrue(r5.isNullable());
+ Assertions.assertEquals(r5.getPrecision(),
TimestampType.DEFAULT_PRECISION);
+
+ // lower precision
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s5, new TimestampType(3),
highestFieldId, false));
+ // higher precision
+ DataType t6 = new TimestampType(9);
+ TimestampType r6 = (TimestampType) SchemaMergingUtils.merge(s5, t6,
highestFieldId, false);
+ Assertions.assertEquals(r6.getPrecision(), 9);
+
+ // TimestampType -> LocalZonedTimestampType
+ DataType s7 = new TimestampType();
+ DataType t7 = new LocalZonedTimestampType();
+ LocalZonedTimestampType r7 =
+ (LocalZonedTimestampType) SchemaMergingUtils.merge(s7, t7,
highestFieldId, false);
+ Assertions.assertEquals(r7.getPrecision(),
TimestampType.DEFAULT_PRECISION);
+
+ // TimestampType -> TimestampType
+ DataType s8 = new TimestampType();
+ DataType t8 = new TimeType(6);
+ TimeType r8 = (TimeType) SchemaMergingUtils.merge(s8, t8,
highestFieldId, false);
+ Assertions.assertEquals(r8.getPrecision(),
TimestampType.DEFAULT_PRECISION);
+
+ // TimeType.
+ DataType s9 = new TimeType();
+ DataType t9 = new TimeType();
+ TimeType r9 = (TimeType) SchemaMergingUtils.merge(s9, t9,
highestFieldId, false);
+ Assertions.assertTrue(r9.isNullable());
+ Assertions.assertEquals(r9.getPrecision(), TimeType.DEFAULT_PRECISION);
+
+ // lower precision
+ DataType s10 = new TimeType(6);
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s10, new TimeType(3),
highestFieldId, false));
+ // higher precision
+ DataType t10 = new TimeType(9);
+ TimeType r10 = (TimeType) SchemaMergingUtils.merge(s9, t10,
highestFieldId, false);
+ Assertions.assertEquals(r10.getPrecision(), 9);
+
+ // TimeType -> LocalZonedTimestampType
+ DataType s11 = new TimeType();
+ DataType t11 = new LocalZonedTimestampType();
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s11, t11, highestFieldId,
false));
+ // TimeType -> LocalZonedTimestampType with allowExplicitCast = true
+ LocalZonedTimestampType r11 =
+ (LocalZonedTimestampType) SchemaMergingUtils.merge(s11, t11,
highestFieldId, true);
+ Assertions.assertEquals(r11.getPrecision(),
LocalZonedTimestampType.DEFAULT_PRECISION);
+
+ // TimeType -> TimestampType
+ DataType s12 = new TimeType();
+ DataType t12 = new TimestampType();
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(s12, t12, highestFieldId,
false));
+ // TimeType -> TimestampType with allowExplicitCast = true
+ TimestampType r12 =
+ (TimestampType) SchemaMergingUtils.merge(s12, t12,
highestFieldId, true);
+ Assertions.assertEquals(r12.getPrecision(),
TimestampType.DEFAULT_PRECISION);
+ }
+
+ @Test
+ public void testMergePrimitiveTypes() {
+ AtomicInteger highestFieldId = new AtomicInteger(1);
+
+ // declare the primitive source and target types
+ DataType bSource = new BooleanType();
+ DataType bTarget = new BooleanType();
+ DataType tiSource = new TinyIntType();
+ DataType tiTarget = new TinyIntType();
+ DataType siSource = new SmallIntType();
+ DataType siTarget = new SmallIntType();
+ DataType iSource = new IntType();
+ DataType iTarget = new IntType();
+ DataType biSource = new BigIntType();
+ DataType biTarget = new BigIntType();
+ DataType fSource = new FloatType();
+ DataType fTarget = new FloatType();
+ DataType dSource = new DoubleType();
+ DataType dTarget = new DoubleType();
+ DataType dcmTarget = new DecimalType();
+
+ // BooleanType
+ DataType btRes1 = SchemaMergingUtils.merge(bSource, bTarget,
highestFieldId, false);
+ Assertions.assertTrue(btRes1 instanceof BooleanType);
+ // BooleanType -> Numeric Type
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(bSource, tiTarget,
highestFieldId, false));
+ // BooleanType -> Numeric Type with allowExplicitCast = true
+ DataType btRes2 = SchemaMergingUtils.merge(bSource, tiTarget,
highestFieldId, true);
+ Assertions.assertTrue(btRes2 instanceof TinyIntType);
+
+ // TinyIntType
+ DataType tiRes1 = SchemaMergingUtils.merge(tiSource, tiTarget,
highestFieldId, false);
+ Assertions.assertTrue(tiRes1 instanceof TinyIntType);
+ // TinyIntType -> SmallIntType
+ DataType tiRes2 = SchemaMergingUtils.merge(tiSource, siTarget,
highestFieldId, false);
+ Assertions.assertTrue(tiRes2 instanceof SmallIntType);
+ // TinyIntType -> IntType
+ DataType tiRes3 = SchemaMergingUtils.merge(tiSource, iTarget,
highestFieldId, false);
+ Assertions.assertTrue(tiRes3 instanceof IntType);
+ // TinyIntType -> BigIntType
+ DataType tiRes4 = SchemaMergingUtils.merge(tiSource, biTarget,
highestFieldId, false);
+ Assertions.assertTrue(tiRes4 instanceof BigIntType);
+ // TinyIntType -> FloatType
+ DataType tiRes5 = SchemaMergingUtils.merge(tiSource, fTarget,
highestFieldId, false);
+ Assertions.assertTrue(tiRes5 instanceof FloatType);
+ // TinyIntType -> DoubleType
+ DataType tiRes6 = SchemaMergingUtils.merge(tiSource, dTarget,
highestFieldId, false);
+ Assertions.assertTrue(tiRes6 instanceof DoubleType);
+ // TinyIntType -> DecimalType
+ DataType tiRes7 = SchemaMergingUtils.merge(tiSource, dcmTarget,
highestFieldId, false);
+ Assertions.assertTrue(tiRes7 instanceof DecimalType);
+ // TinyIntType -> BooleanType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(tiSource, bTarget,
highestFieldId, false));
+ // TinyIntType -> BooleanType with allowExplicitCast = true
+ DataType tiRes8 = SchemaMergingUtils.merge(tiSource, bTarget,
highestFieldId, true);
+ Assertions.assertTrue(tiRes8 instanceof BooleanType);
+
+ // SmallIntType
+ DataType siRes1 = SchemaMergingUtils.merge(siSource, siTarget,
highestFieldId, false);
+ Assertions.assertTrue(siRes1 instanceof SmallIntType);
+ // SmallIntType -> TinyIntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(siSource, tiTarget,
highestFieldId, false));
+ // SmallIntType -> TinyIntType with allowExplicitCast = true
+ DataType siRes2 = SchemaMergingUtils.merge(siSource, tiTarget,
highestFieldId, true);
+ Assertions.assertTrue(siRes2 instanceof TinyIntType);
+ // SmallIntType -> IntType
+ DataType siRes3 = SchemaMergingUtils.merge(siSource, iTarget,
highestFieldId, false);
+ Assertions.assertTrue(siRes3 instanceof IntType);
+ // SmallIntType -> BigIntType
+ DataType siRes4 = SchemaMergingUtils.merge(siSource, biTarget,
highestFieldId, false);
+ Assertions.assertTrue(siRes4 instanceof BigIntType);
+ // SmallIntType -> FloatType
+ DataType siRes5 = SchemaMergingUtils.merge(siSource, fTarget,
highestFieldId, false);
+ Assertions.assertTrue(siRes5 instanceof FloatType);
+ // SmallIntType -> DoubleType
+ DataType siRes6 = SchemaMergingUtils.merge(siSource, dTarget,
highestFieldId, false);
+ Assertions.assertTrue(siRes6 instanceof DoubleType);
+ // SmallIntType -> DecimalType
+ DataType siRes7 = SchemaMergingUtils.merge(siSource, dcmTarget,
highestFieldId, false);
+ Assertions.assertTrue(siRes7 instanceof DecimalType);
+
+ // IntType
+ DataType iRes1 = SchemaMergingUtils.merge(iSource, iTarget,
highestFieldId, false);
+ Assertions.assertTrue(iRes1 instanceof IntType);
+ // IntType -> TinyIntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(iSource, tiTarget,
highestFieldId, false));
+ // IntType -> TinyIntType with allowExplicitCast = true
+ DataType iRes2 = SchemaMergingUtils.merge(iSource, tiTarget,
highestFieldId, true);
+ Assertions.assertTrue(iRes2 instanceof TinyIntType);
+ // IntType -> SmallIntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(iSource, siTarget,
highestFieldId, false));
+ // IntType -> SmallIntType with allowExplicitCast = true
+ DataType iRes3 = SchemaMergingUtils.merge(iSource, siTarget,
highestFieldId, true);
+ Assertions.assertTrue(iRes3 instanceof SmallIntType);
+ // IntType -> BigIntType
+ DataType iRes4 = SchemaMergingUtils.merge(iSource, biTarget,
highestFieldId, false);
+ Assertions.assertTrue(iRes4 instanceof BigIntType);
+ // IntType -> FloatType
+ DataType iRes5 = SchemaMergingUtils.merge(iSource, fTarget,
highestFieldId, false);
+ Assertions.assertTrue(iRes5 instanceof FloatType);
+ // IntType -> DoubleType
+ DataType iRes6 = SchemaMergingUtils.merge(iSource, dTarget,
highestFieldId, false);
+ Assertions.assertTrue(iRes6 instanceof DoubleType);
+ // IntType -> DecimalType
+ DataType iRes7 = SchemaMergingUtils.merge(iSource, dcmTarget,
highestFieldId, false);
+ Assertions.assertTrue(iRes7 instanceof DecimalType);
+
+ // BigIntType
+ DataType biRes1 = SchemaMergingUtils.merge(biSource, biTarget,
highestFieldId, false);
+ Assertions.assertTrue(biRes1 instanceof BigIntType);
+ // BigIntType -> TinyIntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(biSource, tiTarget,
highestFieldId, false));
+ // BigIntType -> TinyIntType with allowExplicitCast = true
+ DataType biRes2 = SchemaMergingUtils.merge(biSource, tiTarget,
highestFieldId, true);
+ Assertions.assertTrue(biRes2 instanceof TinyIntType);
+ // BigIntType -> SmallIntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(biSource, siTarget,
highestFieldId, false));
+ // BigIntType -> SmallIntType with allowExplicitCast = true
+ DataType biRes3 = SchemaMergingUtils.merge(biSource, siTarget,
highestFieldId, true);
+ Assertions.assertTrue(biRes3 instanceof SmallIntType);
+ // BigIntType -> IntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(biSource, iTarget,
highestFieldId, false));
+ // BigIntType -> IntType with allowExplicitCast = true
+ DataType biRes4 = SchemaMergingUtils.merge(biSource, iTarget,
highestFieldId, true);
+ Assertions.assertTrue(biRes4 instanceof IntType);
+ // BigIntType -> FloatType
+ DataType biRes5 = SchemaMergingUtils.merge(biSource, fTarget,
highestFieldId, false);
+ Assertions.assertTrue(biRes5 instanceof FloatType);
+ // BigIntType -> DoubleType
+ DataType biRes6 = SchemaMergingUtils.merge(biSource, dTarget,
highestFieldId, false);
+ Assertions.assertTrue(biRes6 instanceof DoubleType);
+ // BigIntType -> DecimalType
+ DataType biRes7 = SchemaMergingUtils.merge(biSource, dcmTarget,
highestFieldId, false);
+ Assertions.assertTrue(biRes7 instanceof DecimalType);
+
+ // FloatType
+ DataType fRes1 = SchemaMergingUtils.merge(fSource, fTarget,
highestFieldId, false);
+ Assertions.assertTrue(fRes1 instanceof FloatType);
+ // FloatType -> TinyIntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(fSource, tiTarget,
highestFieldId, false));
+ // FloatType -> TinyIntType with allowExplicitCast = true
+ DataType fRes2 = SchemaMergingUtils.merge(fSource, tiTarget,
highestFieldId, true);
+ Assertions.assertTrue(fRes2 instanceof TinyIntType);
+ // FloatType -> SmallIntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(fSource, siTarget,
highestFieldId, false));
+ // FloatType -> IntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(fSource, iTarget,
highestFieldId, false));
+ // FloatType -> IntType with allowExplicitCast = true
+ DataType fRes4 = SchemaMergingUtils.merge(fSource, iTarget,
highestFieldId, true);
+ Assertions.assertTrue(fRes4 instanceof IntType);
+ // FloatType -> BigIntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(fSource, biTarget,
highestFieldId, false));
+ // FloatType -> DoubleType
+ DataType fRes6 = SchemaMergingUtils.merge(fSource, dTarget,
highestFieldId, false);
+ Assertions.assertTrue(fRes6 instanceof DoubleType);
+ // FloatType -> DecimalType
+ DataType fRes7 = SchemaMergingUtils.merge(fSource, dcmTarget,
highestFieldId, false);
+ Assertions.assertTrue(fRes7 instanceof DecimalType);
+
+ // DoubleType
+ DataType dRes1 = SchemaMergingUtils.merge(dSource, dTarget,
highestFieldId, false);
+ Assertions.assertTrue(dRes1 instanceof DoubleType);
+ // DoubleType -> TinyIntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(dSource, tiTarget,
highestFieldId, false));
+ // DoubleType -> SmallIntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(dSource, siTarget,
highestFieldId, false));
+ // DoubleType -> SmallIntType with allowExplicitCast = true
+ DataType dRes3 = SchemaMergingUtils.merge(dSource, siTarget,
highestFieldId, true);
+ Assertions.assertTrue(dRes3 instanceof SmallIntType);
+ // DoubleType -> IntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(dSource, iTarget,
highestFieldId, false));
+ // DoubleType -> BigIntType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(dSource, biTarget,
highestFieldId, false));
+ // DoubleType -> BigIntType with allowExplicitCast = true
+ DataType dRes5 = SchemaMergingUtils.merge(dSource, biTarget,
highestFieldId, true);
+ Assertions.assertTrue(dRes5 instanceof BigIntType);
+ // DoubleType -> FloatType
+ Assertions.assertThrows(
+ UnsupportedOperationException.class,
+ () -> SchemaMergingUtils.merge(dSource, fTarget,
highestFieldId, false));
+ // DoubleType -> DecimalType
+ DataType dRes7 = SchemaMergingUtils.merge(dSource, dcmTarget,
highestFieldId, false);
+ Assertions.assertTrue(dRes7 instanceof DecimalType);
+ }
+}
diff --git a/paimon-docs/pom.xml b/paimon-docs/pom.xml
index 771ff1a3e..10965dbe2 100644
--- a/paimon-docs/pom.xml
+++ b/paimon-docs/pom.xml
@@ -50,6 +50,13 @@ under the License.
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-spark-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-test-utils</artifactId>
diff --git
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
index b1c53b538..83c883f9c 100644
---
a/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
+++
b/paimon-docs/src/main/java/org/apache/paimon/docs/configuration/ConfigOptionsDocGenerator.java
@@ -79,7 +79,9 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation(
"paimon-flink/paimon-flink-common",
"org.apache.paimon.flink.kafka"),
new OptionsClassLocation(
- "paimon-hive/paimon-hive-catalog",
"org.apache.paimon.hive")
+ "paimon-hive/paimon-hive-catalog",
"org.apache.paimon.hive"),
+ new OptionsClassLocation(
+ "paimon-spark/paimon-spark-common",
"org.apache.paimon.spark")
};
static final String DEFAULT_PATH_PREFIX = "src/main/java";
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
new file mode 100644
index 000000000..54950fdd9
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.options.ConfigOption;
+
+import static org.apache.paimon.options.ConfigOptions.key;
+
+/** Options for spark connector. */
+public class SparkConnectorOptions {
+ public static final ConfigOption<Boolean> MERGE_SCHEMA =
+ key("write.merge-schema")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, merge the data schema and the table
schema automatically before write data.");
+
+ public static final ConfigOption<Boolean> EXPLICIT_CAST =
+ key("write.merge-schema.explicit-cast")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, allow to merge data types if the two
types meet the rules for explicit casting.");
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index be13803f0..2cd64c4bc 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -19,6 +19,7 @@
package org.apache.paimon.spark;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
@@ -96,7 +97,7 @@ public class SparkTable
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
try {
- return new SparkWriteBuilder((FileStoreTable) table);
+ return new SparkWriteBuilder((FileStoreTable) table,
Options.fromMap(info.options()));
} catch (Exception e) {
throw new RuntimeException("Only FileStoreTable can be written.");
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
index ae940c469..d73bb78df 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkSource.scala
@@ -67,7 +67,8 @@ class SparkSource
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
val table = loadTable(parameters.asJava)
- WriteIntoPaimonTable(table, SaveMode.transform(mode),
data).run(sqlContext.sparkSession)
+ WriteIntoPaimonTable(table, SaveMode.transform(mode), data,
Options.fromMap(parameters.asJava))
+ .run(sqlContext.sparkSession)
SparkSource.toBaseRelation(table, sqlContext)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
index 44b017414..60549b073 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWrite.scala
@@ -17,6 +17,7 @@
*/
package org.apache.paimon.spark
+import org.apache.paimon.options.Options
import org.apache.paimon.spark.commands.WriteIntoPaimonTable
import org.apache.paimon.table.FileStoreTable
@@ -24,13 +25,13 @@ import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.sources.InsertableRelation
-/** Spark {@link V1Write}, it is required to use v1 write for grouping by
bucket. */
-class SparkWrite(val table: FileStoreTable, saveMode: SaveMode) extends
V1Write {
+/** Spark [[V1Write]], it is required to use v1 write for grouping by bucket.
*/
+class SparkWrite(val table: FileStoreTable, saveMode: SaveMode, options:
Options) extends V1Write {
override def toInsertableRelation: InsertableRelation = {
(data: DataFrame, overwrite: Boolean) =>
{
- WriteIntoPaimonTable(table, saveMode, data).run(data.sparkSession)
+ WriteIntoPaimonTable(table, saveMode, data,
options).run(data.sparkSession)
}
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
index 5cf429af1..f515cb33e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkWriteBuilder.scala
@@ -17,16 +17,19 @@
*/
package org.apache.paimon.spark
+import org.apache.paimon.options.Options
import org.apache.paimon.table.FileStoreTable
-import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite,
SupportsOverwrite, WriteBuilder}
+import org.apache.spark.sql.connector.write.{SupportsOverwrite, WriteBuilder}
import org.apache.spark.sql.sources.{And, Filter}
-private class SparkWriteBuilder(table: FileStoreTable) extends WriteBuilder
with SupportsOverwrite {
+private class SparkWriteBuilder(table: FileStoreTable, options: Options)
+ extends WriteBuilder
+ with SupportsOverwrite {
private var saveMode: SaveMode = InsertInto
- override def build = new SparkWrite(table, saveMode)
+ override def build = new SparkWrite(table, saveMode, options)
override def overwrite(filters: Array[Filter]): WriteBuilder = {
val conjunctiveFilters = if (filters.nonEmpty) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 9e8c964b1..9738c5c83 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -17,14 +17,12 @@
*/
package org.apache.paimon.spark.commands
-import org.apache.paimon.predicate.PredicateBuilder
import org.apache.paimon.spark.SparkFilterConverter
-import org.apache.paimon.table.{BucketMode, FileStoreTable, Table}
+import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageSerializer}
import org.apache.paimon.types.RowType
-import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter,
Not, Or}
+import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
import java.io.IOException
@@ -33,9 +31,9 @@ trait PaimonCommand {
val BUCKET_COL = "_bucket_"
- def getTable: Table
+ def table: FileStoreTable
- lazy val bucketMode: BucketMode = getTable match {
+ lazy val bucketMode: BucketMode = table match {
case fileStoreTable: FileStoreTable =>
fileStoreTable.bucketMode
case _ =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
new file mode 100644
index 000000000..8a8415d64
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark.commands
+
+import org.apache.paimon.schema.{SchemaMergingUtils, TableSchema}
+import org.apache.paimon.spark.SparkTypeUtils
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.RowType
+
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+trait SchemaHelper {
+
+ val originTable: FileStoreTable
+
+ protected var newTable: Option[FileStoreTable] = None
+
+ def table: FileStoreTable = newTable.getOrElse(originTable)
+
+ def tableSchema: TableSchema = table.schema
+
+ def mergeAndCommitSchema(dataSchema: StructType, allowExplicitCast:
Boolean): Unit = {
+ val dataRowType =
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
+ if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
+ newTable = Some(table.copyWithLatestSchema())
+ }
+ }
+
+ def updateTableWithOptions(options: Map[String, String]): Unit = {
+ newTable = Some(table.copy(options.asJava))
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 6ec4987d4..4324a797f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -20,13 +20,15 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.index.PartitionIndex
-import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite,
SaveMode, SparkRow}
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{DynamicOverWrite, InsertInto, Overwrite,
SaveMode, SparkConnectorOptions, SparkRow}
import org.apache.paimon.spark.SparkUtils.createIOManager
-import org.apache.paimon.table.{BucketMode, FileStoreTable, Table}
+import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{BatchWriteBuilder,
CommitMessageSerializer, DynamicBucketRow, RowPartitionKeyExtractor}
import org.apache.paimon.types.RowType
import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -39,29 +41,35 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
/** Used to write a [[DataFrame]] into a paimon table. */
-case class WriteIntoPaimonTable(_table: FileStoreTable, saveMode: SaveMode,
data: DataFrame)
+case class WriteIntoPaimonTable(
+ override val originTable: FileStoreTable,
+ saveMode: SaveMode,
+ data: DataFrame,
+ options: Options)
extends RunnableCommand
- with PaimonCommand {
+ with PaimonCommand
+ with SchemaHelper
+ with Logging {
import WriteIntoPaimonTable._
- private var table = _table
-
- private lazy val tableSchema = table.schema()
-
- private lazy val rowType = table.rowType()
-
- private lazy val writeBuilder: BatchWriteBuilder =
table.newBatchWriteBuilder()
-
private lazy val serializer = new CommitMessageSerializer
+ private lazy val mergeSchema =
options.get(SparkConnectorOptions.MERGE_SCHEMA)
+
+ /** \1. 2. */
override def run(sparkSession: SparkSession): Seq[Row] = {
import sparkSession.implicits._
+ if (mergeSchema) {
+ val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
+ mergeAndCommitSchema(data.schema, allowExplicitCast)
+ }
+
val (dynamicPartitionOverwriteMode, overwritePartition) = parseSaveMode()
// use the extra options to rebuild the table object
- table = table.copy(
- Map(DYNAMIC_PARTITION_OVERWRITE.key() ->
dynamicPartitionOverwriteMode.toString).asJava)
+ updateTableWithOptions(
+ Map(DYNAMIC_PARTITION_OVERWRITE.key ->
dynamicPartitionOverwriteMode.toString))
val primaryKeyCols = tableSchema.trimmedPrimaryKeys().asScala.map(col)
val partitionCols = tableSchema.partitionKeys().asScala.map(col)
@@ -80,6 +88,9 @@ case class WriteIntoPaimonTable(_table: FileStoreTable,
saveMode: SaveMode, data
ds.toDF().repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*)
}
+ val rowType = table.rowType()
+ val writeBuilder = table.newBatchWriteBuilder()
+
val df =
bucketMode match {
case BucketMode.DYNAMIC =>
@@ -153,7 +164,7 @@ case class WriteIntoPaimonTable(_table: FileStoreTable,
saveMode: SaveMode, data
} else if (isTruncate(filter.get)) {
Map.empty[String, String]
} else {
- convertFilterToMap(filter.get, tableSchema.logicalPartitionType())
+ convertFilterToMap(filter.get, table.schema.logicalPartitionType())
}
case DynamicOverWrite =>
dynamicPartitionOverwriteMode = true
@@ -167,7 +178,6 @@ case class WriteIntoPaimonTable(_table: FileStoreTable,
saveMode: SaveMode, data
override def withNewChildrenInternal(newChildren: IndexedSeq[LogicalPlan]):
LogicalPlan =
this.asInstanceOf[WriteIntoPaimonTable]
- override def getTable: Table = table
}
object WriteIntoPaimonTable {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index ae95e505d..1bf21a8f3 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -22,6 +22,8 @@ import org.apache.paimon.WriteMode._
import org.apache.spark.sql.Row
import org.scalactic.source.Position
+import java.sql.Date
+
class DataFrameWriteTest extends PaimonSparkTestBase {
writeModes.foreach {
@@ -71,4 +73,201 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
}
}
}
+
+ writeModes.foreach {
+ writeMode =>
+ bucketModes.foreach {
+ bucket =>
+ test(s"Schema evolution: write data into Paimon: $writeMode, bucket:
$bucket") {
+ val _spark = spark
+ import _spark.implicits._
+
+ val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+ "'primary-key'='a',"
+ } else {
+ ""
+ }
+
+ spark.sql(
+ s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ($primaryKeysProp
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+ |""".stripMargin)
+
+ val paimonTable = loadTable("T")
+ val location = paimonTable.location().getPath
+
+ val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b")
+ df1.write.format("paimon").mode("append").save(location)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, "a") :: Row(2, "b") :: Nil)
+
+ // Case 1: two additional fields
+ val df2 = Seq((1, "a2", 123L, Map("k" -> 11.1)), (3, "c", 345L,
Map("k" -> 33.3)))
+ .toDF("a", "b", "c", "d")
+ df2.write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .save(location)
+ val expected2 = if (writeMode == CHANGE_LOG) {
+ Row(1, "a2", 123L, Map("k" -> 11.1)) ::
+ Row(2, "b", null, null) :: Row(3, "c", 345L, Map("k" -> 33.3))
:: Nil
+ } else {
+ Row(1, "a", null, null) :: Row(1, "a2", 123L, Map("k" -> 11.1))
:: Row(
+ 2,
+ "b",
+ null,
+ null) :: Row(3, "c", 345L, Map("k" -> 33.3)) :: Nil
+ }
+ checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected2)
+
+ // Case 2: two fields with the evolved types: Int -> Long, Long ->
Decimal
+ val df3 = Seq(
+ (2L, "b2", BigDecimal.decimal(234), Map("k" -> 22.2)),
+ (4L, "d", BigDecimal.decimal(456), Map("k" -> 44.4))).toDF("a",
"b", "c", "d")
+ df3.write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .save(location)
+ val expected3 = if (writeMode == CHANGE_LOG) {
+ Row(1L, "a2", BigDecimal.decimal(123), Map("k" -> 11.1)) :: Row(
+ 2L,
+ "b2",
+ BigDecimal.decimal(234),
+ Map("k" -> 22.2)) :: Row(3L, "c", BigDecimal.decimal(345),
Map("k" -> 33.3)) :: Row(
+ 4L,
+ "d",
+ BigDecimal.decimal(456),
+ Map("k" -> 44.4)) :: Nil
+ } else {
+ Row(1L, "a", null, null) :: Row(
+ 1L,
+ "a2",
+ BigDecimal.decimal(123),
+ Map("k" -> 11.1)) :: Row(2L, "b", null, null) :: Row(
+ 2L,
+ "b2",
+ BigDecimal.decimal(234),
+ Map("k" -> 22.2)) :: Row(3L, "c", BigDecimal.decimal(345),
Map("k" -> 33.3)) :: Row(
+ 4L,
+ "d",
+ BigDecimal.decimal(456),
+ Map("k" -> 44.4)) :: Nil
+ }
+ checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected3)
+
+ }
+ }
+ }
+
+ writeModes.foreach {
+ writeMode =>
+ bucketModes.foreach {
+ bucket =>
+ test(
+ s"Schema evolution: write data into Paimon with allowExplicitCast
= true: $writeMode, bucket: $bucket") {
+ val _spark = spark
+ import _spark.implicits._
+
+ val primaryKeysProp = if (writeMode == CHANGE_LOG) {
+ "'primary-key'='a',"
+ } else {
+ ""
+ }
+
+ spark.sql(
+ s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |TBLPROPERTIES ($primaryKeysProp
'write-mode'='${writeMode.toString}', 'bucket'='$bucket')
+ |""".stripMargin)
+
+ val paimonTable = loadTable("T")
+ val location = paimonTable.location().getPath
+
+ val df1 = Seq((1, "2023-08-01"), (2, "2023-08-02")).toDF("a", "b")
+ df1.write.format("paimon").mode("append").save(location)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, "2023-08-01") :: Row(2, "2023-08-02") :: Nil)
+
+ // Case 1: two additional fields: DoubleType and TimestampType
+ val ts = java.sql.Timestamp.valueOf("2023-08-01 10:00:00.0")
+ val df2 = Seq((1, "2023-08-01", 12.3d, ts), (3, "2023-08-03",
34.5d, ts))
+ .toDF("a", "b", "c", "d")
+ df2.write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .save(location)
+ val expected2 = if (writeMode == CHANGE_LOG) {
+ Row(1, "2023-08-01", 12.3d, ts) ::
+ Row(2, "2023-08-02", null, null) :: Row(3, "2023-08-03",
34.5d, ts) :: Nil
+ } else {
+ Row(1, "2023-08-01", null, null) :: Row(1, "2023-08-01", 12.3d,
ts) :: Row(
+ 2,
+ "2023-08-02",
+ null,
+ null) :: Row(3, "2023-08-03", 34.5d, ts) :: Nil
+ }
+ checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected2)
+
+ // Case 2: a: Int -> Long, b: String -> Date, c: Long -> Int, d:
Map -> String
+ val date = java.sql.Date.valueOf("2023-07-31")
+ val df3 = Seq((2L, date, 234, null), (4L, date, 456, "2023-08-01
11:00:00.0")).toDF(
+ "a",
+ "b",
+ "c",
+ "d")
+
+ // throw UnsupportedOperationException if
write.merge-schema.explicit-cast = false
+ assertThrows[UnsupportedOperationException] {
+ df3.write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .save(location)
+ }
+ // merge schema and write data when
write.merge-schema.explicit-cast = true
+ df3.write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .option("write.merge-schema.explicit-cast", "true")
+ .save(location)
+ val expected3 = if (writeMode == CHANGE_LOG) {
+ Row(1L, Date.valueOf("2023-08-01"), 12, ts.toString) :: Row(
+ 2L,
+ date,
+ 234,
+ null) :: Row(3L, Date.valueOf("2023-08-03"), 34, ts.toString)
:: Row(
+ 4L,
+ date,
+ 456,
+ "2023-08-01 11:00:00.0") :: Nil
+ } else {
+ Row(1L, Date.valueOf("2023-08-01"), null, null) :: Row(
+ 1L,
+ Date.valueOf("2023-08-01"),
+ 12,
+ ts.toString) :: Row(2L, date, 234, null) :: Row(
+ 2L,
+ Date.valueOf("2023-08-02"),
+ null,
+ null) :: Row(3L, Date.valueOf("2023-08-03"), 34, ts.toString)
:: Row(
+ 4L,
+ date,
+ 456,
+ "2023-08-01 11:00:00.0") :: Nil
+ }
+ checkAnswer(
+ spark.sql("SELECT a, b, c, substring(d, 0, 21) FROM T ORDER BY
a, b"),
+ expected3)
+
+ }
+ }
+ }
+
}