This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new eb5af93b3 [core] add default column value (#1425)
eb5af93b3 is described below
commit eb5af93b396c5662cddf3942e5ae476bb3924679
Author: wgcn <[email protected]>
AuthorDate: Tue Jul 11 12:08:56 2023 +0800
[core] add default column value (#1425)
---
docs/content/concepts/primary-key-table.md | 41 +++++
docs/content/how-to/creating-tables.md | 69 +++++++-
.../main/java/org/apache/paimon/CoreOptions.java | 15 ++
.../org/apache/paimon/casting/DefaultValueRow.java | 182 +++++++++++++++++++++
.../paimon/operation/DefaultValueAssiger.java | 157 ++++++++++++++++++
.../org/apache/paimon/schema/SchemaValidation.java | 64 ++++++++
.../paimon/table/AbstractFileStoreTable.java | 53 +++++-
.../paimon/table/AppendOnlyFileStoreTable.java | 2 +-
.../table/ChangelogValueCountFileStoreTable.java | 2 +-
.../table/ChangelogWithKeyFileStoreTable.java | 2 +-
.../table/source/InnerStreamTableScanImpl.java | 9 +-
.../paimon/table/source/InnerTableScanImpl.java | 11 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 10 +-
.../paimon/operation/DefaultValueAssigerTest.java | 131 +++++++++++++++
.../table/ChangelogWithKeyFileStoreTableTest.java | 51 ++++++
.../apache/paimon/table/SchemaEvolutionTest.java | 124 ++++++++++++++
.../source/snapshot/DefaultValueScannerTest.java | 89 ++++++++++
.../apache/paimon/flink/ReadWriteTableITCase.java | 74 ++++++++-
18 files changed, 1071 insertions(+), 15 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index 60d68503f..bf8e21f95 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -122,6 +122,47 @@ INSERT INTO T VALUES (1, 3, 3, 1, 3, 3, 3);
SELECT * FROM T; -- output 1, 2, 2, 2, 3, 3, 3
```
+#### Default Value
+If the order of the data cannot be guaranteed and field is written only by
overwriting null values,
+fields that have not been overwritten will be displayed as null when reading
table.
+
+```sql
+CREATE TABLE T (
+ k INT,
+ a INT,
+ b INT,
+ c INT,
+ PRIMARY KEY (k) NOT ENFORCED
+) WITH (
+ 'merge-engine'='partial-update'
+ );
+INSERT INTO T VALUES (1, 1,null,null);
+INSERT INTO T VALUES (1, null,null,1);
+
+SELECT * FROM T; -- output 1, 1, null, 1
+```
+If it is expected that fields which have not been overwritten have a default
value instead of null when reading table,
+'fields.name.default-value' is required.
+```sql
+CREATE TABLE T (
+ k INT,
+ a INT,
+ b INT,
+ c INT,
+ PRIMARY KEY (k) NOT ENFORCED
+) WITH (
+ 'merge-engine'='partial-update',
+ 'fields.b.default-value'='0'
+);
+
+INSERT INTO T VALUES (1, 1,null,null);
+INSERT INTO T VALUES (1, null,null,1);
+
+SELECT * FROM T; -- output 1, 1, 0, 1
+```
+
+
+
### Aggregation
{{< hint info >}}
diff --git a/docs/content/how-to/creating-tables.md
b/docs/content/how-to/creating-tables.md
index 04795702c..87b99d5bd 100644
--- a/docs/content/how-to/creating-tables.md
+++ b/docs/content/how-to/creating-tables.md
@@ -221,7 +221,7 @@ CREATE TABLE MyTable (
By configuring [partition.expiration-time]({{< ref
"maintenance/manage-partition" >}}), expired partitions can be automatically
deleted.
{{< /hint >}}
-### Pick Partition Fields
+#### Pick Partition Fields
{{< hint info >}}
Partition fields must be a subset of primary keys if primary keys are defined.
@@ -248,6 +248,73 @@ Paimon will automatically collect the statistics of the
data file for speeding u
The statistics collector mode can be configured by `'metadata.stats-mode'`, by
default is `'truncate(16)'`.
You can configure the field level by setting
`'fields.{field_name}.stats-mode'`.
+
+### Field Default Value
+
+Paimon table currently supports setting default values for fields in table
properties,
+note that partition fields and primary key fields can not be specified.
+{{< tabs "default-value-example" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ hh STRING,
+ PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
+) PARTITIONED BY (dt, hh)
+with(
+ 'fields.item_id.deafult-value'='0'
+);
+```
+
+{{< /tab >}}
+
+{{< tab "Spark3" >}}
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ hh STRING
+) PARTITIONED BY (dt, hh) TBLPROPERTIES (
+ 'primary-key' = 'dt,hh,user_id',
+ 'fields.item_id.deafult-value'='0'
+);
+```
+
+{{< /tab >}}
+
+{{< tab "Hive" >}}
+
+```sql
+SET hive.metastore.warehouse.dir=warehouse_path;
+
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ hh STRING
+)
+STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
+TBLPROPERTIES (
+ 'primary-key' = 'dt,hh,user_id',
+ 'partition'='dt,hh',
+ 'fields.item_id.deafult-value'='0'
+);
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+
## Create Table As
Table can be created and populated by the results of a query, for example, we
have a sql like this: `CREATE TABLE table_b AS SELECT id, name FORM table_a`,
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 5b983dbee..06187b8b7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -53,6 +53,7 @@ import static
org.apache.paimon.options.description.TextElement.text;
/** Core options for paimon. */
public class CoreOptions implements Serializable {
+ public static final String DEFAULT_VALUE_SUFFIX = "default-value";
public static final String FIELDS_PREFIX = "fields";
@@ -1086,6 +1087,20 @@ public class CoreOptions implements Serializable {
return options.get(CONSUMER_EXPIRATION_TIME);
}
+ public Options getFieldDefaultValues() {
+ Map<String, String> defultValues = new HashMap<>();
+ for (Map.Entry<String, String> option : options.toMap().entrySet()) {
+ String key = option.getKey();
+ String fieldPrefix = FIELDS_PREFIX + ".";
+ String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
+ if (key != null && key.startsWith(fieldPrefix) &&
key.endsWith(defaultValueSuffix)) {
+ String fieldName = key.replace(fieldPrefix,
"").replace(defaultValueSuffix, "");
+ defultValues.put(fieldName, option.getValue());
+ }
+ }
+ return new Options(defultValues);
+ }
+
public List<CommitCallback> commitCallbacks() {
List<CommitCallback> result = new ArrayList<>();
for (String className : options.get(COMMIT_CALLBACKS).split(",")) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
b/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
new file mode 100644
index 000000000..10f2e5cd1
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.casting;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.types.RowKind;
+
+/**
+ * An implementation of {@link InternalRow} which provides a default volue for
the underlying {@link
+ * InternalRow}.
+ */
+public class DefaultValueRow implements InternalRow {
+ private InternalRow row;
+
+ private InternalRow defaultValueRow;
+
+ private DefaultValueRow(InternalRow defaultValueRow) {
+ this.defaultValueRow = defaultValueRow;
+ }
+
+ public DefaultValueRow replaceRow(InternalRow row) {
+ this.row = row;
+ return this;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return row.getFieldCount();
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return row.getRowKind();
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ row.setRowKind(kind);
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ return row.isNullAt(pos) && defaultValueRow.isNullAt(pos);
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getBoolean(pos);
+ }
+ return defaultValueRow.getBoolean(pos);
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getByte(pos);
+ }
+ return defaultValueRow.getByte(pos);
+ }
+
+ @Override
+ public short getShort(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getShort(pos);
+ }
+ return defaultValueRow.getShort(pos);
+ }
+
+ @Override
+ public int getInt(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getInt(pos);
+ }
+ return defaultValueRow.getInt(pos);
+ }
+
+ @Override
+ public long getLong(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getLong(pos);
+ }
+ return defaultValueRow.getLong(pos);
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getFloat(pos);
+ }
+ return defaultValueRow.getFloat(pos);
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getDouble(pos);
+ }
+ return defaultValueRow.getDouble(pos);
+ }
+
+ @Override
+ public BinaryString getString(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getString(pos);
+ }
+ return defaultValueRow.getString(pos);
+ }
+
+ @Override
+ public Decimal getDecimal(int pos, int precision, int scale) {
+ if (!row.isNullAt(pos)) {
+ return row.getDecimal(pos, precision, scale);
+ }
+ return defaultValueRow.getDecimal(pos, precision, scale);
+ }
+
+ @Override
+ public Timestamp getTimestamp(int pos, int precision) {
+ if (!row.isNullAt(pos)) {
+ return row.getTimestamp(pos, precision);
+ }
+ return defaultValueRow.getTimestamp(pos, precision);
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getBinary(pos);
+ }
+ return defaultValueRow.getBinary(pos);
+ }
+
+ @Override
+ public InternalArray getArray(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getArray(pos);
+ }
+ return defaultValueRow.getArray(pos);
+ }
+
+ @Override
+ public InternalMap getMap(int pos) {
+ if (!row.isNullAt(pos)) {
+ return row.getMap(pos);
+ }
+ return defaultValueRow.getMap(pos);
+ }
+
+ @Override
+ public InternalRow getRow(int pos, int numFields) {
+ if (!row.isNullAt(pos)) {
+ return row.getRow(pos, numFields);
+ }
+ return defaultValueRow.getRow(pos, numFields);
+ }
+
+ public static DefaultValueRow from(InternalRow defaultValueRow) {
+ return new DefaultValueRow(defaultValueRow);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
new file mode 100644
index 000000000..adaa645fe
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.casting.DefaultValueRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.PredicateReplaceVisitor;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.Projection;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * the field Default value assigner. note that the invoke of assigning should
be after merge and
+ * schema evolution
+ */
+public class DefaultValueAssiger {
+
+ private GenericRow defaultValueMapping;
+ private TableSchema tableSchema;
+
+ private Map<String, String> defaultValues;
+
+ private int[][] project;
+
+ private boolean isCacheDefaultMapping;
+
+ public DefaultValueAssiger(TableSchema tableSchema) {
+ this.tableSchema = tableSchema;
+
+ CoreOptions coreOptions = new CoreOptions(tableSchema.options());
+ defaultValues = coreOptions.getFieldDefaultValues().toMap();
+ }
+
+ public DefaultValueAssiger handleProject(int[][] project) {
+ this.project = project;
+ return this;
+ }
+
+ /** assign default value for colomn which value is null. */
+ public RecordReader<InternalRow>
assignFieldsDefaultValue(RecordReader<InternalRow> reader) {
+ if (defaultValues.isEmpty()) {
+ return reader;
+ }
+
+ if (!isCacheDefaultMapping) {
+ isCacheDefaultMapping = true;
+ this.defaultValueMapping = createDefaultValueMapping();
+ }
+
+ RecordReader<InternalRow> result = reader;
+ if (defaultValueMapping != null) {
+ DefaultValueRow defaultValueRow =
DefaultValueRow.from(defaultValueMapping);
+ result = reader.transform(defaultValueRow::replaceRow);
+ }
+ return result;
+ }
+
+ GenericRow createDefaultValueMapping() {
+
+ RowType valueType = tableSchema.logicalRowType();
+
+ List<DataField> fields;
+ if (project != null) {
+ fields = Projection.of(project).project(valueType).getFields();
+ } else {
+ fields = valueType.getFields();
+ }
+
+ GenericRow defaultValuesMa = null;
+ if (!fields.isEmpty()) {
+ defaultValuesMa = new GenericRow(fields.size());
+ for (int i = 0; i < fields.size(); i++) {
+ DataField dataField = fields.get(i);
+ String defaultValueStr = defaultValues.get(dataField.name());
+ if (defaultValueStr == null) {
+ continue;
+ }
+
+ CastExecutor<Object, Object> resolve =
+ (CastExecutor<Object, Object>)
+ CastExecutors.resolve(VarCharType.STRING_TYPE,
dataField.type());
+
+ if (resolve == null) {
+ throw new RuntimeException(
+ "Default value do not support the type of " +
dataField.type());
+ }
+ Object defaultValue =
resolve.cast(BinaryString.fromString(defaultValueStr));
+ defaultValuesMa.setField(i, defaultValue);
+ }
+ }
+
+ return defaultValuesMa;
+ }
+
+ public Predicate handlePredicate(Predicate filters) {
+ Predicate result = filters;
+ if (!defaultValues.isEmpty()) {
+ if (filters != null) {
+ // TODO improve predicate tree with replacing always true and
always false
+ PredicateReplaceVisitor deletePredicateWithFieldNameVisitor =
+ predicate -> {
+ if
(defaultValues.containsKey(predicate.fieldName())) {
+ return Optional.empty();
+ }
+ return Optional.of(predicate);
+ };
+
+ ArrayList<Predicate> filterWithouDefaultValueField = new
ArrayList<>();
+
+ List<Predicate> predicates =
PredicateBuilder.splitAnd(filters);
+ for (Predicate predicate : predicates) {
+ predicate
+ .visit(deletePredicateWithFieldNameVisitor)
+ .ifPresent(filterWithouDefaultValueField::add);
+ }
+
+ if (!filterWithouDefaultValueField.isEmpty()) {
+ result =
PredicateBuilder.and(filterWithouDefaultValueField);
+ } else {
+ result = null;
+ }
+ }
+ }
+ return result;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 044f6b1f6..b04554a83 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -20,6 +20,9 @@ package org.apache.paimon.schema;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.WriteMode;
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
@@ -29,6 +32,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
import java.util.Arrays;
import java.util.Collections;
@@ -74,6 +78,8 @@ public class SchemaValidation {
CoreOptions options = new CoreOptions(schema.options());
+ validateDefaultValues(schema);
+
validateStartupMode(options);
if (options.writeMode() == WriteMode.APPEND_ONLY
@@ -288,4 +294,62 @@ public class SchemaValidation {
private static String concatConfigKeys(List<ConfigOption<?>>
configOptions) {
return
configOptions.stream().map(ConfigOption::key).collect(Collectors.joining(","));
}
+
+ private static void validateDefaultValues(TableSchema schema) {
+ CoreOptions coreOptions = new CoreOptions(schema.options());
+ Map<String, String> defaultValues =
coreOptions.getFieldDefaultValues().toMap();
+
+ if (!defaultValues.isEmpty()) {
+
+ List<String> partitionKeys = schema.partitionKeys();
+ for (String partitionKey : partitionKeys) {
+ if (defaultValues.containsKey(partitionKey)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Partition key %s should not be assign
default column.",
+ partitionKey));
+ }
+ }
+
+ List<String> primaryKeys = schema.primaryKeys();
+ for (String primaryKey : primaryKeys) {
+ if (defaultValues.containsKey(primaryKey)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Primary key %s should not be assign
default column.",
+ primaryKey));
+ }
+ }
+
+ List<DataField> fields = schema.fields();
+
+ for (int i = 0; i < fields.size(); i++) {
+ DataField dataField = fields.get(i);
+ String defaultValueStr = defaultValues.get(dataField.name());
+ if (defaultValueStr == null) {
+ continue;
+ }
+
+ CastExecutor<Object, Object> resolve =
+ (CastExecutor<Object, Object>)
+ CastExecutors.resolve(VarCharType.STRING_TYPE,
dataField.type());
+ if (resolve == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The column %s with datatype %s is
currently not supported for default value.",
+ dataField.name(),
dataField.type().asSQLString()));
+ }
+
+ try {
+ resolve.cast(BinaryString.fromString(defaultValueStr));
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "The default value %s of the column %s can
not be cast to datatype: %s",
+ defaultValueStr, dataField.name(),
dataField.type()),
+ e);
+ }
+ }
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 536bc1c34..619fc4151 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -21,12 +21,15 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.operation.DefaultValueAssiger;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaValidation;
import org.apache.paimon.schema.TableSchema;
@@ -37,8 +40,10 @@ import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
import org.apache.paimon.table.source.InnerStreamTableScan;
import org.apache.paimon.table.source.InnerStreamTableScanImpl;
+import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.InnerTableScan;
import org.apache.paimon.table.source.InnerTableScanImpl;
+import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.SnapshotReaderImpl;
@@ -107,12 +112,17 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
coreOptions(),
snapshotManager(),
splitGenerator(),
- nonPartitionFilterConsumer());
+ nonPartitionFilterConsumer(),
+ new DefaultValueAssiger(tableSchema));
}
@Override
public InnerTableScan newScan() {
- return new InnerTableScanImpl(coreOptions(), newSnapshotReader(),
snapshotManager());
+ return new InnerTableScanImpl(
+ coreOptions(),
+ newSnapshotReader(),
+ snapshotManager(),
+ new DefaultValueAssiger(tableSchema));
}
@Override
@@ -121,7 +131,8 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
coreOptions(),
newSnapshotReader(),
snapshotManager(),
- supportStreamingReadOverwrite());
+ supportStreamingReadOverwrite(),
+ new DefaultValueAssiger(tableSchema));
}
public abstract SplitGenerator splitGenerator();
@@ -282,6 +293,42 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
rollbackHelper().cleanLargerThan(snapshotManager.snapshot(snapshotId));
}
+ abstract InnerTableRead innerRead();
+
+ @Override
+ public InnerTableRead newRead() {
+ InnerTableRead innerTableRead = innerRead();
+ DefaultValueAssiger defaultValueAssiger = new
DefaultValueAssiger(tableSchema);
+ return new InnerTableRead() {
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+
innerTableRead.withFilter(defaultValueAssiger.handlePredicate(predicate));
+ return this;
+ }
+
+ @Override
+ public InnerTableRead withProjection(int[][] projection) {
+ defaultValueAssiger.handleProject(projection);
+ innerTableRead.withProjection(projection);
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ RecordReader<InternalRow> reader =
+ defaultValueAssiger.assignFieldsDefaultValue(
+ innerTableRead.createReader(split));
+ return reader;
+ }
+
+ @Override
+ public InnerTableRead forceKeepDelete() {
+ innerTableRead.forceKeepDelete();
+ return this;
+ }
+ };
+ }
+
@Override
public void createTag(String tagName, long fromSnapshotId) {
SnapshotManager snapshotManager = snapshotManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index beb703b84..6dfe02848 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -105,7 +105,7 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public InnerTableRead newRead() {
+ public InnerTableRead innerRead() {
AppendOnlyFileStoreRead read = store().newRead();
return new InnerTableRead() {
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index 42b5fa18a..ec8105cea 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -118,7 +118,7 @@ public class ChangelogValueCountFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public InnerTableRead newRead() {
+ public InnerTableRead innerRead() {
return new KeyValueTableRead(store().newRead()) {
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index ec692a013..95a9f0049 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -192,7 +192,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
}
@Override
- public InnerTableRead newRead() {
+ public InnerTableRead innerRead() {
return new KeyValueTableRead(store().newRead()) {
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 44f696945..9b50e7831 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.operation.DefaultValueAssiger;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import
org.apache.paimon.table.source.snapshot.CompactionChangelogFollowUpScanner;
@@ -58,20 +59,24 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
@Nullable private Long currentWatermark;
@Nullable private Long nextSnapshotId;
+ private DefaultValueAssiger defaultValueAssiger;
+
public InnerStreamTableScanImpl(
CoreOptions options,
SnapshotReader snapshotReader,
SnapshotManager snapshotManager,
- boolean supportStreamingReadOverwrite) {
+ boolean supportStreamingReadOverwrite,
+ DefaultValueAssiger defaultValueAssiger) {
super(options, snapshotReader);
this.options = options;
this.snapshotManager = snapshotManager;
this.supportStreamingReadOverwrite = supportStreamingReadOverwrite;
+ this.defaultValueAssiger = defaultValueAssiger;
}
@Override
public InnerStreamTableScanImpl withFilter(Predicate predicate) {
- snapshotReader.withFilter(predicate);
+
snapshotReader.withFilter(defaultValueAssiger.handlePredicate(predicate));
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index a9d72a123..caa8fbaf3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.DefaultValueAssiger;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
@@ -33,16 +34,22 @@ public class InnerTableScanImpl extends
AbstractInnerTableScan {
private boolean hasNext;
+ private DefaultValueAssiger defaultValueAssiger;
+
public InnerTableScanImpl(
- CoreOptions options, SnapshotReader snapshotReader,
SnapshotManager snapshotManager) {
+ CoreOptions options,
+ SnapshotReader snapshotReader,
+ SnapshotManager snapshotManager,
+ DefaultValueAssiger defaultValueAssiger) {
super(options, snapshotReader);
this.snapshotManager = snapshotManager;
this.hasNext = true;
+ this.defaultValueAssiger = defaultValueAssiger;
}
@Override
public InnerTableScan withFilter(Predicate predicate) {
- snapshotReader.withFilter(predicate);
+
snapshotReader.withFilter(defaultValueAssiger.handlePredicate(predicate));
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 768eb3f0e..136ecd1ab 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.DefaultValueAssiger;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.operation.ScanKind;
import org.apache.paimon.predicate.Predicate;
@@ -70,13 +71,16 @@ public class SnapshotReaderImpl implements SnapshotReader {
private ScanKind scanKind = ScanKind.ALL;
private RecordComparator lazyPartitionComparator;
+ private DefaultValueAssiger defaultValueAssiger;
+
public SnapshotReaderImpl(
FileStoreScan scan,
TableSchema tableSchema,
CoreOptions options,
SnapshotManager snapshotManager,
SplitGenerator splitGenerator,
- BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer) {
+ BiConsumer<FileStoreScan, Predicate> nonPartitionFilterConsumer,
+ DefaultValueAssiger defaultValueAssiger) {
this.scan = scan;
this.tableSchema = tableSchema;
this.options = options;
@@ -85,6 +89,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
new ConsumerManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
this.splitGenerator = splitGenerator;
this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
+ this.defaultValueAssiger = defaultValueAssiger;
}
@Override
@@ -124,7 +129,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
List<Predicate> partitionFilters = new ArrayList<>();
List<Predicate> nonPartitionFilters = new ArrayList<>();
- for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
+ for (Predicate p :
+
PredicateBuilder.splitAnd(defaultValueAssiger.handlePredicate(predicate))) {
Optional<Predicate> mapped = transformFieldMapping(p,
fieldIdxToPartitionIdx);
if (mapped.isPresent()) {
partitionFilters.add(mapped.get());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
new file mode 100644
index 000000000..608964ad0
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssigerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.Projection;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class DefaultValueAssigerTest {
+ @TempDir java.nio.file.Path tempDir;
+
+ private TableSchema tableSchema;
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ Path tablePath = new Path(tempDir.toUri());
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
+ Map<String, String> options = new HashMap<>();
+ options.put(
+ String.format(
+ "%s.%s.%s",
+ CoreOptions.FIELDS_PREFIX, "col4",
CoreOptions.DEFAULT_VALUE_SUFFIX),
+ "0");
+ options.put(
+ String.format(
+ "%s.%s.%s",
+ CoreOptions.FIELDS_PREFIX, "col5",
CoreOptions.DEFAULT_VALUE_SUFFIX),
+ "1");
+ Schema schema =
+ new Schema(
+ Lists.newArrayList(
+ new DataField(0, "col0", DataTypes.STRING()),
+ new DataField(1, "col1", DataTypes.STRING()),
+ new DataField(2, "col2", DataTypes.STRING()),
+ new DataField(3, "col3", DataTypes.STRING()),
+ new DataField(4, "col4", DataTypes.STRING()),
+ new DataField(5, "col5", DataTypes.STRING())),
+ Lists.newArrayList("col0"),
+ Collections.emptyList(),
+ options,
+ "");
+ tableSchema = schemaManager.createTable(schema);
+ }
+
+ @Test
+ public void testGeneralRow() {
+ DefaultValueAssiger defaultValueAssiger = new
DefaultValueAssiger(tableSchema);
+ int[] projection = tableSchema.projection(Lists.newArrayList("col5",
"col4", "col0"));
+ Projection top = Projection.of(projection);
+ int[][] nest = top.toNestedIndexes();
+ defaultValueAssiger = defaultValueAssiger.handleProject(nest);
+ GenericRow row = defaultValueAssiger.createDefaultValueMapping();
+ assertEquals(
+ "1|0|null",
+ String.format("%s|%s|%s", row.getString(0), row.getString(1),
row.getString(2)));
+ }
+
+ @Test
+ public void testHanldePredicate() {
+ DefaultValueAssiger defaultValueAssiger = new
DefaultValueAssiger(tableSchema);
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(tableSchema.logicalRowType());
+
+ {
+ Predicate predicate =
+ PredicateBuilder.and(
+
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
+
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
+ Predicate actual = defaultValueAssiger.handlePredicate(predicate);
+ assertEquals(actual,
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
+ }
+
+ {
+ Predicate predicate =
+ PredicateBuilder.and(
+
predicateBuilder.equal(predicateBuilder.indexOf("col5"), "100"),
+
predicateBuilder.equal(predicateBuilder.indexOf("col4"), "1"));
+ Predicate actual = defaultValueAssiger.handlePredicate(predicate);
+ Assertions.assertThat(actual).isNull();
+ }
+
+ {
+ Predicate actual = defaultValueAssiger.handlePredicate(null);
+ Assertions.assertThat(actual).isNull();
+ }
+
+ {
+ Predicate actual =
+ defaultValueAssiger.handlePredicate(
+
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
+ assertEquals(actual,
predicateBuilder.equal(predicateBuilder.indexOf("col1"), "1"));
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
index 703f82167..7191d26fa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
@@ -46,6 +46,7 @@ import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableRead;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
@@ -726,6 +727,56 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
.isNotEqualTo(splits0.get(0).dataFiles().get(0).fileName());
}
+ @Test
+ public void testAuditLogWithDefaultValueFields() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ conf -> {
+ conf.set(CoreOptions.CHANGELOG_PRODUCER,
ChangelogProducer.INPUT);
+ conf.set(
+ String.format(
+ "%s.%s.%s",
+ CoreOptions.FIELDS_PREFIX,
+ "b",
+ CoreOptions.DEFAULT_VALUE_SUFFIX),
+ "0");
+ });
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowDataWithKind(RowKind.INSERT, 2, 20, 200L));
+ write.write(rowDataWithKind(RowKind.INSERT, 2, 21, null));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.close();
+ commit.close();
+
+ AuditLogTable auditLogTable = new AuditLogTable(table);
+ Function<InternalRow, String> rowDataToString =
+ row ->
+ internalRowToString(
+ row,
+ DataTypes.ROW(
+ DataTypes.STRING(),
+ DataTypes.INT(),
+ DataTypes.INT(),
+ DataTypes.BIGINT()));
+
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(auditLogTable.rowType());
+
+ SnapshotReader snapshotReader =
+ auditLogTable
+ .newSnapshotReader()
+ .withFilter(
+ PredicateBuilder.and(
+
predicateBuilder.equal(predicateBuilder.indexOf("b"), 300),
+
predicateBuilder.equal(predicateBuilder.indexOf("pt"), 2)));
+ InnerTableRead read = auditLogTable.newRead();
+ List<String> result =
+ getResult(read, toSplits(snapshotReader.read().dataSplits()),
rowDataToString);
+ assertThat(result).containsExactlyInAnyOrder("+I[+I, 2, 20, 200]",
"+I[+I, 2, 21, 0]");
+ }
+
@Test
public void testAuditLog() throws Exception {
FileStoreTable table =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 163434cc5..0eba5f2f7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.DataFormatTestUtil;
@@ -39,6 +40,8 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -49,6 +52,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
@@ -75,6 +79,126 @@ public class SchemaEvolutionTest {
commitUser = UUID.randomUUID().toString();
}
+ @Test
+ public void testDefaultValue() throws Exception {
+ {
+ Map<String, String> option = new HashMap<>();
+ option.put(
+ String.format(
+ "%s.%s.%s",
+ CoreOptions.FIELDS_PREFIX, "a",
CoreOptions.DEFAULT_VALUE_SUFFIX),
+ "1");
+ Schema schema =
+ new Schema(
+ RowType.of(
+ new DataType[] {
+ DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()),
+ DataTypes.BIGINT()
+ },
+ new String[] {"a", "b"})
+ .getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ option,
+ "");
+
+ assertThatThrownBy(() -> schemaManager.createTable(schema))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "The column %s with datatype %s is currently not
supported for default value.",
+ "a", DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()).asSQLString());
+ }
+
+ {
+ Map<String, String> option = new HashMap<>();
+ option.put(
+ String.format(
+ "%s.%s.%s",
+ CoreOptions.FIELDS_PREFIX, "a",
CoreOptions.DEFAULT_VALUE_SUFFIX),
+ "abcxxxx");
+ Schema schema =
+ new Schema(
+ RowType.of(
+ new DataType[]
{DataTypes.BIGINT(), DataTypes.BIGINT()},
+ new String[] {"a", "b"})
+ .getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ option,
+ "");
+ assertThatThrownBy(() -> schemaManager.createTable(schema))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "The default value %s of the column a can not be
cast to datatype: %s",
+ "abcxxxx", DataTypes.BIGINT().asSQLString());
+ }
+
+ {
+ Schema schema =
+ new Schema(
+ RowType.of(
+ new DataType[] {
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT(),
+ DataTypes.BIGINT()
+ },
+ new String[] {"a", "b", "c"})
+ .getFields(),
+ Lists.newArrayList("c"),
+ Lists.newArrayList("a", "c"),
+ new HashMap<>(),
+ "");
+
+ schemaManager.createTable(schema);
+
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.setOption(
+ String.format(
+ "%s.%s.%s",
+
CoreOptions.FIELDS_PREFIX,
+ "b",
+ CoreOptions
+
.DEFAULT_VALUE_SUFFIX),
+ "abcxxxx"))))
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining(
+ "The default value %s of the column b can not be
cast to datatype: %s",
+ "abcxxxx", DataTypes.BIGINT().asSQLString());
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.setOption(
+ String.format(
+ "%s.%s.%s",
+
CoreOptions.FIELDS_PREFIX,
+ "a",
+ CoreOptions
+
.DEFAULT_VALUE_SUFFIX),
+ "abc"))))
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Primary key a should not be assign
default column.");
+
+ assertThatThrownBy(
+ () ->
+ schemaManager.commitChanges(
+ Collections.singletonList(
+ SchemaChange.setOption(
+ String.format(
+ "%s.%s.%s",
+
CoreOptions.FIELDS_PREFIX,
+ "c",
+ CoreOptions
+
.DEFAULT_VALUE_SUFFIX),
+ "abc"))))
+ .hasCauseInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Partition key c should not be
assign default column.");
+ }
+ }
+
@Test
public void testAddField() throws Exception {
Schema schema =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
new file mode 100644
index 000000000..843f74a1a
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.InnerStreamTableScan;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** test default value on streaming scan. */
+public class DefaultValueScannerTest extends ScannerTestBase {
+ @Test
+ public void testDefaultValue() throws Exception {
+ TableRead read = table.newRead();
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+ InnerStreamTableScan scan = table.newStreamScan();
+
+ write.write(rowData(1, 10, 101L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(1, 10, null));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ {
+ TableScan.Plan plan = scan.plan();
+ assertThat(getResult(read, plan.splits()))
+ .hasSameElementsAs(Arrays.asList("+I 1|10|100"));
+ }
+
+ write.write(rowData(2, 11, 200L));
+ write.write(rowData(2, 12, null));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ {
+ // Predicate pushdown for default fields will not work
+ PredicateBuilder predicateBuilder =
+ new PredicateBuilder(table.schema().logicalRowType());
+
+ Predicate ptEqual =
predicateBuilder.equal(predicateBuilder.indexOf("pt"), 2);
+ Predicate bEqual =
predicateBuilder.equal(predicateBuilder.indexOf("b"), 200);
+ Predicate predicate = PredicateBuilder.and(ptEqual, bEqual);
+
+ TableScan.Plan plan = scan.withFilter(predicate).plan();
+ read = table.newRead().withFilter(predicate);
+ List<String> result = getResult(read, plan.splits());
+ assertThat(result).hasSameElementsAs(Arrays.asList("+I 2|11|200",
"+I 2|12|100"));
+ }
+ }
+
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ Options options = new Options();
+ options.set(
+ String.format(
+ "%s.%s.%s",
+ CoreOptions.FIELDS_PREFIX, "b",
CoreOptions.DEFAULT_VALUE_SUFFIX),
+ "100");
+ return createFileStoreTable(options);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index af9736e9d..0c7636efb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -32,6 +32,7 @@ import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
@@ -72,6 +73,7 @@ import java.util.stream.Collectors;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.paimon.CoreOptions.BUCKET;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST;
import static org.apache.paimon.CoreOptions.SOURCE_SPLIT_TARGET_SIZE;
import static org.apache.paimon.CoreOptions.WRITE_MODE;
@@ -1474,7 +1476,7 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
@EnumSource(CoreOptions.MergeEngine.class)
public void testUpdateWithPrimaryKey(CoreOptions.MergeEngine mergeEngine)
throws Exception {
Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
- supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
+ supportUpdateEngines.add(DEDUPLICATE);
supportUpdateEngines.add(CoreOptions.MergeEngine.PARTIAL_UPDATE);
// Step1: define table schema
Map<String, String> options = new HashMap<>();
@@ -1531,6 +1533,74 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
}
}
+ @Test
+ public void testDefaultValueWithoutPrimaryKey() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(WRITE_MODE.key(), WriteMode.AUTO.name());
+ options.put(
+ CoreOptions.FIELDS_PREFIX + ".rate." +
CoreOptions.DEFAULT_VALUE_SUFFIX, "1000");
+
+ String table =
+ createTable(
+ Arrays.asList(
+ "id BIGINT NOT NULL",
+ "currency STRING",
+ "rate BIGINT",
+ "dt String"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options);
+ insertInto(
+ table,
+ "(1, 'US Dollar', 114, '2022-01-01')",
+ "(2, 'Yen', cast(null as int), '2022-01-01')",
+ "(3, 'Euro', cast(null as int), '2022-01-01')",
+ "(3, 'Euro', 119, '2022-01-02')");
+
+ List<Row> expectedRecords =
+ Arrays.asList(
+ // part = 2022-01-01
+ changelogRow("+I", 2L, "Yen", 1000L, "2022-01-01"),
+ changelogRow("+I", 3L, "Euro", 1000L, "2022-01-01"));
+
+ String querySql = String.format("SELECT * FROM %s where rate = 1000",
table);
+ testBatchRead(querySql, expectedRecords);
+ }
+
+ @ParameterizedTest
+ @EnumSource(CoreOptions.MergeEngine.class)
+ public void testDefaultValueWithPrimaryKey(CoreOptions.MergeEngine
mergeEngine)
+ throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(WRITE_MODE.key(), WriteMode.AUTO.name());
+ options.put(
+ CoreOptions.FIELDS_PREFIX + ".rate." +
CoreOptions.DEFAULT_VALUE_SUFFIX, "1000");
+ options.put(MERGE_ENGINE.key(), mergeEngine.toString());
+ String table =
+ createTable(
+ Arrays.asList(
+ "id BIGINT NOT NULL",
+ "currency STRING",
+ "rate BIGINT",
+ "dt String"),
+ Lists.newArrayList("id", "dt"),
+ Lists.newArrayList("dt"),
+ options);
+ insertInto(
+ table,
+ "(1, 'US Dollar', 114, '2022-01-01')",
+ "(2, 'Yen', cast(null as int), '2022-01-01')",
+ "(2, 'Yen', cast(null as int), '2022-01-01')",
+ "(3, 'Euro', cast(null as int) , '2022-01-02')");
+
+ List<Row> expectedRecords =
+ Arrays.asList(changelogRow("+I", 3L, "Euro", 1000L,
"2022-01-02"));
+
+ String querySql =
+ String.format("SELECT * FROM %s where rate = 1000 and currency
='Euro'", table);
+ testBatchRead(querySql, expectedRecords);
+ }
+
@ParameterizedTest
@EnumSource(WriteMode.class)
public void testUpdateWithoutPrimaryKey(WriteMode writeMode) throws
Exception {
@@ -1580,7 +1650,7 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
@EnumSource(CoreOptions.MergeEngine.class)
public void testDeleteWithPrimaryKey(CoreOptions.MergeEngine mergeEngine)
throws Exception {
Set<CoreOptions.MergeEngine> supportUpdateEngines = new HashSet<>();
- supportUpdateEngines.add(CoreOptions.MergeEngine.DEDUPLICATE);
+ supportUpdateEngines.add(DEDUPLICATE);
// Step1: define table schema
Map<String, String> options = new HashMap<>();