JingsongLi commented on code in PR #1425:
URL: https://github.com/apache/incubator-paimon/pull/1425#discussion_r1250356757
##########
paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java:
##########
@@ -57,7 +57,10 @@ public AppendOnlyFileStoreScan(
checkNumOfBuckets,
scanManifestParallelism);
this.fieldStatsConverters =
- new FieldStatsConverters(sid -> scanTableSchema(sid).fields(),
schemaId);
+ new FieldStatsConverters(
+ sid -> scanTableSchema(sid).fields(),
+ schemaId,
+ schemaManager.getDefaultValueFieldindex(schemaId));
Review Comment:
getDefaultValueFieldindex -> `getDefaultValueFieldIndex`
##########
paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.casting.DefaultValueRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.Projection;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * the field Default value assigner. note that the invoke of assigning should
be after merge and
+ * schema evolution
+ */
+public class DefaultValueAssiger {
+ private int[][] project;
+
+ private TableSchema tableSchema;
+
+ private RowType valueType;
+
+ public DefaultValueAssiger(int[][] project, TableSchema tableSchema,
RowType valueType) {
+ this.project = project;
+ this.tableSchema = tableSchema;
+ this.valueType = valueType;
+ }
+
+ /**
+ * assign default value for colomn which value is null.
+ *
+ * @return
Review Comment:
remove empty `@return`
##########
paimon-core/src/main/java/org/apache/paimon/CoreOptions.java:
##########
@@ -685,6 +687,15 @@ public class CoreOptions implements Serializable {
"Parallelism of assigner operator for dynamic
bucket mode, it is"
+ " related to the number of initialized
bucket, too small will lead to"
+ " insufficient processing speed of
assigner.");
+ public static final ConfigOption<String> FIELDS_DEFAULTVALUE =
Review Comment:
can we just remove this, this is useless, we already added documentation
##########
paimon-core/src/main/java/org/apache/paimon/operation/DeletePredicateWithFieldNameVisitor.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateReplaceVisitor;
+import org.apache.paimon.predicate.PredicateVisitor;
+
+import java.util.Optional;
+import java.util.Set;
+
+/** A {@link PredicateVisitor} to remove default value. */
+public class DeletePredicateWithFieldNameVisitor implements
PredicateReplaceVisitor {
Review Comment:
this class can be inline in `DefaultValueAssigner`, it is so simple
##########
docs/content/how-to/creating-tables.md:
##########
@@ -235,6 +235,108 @@ The following three types of fields may be defined as
partition fields in the wa
if you declare the primary key containing partition field, you can achieve
the unique effect.
- CDC op_ts: It cannot be defined as a partition field, unable to know
previous record timestamp.
+
+### Field Default Value
+
+Paimon table currently supports setting default values for fields in table
properties,
+note that partition fields and primary key fields can not be specified.
+{{< tabs "default-value-example" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ hh STRING,
+ PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
+) PARTITIONED BY (dt, hh)
+with(
+ 'fields.item_id.deafult-value'='0'
+);
+```
+
+{{< /tab >}}
+
+{{< tab "Spark3" >}}
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ hh STRING
+) PARTITIONED BY (dt, hh) TBLPROPERTIES (
+ 'primary-key' = 'dt,hh,user_id',
+ 'fields.item_id.deafult-value'='0'
+);
+```
+
+{{< /tab >}}
+
+{{< tab "Hive" >}}
+
+```sql
+SET hive.metastore.warehouse.dir=warehouse_path;
+
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ hh STRING
+)
+STORED BY 'org.apache.paimon.hive.PaimonStorageHandler'
+TBLPROPERTIES (
+ 'primary-key' = 'dt,hh,user_id',
+ 'partition'='dt,hh',
+ 'fields.item_id.deafult-value'='0'
+);
+```
+
+{{< /tab >}}
+
+{{< tab "Trino" >}}
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior VARCHAR,
+ dt VARCHAR,
+ hh VARCHAR
+) WITH (
+ primary_key = ARRAY['dt', 'hh', 'user_id'],
+ partitioned_by = ARRAY['dt', 'hh'],
+ fields.item_id.deafult-value='0'
Review Comment:
trino and presto does not support `-`, remove them
##########
paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssiger.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.casting.CastExecutor;
+import org.apache.paimon.casting.CastExecutors;
+import org.apache.paimon.casting.DefaultValueRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.Projection;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * the field Default value assigner. note that the invoke of assigning should
be after merge and
+ * schema evolution
+ */
+public class DefaultValueAssiger {
+ private int[][] project;
+
+ private TableSchema tableSchema;
+
+ private RowType valueType;
+
+ public DefaultValueAssiger(int[][] project, TableSchema tableSchema,
RowType valueType) {
+ this.project = project;
+ this.tableSchema = tableSchema;
+ this.valueType = valueType;
+ }
+
+ /**
+ * assign default value for colomn which value is null.
+ *
+ * @return
+ */
+ public RecordReader<InternalRow>
assignFieldsDefaultValue(RecordReader<InternalRow> reader) {
+ RecordReader<InternalRow> result = reader;
+
+ CoreOptions coreOptions = new CoreOptions(tableSchema.options());
+ Options defaultValues = coreOptions.getFieldDefaultValues();
+ List<DataField> fields = Collections.emptyList();
+ if (!defaultValues.keySet().isEmpty()) {
+ if (project != null) {
+ fields = Projection.of(project).project(valueType).getFields();
+ } else {
+ fields = valueType.getFields();
+ }
+ }
+
+ if (!fields.isEmpty()) {
+ GenericRow defaultValueMapping = new GenericRow(fields.size());
+ for (int i = 0; i < fields.size(); i++) {
+ DataField dataField = fields.get(i);
+ String defaultValueStr = defaultValues.get(dataField.name());
+ if (defaultValueStr == null) {
+ continue;
+ }
+
+ CastExecutor<Object, Object> resolve =
+ (CastExecutor<Object, Object>)
+ CastExecutors.resolve(VarCharType.STRING_TYPE,
dataField.type());
+ if (resolve != null) {
+ Object defaultValue =
resolve.cast(BinaryString.fromString(defaultValueStr));
+ defaultValueMapping.setField(i, defaultValue);
+ }
+ }
+
+ if (defaultValueMapping.getFieldCount() > 0) {
+ DefaultValueRow defaultValueRow =
DefaultValueRow.from(defaultValueMapping);
+ result = reader.transform(defaultValueRow::replaceRow);
+ }
+ }
+
+ return result;
+ }
+
+ public static ArrayList<Predicate> filterPredicate(
Review Comment:
Why we need to have this method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]