This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b7d289873c [core][spark][flink] Introduce default value when writing 
(#5754)
b7d289873c is described below

commit b7d289873c02374a7c1dcde16f4ee11205201e17
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Thu Jun 19 15:11:41 2025 +0800

    [core][spark][flink] Introduce default value when writing (#5754)
---
 docs/content/flink/default-value.md                |  64 +++++++++++
 docs/content/spark/default-value.md                |  71 ++++++++++++
 docs/static/rest-catalog-open-api.yaml             |  15 +++
 .../main/java/org/apache/paimon/CoreOptions.java   |  16 ---
 .../main/java/org/apache/paimon/schema/Schema.java |  21 +++-
 .../org/apache/paimon/schema/SchemaChange.java     |  60 ++++++++++
 .../java/org/apache/paimon/types/DataField.java    |  83 ++++++++------
 .../apache/paimon/types/DataTypeJsonParser.java    |   7 +-
 .../org/apache/paimon/types/ReassignFieldId.java   |   8 +-
 .../main/java/org/apache/paimon/types/RowType.java |  15 ++-
 .../org/apache/paimon/casting/DefaultValueRow.java |  46 ++++++++
 .../org/apache/paimon/types/DataTypesTest.java     |   2 +-
 .../paimon/operation/DefaultValueAssigner.java     |  30 +++--
 .../org/apache/paimon/schema/SchemaManager.java    |  28 ++++-
 .../apache/paimon/schema/SchemaMergingUtils.java   |   9 +-
 .../org/apache/paimon/schema/SchemaValidation.java |  64 -----------
 .../apache/paimon/table/sink/TableWriteImpl.java   |   8 ++
 .../paimon/operation/DefaultValueAssignerTest.java |   8 +-
 .../paimon/schema/DataTypeJsonParserTest.java      |  11 ++
 .../paimon/table/PrimaryKeySimpleTableTest.java    |   3 +-
 .../apache/paimon/table/SchemaEvolutionTest.java   | 121 ---------------------
 .../source/snapshot/DefaultValueScannerTest.java   |   3 +-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      |   4 +-
 .../AlterColumnDefaultValueProcedure.java          |  63 +++++++++++
 .../services/org.apache.paimon.factories.Factory   |   1 +
 .../org/apache/paimon/flink/BranchSqlITCase.java   |   9 ++
 .../apache/paimon/flink/ReadWriteTableITCase.java  |   7 +-
 .../apache/paimon/format/orc/OrcFileFormat.java    |   3 +-
 .../connector/catalog/TableCatalogCapability.java  |  52 +++++++++
 .../connector/catalog/TableCatalogCapability.java  |  51 +++++++++
 .../java/org/apache/paimon/spark/SparkCatalog.java |  29 ++++-
 .../apache/paimon/spark/utils/CatalogUtils.java    |  30 +++++
 .../org/apache/paimon/spark/SparkTypeUtils.java    |  23 +++-
 .../org/apache/paimon/spark/SparkWriteITCase.java  |  45 +++++++-
 34 files changed, 740 insertions(+), 270 deletions(-)

diff --git a/docs/content/flink/default-value.md 
b/docs/content/flink/default-value.md
new file mode 100644
index 0000000000..9030d6f14b
--- /dev/null
+++ b/docs/content/flink/default-value.md
@@ -0,0 +1,64 @@
+---
+title: "Default Value"
+weight: 8
+type: docs
+aliases:
+- /flink/default-value.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Default Value
+
+Paimon allows specifying default values for columns. When users write to these 
tables without explicitly providing
+values for certain columns, Paimon automatically generates default values for 
these columns.
+
+## Create Table
+
+Flink SQL does not have native support for default values, so we can only 
create a table without default values:
+
+```sql
+CREATE TABLE my_table (
+    a BIGINT,
+    b STRING,
+    c INT
+);
+```
+
+We support the procedure of modifying column default values in Flink. You can 
add default value definitions after
+creating the table:
+
+```sql
+CALL sys.alter_column_default_value('default.my_table', 'b', 'my_value');
+CALL sys.alter_column_default_value('default.my_table', 'c', '5');
+```
+
+## Insert Table
+
+For SQL commands that execute table writes, such as the `INSERT`, `UPDATE`, 
and `MERGE` commands, `NULL` value is
+parsed into the default value specified for the corresponding column.
+
+For example:
+
+```sql
+INSERT INTO my_table (a) VALUES (1), (2);
+
+SELECT * FROM my_table;
+-- result: [[1, 5, my_value], [2, 5, my_value]]
+```
diff --git a/docs/content/spark/default-value.md 
b/docs/content/spark/default-value.md
new file mode 100644
index 0000000000..ba5423133e
--- /dev/null
+++ b/docs/content/spark/default-value.md
@@ -0,0 +1,71 @@
+---
+title: "Default Value"
+weight: 8
+type: docs
+aliases:
+- /spark/default-value.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Default Value
+
+Paimon allows specifying default values for columns. When users write to these 
tables without explicitly providing
+values for certain columns, Paimon automatically generates default values for 
these columns.
+
+## Create Table
+
+You can create a table with columns with default values using the following 
SQL:
+
+```sql
+CREATE TABLE my_table (
+    a BIGINT,
+    b STRING DEFAULT 'my_value',
+    c INT DEFAULT 5 
+);
+```
+
+## Insert Table
+
+For SQL commands that execute table writes, such as the `INSERT`, `UPDATE`, 
and `MERGE` commands, the `DEFAULT` keyword
+or `NULL` value is parsed into the default value specified for the 
corresponding column.
+
+## Alter Default Value
+
+Paimon supports alter column default value.
+
+For example:
+
+```sql
+CREATE TABLE T (a INT, b INT DEFAULT 2);
+
+INSERT INTO T (a) VALUES (1);
+-- result: [[1, 2]]
+
+ALTER TABLE T ALTER COLUMN b SET DEFAULT 3;
+
+INSERT INTO T (a) VALUES (2);
+-- result: [[1, 2], [2, 3]]
+```
+
+The default value of `'b'` column has been changed to 3 from 2.
+
+## Limitation
+
+Not support alter table add column with default value, for example: `ALTER 
TABLE T ADD COLUMN d INT DEFAULT 5;`.
diff --git a/docs/static/rest-catalog-open-api.yaml 
b/docs/static/rest-catalog-open-api.yaml
index 4246da1f8e..402a4553b3 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -2233,6 +2233,7 @@ components:
         - $ref: '#/components/schemas/RenameColumn'
         - $ref: '#/components/schemas/DropColumn'
         - $ref: '#/components/schemas/UpdateColumnComment'
+        - $ref: '#/components/schemas/UpdateColumnDefaultValue'
         - $ref: '#/components/schemas/UpdateColumnType'
         - $ref: '#/components/schemas/UpdateColumnPosition'
         - $ref: '#/components/schemas/UpdateColumnNullability'
@@ -2247,6 +2248,7 @@ components:
           renameColumn: '#/components/schemas/RenameColumn'
           dropColumn: '#/components/schemas/DropColumn'
           updateColumnComment: '#/components/schemas/UpdateColumnComment'
+          updateColumnDefaultValue: 
'#/components/schemas/UpdateColumnDefaultValue'
           updateColumnType: '#/components/schemas/UpdateColumnType'
           updateColumnPosition: '#/components/schemas/UpdateColumnPosition'
           updateColumnNullability: 
'#/components/schemas/UpdateColumnNullability'
@@ -2339,6 +2341,19 @@ components:
             type: string
         newComment:
           type: string
+    UpdateColumnDefaultValue:
+      allOf:
+        - $ref: '#/components/schemas/BaseSchemaChange'
+      properties:
+        action:
+          type: string
+          const: "updateColumnDefaultValue"
+        fieldNames:
+          type: array
+          items:
+            type: string
+        newDefaultValue:
+          type: string
     UpdateColumnType:
       allOf:
         - $ref: '#/components/schemas/BaseSchemaChange'
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 4c446dc4e5..3ff05d9a7c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -66,8 +66,6 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 /** Core options for paimon. */
 public class CoreOptions implements Serializable {
 
-    public static final String DEFAULT_VALUE_SUFFIX = "default-value";
-
     public static final String FIELDS_PREFIX = "fields";
 
     public static final String FIELDS_SEPARATOR = ",";
@@ -2589,20 +2587,6 @@ public class CoreOptions implements Serializable {
         return options.get(COMMIT_FORCE_CREATE_SNAPSHOT);
     }
 
-    public Map<String, String> getFieldDefaultValues() {
-        Map<String, String> defaultValues = new HashMap<>();
-        String fieldPrefix = FIELDS_PREFIX + ".";
-        String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
-        for (Map.Entry<String, String> option : options.toMap().entrySet()) {
-            String key = option.getKey();
-            if (key != null && key.startsWith(fieldPrefix) && 
key.endsWith(defaultValueSuffix)) {
-                String fieldName = key.replace(fieldPrefix, 
"").replace(defaultValueSuffix, "");
-                defaultValues.put(fieldName, option.getValue());
-            }
-        }
-        return defaultValues;
-    }
-
     public Map<String, String> commitCallbacks() {
         return callbacks(COMMIT_CALLBACKS, COMMIT_CALLBACK_PARAM);
     }
diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java 
b/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
index 8a679665fb..d4dbbb0117 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
@@ -175,7 +175,8 @@ public class Schema {
                                 field.id(),
                                 field.name(),
                                 field.type().copy(false),
-                                field.description()));
+                                field.description(),
+                                field.defaultValue()));
             } else {
                 newFields.add(field);
             }
@@ -302,12 +303,28 @@ public class Schema {
          * @param description description of the column
          */
         public Builder column(String columnName, DataType dataType, @Nullable 
String description) {
+            return column(columnName, dataType, description, null);
+        }
+
+        /**
+         * Declares a column that is appended to this schema.
+         *
+         * @param columnName column name
+         * @param dataType data type of the column
+         * @param description description of the column
+         * @param defaultValue default value of the column
+         */
+        public Builder column(
+                String columnName,
+                DataType dataType,
+                @Nullable String description,
+                @Nullable String defaultValue) {
             Preconditions.checkNotNull(columnName, "Column name must not be 
null.");
             Preconditions.checkNotNull(dataType, "Data type must not be 
null.");
 
             int id = highestFieldId.incrementAndGet();
             DataType reassignDataType = ReassignFieldId.reassign(dataType, 
highestFieldId);
-            columns.add(new DataField(id, columnName, reassignDataType, 
description));
+            columns.add(new DataField(id, columnName, reassignDataType, 
description, defaultValue));
             return this;
         }
 
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java 
b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 90bc9abc0a..4b68ad105a 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -72,6 +72,9 @@ import java.util.Objects;
     @JsonSubTypes.Type(
             value = SchemaChange.UpdateColumnComment.class,
             name = SchemaChange.Actions.UPDATE_COLUMN_COMMENT_ACTION),
+    @JsonSubTypes.Type(
+            value = SchemaChange.UpdateColumnDefaultValue.class,
+            name = SchemaChange.Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION),
     @JsonSubTypes.Type(
             value = SchemaChange.UpdateColumnPosition.class,
             name = SchemaChange.Actions.UPDATE_COLUMN_POSITION_ACTION),
@@ -153,6 +156,10 @@ public interface SchemaChange extends Serializable {
         return new UpdateColumnComment(fieldNames, comment);
     }
 
+    static SchemaChange updateColumnDefaultValue(String[] fieldNames, String 
defaultValue) {
+        return new UpdateColumnDefaultValue(fieldNames, defaultValue);
+    }
+
     static SchemaChange updateColumnPosition(Move move) {
         return new UpdateColumnPosition(move);
     }
@@ -751,6 +758,58 @@ public interface SchemaChange extends Serializable {
         }
     }
 
+    /** A SchemaChange to update the default value. */
+    final class UpdateColumnDefaultValue implements SchemaChange {
+
+        private static final long serialVersionUID = 1L;
+        private static final String FIELD_FILED_NAMES = "fieldNames";
+        private static final String FIELD_NEW_DEFAULT_VALUE = 
"newDefaultValue";
+
+        @JsonProperty(FIELD_FILED_NAMES)
+        private final String[] fieldNames;
+
+        @JsonProperty(FIELD_NEW_DEFAULT_VALUE)
+        private final String newDefaultValue;
+
+        @JsonCreator
+        private UpdateColumnDefaultValue(
+                @JsonProperty(FIELD_FILED_NAMES) String[] fieldNames,
+                @JsonProperty(FIELD_NEW_DEFAULT_VALUE) String newDefaultValue) 
{
+            this.fieldNames = fieldNames;
+            this.newDefaultValue = newDefaultValue;
+        }
+
+        @JsonGetter(FIELD_FILED_NAMES)
+        public String[] fieldNames() {
+            return fieldNames;
+        }
+
+        @JsonGetter(FIELD_NEW_DEFAULT_VALUE)
+        public String newDefaultValue() {
+            return newDefaultValue;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            UpdateColumnDefaultValue that = (UpdateColumnDefaultValue) o;
+            return Arrays.equals(fieldNames, that.fieldNames)
+                    && newDefaultValue.equals(that.newDefaultValue);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = Objects.hash(newDefaultValue);
+            result = 31 * result + Objects.hashCode(fieldNames);
+            return result;
+        }
+    }
+
     /** Actions for schema changes: identify for schema change. */
     class Actions {
         public static final String FIELD_ACTION = "action";
@@ -763,6 +822,7 @@ public interface SchemaChange extends Serializable {
         public static final String UPDATE_COLUMN_TYPE_ACTION = 
"updateColumnType";
         public static final String UPDATE_COLUMN_NULLABILITY_ACTION = 
"updateColumnNullability";
         public static final String UPDATE_COLUMN_COMMENT_ACTION = 
"updateColumnComment";
+        public static final String UPDATE_COLUMN_DEFAULT_VALUE_ACTION = 
"updateColumnDefaultValue";
         public static final String UPDATE_COLUMN_POSITION_ACTION = 
"updateColumnPosition";
 
         private Actions() {}
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataField.java 
b/paimon-api/src/main/java/org/apache/paimon/types/DataField.java
index 209118023b..dcca909d52 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/DataField.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/DataField.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.types;
 
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.utils.StringUtils;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 
@@ -41,27 +42,31 @@ public final class DataField implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    public static final String FIELD_FORMAT_WITH_DESCRIPTION = "%s %s '%s'";
-
-    public static final String FIELD_FORMAT_NO_DESCRIPTION = "%s %s";
-
     private final int id;
-
     private final String name;
-
     private final DataType type;
-
     private final @Nullable String description;
+    private final @Nullable String defaultValue;
 
     public DataField(int id, String name, DataType dataType) {
-        this(id, name, dataType, null);
+        this(id, name, dataType, null, null);
     }
 
-    public DataField(int id, String name, DataType type, @Nullable String 
description) {
+    public DataField(int id, String name, DataType dataType, @Nullable String 
description) {
+        this(id, name, dataType, description, null);
+    }
+
+    public DataField(
+            int id,
+            String name,
+            DataType type,
+            @Nullable String description,
+            @Nullable String defaultValue) {
         this.id = id;
         this.name = name;
         this.type = type;
         this.description = description;
+        this.defaultValue = defaultValue;
     }
 
     public int id() {
@@ -76,20 +81,24 @@ public final class DataField implements Serializable {
         return type;
     }
 
-    public DataField newId(int newid) {
-        return new DataField(newid, name, type, description);
+    public DataField newId(int newId) {
+        return new DataField(newId, name, type, description, defaultValue);
     }
 
     public DataField newName(String newName) {
-        return new DataField(id, newName, type, description);
+        return new DataField(id, newName, type, description, defaultValue);
     }
 
     public DataField newType(DataType newType) {
-        return new DataField(id, name, newType, description);
+        return new DataField(id, name, newType, description, defaultValue);
     }
 
     public DataField newDescription(String newDescription) {
-        return new DataField(id, name, type, newDescription);
+        return new DataField(id, name, type, defaultValue, newDescription);
+    }
+
+    public DataField newDefaultValue(String newDefaultValue) {
+        return new DataField(id, name, type, newDefaultValue, description);
     }
 
     @Nullable
@@ -97,28 +106,29 @@ public final class DataField implements Serializable {
         return description;
     }
 
+    @Nullable
+    public String defaultValue() {
+        return defaultValue;
+    }
+
     public DataField copy() {
-        return new DataField(id, name, type.copy(), description);
+        return new DataField(id, name, type.copy(), description, defaultValue);
     }
 
     public DataField copy(boolean isNullable) {
-        return new DataField(id, name, type.copy(isNullable), description);
+        return new DataField(id, name, type.copy(isNullable), description, 
defaultValue);
     }
 
     public String asSQLString() {
-        return formatString(type.asSQLString());
-    }
-
-    private String formatString(String typeString) {
-        if (description == null) {
-            return String.format(FIELD_FORMAT_NO_DESCRIPTION, 
escapeIdentifier(name), typeString);
-        } else {
-            return String.format(
-                    FIELD_FORMAT_WITH_DESCRIPTION,
-                    escapeIdentifier(name),
-                    typeString,
-                    escapeSingleQuotes(description));
+        StringBuilder sb = new StringBuilder();
+        sb.append(escapeIdentifier(name)).append(" 
").append(type.asSQLString());
+        if (StringUtils.isNotEmpty(description)) {
+            sb.append(" COMMENT 
'").append(escapeSingleQuotes(description)).append("'");
         }
+        if (defaultValue != null) {
+            sb.append(" DEFAULT ").append(defaultValue);
+        }
+        return sb.toString();
     }
 
     public void serializeJson(JsonGenerator generator) throws IOException {
@@ -130,6 +140,9 @@ public final class DataField implements Serializable {
         if (description() != null) {
             generator.writeStringField("description", description());
         }
+        if (defaultValue() != null) {
+            generator.writeStringField("defaultValue", defaultValue());
+        }
         generator.writeEndObject();
     }
 
@@ -145,7 +158,8 @@ public final class DataField implements Serializable {
         return Objects.equals(id, field.id)
                 && Objects.equals(name, field.name)
                 && Objects.equals(type, field.type)
-                && Objects.equals(description, field.description);
+                && Objects.equals(description, field.description)
+                && Objects.equals(defaultValue, field.defaultValue);
     }
 
     public boolean equalsIgnoreFieldId(DataField other) {
@@ -157,7 +171,8 @@ public final class DataField implements Serializable {
         }
         return Objects.equals(name, other.name)
                 && type.equalsIgnoreFieldId(other.type)
-                && Objects.equals(description, other.description);
+                && Objects.equals(description, other.description)
+                && Objects.equals(defaultValue, other.defaultValue);
     }
 
     public boolean isPrunedFrom(DataField other) {
@@ -170,12 +185,13 @@ public final class DataField implements Serializable {
         return Objects.equals(id, other.id)
                 && Objects.equals(name, other.name)
                 && type.isPrunedFrom(other.type)
-                && Objects.equals(description, other.description);
+                && Objects.equals(description, other.description)
+                && Objects.equals(defaultValue, other.defaultValue);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(id, name, type, description);
+        return Objects.hash(id, name, type, description, defaultValue);
     }
 
     @Override
@@ -193,7 +209,8 @@ public final class DataField implements Serializable {
         } else if (dataField1 != null && dataField2 != null) {
             return Objects.equals(dataField1.name(), dataField2.name())
                     && Objects.equals(dataField1.type(), dataField2.type())
-                    && Objects.equals(dataField1.description(), 
dataField2.description());
+                    && Objects.equals(dataField1.description(), 
dataField2.description())
+                    && Objects.equals(dataField1.defaultValue(), 
dataField2.defaultValue());
         } else {
             return false;
         }
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java 
b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
index 40790f06fb..d8e6063dc1 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
@@ -58,7 +58,12 @@ public final class DataTypeJsonParser {
         if (descriptionNode != null) {
             description = descriptionNode.asText();
         }
-        return new DataField(id, name, type, description);
+        JsonNode defaultValueNode = json.get("defaultValue");
+        String defaultValue = null;
+        if (defaultValueNode != null) {
+            defaultValue = defaultValueNode.asText();
+        }
+        return new DataField(id, name, type, description, defaultValue);
     }
 
     public static DataType parseDataType(JsonNode json) {
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java 
b/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java
index 36e4d86070..2aacfeaf88 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java
@@ -56,7 +56,13 @@ public class ReassignFieldId extends 
DataTypeDefaultVisitor<DataType> {
     public DataType visit(RowType rowType) {
         RowType.Builder builder = RowType.builder(rowType.isNullable(), 
fieldId);
         rowType.getFields()
-                .forEach(f -> builder.field(f.name(), f.type().accept(this), 
f.description()));
+                .forEach(
+                        f ->
+                                builder.field(
+                                        f.name(),
+                                        f.type().accept(this),
+                                        f.description(),
+                                        f.defaultValue()));
         return builder.build();
     }
 
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java 
b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
index 5aa58fb375..e3fbf0bd8b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
@@ -27,6 +27,8 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -387,11 +389,22 @@ public final class RowType extends DataType {
             return this;
         }
 
-        public Builder field(String name, DataType type, String description) {
+        public Builder field(String name, DataType type, @Nullable String 
description) {
             fields.add(new DataField(fieldId.incrementAndGet(), name, type, 
description));
             return this;
         }
 
+        public Builder field(
+                String name,
+                DataType type,
+                @Nullable String description,
+                @Nullable String defaultValue) {
+            fields.add(
+                    new DataField(
+                            fieldId.incrementAndGet(), name, type, 
description, defaultValue));
+            return this;
+        }
+
         public Builder fields(List<DataType> types) {
             for (int i = 0; i < types.size(); i++) {
                 field("f" + i, types.get(i));
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java 
b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
index 99199c0a79..4e62bf47ed 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
@@ -20,12 +20,20 @@ package org.apache.paimon.casting;
 
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
 
 /**
  * An implementation of {@link InternalRow} which provides a default value for 
the underlying {@link
@@ -193,4 +201,42 @@ public class DefaultValueRow implements InternalRow {
     public static DefaultValueRow from(InternalRow defaultValueRow) {
         return new DefaultValueRow(defaultValueRow);
     }
+
+    @Nullable
+    public static DefaultValueRow create(RowType rowType) {
+        List<DataField> fields = rowType.getFields();
+        GenericRow row = new GenericRow(fields.size());
+        boolean containsDefaultValue = false;
+        for (int i = 0; i < fields.size(); i++) {
+            DataField dataField = fields.get(i);
+            String defaultValueStr = dataField.defaultValue();
+            if (defaultValueStr == null) {
+                continue;
+            }
+
+            containsDefaultValue = true;
+            @SuppressWarnings("unchecked")
+            CastExecutor<Object, Object> resolve =
+                    (CastExecutor<Object, Object>)
+                            CastExecutors.resolve(VarCharType.STRING_TYPE, 
dataField.type());
+
+            if (resolve == null) {
+                throw new RuntimeException(
+                        "Default value do not support the type of " + 
dataField.type());
+            }
+
+            if (defaultValueStr.startsWith("'") && 
defaultValueStr.endsWith("'")) {
+                defaultValueStr = defaultValueStr.substring(1, 
defaultValueStr.length() - 1);
+            }
+
+            Object defaultValue = 
resolve.cast(BinaryString.fromString(defaultValueStr));
+            row.setField(i, defaultValue);
+        }
+
+        if (!containsDefaultValue) {
+            return null;
+        }
+
+        return DefaultValueRow.from(row);
+    }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java 
b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java
index 9da6289d29..df4cf0679b 100644
--- a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java
@@ -178,7 +178,7 @@ public class DataTypesTest {
                                         new DataField(1, "b`", new 
TimestampType()))))
                 .satisfies(
                         baseAssertions(
-                                "ROW<`a` VARCHAR(1) 'Someone''s desc.', `b``` 
TIMESTAMP(6)>",
+                                "ROW<`a` VARCHAR(1) COMMENT 'Someone''s 
desc.', `b``` TIMESTAMP(6)>",
                                 new RowType(
                                         Arrays.asList(
                                                 new DataField(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
index ccb65eb9da..416237b60b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.operation;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.casting.CastExecutor;
 import org.apache.paimon.casting.CastExecutors;
@@ -38,16 +37,24 @@ import org.apache.paimon.types.VarCharType;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
+
 /**
  * The field Default value assigner. note that invoke of assigning should be 
after merge and schema
  * evolution.
+ *
+ * @deprecated default value in reading is not recommended
  */
+@Deprecated
 public class DefaultValueAssigner {
 
+    public static final String DEFAULT_VALUE_SUFFIX = "default-value";
+
     private final RowType rowType;
     private final Map<String, String> defaultValues;
 
@@ -69,10 +76,6 @@ public class DefaultValueAssigner {
         return this;
     }
 
-    public boolean needToAssign() {
-        return needToAssign;
-    }
-
     /** assign default value for column which value is null. */
     public RecordReader<InternalRow> 
assignFieldsDefaultValue(RecordReader<InternalRow> reader) {
         if (!needToAssign) {
@@ -152,8 +155,21 @@ public class DefaultValueAssigner {
     }
 
     public static DefaultValueAssigner create(TableSchema schema) {
-        CoreOptions coreOptions = new CoreOptions(schema.options());
-        Map<String, String> defaultValues = 
coreOptions.getFieldDefaultValues();
+        Map<String, String> defaultValues = 
getFieldDefaultValues(schema.options());
         return new DefaultValueAssigner(defaultValues, 
schema.logicalRowType());
     }
+
+    private static Map<String, String> getFieldDefaultValues(Map<String, 
String> options) {
+        Map<String, String> defaultValues = new HashMap<>();
+        String fieldPrefix = FIELDS_PREFIX + ".";
+        String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
+        for (Map.Entry<String, String> option : options.entrySet()) {
+            String key = option.getKey();
+            if (key != null && key.startsWith(fieldPrefix) && 
key.endsWith(defaultValueSuffix)) {
+                String fieldName = key.replace(fieldPrefix, 
"").replace(defaultValueSuffix, "");
+                defaultValues.put(fieldName, option.getValue());
+            }
+        }
+        return defaultValues;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 36a18e26a8..a33d4e4f1f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -31,6 +31,7 @@ import org.apache.paimon.schema.SchemaChange.RemoveOption;
 import org.apache.paimon.schema.SchemaChange.RenameColumn;
 import org.apache.paimon.schema.SchemaChange.SetOption;
 import org.apache.paimon.schema.SchemaChange.UpdateColumnComment;
+import org.apache.paimon.schema.SchemaChange.UpdateColumnDefaultValue;
 import org.apache.paimon.schema.SchemaChange.UpdateColumnNullability;
 import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition;
 import org.apache.paimon.schema.SchemaChange.UpdateColumnType;
@@ -404,7 +405,8 @@ public class SchemaManager implements Serializable {
                                             field.id(),
                                             rename.newName(),
                                             field.type(),
-                                            field.description());
+                                            field.description(),
+                                            field.defaultValue());
                             newFields.set(i, newField);
                             return;
                         }
@@ -463,7 +465,8 @@ public class SchemaManager implements Serializable {
                                             targetRootType,
                                             depth,
                                             update.fieldNames().length),
-                                    field.description());
+                                    field.description(),
+                                    field.defaultValue());
                         });
             } else if (change instanceof UpdateColumnNullability) {
                 UpdateColumnNullability update = (UpdateColumnNullability) 
change;
@@ -494,7 +497,8 @@ public class SchemaManager implements Serializable {
                                             sourceRootType,
                                             depth,
                                             update.fieldNames().length),
-                                    field.description());
+                                    field.description(),
+                                    field.defaultValue());
                         });
             } else if (change instanceof UpdateColumnComment) {
                 UpdateColumnComment update = (UpdateColumnComment) change;
@@ -506,11 +510,24 @@ public class SchemaManager implements Serializable {
                                         field.id(),
                                         field.name(),
                                         field.type(),
-                                        update.newDescription()));
+                                        update.newDescription(),
+                                        field.defaultValue()));
             } else if (change instanceof UpdateColumnPosition) {
                 UpdateColumnPosition update = (UpdateColumnPosition) change;
                 SchemaChange.Move move = update.move();
                 applyMove(newFields, move);
+            } else if (change instanceof UpdateColumnDefaultValue) {
+                UpdateColumnDefaultValue update = (UpdateColumnDefaultValue) 
change;
+                updateNestedColumn(
+                        newFields,
+                        update.fieldNames(),
+                        (field, depth) ->
+                                new DataField(
+                                        field.id(),
+                                        field.name(),
+                                        field.type(),
+                                        field.description(),
+                                        update.newDefaultValue()));
             } else {
                 throw new UnsupportedOperationException("Unsupported change: " 
+ change.getClass());
             }
@@ -793,7 +810,8 @@ public class SchemaManager implements Serializable {
                                 field.id(),
                                 field.name(),
                                 wrapNewRowType(field.type(), nestedFields),
-                                field.description()));
+                                field.description(),
+                                field.defaultValue()));
                 return;
             }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
index 0fff27dce4..82c61adf6c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
@@ -119,7 +119,8 @@ public class SchemaMergingUtils {
                                                     baseField.id(),
                                                     baseField.name(),
                                                     updatedDataType,
-                                                    baseField.description());
+                                                    baseField.description(),
+                                                    baseField.defaultValue());
                                         } else {
                                             return baseField;
                                         }
@@ -226,6 +227,10 @@ public class SchemaMergingUtils {
     private static DataField assignIdForNewField(DataField field, 
AtomicInteger highestFieldId) {
         DataType dataType = ReassignFieldId.reassign(field.type(), 
highestFieldId);
         return new DataField(
-                highestFieldId.incrementAndGet(), field.name(), dataType, 
field.description());
+                highestFieldId.incrementAndGet(),
+                field.name(),
+                dataType,
+                field.description(),
+                field.defaultValue());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 66b648f890..cf6b919499 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -21,9 +21,6 @@ package org.apache.paimon.schema;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.MergeEngine;
-import org.apache.paimon.casting.CastExecutor;
-import org.apache.paimon.casting.CastExecutors;
-import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
@@ -38,7 +35,6 @@ import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.StringUtils;
 
 import java.util.ArrayList;
@@ -105,8 +101,6 @@ public class SchemaValidation {
 
         validateBucket(schema, options);
 
-        validateDefaultValues(schema);
-
         validateStartupMode(options);
 
         validateFieldsPrefix(schema, options);
@@ -486,64 +480,6 @@ public class SchemaValidation {
         }
     }
 
-    private static void validateDefaultValues(TableSchema schema) {
-        CoreOptions coreOptions = new CoreOptions(schema.options());
-        Map<String, String> defaultValues = 
coreOptions.getFieldDefaultValues();
-
-        if (!defaultValues.isEmpty()) {
-
-            List<String> partitionKeys = schema.partitionKeys();
-            for (String partitionKey : partitionKeys) {
-                if (defaultValues.containsKey(partitionKey)) {
-                    throw new IllegalArgumentException(
-                            String.format(
-                                    "Partition key %s should not be assign 
default column.",
-                                    partitionKey));
-                }
-            }
-
-            List<String> primaryKeys = schema.primaryKeys();
-            for (String primaryKey : primaryKeys) {
-                if (defaultValues.containsKey(primaryKey)) {
-                    throw new IllegalArgumentException(
-                            String.format(
-                                    "Primary key %s should not be assign 
default column.",
-                                    primaryKey));
-                }
-            }
-
-            List<DataField> fields = schema.fields();
-
-            for (DataField field : fields) {
-                String defaultValueStr = defaultValues.get(field.name());
-                if (defaultValueStr == null) {
-                    continue;
-                }
-
-                @SuppressWarnings("unchecked")
-                CastExecutor<Object, Object> resolve =
-                        (CastExecutor<Object, Object>)
-                                CastExecutors.resolve(VarCharType.STRING_TYPE, 
field.type());
-                if (resolve == null) {
-                    throw new IllegalArgumentException(
-                            String.format(
-                                    "The column %s with datatype %s is 
currently not supported for default value.",
-                                    field.name(), field.type().asSQLString()));
-                }
-
-                try {
-                    resolve.cast(BinaryString.fromString(defaultValueStr));
-                } catch (Exception e) {
-                    throw new IllegalArgumentException(
-                            String.format(
-                                    "The default value %s of the column %s can 
not be cast to datatype: %s",
-                                    defaultValueStr, field.name(), 
field.type()),
-                            e);
-                }
-            }
-        }
-    }
-
     private static void validateForDeletionVectors(CoreOptions options) {
         checkArgument(
                 options.changelogProducer() == ChangelogProducer.NONE
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index f72cd7151f..cf3f8d6657 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.sink;
 
 import org.apache.paimon.FileStore;
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.casting.DefaultValueRow;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
@@ -63,6 +64,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     private BucketMode bucketMode;
 
     private final int[] notNullFieldIndex;
+    private final @Nullable DefaultValueRow defaultValueRow;
 
     public TableWriteImpl(
             RowType rowType,
@@ -84,6 +86,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
                         .map(DataField::name)
                         .collect(Collectors.toList());
         this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
+        this.defaultValueRow = DefaultValueRow.create(rowType);
     }
 
     @Override
@@ -162,6 +165,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     @Nullable
     public SinkRecord writeAndReturn(InternalRow row) throws Exception {
         checkNullability(row);
+        row = wrapDefaultValue(row);
         RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
         if (ignoreDelete && rowKind.isRetract()) {
             return null;
@@ -193,6 +197,10 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
         }
     }
 
+    private InternalRow wrapDefaultValue(InternalRow row) {
+        return defaultValueRow == null ? row : defaultValueRow.replaceRow(row);
+    }
+
     private SinkRecord toSinkRecord(InternalRow row) {
         keyAndBucketExtractor.setRecord(row);
         return new SinkRecord(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
index 0554cc0134..7cb31fba27 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
@@ -57,12 +57,16 @@ class DefaultValueAssignerTest {
         options.put(
                 String.format(
                         "%s.%s.%s",
-                        CoreOptions.FIELDS_PREFIX, "col4", 
CoreOptions.DEFAULT_VALUE_SUFFIX),
+                        CoreOptions.FIELDS_PREFIX,
+                        "col4",
+                        DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
                 "0");
         options.put(
                 String.format(
                         "%s.%s.%s",
-                        CoreOptions.FIELDS_PREFIX, "col5", 
CoreOptions.DEFAULT_VALUE_SUFFIX),
+                        CoreOptions.FIELDS_PREFIX,
+                        "col5",
+                        DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
                 "1");
         Schema schema =
                 new Schema(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java
index 2397af83aa..808ce70aee 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java
@@ -168,6 +168,17 @@ public class DataTypeJsonParserTest {
                                                         "f1",
                                                         new BooleanType(),
                                                         "This as well.")))),
+                TestSpec.forString(
+                                
"{\"type\":\"ROW\",\"fields\":[{\"id\":0,\"name\":\"f0\",\"type\":\"INT NOT 
NULL\",\"description\":\"my_comment\",\"defaultValue\":\"55\"}]}")
+                        .expectType(
+                                new RowType(
+                                        Collections.singletonList(
+                                                new DataField(
+                                                        0,
+                                                        "f0",
+                                                        new IntType(false),
+                                                        "my_comment",
+                                                        "55")))),
 
                 // error message testing
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 35928e2335..693c378a21 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
@@ -1291,7 +1292,7 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                                             "%s.%s.%s",
                                             CoreOptions.FIELDS_PREFIX,
                                             "b",
-                                            CoreOptions.DEFAULT_VALUE_SUFFIX),
+                                            
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
                                     "0");
                         });
         StreamTableWrite write = table.newWrite(commitUser);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 76a63eb06e..5d20e46ec2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -56,7 +56,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.function.Consumer;
 
@@ -84,126 +83,6 @@ public class SchemaEvolutionTest {
         commitUser = UUID.randomUUID().toString();
     }
 
-    @Test
-    public void testDefaultValue() throws Exception {
-        {
-            Map<String, String> option = new HashMap<>();
-            option.put(
-                    String.format(
-                            "%s.%s.%s",
-                            CoreOptions.FIELDS_PREFIX, "a", 
CoreOptions.DEFAULT_VALUE_SUFFIX),
-                    "1");
-            Schema schema =
-                    new Schema(
-                            RowType.of(
-                                            new DataType[] {
-                                                DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()),
-                                                DataTypes.BIGINT()
-                                            },
-                                            new String[] {"a", "b"})
-                                    .getFields(),
-                            Collections.emptyList(),
-                            Collections.emptyList(),
-                            option,
-                            "");
-
-            assertThatThrownBy(() -> schemaManager.createTable(schema))
-                    .isInstanceOf(IllegalArgumentException.class)
-                    .hasMessageContaining(
-                            "The column %s with datatype %s is currently not 
supported for default value.",
-                            "a", DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()).asSQLString());
-        }
-
-        {
-            Map<String, String> option = new HashMap<>();
-            option.put(
-                    String.format(
-                            "%s.%s.%s",
-                            CoreOptions.FIELDS_PREFIX, "a", 
CoreOptions.DEFAULT_VALUE_SUFFIX),
-                    "abcxxxx");
-            Schema schema =
-                    new Schema(
-                            RowType.of(
-                                            new DataType[] 
{DataTypes.BIGINT(), DataTypes.BIGINT()},
-                                            new String[] {"a", "b"})
-                                    .getFields(),
-                            Collections.emptyList(),
-                            Collections.emptyList(),
-                            option,
-                            "");
-            assertThatThrownBy(() -> schemaManager.createTable(schema))
-                    .isInstanceOf(IllegalArgumentException.class)
-                    .hasMessageContaining(
-                            "The default value %s of the column a can not be 
cast to datatype: %s",
-                            "abcxxxx", DataTypes.BIGINT().asSQLString());
-        }
-
-        {
-            Schema schema =
-                    new Schema(
-                            RowType.of(
-                                            new DataType[] {
-                                                DataTypes.BIGINT(),
-                                                DataTypes.BIGINT(),
-                                                DataTypes.BIGINT()
-                                            },
-                                            new String[] {"a", "b", "c"})
-                                    .getFields(),
-                            Lists.newArrayList("c"),
-                            Lists.newArrayList("a", "c"),
-                            new HashMap<>(),
-                            "");
-
-            schemaManager.createTable(schema);
-
-            assertThatThrownBy(
-                            () ->
-                                    schemaManager.commitChanges(
-                                            Collections.singletonList(
-                                                    SchemaChange.setOption(
-                                                            String.format(
-                                                                    "%s.%s.%s",
-                                                                    
CoreOptions.FIELDS_PREFIX,
-                                                                    "b",
-                                                                    CoreOptions
-                                                                            
.DEFAULT_VALUE_SUFFIX),
-                                                            "abcxxxx"))))
-                    .hasCauseInstanceOf(IllegalArgumentException.class)
-                    .hasMessageContaining(
-                            "The default value %s of the column b can not be 
cast to datatype: %s",
-                            "abcxxxx", DataTypes.BIGINT().asSQLString());
-            assertThatThrownBy(
-                            () ->
-                                    schemaManager.commitChanges(
-                                            Collections.singletonList(
-                                                    SchemaChange.setOption(
-                                                            String.format(
-                                                                    "%s.%s.%s",
-                                                                    
CoreOptions.FIELDS_PREFIX,
-                                                                    "a",
-                                                                    CoreOptions
-                                                                            
.DEFAULT_VALUE_SUFFIX),
-                                                            "abc"))))
-                    .hasCauseInstanceOf(IllegalArgumentException.class)
-                    .hasMessageContaining("Primary key a should not be assign 
default column.");
-
-            assertThatThrownBy(
-                            () ->
-                                    schemaManager.commitChanges(
-                                            Collections.singletonList(
-                                                    SchemaChange.setOption(
-                                                            String.format(
-                                                                    "%s.%s.%s",
-                                                                    
CoreOptions.FIELDS_PREFIX,
-                                                                    "c",
-                                                                    CoreOptions
-                                                                            
.DEFAULT_VALUE_SUFFIX),
-                                                            "abc"))))
-                    .hasCauseInstanceOf(IllegalArgumentException.class)
-                    .hasMessageContaining("Partition key c should not be 
assign default column.");
-        }
-    }
-
     @Test
     public void testAddField() throws Exception {
         Schema schema =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
index a99d6b234e..ec253143a7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source.snapshot;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -84,7 +85,7 @@ public class DefaultValueScannerTest extends ScannerTestBase {
         options.set(
                 String.format(
                         "%s.%s.%s",
-                        CoreOptions.FIELDS_PREFIX, "b", 
CoreOptions.DEFAULT_VALUE_SUFFIX),
+                        CoreOptions.FIELDS_PREFIX, "b", 
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
                 "100");
         return createFileStoreTable(options);
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 6f16609372..171b489d42 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -750,8 +750,8 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
                         anyCauseMatches(
                                 IllegalArgumentException.class,
                                 "Column v1 have different types when merging 
schemas.\n"
-                                        + "Current table 
'{paimon_sync_table.incompatible_field_1}' field: `v1` TIMESTAMP(0) ''\n"
-                                        + "To be merged table 
'paimon_sync_table.incompatible_field_2' field: `v1` INT ''"));
+                                        + "Current table 
'{paimon_sync_table.incompatible_field_1}' field: `v1` TIMESTAMP(0)\n"
+                                        + "To be merged table 
'paimon_sync_table.incompatible_field_2' field: `v1` INT"));
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java
new file mode 100644
index 0000000000..b85dfd28e5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Alter column default value procedure. Usage:
+ *
+ * <pre><code>
+ *  CALL sys.alter_column_default_value('table_identifier', 'column', 
'default_value')
+ * </code></pre>
+ */
+public class AlterColumnDefaultValueProcedure extends ProcedureBase {
+
+    @Override
+    public String identifier() {
+        return "alter_column_default_value";
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "column", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "default_value", type = 
@DataTypeHint("STRING"))
+            })
+    public String[] call(
+            ProcedureContext procedureContext, String table, String column, 
String defaultValue)
+            throws Catalog.ColumnAlreadyExistException, 
Catalog.TableNotExistException,
+                    Catalog.ColumnNotExistException {
+        Identifier identifier = Identifier.fromString(table);
+        String[] fieldNames = StringUtils.split(column, ".");
+        SchemaChange schemaChange = 
SchemaChange.updateColumnDefaultValue(fieldNames, defaultValue);
+        catalog.alterTable(identifier, ImmutableList.of(schemaChange), false);
+        return new String[] {"Success"};
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index df615478c5..df8b368c0a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -95,3 +95,4 @@ org.apache.paimon.flink.procedure.AlterViewDialectProcedure
 org.apache.paimon.flink.procedure.CreateFunctionProcedure
 org.apache.paimon.flink.procedure.DropFunctionProcedure
 org.apache.paimon.flink.procedure.AlterFunctionProcedure
+org.apache.paimon.flink.procedure.AlterColumnDefaultValueProcedure
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 57a6746fea..41313e0e9c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -40,6 +40,15 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** IT cases for table with branches using SQL. */
 public class BranchSqlITCase extends CatalogITCaseBase {
 
+    @Test
+    public void testDefaultValue() throws Exception {
+        sql("CREATE TABLE T (a INT, b INT)");
+        sql("CALL sys.alter_column_default_value('default.T', 'b', '5')");
+        sql("INSERT INTO T (a) VALUES (1), (2)");
+        assertThat(collectResult("SELECT * FROM T"))
+                .containsExactlyInAnyOrder("+I[1, 5]", "+I[2, 5]");
+    }
+
     @Test
     public void testAlterBranchTable() throws Exception {
         sql(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 285b245c4a..676836d15c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.sink.FlinkTableSink;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.operation.DefaultValueAssigner;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -1589,7 +1590,8 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
     public void testDefaultValueWithoutPrimaryKey() throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put(
-                CoreOptions.FIELDS_PREFIX + ".rate." + 
CoreOptions.DEFAULT_VALUE_SUFFIX, "1000");
+                CoreOptions.FIELDS_PREFIX + ".rate." + 
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX,
+                "1000");
 
         String table =
                 createTable(
@@ -1625,7 +1627,8 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
             throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put(
-                CoreOptions.FIELDS_PREFIX + ".rate." + 
CoreOptions.DEFAULT_VALUE_SUFFIX, "1000");
+                CoreOptions.FIELDS_PREFIX + ".rate." + 
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX,
+                "1000");
         options.put(MERGE_ENGINE.key(), mergeEngine.toString());
         if (mergeEngine == FIRST_ROW) {
             options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index be257aef4b..1cff47d7d0 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -203,7 +203,8 @@ public class OrcFileFormat extends FileFormat {
                                                         f.id(),
                                                         f.name(),
                                                         
refineDataType(f.type()),
-                                                        f.description()))
+                                                        f.description(),
+                                                        f.defaultValue()))
                                 .collect(Collectors.toList()));
             default:
                 return type;
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
new file mode 100644
index 0000000000..ce375ff56d
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+/**
+ * Capabilities that can be provided by a {@link TableCatalog} implementation.
+ */
+public enum TableCatalogCapability {
+
+  /**
+   * Signals that the TableCatalog supports defining generated columns upon 
table creation in SQL.
+   * <p>
+   * Without this capability, any create/replace table statements with a 
generated column defined
+   * in the table schema will throw an exception during analysis.
+   * <p>
+   * A generated column is defined with syntax: {@code colName colType 
GENERATED ALWAYS AS (expr)}
+   * <p>
+   * Generation expression are included in the column definition for APIs like
+   * {@link TableCatalog#createTable}.
+   */
+  SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS,
+
+  /**
+   * Signals that the TableCatalog supports defining column default value as 
expression in
+   * CREATE/REPLACE/ALTER TABLE.
+   * <p>
+   * Without this capability, any CREATE/REPLACE/ALTER TABLE statement with a 
column default value
+   * defined in the table schema will throw an exception during analysis.
+   * <p>
+   * A column default value is defined with syntax: {@code colName colType 
DEFAULT expr}
+   * <p>
+   * Column default value expression is included in the column definition for 
APIs like
+   * {@link TableCatalog#createTable}.
+   */
+  SUPPORT_COLUMN_DEFAULT_VALUE
+}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
 
b/paimon-spark/paimon-spark-3.3/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
new file mode 100644
index 0000000000..94fde80128
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+/** Capabilities that can be provided by a {@link TableCatalog} 
implementation. */
+public enum TableCatalogCapability {
+
+    /**
+     * Signals that the TableCatalog supports defining generated columns upon 
table creation in SQL.
+     *
+     * <p>Without this capability, any create/replace table statements with a 
generated column
+     * defined in the table schema will throw an exception during analysis.
+     *
+     * <p>A generated column is defined with syntax: {@code colName colType 
GENERATED ALWAYS AS
+     * (expr)}
+     *
+     * <p>Generation expression are included in the column definition for APIs 
like {@link
+     * TableCatalog#createTable}.
+     */
+    SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS,
+
+    /**
+     * Signals that the TableCatalog supports defining column default value as 
expression in
+     * CREATE/REPLACE/ALTER TABLE.
+     *
+     * <p>Without this capability, any CREATE/REPLACE/ALTER TABLE statement 
with a column default
+     * value defined in the table schema will throw an exception during 
analysis.
+     *
+     * <p>A column default value is defined with syntax: {@code colName 
colType DEFAULT expr}
+     *
+     * <p>Column default value expression is included in the column definition 
for APIs like {@link
+     * TableCatalog#createTable}.
+     */
+    SUPPORT_COLUMN_DEFAULT_VALUE
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 695f03d9ca..d910ade8fa 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -36,6 +36,7 @@ import org.apache.paimon.spark.utils.CatalogUtils;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.FormatTableOptions;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.utils.TypeUtils;
 
 import org.apache.spark.sql.PaimonSparkSession$;
@@ -50,6 +51,7 @@ import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableCatalogCapability;
 import org.apache.spark.sql.connector.catalog.TableChange;
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.apache.spark.sql.connector.expressions.FieldReference;
@@ -77,18 +79,24 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.FORMAT_TABLE;
 import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
+import static 
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY;
 import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 import static 
org.apache.paimon.spark.util.OptionUtils.checkRequiredConfigurations;
 import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
 import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
+import static org.apache.paimon.spark.utils.CatalogUtils.checkNoDefaultValue;
+import static 
org.apache.paimon.spark.utils.CatalogUtils.isUpdateColumnDefaultValue;
 import static org.apache.paimon.spark.utils.CatalogUtils.removeCatalogName;
 import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
+import static 
org.apache.paimon.spark.utils.CatalogUtils.toUpdateColumnDefaultValue;
+import static 
org.apache.spark.sql.connector.catalog.TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE;
 
 /** Spark {@link TableCatalog} for paimon. */
 public class SparkCatalog extends SparkBaseCatalog
@@ -103,6 +111,10 @@ public class SparkCatalog extends SparkBaseCatalog
 
     private String defaultDatabase;
 
+    public Set<TableCatalogCapability> capabilities() {
+        return Collections.singleton(SUPPORT_COLUMN_DEFAULT_VALUE);
+    }
+
     @Override
     public void initialize(String name, CaseInsensitiveStringMap options) {
         checkRequiredConfigurations();
@@ -353,6 +365,7 @@ public class SparkCatalog extends SparkBaseCatalog
         } else if (change instanceof TableChange.AddColumn) {
             TableChange.AddColumn add = (TableChange.AddColumn) change;
             SchemaChange.Move move = getMove(add.position(), add.fieldNames());
+            checkNoDefaultValue(add);
             return SchemaChange.addColumn(
                     add.fieldNames(),
                     toPaimonType(add.dataType()).copy(add.isNullable()),
@@ -379,6 +392,8 @@ public class SparkCatalog extends SparkBaseCatalog
             TableChange.UpdateColumnPosition update = 
(TableChange.UpdateColumnPosition) change;
             SchemaChange.Move move = getMove(update.position(), 
update.fieldNames());
             return SchemaChange.updateColumnPosition(move);
+        } else if (isUpdateColumnDefaultValue(change)) {
+            return toUpdateColumnDefaultValue(change);
         } else {
             throw new UnsupportedOperationException(
                     "Change is not supported: " + change.getClass());
@@ -428,10 +443,16 @@ public class SparkCatalog extends SparkBaseCatalog
                         
.comment(properties.getOrDefault(TableCatalog.PROP_COMMENT, null));
 
         for (StructField field : schema.fields()) {
-            schemaBuilder.column(
-                    field.name(),
-                    toPaimonType(field.dataType()).copy(field.nullable()),
-                    field.getComment().getOrElse(() -> null));
+            String name = field.name();
+            DataType type = 
toPaimonType(field.dataType()).copy(field.nullable());
+            String comment = field.getComment().getOrElse(() -> null);
+            if 
(field.metadata().contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+                String defaultValue =
+                        
field.metadata().getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY);
+                schemaBuilder.column(name, type, comment, defaultValue);
+            } else {
+                schemaBuilder.column(name, type, comment);
+            }
         }
         return schemaBuilder.build();
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
index ddda188d18..25882f5e86 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.utils;
 
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DecimalType;
@@ -27,7 +28,9 @@ import org.apache.paimon.types.MultisetType;
 
 import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
 import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.connector.catalog.ColumnDefaultValue;
 import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableChange;
 import org.apache.spark.sql.types.BooleanType;
 import org.apache.spark.sql.types.ByteType;
 import org.apache.spark.sql.types.DataTypes;
@@ -248,4 +251,31 @@ public class CatalogUtils {
 
         throw new IllegalArgumentException("Unsupported Spark data type: " + 
sparkType);
     }
+
+    public static void checkNoDefaultValue(TableChange.AddColumn addColumn) {
+        try {
+            ColumnDefaultValue defaultValue = addColumn.defaultValue();
+            if (defaultValue != null) {
+                throw new IllegalArgumentException(
+                        String.format(
+                                "Cannot add column %s with default value %s.",
+                                Arrays.toString(addColumn.fieldNames()), 
defaultValue));
+            }
+        } catch (NoClassDefFoundError | NoSuchMethodError ignored) {
+        }
+    }
+
+    public static boolean isUpdateColumnDefaultValue(TableChange tableChange) {
+        try {
+            return tableChange instanceof TableChange.UpdateColumnDefaultValue;
+        } catch (NoClassDefFoundError ignored) {
+            return false;
+        }
+    }
+
+    public static SchemaChange toUpdateColumnDefaultValue(TableChange 
tableChange) {
+        TableChange.UpdateColumnDefaultValue update =
+                (TableChange.UpdateColumnDefaultValue) tableChange;
+        return SchemaChange.updateColumnDefaultValue(update.fieldNames(), 
update.newDefaultValue());
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
index ae95881621..de14ef4316 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
@@ -48,6 +48,7 @@ import org.apache.spark.sql.paimon.shims.SparkShimLoader;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.MetadataBuilder;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.types.UserDefinedType;
@@ -62,6 +63,11 @@ public class SparkTypeUtils {
 
     private SparkTypeUtils() {}
 
+    /**
+     * Copy here from Spark ResolveDefaultColumnsUtils for old Spark versions.
+     */
+    public static final String CURRENT_DEFAULT_COLUMN_METADATA_KEY = 
"CURRENT_DEFAULT";
+
     public static RowType toPartitionType(Table table) {
         int[] projections = 
table.rowType().getFieldIndices(table.partitionKeys());
         List<DataField> partitionTypes = new ArrayList<>();
@@ -252,9 +258,17 @@ public class SparkTypeUtils {
         public DataType visit(RowType rowType) {
             List<StructField> fields = new 
ArrayList<>(rowType.getFieldCount());
             for (DataField field : rowType.getFields()) {
+                MetadataBuilder metadataBuilder = new MetadataBuilder();
+                if (field.defaultValue() != null) {
+                    
metadataBuilder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, 
field.defaultValue());
+                }
                 StructField structField =
                         DataTypes.createStructField(
-                                field.name(), field.type().accept(this), 
field.type().isNullable());
+                                field.name(),
+                                field.type().accept(this),
+                                field.type().isNullable(),
+                                metadataBuilder.build()
+                                );
                 structField =
                         Optional.ofNullable(field.description())
                                 .map(structField::withComment)
@@ -329,9 +343,14 @@ public class SparkTypeUtils {
                 org.apache.paimon.types.DataType fieldType =
                         fieldResults.get(i).copy(field.nullable());
                 String comment = field.getComment().getOrElse(() -> null);
+                String defaultValue = null;
+                if 
(field.metadata().contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+                    defaultValue =
+                            
field.metadata().getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY);
+                }
                 newFields.add(
                         new DataField(
-                                atomicInteger.incrementAndGet(), field.name(), 
fieldType, comment));
+                                atomicInteger.incrementAndGet(), field.name(), 
fieldType, comment, defaultValue));
             }
 
             return new RowType(newFields);
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index fff94ce037..b39e44fc6f 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for spark writer. */
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
@@ -78,7 +79,49 @@ public class SparkWriteITCase {
 
     @AfterEach
     public void afterEach() {
-        spark.sql("DROP TABLE T");
+        spark.sql("DROP TABLE IF EXISTS T");
+    }
+
+    @Test
+    public void testWriteWithDefaultValue() {
+        spark.sql(
+                "CREATE TABLE T (a INT, b INT DEFAULT 2, c STRING DEFAULT 
'my_value') TBLPROPERTIES"
+                        + " ('file.format'='avro')");
+
+        // test show create table
+        List<Row> show = spark.sql("SHOW CREATE TABLE T").collectAsList();
+        assertThat(show.toString())
+                .contains("a INT,\n" + "  b INT DEFAULT 2,\n" + "  c STRING 
DEFAULT 'my_value'");
+
+        // test partial write
+        spark.sql("INSERT INTO T (a) VALUES (1), (2)").collectAsList();
+        List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[1,2,my_value], 
[2,2,my_value]]");
+
+        // test write with DEFAULT
+        spark.sql("INSERT INTO T VALUES (3, DEFAULT, 
DEFAULT)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[1,2,my_value], 
[2,2,my_value], [3,2,my_value]]");
+
+        // test add column with DEFAULT not support
+        assertThatThrownBy(() -> spark.sql("ALTER TABLE T ADD COLUMN d INT 
DEFAULT 5"))
+                .hasMessageContaining(
+                        "Unsupported table change: Cannot add column [d] with 
default value");
+
+        // test alter type to default column
+        spark.sql("ALTER TABLE T ALTER COLUMN b TYPE STRING").collectAsList();
+        spark.sql("INSERT INTO T (a) VALUES (4)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString())
+                .isEqualTo("[[1,2,my_value], [2,2,my_value], [3,2,my_value], 
[4,2,my_value]]");
+
+        // test alter default column
+        spark.sql("ALTER TABLE T ALTER COLUMN b SET DEFAULT '3'");
+        spark.sql("INSERT INTO T (a) VALUES (5)").collectAsList();
+        rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString())
+                .isEqualTo(
+                        "[[1,2,my_value], [2,2,my_value], [3,2,my_value], 
[4,2,my_value], [5,3,my_value]]");
     }
 
     @Test

Reply via email to