This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new b7d289873c [core][spark][flink] Introduce default value when writing (#5754) b7d289873c is described below commit b7d289873c02374a7c1dcde16f4ee11205201e17 Author: Jingsong Lee <jingsongl...@gmail.com> AuthorDate: Thu Jun 19 15:11:41 2025 +0800 [core][spark][flink] Introduce default value when writing (#5754) --- docs/content/flink/default-value.md | 64 +++++++++++ docs/content/spark/default-value.md | 71 ++++++++++++ docs/static/rest-catalog-open-api.yaml | 15 +++ .../main/java/org/apache/paimon/CoreOptions.java | 16 --- .../main/java/org/apache/paimon/schema/Schema.java | 21 +++- .../org/apache/paimon/schema/SchemaChange.java | 60 ++++++++++ .../java/org/apache/paimon/types/DataField.java | 83 ++++++++------ .../apache/paimon/types/DataTypeJsonParser.java | 7 +- .../org/apache/paimon/types/ReassignFieldId.java | 8 +- .../main/java/org/apache/paimon/types/RowType.java | 15 ++- .../org/apache/paimon/casting/DefaultValueRow.java | 46 ++++++++ .../org/apache/paimon/types/DataTypesTest.java | 2 +- .../paimon/operation/DefaultValueAssigner.java | 30 +++-- .../org/apache/paimon/schema/SchemaManager.java | 28 ++++- .../apache/paimon/schema/SchemaMergingUtils.java | 9 +- .../org/apache/paimon/schema/SchemaValidation.java | 64 ----------- .../apache/paimon/table/sink/TableWriteImpl.java | 8 ++ .../paimon/operation/DefaultValueAssignerTest.java | 8 +- .../paimon/schema/DataTypeJsonParserTest.java | 11 ++ .../paimon/table/PrimaryKeySimpleTableTest.java | 3 +- .../apache/paimon/table/SchemaEvolutionTest.java | 121 --------------------- .../source/snapshot/DefaultValueScannerTest.java | 3 +- .../cdc/mysql/MySqlSyncTableActionITCase.java | 4 +- .../AlterColumnDefaultValueProcedure.java | 63 +++++++++++ .../services/org.apache.paimon.factories.Factory | 1 + .../org/apache/paimon/flink/BranchSqlITCase.java | 9 ++ .../apache/paimon/flink/ReadWriteTableITCase.java | 7 +- .../apache/paimon/format/orc/OrcFileFormat.java | 3 +- .../connector/catalog/TableCatalogCapability.java | 52 +++++++++ .../connector/catalog/TableCatalogCapability.java | 51 +++++++++ .../java/org/apache/paimon/spark/SparkCatalog.java | 29 ++++- .../apache/paimon/spark/utils/CatalogUtils.java | 30 +++++ .../org/apache/paimon/spark/SparkTypeUtils.java | 23 +++- .../org/apache/paimon/spark/SparkWriteITCase.java | 45 +++++++- 34 files changed, 740 insertions(+), 270 deletions(-) diff --git a/docs/content/flink/default-value.md b/docs/content/flink/default-value.md new file mode 100644 index 0000000000..9030d6f14b --- /dev/null +++ b/docs/content/flink/default-value.md @@ -0,0 +1,64 @@ +--- +title: "Default Value" +weight: 8 +type: docs +aliases: +- /flink/default-value.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Default Value + +Paimon allows specifying default values for columns. When users write to these tables without explicitly providing +values for certain columns, Paimon automatically generates default values for these columns. + +## Create Table + +Flink SQL does not have native support for default values, so we can only create a table without default values: + +```sql +CREATE TABLE my_table ( + a BIGINT, + b STRING, + c INT +); +``` + +We support the procedure of modifying column default values in Flink. You can add default value definitions after +creating the table: + +```sql +CALL sys.alter_column_default_value('default.my_table', 'b', 'my_value'); +CALL sys.alter_column_default_value('default.my_table', 'c', '5'); +``` + +## Insert Table + +For SQL commands that execute table writes, such as the `INSERT`, `UPDATE`, and `MERGE` commands, `NULL` value is +parsed into the default value specified for the corresponding column. + +For example: + +```sql +INSERT INTO my_table (a) VALUES (1), (2); + +SELECT * FROM my_table; +-- result: [[1, 5, my_value], [2, 5, my_value]] +``` diff --git a/docs/content/spark/default-value.md b/docs/content/spark/default-value.md new file mode 100644 index 0000000000..ba5423133e --- /dev/null +++ b/docs/content/spark/default-value.md @@ -0,0 +1,71 @@ +--- +title: "Default Value" +weight: 8 +type: docs +aliases: +- /spark/default-value.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Default Value + +Paimon allows specifying default values for columns. When users write to these tables without explicitly providing +values for certain columns, Paimon automatically generates default values for these columns. + +## Create Table + +You can create a table with columns with default values using the following SQL: + +```sql +CREATE TABLE my_table ( + a BIGINT, + b STRING DEFAULT 'my_value', + c INT DEFAULT 5 +); +``` + +## Insert Table + +For SQL commands that execute table writes, such as the `INSERT`, `UPDATE`, and `MERGE` commands, the `DEFAULT` keyword +or `NULL` value is parsed into the default value specified for the corresponding column. + +## Alter Default Value + +Paimon supports alter column default value. + +For example: + +```sql +CREATE TABLE T (a INT, b INT DEFAULT 2); + +INSERT INTO T (a) VALUES (1); +-- result: [[1, 2]] + +ALTER TABLE T ALTER COLUMN b SET DEFAULT 3; + +INSERT INTO T (a) VALUES (2); +-- result: [[1, 2], [2, 3]] +``` + +The default value of `'b'` column has been changed to 3 from 2. + +## Limitation + +Not support alter table add column with default value, for example: `ALTER TABLE T ADD COLUMN d INT DEFAULT 5;`. diff --git a/docs/static/rest-catalog-open-api.yaml b/docs/static/rest-catalog-open-api.yaml index 4246da1f8e..402a4553b3 100644 --- a/docs/static/rest-catalog-open-api.yaml +++ b/docs/static/rest-catalog-open-api.yaml @@ -2233,6 +2233,7 @@ components: - $ref: '#/components/schemas/RenameColumn' - $ref: '#/components/schemas/DropColumn' - $ref: '#/components/schemas/UpdateColumnComment' + - $ref: '#/components/schemas/UpdateColumnDefaultValue' - $ref: '#/components/schemas/UpdateColumnType' - $ref: '#/components/schemas/UpdateColumnPosition' - $ref: '#/components/schemas/UpdateColumnNullability' @@ -2247,6 +2248,7 @@ components: renameColumn: '#/components/schemas/RenameColumn' dropColumn: '#/components/schemas/DropColumn' updateColumnComment: '#/components/schemas/UpdateColumnComment' + updateColumnDefaultValue: '#/components/schemas/UpdateColumnDefaultValue' updateColumnType: '#/components/schemas/UpdateColumnType' updateColumnPosition: '#/components/schemas/UpdateColumnPosition' updateColumnNullability: '#/components/schemas/UpdateColumnNullability' @@ -2339,6 +2341,19 @@ components: type: string newComment: type: string + UpdateColumnDefaultValue: + allOf: + - $ref: '#/components/schemas/BaseSchemaChange' + properties: + action: + type: string + const: "updateColumnDefaultValue" + fieldNames: + type: array + items: + type: string + newDefaultValue: + type: string UpdateColumnType: allOf: - $ref: '#/components/schemas/BaseSchemaChange' diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 4c446dc4e5..3ff05d9a7c 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -66,8 +66,6 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; /** Core options for paimon. */ public class CoreOptions implements Serializable { - public static final String DEFAULT_VALUE_SUFFIX = "default-value"; - public static final String FIELDS_PREFIX = "fields"; public static final String FIELDS_SEPARATOR = ","; @@ -2589,20 +2587,6 @@ public class CoreOptions implements Serializable { return options.get(COMMIT_FORCE_CREATE_SNAPSHOT); } - public Map<String, String> getFieldDefaultValues() { - Map<String, String> defaultValues = new HashMap<>(); - String fieldPrefix = FIELDS_PREFIX + "."; - String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX; - for (Map.Entry<String, String> option : options.toMap().entrySet()) { - String key = option.getKey(); - if (key != null && key.startsWith(fieldPrefix) && key.endsWith(defaultValueSuffix)) { - String fieldName = key.replace(fieldPrefix, "").replace(defaultValueSuffix, ""); - defaultValues.put(fieldName, option.getValue()); - } - } - return defaultValues; - } - public Map<String, String> commitCallbacks() { return callbacks(COMMIT_CALLBACKS, COMMIT_CALLBACK_PARAM); } diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java b/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java index 8a679665fb..d4dbbb0117 100644 --- a/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java +++ b/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java @@ -175,7 +175,8 @@ public class Schema { field.id(), field.name(), field.type().copy(false), - field.description())); + field.description(), + field.defaultValue())); } else { newFields.add(field); } @@ -302,12 +303,28 @@ public class Schema { * @param description description of the column */ public Builder column(String columnName, DataType dataType, @Nullable String description) { + return column(columnName, dataType, description, null); + } + + /** + * Declares a column that is appended to this schema. + * + * @param columnName column name + * @param dataType data type of the column + * @param description description of the column + * @param defaultValue default value of the column + */ + public Builder column( + String columnName, + DataType dataType, + @Nullable String description, + @Nullable String defaultValue) { Preconditions.checkNotNull(columnName, "Column name must not be null."); Preconditions.checkNotNull(dataType, "Data type must not be null."); int id = highestFieldId.incrementAndGet(); DataType reassignDataType = ReassignFieldId.reassign(dataType, highestFieldId); - columns.add(new DataField(id, columnName, reassignDataType, description)); + columns.add(new DataField(id, columnName, reassignDataType, description, defaultValue)); return this; } diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java index 90bc9abc0a..4b68ad105a 100644 --- a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java +++ b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java @@ -72,6 +72,9 @@ import java.util.Objects; @JsonSubTypes.Type( value = SchemaChange.UpdateColumnComment.class, name = SchemaChange.Actions.UPDATE_COLUMN_COMMENT_ACTION), + @JsonSubTypes.Type( + value = SchemaChange.UpdateColumnDefaultValue.class, + name = SchemaChange.Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION), @JsonSubTypes.Type( value = SchemaChange.UpdateColumnPosition.class, name = SchemaChange.Actions.UPDATE_COLUMN_POSITION_ACTION), @@ -153,6 +156,10 @@ public interface SchemaChange extends Serializable { return new UpdateColumnComment(fieldNames, comment); } + static SchemaChange updateColumnDefaultValue(String[] fieldNames, String defaultValue) { + return new UpdateColumnDefaultValue(fieldNames, defaultValue); + } + static SchemaChange updateColumnPosition(Move move) { return new UpdateColumnPosition(move); } @@ -751,6 +758,58 @@ public interface SchemaChange extends Serializable { } } + /** A SchemaChange to update the default value. */ + final class UpdateColumnDefaultValue implements SchemaChange { + + private static final long serialVersionUID = 1L; + private static final String FIELD_FILED_NAMES = "fieldNames"; + private static final String FIELD_NEW_DEFAULT_VALUE = "newDefaultValue"; + + @JsonProperty(FIELD_FILED_NAMES) + private final String[] fieldNames; + + @JsonProperty(FIELD_NEW_DEFAULT_VALUE) + private final String newDefaultValue; + + @JsonCreator + private UpdateColumnDefaultValue( + @JsonProperty(FIELD_FILED_NAMES) String[] fieldNames, + @JsonProperty(FIELD_NEW_DEFAULT_VALUE) String newDefaultValue) { + this.fieldNames = fieldNames; + this.newDefaultValue = newDefaultValue; + } + + @JsonGetter(FIELD_FILED_NAMES) + public String[] fieldNames() { + return fieldNames; + } + + @JsonGetter(FIELD_NEW_DEFAULT_VALUE) + public String newDefaultValue() { + return newDefaultValue; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + UpdateColumnDefaultValue that = (UpdateColumnDefaultValue) o; + return Arrays.equals(fieldNames, that.fieldNames) + && newDefaultValue.equals(that.newDefaultValue); + } + + @Override + public int hashCode() { + int result = Objects.hash(newDefaultValue); + result = 31 * result + Objects.hashCode(fieldNames); + return result; + } + } + /** Actions for schema changes: identify for schema change. */ class Actions { public static final String FIELD_ACTION = "action"; @@ -763,6 +822,7 @@ public interface SchemaChange extends Serializable { public static final String UPDATE_COLUMN_TYPE_ACTION = "updateColumnType"; public static final String UPDATE_COLUMN_NULLABILITY_ACTION = "updateColumnNullability"; public static final String UPDATE_COLUMN_COMMENT_ACTION = "updateColumnComment"; + public static final String UPDATE_COLUMN_DEFAULT_VALUE_ACTION = "updateColumnDefaultValue"; public static final String UPDATE_COLUMN_POSITION_ACTION = "updateColumnPosition"; private Actions() {} diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataField.java b/paimon-api/src/main/java/org/apache/paimon/types/DataField.java index 209118023b..dcca909d52 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataField.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataField.java @@ -19,6 +19,7 @@ package org.apache.paimon.types; import org.apache.paimon.annotation.Public; +import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator; @@ -41,27 +42,31 @@ public final class DataField implements Serializable { private static final long serialVersionUID = 1L; - public static final String FIELD_FORMAT_WITH_DESCRIPTION = "%s %s '%s'"; - - public static final String FIELD_FORMAT_NO_DESCRIPTION = "%s %s"; - private final int id; - private final String name; - private final DataType type; - private final @Nullable String description; + private final @Nullable String defaultValue; public DataField(int id, String name, DataType dataType) { - this(id, name, dataType, null); + this(id, name, dataType, null, null); } - public DataField(int id, String name, DataType type, @Nullable String description) { + public DataField(int id, String name, DataType dataType, @Nullable String description) { + this(id, name, dataType, description, null); + } + + public DataField( + int id, + String name, + DataType type, + @Nullable String description, + @Nullable String defaultValue) { this.id = id; this.name = name; this.type = type; this.description = description; + this.defaultValue = defaultValue; } public int id() { @@ -76,20 +81,24 @@ public final class DataField implements Serializable { return type; } - public DataField newId(int newid) { - return new DataField(newid, name, type, description); + public DataField newId(int newId) { + return new DataField(newId, name, type, description, defaultValue); } public DataField newName(String newName) { - return new DataField(id, newName, type, description); + return new DataField(id, newName, type, description, defaultValue); } public DataField newType(DataType newType) { - return new DataField(id, name, newType, description); + return new DataField(id, name, newType, description, defaultValue); } public DataField newDescription(String newDescription) { - return new DataField(id, name, type, newDescription); + return new DataField(id, name, type, defaultValue, newDescription); + } + + public DataField newDefaultValue(String newDefaultValue) { + return new DataField(id, name, type, newDefaultValue, description); } @Nullable @@ -97,28 +106,29 @@ public final class DataField implements Serializable { return description; } + @Nullable + public String defaultValue() { + return defaultValue; + } + public DataField copy() { - return new DataField(id, name, type.copy(), description); + return new DataField(id, name, type.copy(), description, defaultValue); } public DataField copy(boolean isNullable) { - return new DataField(id, name, type.copy(isNullable), description); + return new DataField(id, name, type.copy(isNullable), description, defaultValue); } public String asSQLString() { - return formatString(type.asSQLString()); - } - - private String formatString(String typeString) { - if (description == null) { - return String.format(FIELD_FORMAT_NO_DESCRIPTION, escapeIdentifier(name), typeString); - } else { - return String.format( - FIELD_FORMAT_WITH_DESCRIPTION, - escapeIdentifier(name), - typeString, - escapeSingleQuotes(description)); + StringBuilder sb = new StringBuilder(); + sb.append(escapeIdentifier(name)).append(" ").append(type.asSQLString()); + if (StringUtils.isNotEmpty(description)) { + sb.append(" COMMENT '").append(escapeSingleQuotes(description)).append("'"); } + if (defaultValue != null) { + sb.append(" DEFAULT ").append(defaultValue); + } + return sb.toString(); } public void serializeJson(JsonGenerator generator) throws IOException { @@ -130,6 +140,9 @@ public final class DataField implements Serializable { if (description() != null) { generator.writeStringField("description", description()); } + if (defaultValue() != null) { + generator.writeStringField("defaultValue", defaultValue()); + } generator.writeEndObject(); } @@ -145,7 +158,8 @@ public final class DataField implements Serializable { return Objects.equals(id, field.id) && Objects.equals(name, field.name) && Objects.equals(type, field.type) - && Objects.equals(description, field.description); + && Objects.equals(description, field.description) + && Objects.equals(defaultValue, field.defaultValue); } public boolean equalsIgnoreFieldId(DataField other) { @@ -157,7 +171,8 @@ public final class DataField implements Serializable { } return Objects.equals(name, other.name) && type.equalsIgnoreFieldId(other.type) - && Objects.equals(description, other.description); + && Objects.equals(description, other.description) + && Objects.equals(defaultValue, other.defaultValue); } public boolean isPrunedFrom(DataField other) { @@ -170,12 +185,13 @@ public final class DataField implements Serializable { return Objects.equals(id, other.id) && Objects.equals(name, other.name) && type.isPrunedFrom(other.type) - && Objects.equals(description, other.description); + && Objects.equals(description, other.description) + && Objects.equals(defaultValue, other.defaultValue); } @Override public int hashCode() { - return Objects.hash(id, name, type, description); + return Objects.hash(id, name, type, description, defaultValue); } @Override @@ -193,7 +209,8 @@ public final class DataField implements Serializable { } else if (dataField1 != null && dataField2 != null) { return Objects.equals(dataField1.name(), dataField2.name()) && Objects.equals(dataField1.type(), dataField2.type()) - && Objects.equals(dataField1.description(), dataField2.description()); + && Objects.equals(dataField1.description(), dataField2.description()) + && Objects.equals(dataField1.defaultValue(), dataField2.defaultValue()); } else { return false; } diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java index 40790f06fb..d8e6063dc1 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java @@ -58,7 +58,12 @@ public final class DataTypeJsonParser { if (descriptionNode != null) { description = descriptionNode.asText(); } - return new DataField(id, name, type, description); + JsonNode defaultValueNode = json.get("defaultValue"); + String defaultValue = null; + if (defaultValueNode != null) { + defaultValue = defaultValueNode.asText(); + } + return new DataField(id, name, type, description, defaultValue); } public static DataType parseDataType(JsonNode json) { diff --git a/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java b/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java index 36e4d86070..2aacfeaf88 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java @@ -56,7 +56,13 @@ public class ReassignFieldId extends DataTypeDefaultVisitor<DataType> { public DataType visit(RowType rowType) { RowType.Builder builder = RowType.builder(rowType.isNullable(), fieldId); rowType.getFields() - .forEach(f -> builder.field(f.name(), f.type().accept(this), f.description())); + .forEach( + f -> + builder.field( + f.name(), + f.type().accept(this), + f.description(), + f.defaultValue())); return builder.build(); } diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java index 5aa58fb375..e3fbf0bd8b 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java @@ -27,6 +27,8 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCre import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -387,11 +389,22 @@ public final class RowType extends DataType { return this; } - public Builder field(String name, DataType type, String description) { + public Builder field(String name, DataType type, @Nullable String description) { fields.add(new DataField(fieldId.incrementAndGet(), name, type, description)); return this; } + public Builder field( + String name, + DataType type, + @Nullable String description, + @Nullable String defaultValue) { + fields.add( + new DataField( + fieldId.incrementAndGet(), name, type, description, defaultValue)); + return this; + } + public Builder fields(List<DataType> types) { for (int i = 0; i < types.size(); i++) { field("f" + i, types.get(i)); diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java index 99199c0a79..4e62bf47ed 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java @@ -20,12 +20,20 @@ package org.apache.paimon.casting; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VarCharType; + +import javax.annotation.Nullable; + +import java.util.List; /** * An implementation of {@link InternalRow} which provides a default value for the underlying {@link @@ -193,4 +201,42 @@ public class DefaultValueRow implements InternalRow { public static DefaultValueRow from(InternalRow defaultValueRow) { return new DefaultValueRow(defaultValueRow); } + + @Nullable + public static DefaultValueRow create(RowType rowType) { + List<DataField> fields = rowType.getFields(); + GenericRow row = new GenericRow(fields.size()); + boolean containsDefaultValue = false; + for (int i = 0; i < fields.size(); i++) { + DataField dataField = fields.get(i); + String defaultValueStr = dataField.defaultValue(); + if (defaultValueStr == null) { + continue; + } + + containsDefaultValue = true; + @SuppressWarnings("unchecked") + CastExecutor<Object, Object> resolve = + (CastExecutor<Object, Object>) + CastExecutors.resolve(VarCharType.STRING_TYPE, dataField.type()); + + if (resolve == null) { + throw new RuntimeException( + "Default value do not support the type of " + dataField.type()); + } + + if (defaultValueStr.startsWith("'") && defaultValueStr.endsWith("'")) { + defaultValueStr = defaultValueStr.substring(1, defaultValueStr.length() - 1); + } + + Object defaultValue = resolve.cast(BinaryString.fromString(defaultValueStr)); + row.setField(i, defaultValue); + } + + if (!containsDefaultValue) { + return null; + } + + return DefaultValueRow.from(row); + } } diff --git a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java index 9da6289d29..df4cf0679b 100644 --- a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java @@ -178,7 +178,7 @@ public class DataTypesTest { new DataField(1, "b`", new TimestampType())))) .satisfies( baseAssertions( - "ROW<`a` VARCHAR(1) 'Someone''s desc.', `b``` TIMESTAMP(6)>", + "ROW<`a` VARCHAR(1) COMMENT 'Someone''s desc.', `b``` TIMESTAMP(6)>", new RowType( Arrays.asList( new DataField( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java index ccb65eb9da..416237b60b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java @@ -18,7 +18,6 @@ package org.apache.paimon.operation; -import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.casting.CastExecutor; import org.apache.paimon.casting.CastExecutors; @@ -38,16 +37,24 @@ import org.apache.paimon.types.VarCharType; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.paimon.CoreOptions.FIELDS_PREFIX; + /** * The field Default value assigner. note that invoke of assigning should be after merge and schema * evolution. + * + * @deprecated default value in reading is not recommended */ +@Deprecated public class DefaultValueAssigner { + public static final String DEFAULT_VALUE_SUFFIX = "default-value"; + private final RowType rowType; private final Map<String, String> defaultValues; @@ -69,10 +76,6 @@ public class DefaultValueAssigner { return this; } - public boolean needToAssign() { - return needToAssign; - } - /** assign default value for column which value is null. */ public RecordReader<InternalRow> assignFieldsDefaultValue(RecordReader<InternalRow> reader) { if (!needToAssign) { @@ -152,8 +155,21 @@ public class DefaultValueAssigner { } public static DefaultValueAssigner create(TableSchema schema) { - CoreOptions coreOptions = new CoreOptions(schema.options()); - Map<String, String> defaultValues = coreOptions.getFieldDefaultValues(); + Map<String, String> defaultValues = getFieldDefaultValues(schema.options()); return new DefaultValueAssigner(defaultValues, schema.logicalRowType()); } + + private static Map<String, String> getFieldDefaultValues(Map<String, String> options) { + Map<String, String> defaultValues = new HashMap<>(); + String fieldPrefix = FIELDS_PREFIX + "."; + String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX; + for (Map.Entry<String, String> option : options.entrySet()) { + String key = option.getKey(); + if (key != null && key.startsWith(fieldPrefix) && key.endsWith(defaultValueSuffix)) { + String fieldName = key.replace(fieldPrefix, "").replace(defaultValueSuffix, ""); + defaultValues.put(fieldName, option.getValue()); + } + } + return defaultValues; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 36a18e26a8..a33d4e4f1f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -31,6 +31,7 @@ import org.apache.paimon.schema.SchemaChange.RemoveOption; import org.apache.paimon.schema.SchemaChange.RenameColumn; import org.apache.paimon.schema.SchemaChange.SetOption; import org.apache.paimon.schema.SchemaChange.UpdateColumnComment; +import org.apache.paimon.schema.SchemaChange.UpdateColumnDefaultValue; import org.apache.paimon.schema.SchemaChange.UpdateColumnNullability; import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition; import org.apache.paimon.schema.SchemaChange.UpdateColumnType; @@ -404,7 +405,8 @@ public class SchemaManager implements Serializable { field.id(), rename.newName(), field.type(), - field.description()); + field.description(), + field.defaultValue()); newFields.set(i, newField); return; } @@ -463,7 +465,8 @@ public class SchemaManager implements Serializable { targetRootType, depth, update.fieldNames().length), - field.description()); + field.description(), + field.defaultValue()); }); } else if (change instanceof UpdateColumnNullability) { UpdateColumnNullability update = (UpdateColumnNullability) change; @@ -494,7 +497,8 @@ public class SchemaManager implements Serializable { sourceRootType, depth, update.fieldNames().length), - field.description()); + field.description(), + field.defaultValue()); }); } else if (change instanceof UpdateColumnComment) { UpdateColumnComment update = (UpdateColumnComment) change; @@ -506,11 +510,24 @@ public class SchemaManager implements Serializable { field.id(), field.name(), field.type(), - update.newDescription())); + update.newDescription(), + field.defaultValue())); } else if (change instanceof UpdateColumnPosition) { UpdateColumnPosition update = (UpdateColumnPosition) change; SchemaChange.Move move = update.move(); applyMove(newFields, move); + } else if (change instanceof UpdateColumnDefaultValue) { + UpdateColumnDefaultValue update = (UpdateColumnDefaultValue) change; + updateNestedColumn( + newFields, + update.fieldNames(), + (field, depth) -> + new DataField( + field.id(), + field.name(), + field.type(), + field.description(), + update.newDefaultValue())); } else { throw new UnsupportedOperationException("Unsupported change: " + change.getClass()); } @@ -793,7 +810,8 @@ public class SchemaManager implements Serializable { field.id(), field.name(), wrapNewRowType(field.type(), nestedFields), - field.description())); + field.description(), + field.defaultValue())); return; } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java index 0fff27dce4..82c61adf6c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java @@ -119,7 +119,8 @@ public class SchemaMergingUtils { baseField.id(), baseField.name(), updatedDataType, - baseField.description()); + baseField.description(), + baseField.defaultValue()); } else { return baseField; } @@ -226,6 +227,10 @@ public class SchemaMergingUtils { private static DataField assignIdForNewField(DataField field, AtomicInteger highestFieldId) { DataType dataType = ReassignFieldId.reassign(field.type(), highestFieldId); return new DataField( - highestFieldId.incrementAndGet(), field.name(), dataType, field.description()); + highestFieldId.incrementAndGet(), + field.name(), + dataType, + field.description(), + field.defaultValue()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 66b648f890..cf6b919499 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -21,9 +21,6 @@ package org.apache.paimon.schema; import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.MergeEngine; -import org.apache.paimon.casting.CastExecutor; -import org.apache.paimon.casting.CastExecutors; -import org.apache.paimon.data.BinaryString; import org.apache.paimon.format.FileFormat; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.Options; @@ -38,7 +35,6 @@ import org.apache.paimon.types.MapType; import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimestampType; -import org.apache.paimon.types.VarCharType; import org.apache.paimon.utils.StringUtils; import java.util.ArrayList; @@ -105,8 +101,6 @@ public class SchemaValidation { validateBucket(schema, options); - validateDefaultValues(schema); - validateStartupMode(options); validateFieldsPrefix(schema, options); @@ -486,64 +480,6 @@ public class SchemaValidation { } } - private static void validateDefaultValues(TableSchema schema) { - CoreOptions coreOptions = new CoreOptions(schema.options()); - Map<String, String> defaultValues = coreOptions.getFieldDefaultValues(); - - if (!defaultValues.isEmpty()) { - - List<String> partitionKeys = schema.partitionKeys(); - for (String partitionKey : partitionKeys) { - if (defaultValues.containsKey(partitionKey)) { - throw new IllegalArgumentException( - String.format( - "Partition key %s should not be assign default column.", - partitionKey)); - } - } - - List<String> primaryKeys = schema.primaryKeys(); - for (String primaryKey : primaryKeys) { - if (defaultValues.containsKey(primaryKey)) { - throw new IllegalArgumentException( - String.format( - "Primary key %s should not be assign default column.", - primaryKey)); - } - } - - List<DataField> fields = schema.fields(); - - for (DataField field : fields) { - String defaultValueStr = defaultValues.get(field.name()); - if (defaultValueStr == null) { - continue; - } - - @SuppressWarnings("unchecked") - CastExecutor<Object, Object> resolve = - (CastExecutor<Object, Object>) - CastExecutors.resolve(VarCharType.STRING_TYPE, field.type()); - if (resolve == null) { - throw new IllegalArgumentException( - String.format( - "The column %s with datatype %s is currently not supported for default value.", - field.name(), field.type().asSQLString())); - } - - try { - resolve.cast(BinaryString.fromString(defaultValueStr)); - } catch (Exception e) { - throw new IllegalArgumentException( - String.format( - "The default value %s of the column %s can not be cast to datatype: %s", - defaultValueStr, field.name(), field.type()), - e); - } - } - } - } - private static void validateForDeletionVectors(CoreOptions options) { checkArgument( options.changelogProducer() == ChangelogProducer.NONE diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index f72cd7151f..cf3f8d6657 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -20,6 +20,7 @@ package org.apache.paimon.table.sink; import org.apache.paimon.FileStore; import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.casting.DefaultValueRow; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; @@ -63,6 +64,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State private BucketMode bucketMode; private final int[] notNullFieldIndex; + private final @Nullable DefaultValueRow defaultValueRow; public TableWriteImpl( RowType rowType, @@ -84,6 +86,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State .map(DataField::name) .collect(Collectors.toList()); this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames); + this.defaultValueRow = DefaultValueRow.create(rowType); } @Override @@ -162,6 +165,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State @Nullable public SinkRecord writeAndReturn(InternalRow row) throws Exception { checkNullability(row); + row = wrapDefaultValue(row); RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); if (ignoreDelete && rowKind.isRetract()) { return null; @@ -193,6 +197,10 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State } } + private InternalRow wrapDefaultValue(InternalRow row) { + return defaultValueRow == null ? row : defaultValueRow.replaceRow(row); + } + private SinkRecord toSinkRecord(InternalRow row) { keyAndBucketExtractor.setRecord(row); return new SinkRecord( diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java index 0554cc0134..7cb31fba27 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java @@ -57,12 +57,16 @@ class DefaultValueAssignerTest { options.put( String.format( "%s.%s.%s", - CoreOptions.FIELDS_PREFIX, "col4", CoreOptions.DEFAULT_VALUE_SUFFIX), + CoreOptions.FIELDS_PREFIX, + "col4", + DefaultValueAssigner.DEFAULT_VALUE_SUFFIX), "0"); options.put( String.format( "%s.%s.%s", - CoreOptions.FIELDS_PREFIX, "col5", CoreOptions.DEFAULT_VALUE_SUFFIX), + CoreOptions.FIELDS_PREFIX, + "col5", + DefaultValueAssigner.DEFAULT_VALUE_SUFFIX), "1"); Schema schema = new Schema( diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java index 2397af83aa..808ce70aee 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java @@ -168,6 +168,17 @@ public class DataTypeJsonParserTest { "f1", new BooleanType(), "This as well.")))), + TestSpec.forString( + "{\"type\":\"ROW\",\"fields\":[{\"id\":0,\"name\":\"f0\",\"type\":\"INT NOT NULL\",\"description\":\"my_comment\",\"defaultValue\":\"55\"}]}") + .expectType( + new RowType( + Collections.singletonList( + new DataField( + 0, + "f0", + new IntType(false), + "my_comment", + "55")))), // error message testing diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java index 35928e2335..693c378a21 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java @@ -36,6 +36,7 @@ import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.operation.AbstractFileStoreWrite; +import org.apache.paimon.operation.DefaultValueAssigner; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -1291,7 +1292,7 @@ public class PrimaryKeySimpleTableTest extends SimpleTableTestBase { "%s.%s.%s", CoreOptions.FIELDS_PREFIX, "b", - CoreOptions.DEFAULT_VALUE_SUFFIX), + DefaultValueAssigner.DEFAULT_VALUE_SUFFIX), "0"); }); StreamTableWrite write = table.newWrite(commitUser); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index 76a63eb06e..5d20e46ec2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -56,7 +56,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.function.Consumer; @@ -84,126 +83,6 @@ public class SchemaEvolutionTest { commitUser = UUID.randomUUID().toString(); } - @Test - public void testDefaultValue() throws Exception { - { - Map<String, String> option = new HashMap<>(); - option.put( - String.format( - "%s.%s.%s", - CoreOptions.FIELDS_PREFIX, "a", CoreOptions.DEFAULT_VALUE_SUFFIX), - "1"); - Schema schema = - new Schema( - RowType.of( - new DataType[] { - DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), - DataTypes.BIGINT() - }, - new String[] {"a", "b"}) - .getFields(), - Collections.emptyList(), - Collections.emptyList(), - option, - ""); - - assertThatThrownBy(() -> schemaManager.createTable(schema)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining( - "The column %s with datatype %s is currently not supported for default value.", - "a", DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()).asSQLString()); - } - - { - Map<String, String> option = new HashMap<>(); - option.put( - String.format( - "%s.%s.%s", - CoreOptions.FIELDS_PREFIX, "a", CoreOptions.DEFAULT_VALUE_SUFFIX), - "abcxxxx"); - Schema schema = - new Schema( - RowType.of( - new DataType[] {DataTypes.BIGINT(), DataTypes.BIGINT()}, - new String[] {"a", "b"}) - .getFields(), - Collections.emptyList(), - Collections.emptyList(), - option, - ""); - assertThatThrownBy(() -> schemaManager.createTable(schema)) - .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining( - "The default value %s of the column a can not be cast to datatype: %s", - "abcxxxx", DataTypes.BIGINT().asSQLString()); - } - - { - Schema schema = - new Schema( - RowType.of( - new DataType[] { - DataTypes.BIGINT(), - DataTypes.BIGINT(), - DataTypes.BIGINT() - }, - new String[] {"a", "b", "c"}) - .getFields(), - Lists.newArrayList("c"), - Lists.newArrayList("a", "c"), - new HashMap<>(), - ""); - - schemaManager.createTable(schema); - - assertThatThrownBy( - () -> - schemaManager.commitChanges( - Collections.singletonList( - SchemaChange.setOption( - String.format( - "%s.%s.%s", - CoreOptions.FIELDS_PREFIX, - "b", - CoreOptions - .DEFAULT_VALUE_SUFFIX), - "abcxxxx")))) - .hasCauseInstanceOf(IllegalArgumentException.class) - .hasMessageContaining( - "The default value %s of the column b can not be cast to datatype: %s", - "abcxxxx", DataTypes.BIGINT().asSQLString()); - assertThatThrownBy( - () -> - schemaManager.commitChanges( - Collections.singletonList( - SchemaChange.setOption( - String.format( - "%s.%s.%s", - CoreOptions.FIELDS_PREFIX, - "a", - CoreOptions - .DEFAULT_VALUE_SUFFIX), - "abc")))) - .hasCauseInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Primary key a should not be assign default column."); - - assertThatThrownBy( - () -> - schemaManager.commitChanges( - Collections.singletonList( - SchemaChange.setOption( - String.format( - "%s.%s.%s", - CoreOptions.FIELDS_PREFIX, - "c", - CoreOptions - .DEFAULT_VALUE_SUFFIX), - "abc")))) - .hasCauseInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("Partition key c should not be assign default column."); - } - } - @Test public void testAddField() throws Exception { Schema schema = diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java index a99d6b234e..ec253143a7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.source.snapshot; import org.apache.paimon.CoreOptions; +import org.apache.paimon.operation.DefaultValueAssigner; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; @@ -84,7 +85,7 @@ public class DefaultValueScannerTest extends ScannerTestBase { options.set( String.format( "%s.%s.%s", - CoreOptions.FIELDS_PREFIX, "b", CoreOptions.DEFAULT_VALUE_SUFFIX), + CoreOptions.FIELDS_PREFIX, "b", DefaultValueAssigner.DEFAULT_VALUE_SUFFIX), "100"); return createFileStoreTable(options); } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 6f16609372..171b489d42 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -750,8 +750,8 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { anyCauseMatches( IllegalArgumentException.class, "Column v1 have different types when merging schemas.\n" - + "Current table '{paimon_sync_table.incompatible_field_1}' field: `v1` TIMESTAMP(0) ''\n" - + "To be merged table 'paimon_sync_table.incompatible_field_2' field: `v1` INT ''")); + + "Current table '{paimon_sync_table.incompatible_field_1}' field: `v1` TIMESTAMP(0)\n" + + "To be merged table 'paimon_sync_table.incompatible_field_2' field: `v1` INT")); } @Test diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java new file mode 100644 index 0000000000..b85dfd28e5 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +/** + * Alter column default value procedure. Usage: + * + * <pre><code> + * CALL sys.alter_column_default_value('table_identifier', 'column', 'default_value') + * </code></pre> + */ +public class AlterColumnDefaultValueProcedure extends ProcedureBase { + + @Override + public String identifier() { + return "alter_column_default_value"; + } + + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "column", type = @DataTypeHint("STRING")), + @ArgumentHint(name = "default_value", type = @DataTypeHint("STRING")) + }) + public String[] call( + ProcedureContext procedureContext, String table, String column, String defaultValue) + throws Catalog.ColumnAlreadyExistException, Catalog.TableNotExistException, + Catalog.ColumnNotExistException { + Identifier identifier = Identifier.fromString(table); + String[] fieldNames = StringUtils.split(column, "."); + SchemaChange schemaChange = SchemaChange.updateColumnDefaultValue(fieldNames, defaultValue); + catalog.alterTable(identifier, ImmutableList.of(schemaChange), false); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index df615478c5..df8b368c0a 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -95,3 +95,4 @@ org.apache.paimon.flink.procedure.AlterViewDialectProcedure org.apache.paimon.flink.procedure.CreateFunctionProcedure org.apache.paimon.flink.procedure.DropFunctionProcedure org.apache.paimon.flink.procedure.AlterFunctionProcedure +org.apache.paimon.flink.procedure.AlterColumnDefaultValueProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java index 57a6746fea..41313e0e9c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java @@ -40,6 +40,15 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT cases for table with branches using SQL. */ public class BranchSqlITCase extends CatalogITCaseBase { + @Test + public void testDefaultValue() throws Exception { + sql("CREATE TABLE T (a INT, b INT)"); + sql("CALL sys.alter_column_default_value('default.T', 'b', '5')"); + sql("INSERT INTO T (a) VALUES (1), (2)"); + assertThat(collectResult("SELECT * FROM T")) + .containsExactlyInAnyOrder("+I[1, 5]", "+I[2, 5]"); + } + @Test public void testAlterBranchTable() throws Exception { sql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index 285b245c4a..676836d15c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -23,6 +23,7 @@ import org.apache.paimon.flink.sink.FlinkTableSink; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.operation.DefaultValueAssigner; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -1589,7 +1590,8 @@ public class ReadWriteTableITCase extends AbstractTestBase { public void testDefaultValueWithoutPrimaryKey() throws Exception { Map<String, String> options = new HashMap<>(); options.put( - CoreOptions.FIELDS_PREFIX + ".rate." + CoreOptions.DEFAULT_VALUE_SUFFIX, "1000"); + CoreOptions.FIELDS_PREFIX + ".rate." + DefaultValueAssigner.DEFAULT_VALUE_SUFFIX, + "1000"); String table = createTable( @@ -1625,7 +1627,8 @@ public class ReadWriteTableITCase extends AbstractTestBase { throws Exception { Map<String, String> options = new HashMap<>(); options.put( - CoreOptions.FIELDS_PREFIX + ".rate." + CoreOptions.DEFAULT_VALUE_SUFFIX, "1000"); + CoreOptions.FIELDS_PREFIX + ".rate." + DefaultValueAssigner.DEFAULT_VALUE_SUFFIX, + "1000"); options.put(MERGE_ENGINE.key(), mergeEngine.toString()); if (mergeEngine == FIRST_ROW) { options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString()); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java index be257aef4b..1cff47d7d0 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java @@ -203,7 +203,8 @@ public class OrcFileFormat extends FileFormat { f.id(), f.name(), refineDataType(f.type()), - f.description())) + f.description(), + f.defaultValue())) .collect(Collectors.toList())); default: return type; diff --git a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java new file mode 100644 index 0000000000..ce375ff56d --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +/** + * Capabilities that can be provided by a {@link TableCatalog} implementation. + */ +public enum TableCatalogCapability { + + /** + * Signals that the TableCatalog supports defining generated columns upon table creation in SQL. + * <p> + * Without this capability, any create/replace table statements with a generated column defined + * in the table schema will throw an exception during analysis. + * <p> + * A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS (expr)} + * <p> + * Generation expression are included in the column definition for APIs like + * {@link TableCatalog#createTable}. + */ + SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS, + + /** + * Signals that the TableCatalog supports defining column default value as expression in + * CREATE/REPLACE/ALTER TABLE. + * <p> + * Without this capability, any CREATE/REPLACE/ALTER TABLE statement with a column default value + * defined in the table schema will throw an exception during analysis. + * <p> + * A column default value is defined with syntax: {@code colName colType DEFAULT expr} + * <p> + * Column default value expression is included in the column definition for APIs like + * {@link TableCatalog#createTable}. + */ + SUPPORT_COLUMN_DEFAULT_VALUE +} diff --git a/paimon-spark/paimon-spark-3.3/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java b/paimon-spark/paimon-spark-3.3/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java new file mode 100644 index 0000000000..94fde80128 --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog; + +/** Capabilities that can be provided by a {@link TableCatalog} implementation. */ +public enum TableCatalogCapability { + + /** + * Signals that the TableCatalog supports defining generated columns upon table creation in SQL. + * + * <p>Without this capability, any create/replace table statements with a generated column + * defined in the table schema will throw an exception during analysis. + * + * <p>A generated column is defined with syntax: {@code colName colType GENERATED ALWAYS AS + * (expr)} + * + * <p>Generation expression are included in the column definition for APIs like {@link + * TableCatalog#createTable}. + */ + SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS, + + /** + * Signals that the TableCatalog supports defining column default value as expression in + * CREATE/REPLACE/ALTER TABLE. + * + * <p>Without this capability, any CREATE/REPLACE/ALTER TABLE statement with a column default + * value defined in the table schema will throw an exception during analysis. + * + * <p>A column default value is defined with syntax: {@code colName colType DEFAULT expr} + * + * <p>Column default value expression is included in the column definition for APIs like {@link + * TableCatalog#createTable}. + */ + SUPPORT_COLUMN_DEFAULT_VALUE +} diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 695f03d9ca..d910ade8fa 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -36,6 +36,7 @@ import org.apache.paimon.spark.utils.CatalogUtils; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.FormatTableOptions; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; import org.apache.paimon.utils.TypeUtils; import org.apache.spark.sql.PaimonSparkSession$; @@ -50,6 +51,7 @@ import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableCatalogCapability; import org.apache.spark.sql.connector.catalog.TableChange; import org.apache.spark.sql.connector.catalog.functions.UnboundFunction; import org.apache.spark.sql.connector.expressions.FieldReference; @@ -77,18 +79,24 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.FILE_FORMAT; import static org.apache.paimon.CoreOptions.TYPE; import static org.apache.paimon.TableType.FORMAT_TABLE; import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE; +import static org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY; import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType; import static org.apache.paimon.spark.util.OptionUtils.checkRequiredConfigurations; import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf; import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace; +import static org.apache.paimon.spark.utils.CatalogUtils.checkNoDefaultValue; +import static org.apache.paimon.spark.utils.CatalogUtils.isUpdateColumnDefaultValue; import static org.apache.paimon.spark.utils.CatalogUtils.removeCatalogName; import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier; +import static org.apache.paimon.spark.utils.CatalogUtils.toUpdateColumnDefaultValue; +import static org.apache.spark.sql.connector.catalog.TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE; /** Spark {@link TableCatalog} for paimon. */ public class SparkCatalog extends SparkBaseCatalog @@ -103,6 +111,10 @@ public class SparkCatalog extends SparkBaseCatalog private String defaultDatabase; + public Set<TableCatalogCapability> capabilities() { + return Collections.singleton(SUPPORT_COLUMN_DEFAULT_VALUE); + } + @Override public void initialize(String name, CaseInsensitiveStringMap options) { checkRequiredConfigurations(); @@ -353,6 +365,7 @@ public class SparkCatalog extends SparkBaseCatalog } else if (change instanceof TableChange.AddColumn) { TableChange.AddColumn add = (TableChange.AddColumn) change; SchemaChange.Move move = getMove(add.position(), add.fieldNames()); + checkNoDefaultValue(add); return SchemaChange.addColumn( add.fieldNames(), toPaimonType(add.dataType()).copy(add.isNullable()), @@ -379,6 +392,8 @@ public class SparkCatalog extends SparkBaseCatalog TableChange.UpdateColumnPosition update = (TableChange.UpdateColumnPosition) change; SchemaChange.Move move = getMove(update.position(), update.fieldNames()); return SchemaChange.updateColumnPosition(move); + } else if (isUpdateColumnDefaultValue(change)) { + return toUpdateColumnDefaultValue(change); } else { throw new UnsupportedOperationException( "Change is not supported: " + change.getClass()); @@ -428,10 +443,16 @@ public class SparkCatalog extends SparkBaseCatalog .comment(properties.getOrDefault(TableCatalog.PROP_COMMENT, null)); for (StructField field : schema.fields()) { - schemaBuilder.column( - field.name(), - toPaimonType(field.dataType()).copy(field.nullable()), - field.getComment().getOrElse(() -> null)); + String name = field.name(); + DataType type = toPaimonType(field.dataType()).copy(field.nullable()); + String comment = field.getComment().getOrElse(() -> null); + if (field.metadata().contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) { + String defaultValue = + field.metadata().getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY); + schemaBuilder.column(name, type, comment, defaultValue); + } else { + schemaBuilder.column(name, type, comment); + } } return schemaBuilder.build(); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java index ddda188d18..25882f5e86 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java @@ -18,6 +18,7 @@ package org.apache.paimon.spark.utils; +import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DecimalType; @@ -27,7 +28,9 @@ import org.apache.paimon.types.MultisetType; import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; import org.apache.spark.sql.catalyst.util.GenericArrayData; +import org.apache.spark.sql.connector.catalog.ColumnDefaultValue; import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableChange; import org.apache.spark.sql.types.BooleanType; import org.apache.spark.sql.types.ByteType; import org.apache.spark.sql.types.DataTypes; @@ -248,4 +251,31 @@ public class CatalogUtils { throw new IllegalArgumentException("Unsupported Spark data type: " + sparkType); } + + public static void checkNoDefaultValue(TableChange.AddColumn addColumn) { + try { + ColumnDefaultValue defaultValue = addColumn.defaultValue(); + if (defaultValue != null) { + throw new IllegalArgumentException( + String.format( + "Cannot add column %s with default value %s.", + Arrays.toString(addColumn.fieldNames()), defaultValue)); + } + } catch (NoClassDefFoundError | NoSuchMethodError ignored) { + } + } + + public static boolean isUpdateColumnDefaultValue(TableChange tableChange) { + try { + return tableChange instanceof TableChange.UpdateColumnDefaultValue; + } catch (NoClassDefFoundError ignored) { + return false; + } + } + + public static SchemaChange toUpdateColumnDefaultValue(TableChange tableChange) { + TableChange.UpdateColumnDefaultValue update = + (TableChange.UpdateColumnDefaultValue) tableChange; + return SchemaChange.updateColumnDefaultValue(update.fieldNames(), update.newDefaultValue()); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java index ae95881621..de14ef4316 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java @@ -48,6 +48,7 @@ import org.apache.spark.sql.paimon.shims.SparkShimLoader; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.LongType; +import org.apache.spark.sql.types.MetadataBuilder; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.types.UserDefinedType; @@ -62,6 +63,11 @@ public class SparkTypeUtils { private SparkTypeUtils() {} + /** + * Copy here from Spark ResolveDefaultColumnsUtils for old Spark versions. + */ + public static final String CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"; + public static RowType toPartitionType(Table table) { int[] projections = table.rowType().getFieldIndices(table.partitionKeys()); List<DataField> partitionTypes = new ArrayList<>(); @@ -252,9 +258,17 @@ public class SparkTypeUtils { public DataType visit(RowType rowType) { List<StructField> fields = new ArrayList<>(rowType.getFieldCount()); for (DataField field : rowType.getFields()) { + MetadataBuilder metadataBuilder = new MetadataBuilder(); + if (field.defaultValue() != null) { + metadataBuilder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, field.defaultValue()); + } StructField structField = DataTypes.createStructField( - field.name(), field.type().accept(this), field.type().isNullable()); + field.name(), + field.type().accept(this), + field.type().isNullable(), + metadataBuilder.build() + ); structField = Optional.ofNullable(field.description()) .map(structField::withComment) @@ -329,9 +343,14 @@ public class SparkTypeUtils { org.apache.paimon.types.DataType fieldType = fieldResults.get(i).copy(field.nullable()); String comment = field.getComment().getOrElse(() -> null); + String defaultValue = null; + if (field.metadata().contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) { + defaultValue = + field.metadata().getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY); + } newFields.add( new DataField( - atomicInteger.incrementAndGet(), field.name(), fieldType, comment)); + atomicInteger.incrementAndGet(), field.name(), fieldType, comment, defaultValue)); } return new RowType(newFields); diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java index fff94ce037..b39e44fc6f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** ITCase for spark writer. */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) @@ -78,7 +79,49 @@ public class SparkWriteITCase { @AfterEach public void afterEach() { - spark.sql("DROP TABLE T"); + spark.sql("DROP TABLE IF EXISTS T"); + } + + @Test + public void testWriteWithDefaultValue() { + spark.sql( + "CREATE TABLE T (a INT, b INT DEFAULT 2, c STRING DEFAULT 'my_value') TBLPROPERTIES" + + " ('file.format'='avro')"); + + // test show create table + List<Row> show = spark.sql("SHOW CREATE TABLE T").collectAsList(); + assertThat(show.toString()) + .contains("a INT,\n" + " b INT DEFAULT 2,\n" + " c STRING DEFAULT 'my_value'"); + + // test partial write + spark.sql("INSERT INTO T (a) VALUES (1), (2)").collectAsList(); + List<Row> rows = spark.sql("SELECT * FROM T").collectAsList(); + assertThat(rows.toString()).isEqualTo("[[1,2,my_value], [2,2,my_value]]"); + + // test write with DEFAULT + spark.sql("INSERT INTO T VALUES (3, DEFAULT, DEFAULT)").collectAsList(); + rows = spark.sql("SELECT * FROM T").collectAsList(); + assertThat(rows.toString()).isEqualTo("[[1,2,my_value], [2,2,my_value], [3,2,my_value]]"); + + // test add column with DEFAULT not support + assertThatThrownBy(() -> spark.sql("ALTER TABLE T ADD COLUMN d INT DEFAULT 5")) + .hasMessageContaining( + "Unsupported table change: Cannot add column [d] with default value"); + + // test alter type to default column + spark.sql("ALTER TABLE T ALTER COLUMN b TYPE STRING").collectAsList(); + spark.sql("INSERT INTO T (a) VALUES (4)").collectAsList(); + rows = spark.sql("SELECT * FROM T").collectAsList(); + assertThat(rows.toString()) + .isEqualTo("[[1,2,my_value], [2,2,my_value], [3,2,my_value], [4,2,my_value]]"); + + // test alter default column + spark.sql("ALTER TABLE T ALTER COLUMN b SET DEFAULT '3'"); + spark.sql("INSERT INTO T (a) VALUES (5)").collectAsList(); + rows = spark.sql("SELECT * FROM T").collectAsList(); + assertThat(rows.toString()) + .isEqualTo( + "[[1,2,my_value], [2,2,my_value], [3,2,my_value], [4,2,my_value], [5,3,my_value]]"); } @Test