This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new eb5af93b3 [core] add default column value (#1425)
eb5af93b3 is described below

commit eb5af93b396c5662cddf3942e5ae476bb3924679
Author: wgcn <[email protected]>
AuthorDate: Tue Jul 11 12:08:56 2023 +0800

    [core] add default column value (#1425)
---
 docs/content/concepts/primary-key-table.md         |  41 +++++
 docs/content/how-to/creating-tables.md             |  69 +++++++-
 .../main/java/org/apache/paimon/CoreOptions.java   |  15 ++
 .../org/apache/paimon/casting/DefaultValueRow.java | 182 +++++++++++++++++++++
 .../paimon/operation/DefaultValueAssiger.java      | 157 ++++++++++++++++++
 .../org/apache/paimon/schema/SchemaValidation.java |  64 ++++++++
 .../paimon/table/AbstractFileStoreTable.java       |  53 +++++-
 .../paimon/table/AppendOnlyFileStoreTable.java     |   2 +-
 .../table/ChangelogValueCountFileStoreTable.java   |   2 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   2 +-
 .../table/source/InnerStreamTableScanImpl.java     |   9 +-
 .../paimon/table/source/InnerTableScanImpl.java    |  11 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  |  10 +-
 .../paimon/operation/DefaultValueAssigerTest.java  | 131 +++++++++++++++
 .../table/ChangelogWithKeyFileStoreTableTest.java  |  51 ++++++
 .../apache/paimon/table/SchemaEvolutionTest.java   | 124 ++++++++++++++
 .../source/snapshot/DefaultValueScannerTest.java   |  89 ++++++++++
 .../apache/paimon/flink/ReadWriteTableITCase.java  |  74 ++++++++-
 18 files changed, 1071 insertions(+), 15 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index 60d68503f..bf8e21f95 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -122,6 +122,47 @@ INSERT INTO T VALUES (1, 3, 3, 1, 3, 3, 3);
 SELECT * FROM T; -- output 1, 2, 2, 2, 3, 3, 3
 ```
 
+#### Default Value
+If the order of the data cannot be guaranteed and field is written only by 
overwriting null values,
+fields that have not been overwritten will be displayed as null when reading 
table.
+
+```sql
+CREATE TABLE T (
+                  k INT,
+                  a INT,
+                  b INT,
+                  c INT,
+                  PRIMARY KEY (k) NOT ENFORCED
+) WITH (
+     'merge-engine'='partial-update'
+     );
+INSERT INTO T VALUES (1, 1,null,null);
+INSERT INTO T VALUES (1, null,null,1);
+
+SELECT * FROM T; -- output 1, 1, null, 1
+```
+If it is expected that fields which have not been overwritten have a default 
value instead of null when reading table,
+'fields.name.default-value' is required.
+```sql
+CREATE TABLE T (
+    k INT,
+    a INT,
+    b INT,
+    c INT,
+    PRIMARY KEY (k) NOT ENFORCED
+) WITH (
+    'merge-engine'='partial-update',
+    'fields.b.default-value'='0'
+);
+
+INSERT INTO T VALUES (1, 1,null,null);
+INSERT INTO T VALUES (1, null,null,1);
+
+SELECT * FROM T; -- output 1, 1, 0, 1
+```
+
+
+
 ### Aggregation
 
 {{< hint info >}}
diff --git a/docs/content/how-to/creating-tables.md 
b/docs/content/how-to/creating-tables.md
index 04795702c..87b99d5bd 100644
--- a/docs/content/how-to/creating-tables.md
+++ b/docs/content/how-to/creating-tables.md
@@ -221,7 +221,7 @@ CREATE TABLE MyTable (
 By configuring [partition.expiration-time]({{< ref 
"maintenance/manage-partition" >}}), expired partitions can be automatically 
deleted.
 {{< /hint >}}
 
-### Pick Partition Fields
+#### Pick Partition Fields
 
 {{< hint info >}}
 Partition fields must be a subset of primary keys if primary keys are defined.
@@ -248,6 +248,73 @@ Paimon will automatically collect the statistics of the 
data file for speeding u
 The statistics collector mode can be configured by `'metadata.stats-mode'`, by 
default is `'truncate(16)'`.
 You can configure the field level by setting 
`'fields.{field_name}.stats-mode'`.
 
+
+### Field Default Value
+
+Paimon table currently supports setting default values for fields in table 
properties,
+note that partition fields and primary key fields can not be specified.
+{{< tabs "default-value-example" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+CREATE TABLE MyTable (
+    user_id BIGINT,
+    item_id BIGINT,
+    behavior STRING,
+    dt STRING,
+    hh STRING,
+    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
+) PARTITIONED BY (dt, hh)
+with(
+    'fields.item_id.deafult-value'='0'
+);
+```
+
+{{< /tab >}}
+
+{{< tab "Spark3" >}}
+
+```sql
+CREATE TABLE MyTable (
+    user_id BIGINT,
+    item_id BIGINT,
+    behavior STRING,
+    dt STRING,
+    hh STRING
+) PARTITIONED BY (dt, hh) TBLPROPERTIES (
+    'primary-key' = 'dt,hh,user_id',
+    'fields.item_id.deafult-value'='0'
+);
+```
+
+{{< /tab >}}
+
+{{< tab "Hive" >}}
+
+```sql
+SET hive.metastore.warehouse.dir=warehouse_path;
+
+CREATE TABLE MyTable (
+    user_id BIGINT,
+    item_id BIGINT,
+    behavior STRING,
+    dt STRING,
+    hh STRING
+)
+STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
+TBLPROPERTIES (
+    'primary-key' = 'dt,hh,user_id',
+    'partition'='dt,hh',
+    'fields.item_id.deafult-value'='0'
+);
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+
 ## Create Table As
 
 Table can be created and populated by the results of a query, for example, we 
have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`,
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 5b983dbee..06187b8b7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -53,6 +53,7 @@ import static 
org.apache.paimon.options.description.TextElement.text;
 
 /** Core options for paimon. */
 public class CoreOptions implements Serializable {
+    public static final String DEFAULT_VALUE_SUFFIX = "default-value";
 
     public static final String FIELDS_PREFIX = "fields";
 
@@ -1086,6 +1087,20 @@ public class CoreOptions implements Serializable {
         return options.get(CONSUMER_EXPIRATION_TIME);
     }
 
+    public Options getFieldDefaultValues() {
+        Map<String, String> defultValues = new HashMap<>();
+        for (Map.Entry<String, String> option : options.toMap().entrySet()) {
+            String key = option.getKey();
+            String fieldPrefix = FIELDS_PREFIX + ".";
+            String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
+            if (key != null && key.startsWith(fieldPrefix) && 
key.endsWith(defaultValueSuffix)) {
+                String fieldName = key.replace(fieldPrefix, 
"").replace(defaultValueSuffix, "");
+                defultValues.put(fieldName, option.getValue());
+            }
+        }
+        return new Options(defultValues);
+    }
+
     public List<CommitCallback> commitCallbacks() {
         List<CommitCallback> result = new ArrayList<>();
         for (String className : options.get(COMMIT_CALLBACKS).split(",")) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java 
b/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
new file mode 100644
index 000000000..10f2e5cd1
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+/**
+ * An implementation of {@link InternalRow} which provides a default volue for 
the underlying {@link
+ * InternalRow}.
+ */
+public class DefaultValueRow implements InternalRow {
+    private InternalRow row;
+
+    private InternalRow defaultValueRow;
+
+    private DefaultValueRow(InternalRow defaultValueRow) {
+        this.defaultValueRow = defaultValueRow;
+    }
+
+    public DefaultValueRow replaceRow(InternalRow row) {
+        this.row = row;
+        return this;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return row.getFieldCount();
+    }
+
+    @Override
+    public RowKind getRowKind() {
+        return row.getRowKind();
+    }
+
+    @Override
+    public void setRowKind(RowKind kind) {
+        row.setRowKind(kind);
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return row.isNullAt(pos) && defaultValueRow.isNullAt(pos);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        if (!row.isNullAt(pos)) {
+            return row.getBoolean(pos);
+        }
+        return defaultValueRow.getBoolean(pos);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        if (!row.isNullAt(pos)) {
+            return row.getByte(pos);
+        }
+        return defaultValueRow.getByte(pos);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        if (!row.isNullAt(pos)) {
+            return row.getShort(pos);
+        }
+        return defaultValueRow.getShort(pos);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        if (!row.isNullAt(pos)) {
+            return row.getInt(pos);
+        }
+        return defaultValueRow.getInt(pos);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        if (!row.isNullAt(pos)) {
+            return row.getLong(pos);
+        }
+        return defaultValueRow.getLong(pos);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        if (!row.isNullAt(pos)) {
+            return row.getFloat(pos);
+        }
+        return defaultValueRow.getFloat(pos);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        if (!row.isNullAt(pos)) {
+            return row.getDouble(pos);
+        }
+        return defaultValueRow.getDouble(pos);
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        if (!row.isNullAt(pos)) {
+            return row.getString(pos);
+        }
+        return defaultValueRow.getString(pos);
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        if (!row.isNullAt(pos)) {
+            return row.getDecimal(pos, precision, scale);
+        }
+        return defaultValueRow.getDecimal(pos, precision, scale);
+    }
+
+    @Override
+    public Timestamp getTimestamp(int pos, int precision) {
+        if (!row.isNullAt(pos)) {
+            return row.getTimestamp(pos, precision);
+        }
+        return defaultValueRow.getTimestamp(pos, precision);
+    }
+
+    @Override
+    public byte[] getBinary(int pos) {
+        if (!row.isNullAt(pos)) {
+            return row.getBinary(pos);
+        }
+        return defaultValueRow.getBinary(pos);
+    }
+
+    @Override
+    public InternalArray getArray(int pos) {
+        if (!row.isNullAt(pos)) {
+            return row.getArray(pos);
+        }
+        return defaultValueRow.getArray(pos);
+    }
+
+    @Override
+    public InternalMap getMap(int pos) {
+        if (!row.isNullAt(pos)) {
+            return row.getMap(pos);
+        }
+        return defaultValueRow.getMap(pos);
+    }
+
+    @Override
+    public InternalRow getRow(int pos, int numFields) {
+        if (!row.isNullAt(pos)) {
+            return row.getRow(pos, numFields);
+        }
+        return defaultValueRow.getRow(pos, numFields);
+    }
+
+    public static DefaultValueRow from(InternalRow defaultValueRow) {
+        return new DefaultValueRow(defaultValueRow);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
new file mode 100644
index 000000000..adaa645fe
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.casting.DefaultValueRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.PredicateReplaceVisitor;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.Projection;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * the field Default value assigner. note that the invoke of assigning should 
be after merge and
+ * schema evolution
+ */
+public class DefaultValueAssiger {
+
+    private GenericRow defaultValueMapping;
+    private TableSchema tableSchema;
+
+    private Map<String, String> defaultValues;
+
+    private int[][] project;
+
+    private boolean isCacheDefaultMapping;
+
+    public DefaultValueAssiger(TableSchema tableSchema) {
+        this.tableSchema = tableSchema;
+
+        CoreOptions coreOptions = new CoreOptions(tableSchema.options());
+        defaultValues = coreOptions.getFieldDefaultValues().toMap();
+    }
+
+    public DefaultValueAssiger handleProject(int[][] project) {
+        this.project = project;
+        return this;
+    }
+
+    /** assign default value for colomn which value is null. */
+    public RecordReader<InternalRow> 
assignFieldsDefaultValue(RecordReader<InternalRow> reader) {
+        if (defaultValues.isEmpty()) {
+            return reader;
+        }
+
+        if (!isCacheDefaultMapping) {
+            isCacheDefaultMapping = true;
+            this.defaultValueMapping = createDefaultValueMapping();
+        }
+
+        RecordReader<InternalRow> result = reader;
+        if (defaultValueMapping != null) {
+            DefaultValueRow defaultValueRow = 
DefaultValueRow.from(defaultValueMapping);
+            result = reader.transform(defaultValueRow::replaceRow);
+        }
+        return result;
+    }
+
+    GenericRow createDefaultValueMapping() {
+
+        RowType valueType = tableSchema.logicalRowType();
+
+        List<DataField> fields;
+        if (project != null) {
+            fields = Projection.of(project).project(valueType).getFields();
+        } else {
+            fields = valueType.getFields();
+        }
+
+        GenericRow defaultValuesMa = null;
+        if (!fields.isEmpty()) {
+            defaultValuesMa = new GenericRow(fields.size());
+            for (int i = 0; i < fields.size(); i++) {
+                DataField dataField = fields.get(i);
+                String defaultValueStr = defaultValues.get(dataField.name());
+                if (defaultValueStr == null) {
+                    continue;
+                }
+
+                CastExecutor<Object, Object> resolve =
+                        (CastExecutor<Object, Object>)
+                                CastExecutors.resolve(VarCharType.STRING_TYPE, 
dataField.type());
+
+                if (resolve == null) {
+                    throw new RuntimeException(
+                            "Default value do not support the type of " + 
dataField.type());
+                }
+                Object defaultValue = 
resolve.cast(BinaryString.fromString(defaultValueStr));
+                defaultValuesMa.setField(i, defaultValue);
+            }
+        }
+
+        return defaultValuesMa;
+    }
+
+    public Predicate handlePredicate(Predicate filters) {
+        Predicate result = filters;
+        if (!defaultValues.isEmpty()) {
+            if (filters != null) {
+                // TODO improve predicate tree with replacing always true and 
always false
+                PredicateReplaceVisitor deletePredicateWithFieldNameVisitor =
+                        predicate -> {
+                            if 
(defaultValues.containsKey(predicate.fieldName())) {
+                                return Optional.empty();
+                            }
+                            return Optional.of(predicate);
+                        };
+
+                ArrayList<Predicate> filterWithouDefaultValueField = new 
ArrayList<>();
+
+                List<Predicate> predicates = 
PredicateBuilder.splitAnd(filters);
+                for (Predicate predicate : predicates) {
+                    predicate
+                            .visit(deletePredicateWithFieldNameVisitor)
+                            .ifPresent(filterWithouDefaultValueField::add);
+                }
+
+                if (!filterWithouDefaultValueField.isEmpty()) {
+                    result = 
PredicateBuilder.and(filterWithouDefaultValueField);
+                } else {
+                    result = null;
+                }
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 044f6b1f6..b04554a83 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -20,6 +20,9 @@ package org.apache.paimon.schema;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.WriteMode;
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
@@ -29,6 +32,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -74,6 +78,8 @@ public class SchemaValidation {
 
         CoreOptions options = new CoreOptions(schema.options());
 
+        validateDefaultValues(schema);
+
         validateStartupMode(options);
 
         if (options.writeMode() == WriteMode.APPEND_ONLY
@@ -288,4 +294,62 @@ public class SchemaValidation {
     private static String concatConfigKeys(List<ConfigOption<?>> 
configOptions) {
         return 
configOptions.stream().map(ConfigOption::key).collect(Collectors.joining(","));
     }
+
+    private static void validateDefaultValues(TableSchema schema) {
+        CoreOptions coreOptions = new CoreOptions(schema.options());
+        Map<String, String> defaultValues = 
coreOptions.getFieldDefaultValues().toMap();
+
+        if (!defaultValues.isEmpty()) {
+
+            List<String> partitionKeys = schema.partitionKeys();
+            for (String partitionKey : partitionKeys) {
+                if (defaultValues.containsKey(partitionKey)) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Partition key %s should not be assign 
default column.",
+                                    partitionKey));
+                }
+            }
+
+            List<String> primaryKeys = schema.primaryKeys();
+            for (String primaryKey : primaryKeys) {
+                if (defaultValues.containsKey(primaryKey)) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "Primary key %s should not be assign 
default column.",
+                                    primaryKey));
+                }
+            }
+
+            List<DataField> fields = schema.fields();
+
+            for (int i = 0; i < fields.size(); i++) {
+                DataField dataField = fields.get(i);
+                String defaultValueStr = defaultValues.get(dataField.name());
+                if (defaultValueStr == null) {
+                    continue;
+                }
+
+                CastExecutor<Object, Object> resolve =
+                        (CastExecutor<Object, Object>)
+                                CastExecutors.resolve(VarCharType.STRING_TYPE, 
dataField.type());
+                if (resolve == null) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "The column %s with datatype %s is 
currently not supported for default value.",
+                                    dataField.name(), 
dataField.type().asSQLString()));
+                }
+
+                try {
+                    resolve.cast(BinaryString.fromString(defaultValueStr));
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(
+                            String.format(
+                                    "The default value %s of the column %s can 
not be cast to datatype: %s",
+                                    defaultValueStr, dataField.name(), 
dataField.type()),
+                            e);
+                }
+            }
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 536bc1c34..619fc4151 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -21,12 +21,15 @@ package org.apache.paimon.table;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.operation.DefaultValueAssiger;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaValidation;
 import org.apache.paimon.schema.TableSchema;
@@ -37,8 +40,10 @@ import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
 import org.apache.paimon.table.source.InnerStreamTableScan;
 import org.apache.paimon.table.source.InnerStreamTableScanImpl;
+import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.InnerTableScan;
 import org.apache.paimon.table.source.InnerTableScanImpl;
+import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
@@ -107,12 +112,17 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 coreOptions(),
                 snapshotManager(),
                 splitGenerator(),
-                nonPartitionFilterConsumer());
+                nonPartitionFilterConsumer(),
+                new DefaultValueAssiger(tableSchema));
     }
 
     @Override
     public InnerTableScan newScan() {
-        return new InnerTableScanImpl(coreOptions(), newSnapshotReader(), 
snapshotManager());
+        return new InnerTableScanImpl(
+                coreOptions(),
+                newSnapshotReader(),
+                snapshotManager(),
+                new DefaultValueAssiger(tableSchema));
     }
 
     @Override
@@ -121,7 +131,8 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 coreOptions(),
                 newSnapshotReader(),
                 snapshotManager(),
-                supportStreamingReadOverwrite());
+                supportStreamingReadOverwrite(),
+                new DefaultValueAssiger(tableSchema));
     }
 
     public abstract SplitGenerator splitGenerator();
@@ -282,6 +293,42 @@ public abstract class AbstractFileStoreTable implements 
FileStoreTable {
         rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId));
     }
 
+    abstract InnerTableRead innerRead();
+
+    @Override
+    public InnerTableRead newRead() {
+        InnerTableRead innerTableRead = innerRead();
+        DefaultValueAssiger defaultValueAssiger = new 
DefaultValueAssiger(tableSchema);
+        return new InnerTableRead() {
+            @Override
+            public InnerTableRead withFilter(Predicate predicate) {
+                
innerTableRead.withFilter(defaultValueAssiger.handlePredicate(predicate));
+                return this;
+            }
+
+            @Override
+            public InnerTableRead withProjection(int[][] projection) {
+                defaultValueAssiger.handleProject(projection);
+                innerTableRead.withProjection(projection);
+                return this;
+            }
+
+            @Override
+            public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+                RecordReader<InternalRow> reader =
+                        defaultValueAssiger.assignFieldsDefaultValue(
+                                innerTableRead.createReader(split));
+                return reader;
+            }
+
+            @Override
+            public InnerTableRead forceKeepDelete() {
+                innerTableRead.forceKeepDelete();
+                return this;
+            }
+        };
+    }
+
     @Override
     public void createTag(String tagName, long fromSnapshotId) {
         SnapshotManager snapshotManager = snapshotManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index beb703b84..6dfe02848 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -105,7 +105,7 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public InnerTableRead newRead() {
+    public InnerTableRead innerRead() {
         AppendOnlyFileStoreRead read = store().newRead();
         return new InnerTableRead() {
             @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index 42b5fa18a..ec8105cea 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -118,7 +118,7 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public InnerTableRead newRead() {
+    public InnerTableRead innerRead() {
         return new KeyValueTableRead(store().newRead()) {
 
             @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index ec692a013..95a9f0049 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -192,7 +192,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
     }
 
     @Override
-    public InnerTableRead newRead() {
+    public InnerTableRead innerRead() {
         return new KeyValueTableRead(store().newRead()) {
 
             @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 44f696945..9b50e7831 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.operation.DefaultValueAssiger;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.BoundedChecker;
 import 
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -58,20 +59,24 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
     @Nullable private Long currentWatermark;
     @Nullable private Long nextSnapshotId;
 
+    private DefaultValueAssiger defaultValueAssiger;
+
     public InnerStreamTableScanImpl(
             CoreOptions options,
             SnapshotReader snapshotReader,
             SnapshotManager snapshotManager,
-            boolean supportStreamingReadOverwrite) {
+            boolean supportStreamingReadOverwrite,
+            DefaultValueAssiger defaultValueAssiger) {
         super(options, snapshotReader);
         this.options = options;
         this.snapshotManager = snapshotManager;
         this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
+        this.defaultValueAssiger = defaultValueAssiger;
     }
 
     @Override
     public InnerStreamTableScanImpl withFilter(Predicate predicate) {
-        snapshotReader.withFilter(predicate);
+        
snapshotReader.withFilter(defaultValueAssiger.handlePredicate(predicate));
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index a9d72a123..caa8fbaf3 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.DefaultValueAssiger;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -33,16 +34,22 @@ public class InnerTableScanImpl extends 
AbstractInnerTableScan {
 
     private boolean hasNext;
 
+    private DefaultValueAssiger defaultValueAssiger;
+
     public InnerTableScanImpl(
-            CoreOptions options, SnapshotReader snapshotReader, 
SnapshotManager snapshotManager) {
+            CoreOptions options,
+            SnapshotReader snapshotReader,
+            SnapshotManager snapshotManager,
+            DefaultValueAssiger defaultValueAssiger) {
         super(options, snapshotReader);
         this.snapshotManager = snapshotManager;
         this.hasNext = true;
+        this.defaultValueAssiger = defaultValueAssiger;
     }
 
     @Override
     public InnerTableScan withFilter(Predicate predicate) {
-        snapshotReader.withFilter(predicate);
+        
snapshotReader.withFilter(defaultValueAssiger.handlePredicate(predicate));
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 768eb3f0e..136ecd1ab 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.DefaultValueAssiger;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.ScanKind;
 import org.apache.paimon.predicate.Predicate;
@@ -70,13 +71,16 @@ public class SnapshotReaderImpl implements SnapshotReader {
     private ScanKind scanKind = ScanKind.ALL;
     private RecordComparator lazyPartitionComparator;
 
+    private DefaultValueAssiger defaultValueAssiger;
+
     public SnapshotReaderImpl(
             FileStoreScan scan,
             TableSchema tableSchema,
             CoreOptions options,
             SnapshotManager snapshotManager,
             SplitGenerator splitGenerator,
-            BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer) {
+            BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
+            DefaultValueAssiger defaultValueAssiger) {
         this.scan = scan;
         this.tableSchema = tableSchema;
         this.options = options;
@@ -85,6 +89,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 new ConsumerManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
         this.splitGenerator = splitGenerator;
         this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
+        this.defaultValueAssiger = defaultValueAssiger;
     }
 
     @Override
@@ -124,7 +129,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
 
         List<Predicate> partitionFilters = new ArrayList<>();
         List<Predicate> nonPartitionFilters = new ArrayList<>();
-        for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
+        for (Predicate p :
+                
PredicateBuilder.splitAnd(defaultValueAssiger.handlePredicate(predicate))) {
             Optional<Predicate> mapped = transformFieldMapping(p, 
fieldIdxToPartitionIdx);
             if (mapped.isPresent()) {
                 partitionFilters.add(mapped.get());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
new file mode 100644
index 000000000..608964ad0
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Projection;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class DefaultValueAssigerTest {
+    @TempDir java.nio.file.Path tempDir;
+
+    private TableSchema tableSchema;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        Path tablePath = new Path(tempDir.toUri());
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
+        Map<String, String> options = new HashMap<>();
+        options.put(
+                String.format(
+                        "%s.%s.%s",
+                        CoreOptions.FIELDS_PREFIX, "col4", 
CoreOptions.DEFAULT_VALUE_SUFFIX),
+                "0");
+        options.put(
+                String.format(
+                        "%s.%s.%s",
+                        CoreOptions.FIELDS_PREFIX, "col5", 
CoreOptions.DEFAULT_VALUE_SUFFIX),
+                "1");
+        Schema schema =
+                new Schema(
+                        Lists.newArrayList(
+                                new DataField(0, "col0", DataTypes.STRING()),
+                                new DataField(1, "col1", DataTypes.STRING()),
+                                new DataField(2, "col2", DataTypes.STRING()),
+                                new DataField(3, "col3", DataTypes.STRING()),
+                                new DataField(4, "col4", DataTypes.STRING()),
+                                new DataField(5, "col5", DataTypes.STRING())),
+                        Lists.newArrayList("col0"),
+                        Collections.emptyList(),
+                        options,
+                        "");
+        tableSchema = schemaManager.createTable(schema);
+    }
+
+    @Test
+    public void testGeneralRow() {
+        DefaultValueAssiger defaultValueAssiger = new 
DefaultValueAssiger(tableSchema);
+        int[] projection = tableSchema.projection(Lists.newArrayList("col5", 
"col4", "col0"));
+        Projection top = Projection.of(projection);
+        int[][] nest = top.toNestedIndexes();
+        defaultValueAssiger = defaultValueAssiger.handleProject(nest);
+        GenericRow row = defaultValueAssiger.createDefaultValueMapping();
+        assertEquals(
+                "1|0|null",
+                String.format("%s|%s|%s", row.getString(0), row.getString(1), 
row.getString(2)));
+    }
+
+    @Test
+    public void testHanldePredicate() {
+        DefaultValueAssiger defaultValueAssiger = new 
DefaultValueAssiger(tableSchema);
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(tableSchema.logicalRowType());
+
+        {
+            Predicate predicate =
+                    PredicateBuilder.and(
+                            
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
+                            
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
+            Predicate actual = defaultValueAssiger.handlePredicate(predicate);
+            assertEquals(actual, 
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
+        }
+
+        {
+            Predicate predicate =
+                    PredicateBuilder.and(
+                            
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
+                            
predicateBuilder.equal(predicateBuilder.indexOf("col4"), "1"));
+            Predicate actual = defaultValueAssiger.handlePredicate(predicate);
+            Assertions.assertThat(actual).isNull();
+        }
+
+        {
+            Predicate actual = defaultValueAssiger.handlePredicate(null);
+            Assertions.assertThat(actual).isNull();
+        }
+
+        {
+            Predicate actual =
+                    defaultValueAssiger.handlePredicate(
+                            
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
+            assertEquals(actual, 
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index 703f82167..7191d26fa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -46,6 +46,7 @@ import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
@@ -726,6 +727,56 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                 .isNotEqualTo(splits0.get(0).dataFiles().get(0).fileName());
     }
 
+    @Test
+    public void testAuditLogWithDefaultValueFields() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(CoreOptions.CHANGELOG_PRODUCER, 
ChangelogProducer.INPUT);
+                            conf.set(
+                                    String.format(
+                                            "%s.%s.%s",
+                                            CoreOptions.FIELDS_PREFIX,
+                                            "b",
+                                            CoreOptions.DEFAULT_VALUE_SUFFIX),
+                                    "0");
+                        });
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 200L));
+        write.write(rowDataWithKind(RowKind.INSERT, 2, 21, null));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.close();
+        commit.close();
+
+        AuditLogTable auditLogTable = new AuditLogTable(table);
+        Function<InternalRow, String> rowDataToString =
+                row ->
+                        internalRowToString(
+                                row,
+                                DataTypes.ROW(
+                                        DataTypes.STRING(),
+                                        DataTypes.INT(),
+                                        DataTypes.INT(),
+                                        DataTypes.BIGINT()));
+
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(auditLogTable.rowType());
+
+        SnapshotReader snapshotReader =
+                auditLogTable
+                        .newSnapshotReader()
+                        .withFilter(
+                                PredicateBuilder.and(
+                                        
predicateBuilder.equal(predicateBuilder.indexOf("b"), 300),
+                                        
predicateBuilder.equal(predicateBuilder.indexOf("pt"), 2)));
+        InnerTableRead read = auditLogTable.newRead();
+        List<String> result =
+                getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowDataToString);
+        assertThat(result).containsExactlyInAnyOrder("+I[+I, 2, 20, 200]", 
"+I[+I, 2, 21, 0]");
+    }
+
     @Test
     public void testAuditLog() throws Exception {
         FileStoreTable table =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 163434cc5..0eba5f2f7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.DataFormatTestUtil;
@@ -39,6 +40,8 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -49,6 +52,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.function.Consumer;
 
@@ -75,6 +79,126 @@ public class SchemaEvolutionTest {
         commitUser = UUID.randomUUID().toString();
     }
 
+    @Test
+    public void testDefaultValue() throws Exception {
+        {
+            Map<String, String> option = new HashMap<>();
+            option.put(
+                    String.format(
+                            "%s.%s.%s",
+                            CoreOptions.FIELDS_PREFIX, "a", 
CoreOptions.DEFAULT_VALUE_SUFFIX),
+                    "1");
+            Schema schema =
+                    new Schema(
+                            RowType.of(
+                                            new DataType[] {
+                                                DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()),
+                                                DataTypes.BIGINT()
+                                            },
+                                            new String[] {"a", "b"})
+                                    .getFields(),
+                            Collections.emptyList(),
+                            Collections.emptyList(),
+                            option,
+                            "");
+
+            assertThatThrownBy(() -> schemaManager.createTable(schema))
+                    .isInstanceOf(IllegalArgumentException.class)
+                    .hasMessageContaining(
+                            "The column %s with datatype %s is currently not 
supported for default value.",
+                            "a", DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()).asSQLString());
+        }
+
+        {
+            Map<String, String> option = new HashMap<>();
+            option.put(
+                    String.format(
+                            "%s.%s.%s",
+                            CoreOptions.FIELDS_PREFIX, "a", 
CoreOptions.DEFAULT_VALUE_SUFFIX),
+                    "abcxxxx");
+            Schema schema =
+                    new Schema(
+                            RowType.of(
+                                            new DataType[] 
{DataTypes.BIGINT(), DataTypes.BIGINT()},
+                                            new String[] {"a", "b"})
+                                    .getFields(),
+                            Collections.emptyList(),
+                            Collections.emptyList(),
+                            option,
+                            "");
+            assertThatThrownBy(() -> schemaManager.createTable(schema))
+                    .isInstanceOf(IllegalArgumentException.class)
+                    .hasMessageContaining(
+                            "The default value %s of the column a can not be 
cast to datatype: %s",
+                            "abcxxxx", DataTypes.BIGINT().asSQLString());
+        }
+
+        {
+            Schema schema =
+                    new Schema(
+                            RowType.of(
+                                            new DataType[] {
+                                                DataTypes.BIGINT(),
+                                                DataTypes.BIGINT(),
+                                                DataTypes.BIGINT()
+                                            },
+                                            new String[] {"a", "b", "c"})
+                                    .getFields(),
+                            Lists.newArrayList("c"),
+                            Lists.newArrayList("a", "c"),
+                            new HashMap<>(),
+                            "");
+
+            schemaManager.createTable(schema);
+
+            assertThatThrownBy(
+                            () ->
+                                    schemaManager.commitChanges(
+                                            Collections.singletonList(
+                                                    SchemaChange.setOption(
+                                                            String.format(
+                                                                    "%s.%s.%s",
+                                                                    
CoreOptions.FIELDS_PREFIX,
+                                                                    "b",
+                                                                    CoreOptions
+                                                                            
.DEFAULT_VALUE_SUFFIX),
+                                                            "abcxxxx"))))
+                    .hasCauseInstanceOf(IllegalArgumentException.class)
+                    .hasMessageContaining(
+                            "The default value %s of the column b can not be 
cast to datatype: %s",
+                            "abcxxxx", DataTypes.BIGINT().asSQLString());
+            assertThatThrownBy(
+                            () ->
+                                    schemaManager.commitChanges(
+                                            Collections.singletonList(
+                                                    SchemaChange.setOption(
+                                                            String.format(
+                                                                    "%s.%s.%s",
+                                                                    
CoreOptions.FIELDS_PREFIX,
+                                                                    "a",
+                                                                    CoreOptions
+                                                                            
.DEFAULT_VALUE_SUFFIX),
+                                                            "abc"))))
+                    .hasCauseInstanceOf(IllegalArgumentException.class)
+                    .hasMessageContaining("Primary key a should not be assign 
default column.");
+
+            assertThatThrownBy(
+                            () ->
+                                    schemaManager.commitChanges(
+                                            Collections.singletonList(
+                                                    SchemaChange.setOption(
+                                                            String.format(
+                                                                    "%s.%s.%s",
+                                                                    
CoreOptions.FIELDS_PREFIX,
+                                                                    "c",
+                                                                    CoreOptions
+                                                                            
.DEFAULT_VALUE_SUFFIX),
+                                                            "abc"))))
+                    .hasCauseInstanceOf(IllegalArgumentException.class)
+                    .hasMessageContaining("Partition key c should not be 
assign default column.");
+        }
+    }
+
     @Test
     public void testAddField() throws Exception {
         Schema schema =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
new file mode 100644
index 000000000..843f74a1a
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** test default value on streaming scan. */
+public class DefaultValueScannerTest extends ScannerTestBase {
+    @Test
+    public void testDefaultValue() throws Exception {
+        TableRead read = table.newRead();
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+        InnerStreamTableScan scan = table.newStreamScan();
+
+        write.write(rowData(1, 10, 101L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(1, 10, null));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        {
+            TableScan.Plan plan = scan.plan();
+            assertThat(getResult(read, plan.splits()))
+                    .hasSameElementsAs(Arrays.asList("+I 1|10|100"));
+        }
+
+        write.write(rowData(2, 11, 200L));
+        write.write(rowData(2, 12, null));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        {
+            // Predicate pushdown for default fields will not work
+            PredicateBuilder predicateBuilder =
+                    new PredicateBuilder(table.schema().logicalRowType());
+
+            Predicate ptEqual = 
predicateBuilder.equal(predicateBuilder.indexOf("pt"), 2);
+            Predicate bEqual = 
predicateBuilder.equal(predicateBuilder.indexOf("b"), 200);
+            Predicate predicate = PredicateBuilder.and(ptEqual, bEqual);
+
+            TableScan.Plan plan = scan.withFilter(predicate).plan();
+            read = table.newRead().withFilter(predicate);
+            List<String> result = getResult(read, plan.splits());
+            assertThat(result).hasSameElementsAs(Arrays.asList("+I 2|11|200", 
"+I 2|12|100"));
+        }
+    }
+
+    protected FileStoreTable createFileStoreTable() throws Exception {
+        Options options = new Options();
+        options.set(
+                String.format(
+                        "%s.%s.%s",
+                        CoreOptions.FIELDS_PREFIX, "b", 
CoreOptions.DEFAULT_VALUE_SUFFIX),
+                "100");
+        return createFileStoreTable(options);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index af9736e9d..0c7636efb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -32,6 +32,7 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.Configuration;
@@ -72,6 +73,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static org.apache.paimon.CoreOptions.BUCKET;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
 import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
 import static org.apache.paimon.CoreOptions.WRITE_MODE;
@@ -1474,7 +1476,7 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
     @EnumSource(CoreOptions.MergeEngine.class)
     public void testUpdateWithPrimaryKey(CoreOptions.MergeEngine mergeEngine) 
throws Exception {
         Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
-        supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
+        supportUpdateEngines.add(DEDUPLICATE);
         supportUpdateEngines.add(CoreOptions.MergeEngine.PARTIAL_UPDATE);
         // Step1: define table schema
         Map<String, String> options = new HashMap<>();
@@ -1531,6 +1533,74 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
         }
     }
 
+    @Test
+    public void testDefaultValueWithoutPrimaryKey() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(WRITE_MODE.key(), WriteMode.AUTO.name());
+        options.put(
+                CoreOptions.FIELDS_PREFIX + ".rate." + 
CoreOptions.DEFAULT_VALUE_SUFFIX, "1000");
+
+        String table =
+                createTable(
+                        Arrays.asList(
+                                "id BIGINT NOT NULL",
+                                "currency STRING",
+                                "rate BIGINT",
+                                "dt String"),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        options);
+        insertInto(
+                table,
+                "(1, 'US Dollar', 114, '2022-01-01')",
+                "(2, 'Yen', cast(null as int), '2022-01-01')",
+                "(3, 'Euro', cast(null as int), '2022-01-01')",
+                "(3, 'Euro', 119, '2022-01-02')");
+
+        List<Row> expectedRecords =
+                Arrays.asList(
+                        // part = 2022-01-01
+                        changelogRow("+I", 2L, "Yen", 1000L, "2022-01-01"),
+                        changelogRow("+I", 3L, "Euro", 1000L, "2022-01-01"));
+
+        String querySql = String.format("SELECT * FROM %s where rate = 1000", 
table);
+        testBatchRead(querySql, expectedRecords);
+    }
+
+    @ParameterizedTest
+    @EnumSource(CoreOptions.MergeEngine.class)
+    public void testDefaultValueWithPrimaryKey(CoreOptions.MergeEngine 
mergeEngine)
+            throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(WRITE_MODE.key(), WriteMode.AUTO.name());
+        options.put(
+                CoreOptions.FIELDS_PREFIX + ".rate." + 
CoreOptions.DEFAULT_VALUE_SUFFIX, "1000");
+        options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+        String table =
+                createTable(
+                        Arrays.asList(
+                                "id BIGINT NOT NULL",
+                                "currency STRING",
+                                "rate BIGINT",
+                                "dt String"),
+                        Lists.newArrayList("id", "dt"),
+                        Lists.newArrayList("dt"),
+                        options);
+        insertInto(
+                table,
+                "(1, 'US Dollar', 114, '2022-01-01')",
+                "(2, 'Yen', cast(null as int), '2022-01-01')",
+                "(2, 'Yen', cast(null as int), '2022-01-01')",
+                "(3, 'Euro', cast(null as int) , '2022-01-02')");
+
+        List<Row> expectedRecords =
+                Arrays.asList(changelogRow("+I", 3L, "Euro", 1000L, 
"2022-01-02"));
+
+        String querySql =
+                String.format("SELECT * FROM %s where rate = 1000 and currency 
='Euro'", table);
+        testBatchRead(querySql, expectedRecords);
+    }
+
     @ParameterizedTest
     @EnumSource(WriteMode.class)
     public void testUpdateWithoutPrimaryKey(WriteMode writeMode) throws 
Exception {
@@ -1580,7 +1650,7 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
     @EnumSource(CoreOptions.MergeEngine.class)
     public void testDeleteWithPrimaryKey(CoreOptions.MergeEngine mergeEngine) 
throws Exception {
         Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
-        supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
+        supportUpdateEngines.add(DEDUPLICATE);
 
         // Step1: define table schema
         Map<String, String> options = new HashMap<>();

Reply via email to