This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.6 in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit abf3fe3ecb0d436dd47053acca2e95bbc5e253a7 Author: Jingsong Lee <[email protected]> AuthorDate: Mon Dec 11 10:58:15 2023 +0800 [core] Introduce 'rowkind.field' to define rowkind from data (#2476) --- docs/content/concepts/primary-key-table.md | 7 +++ .../shortcodes/generated/core_configuration.html | 6 ++ .../main/java/org/apache/paimon/CoreOptions.java | 13 +++++ .../main/java/org/apache/paimon/types/RowKind.java | 21 +++++++ .../org/apache/paimon/schema/SchemaValidation.java | 9 +++ .../paimon/table/PrimaryKeyFileStoreTable.java | 21 ++++--- .../apache/paimon/table/sink/RowKindGenerator.java | 65 ++++++++++++++++++++++ .../paimon/table/sink/SequenceGenerator.java | 1 + .../paimon/flink/sink/LocalMergeOperator.java | 6 +- .../apache/paimon/flink/BatchFileStoreITCase.java | 11 ++++ 10 files changed, 151 insertions(+), 9 deletions(-) diff --git a/docs/content/concepts/primary-key-table.md b/docs/content/concepts/primary-key-table.md index 8c3b69516..eb3ade035 100644 --- a/docs/content/concepts/primary-key-table.md +++ b/docs/content/concepts/primary-key-table.md @@ -458,3 +458,10 @@ of sequence number will be made up to microsecond by system. 3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add the micro to the second, and then pad the row kind flag. + +## Row Kind Field + +By default, the primary key table determines the row kind according to the input row. You can also define the +`'rowkind.field'` to use a field to extract row kind. + +The valid row kind string should be `'+I'`, `'-U'`, `'+U'` or `'-D'`. diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index a6024d99a..82a8fc798 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -419,6 +419,12 @@ This config option does not affect the default filesystem metastore.</td> <td>Integer</td> <td>Read batch size for orc and parquet.</td> </tr> + <tr> + <td><h5>rowkind.field</h5></td> + <td style="word-wrap: break-word;">(none)</td> + <td>String</td> + <td>The field that generates the row kind for primary key table, the row kind determines which data is '+I', '-U', '+U' or '-D'.</td> + </tr> <tr> <td><h5>scan.bounded.watermark</h5></td> <td style="word-wrap: break-word;">(none)</td> diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 7a24523e9..4274ea8d2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -427,6 +427,15 @@ public class CoreOptions implements Serializable { "The field that generates the sequence number for primary key table," + " the sequence number determines which data is the most recent."); + @Immutable + public static final ConfigOption<String> ROWKIND_FIELD = + key("rowkind.field") + .stringType() + .noDefaultValue() + .withDescription( + "The field that generates the row kind for primary key table," + + " the row kind determines which data is '+I', '-U', '+U' or '-D'."); + public static final ConfigOption<String> SEQUENCE_AUTO_PADDING = key("sequence.auto-padding") .stringType() @@ -1311,6 +1320,10 @@ public class CoreOptions implements Serializable { return options.getOptional(SEQUENCE_FIELD); } + public Optional<String> rowkindField() { + return options.getOptional(ROWKIND_FIELD); + } + public List<String> sequenceAutoPadding() { String padding = options.get(SEQUENCE_AUTO_PADDING); if (padding == null) { diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java b/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java index 6903ab524..17096dfbf 100644 --- a/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java +++ b/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java @@ -128,4 +128,25 @@ public enum RowKind { "Unsupported byte value '" + value + "' for row kind."); } } + + /** + * Creates a {@link RowKind} from the given short string. + * + * @see #shortString() for mapping of string and {@link RowKind}. + */ + public static RowKind fromShortString(String value) { + switch (value.toUpperCase()) { + case "+I": + return INSERT; + case "-U": + return UPDATE_BEFORE; + case "+U": + return UPDATE_AFTER; + case "-D": + return DELETE; + default: + throw new UnsupportedOperationException( + "Unsupported short string '" + value + "' for row kind."); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index c4c13b5c7..538cff2dd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -170,6 +170,15 @@ public class SchemaValidation { schema.fieldNames().contains(field), "Nonexistent sequence field: '%s'", field)); + + Optional<String> rowkindField = options.rowkindField(); + rowkindField.ifPresent( + field -> + checkArgument( + schema.fieldNames().contains(field), + "Nonexistent rowkind field: '%s'", + field)); + sequenceField.ifPresent( field -> checkArgument( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index feb643937..84a586066 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -36,6 +36,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.sink.RowKindGenerator; import org.apache.paimon.table.sink.SequenceGenerator; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.InnerTableRead; @@ -43,6 +44,7 @@ import org.apache.paimon.table.source.KeyValueTableRead; import org.apache.paimon.table.source.MergeTreeSplitGenerator; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.table.source.ValueContentRowDataRecordIterator; +import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import java.util.List; @@ -181,22 +183,25 @@ public class PrimaryKeyFileStoreTable extends AbstractFileStoreTable { @Override public TableWriteImpl<KeyValue> newWrite( String commitUser, ManifestCacheFilter manifestFilter) { - final SequenceGenerator sequenceGenerator = - SequenceGenerator.create(schema(), store().options()); + TableSchema schema = schema(); + CoreOptions options = store().options(); + final SequenceGenerator sequenceGenerator = SequenceGenerator.create(schema, options); + final RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options); final KeyValue kv = new KeyValue(); return new TableWriteImpl<>( store().newWrite(commitUser, manifestFilter), createRowKeyExtractor(), record -> { + InternalRow row = record.row(); long sequenceNumber = sequenceGenerator == null ? KeyValue.UNKNOWN_SEQUENCE - : sequenceGenerator.generate(record.row()); - return kv.replace( - record.primaryKey(), - sequenceNumber, - record.row().getRowKind(), - record.row()); + : sequenceGenerator.generate(row); + RowKind rowKind = + rowKindGenerator == null + ? row.getRowKind() + : rowKindGenerator.generate(row); + return kv.replace(record.primaryKey(), sequenceNumber, rowKind, row); }); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java new file mode 100644 index 000000000..2735339b4 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.sink; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.types.RowType; + +import javax.annotation.Nullable; + +import static org.apache.paimon.types.DataTypeFamily.CHARACTER_STRING; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Generate row kind. */ +public class RowKindGenerator { + + private final int index; + + public RowKindGenerator(String field, RowType rowType) { + this.index = rowType.getFieldNames().indexOf(field); + if (index == -1) { + throw new RuntimeException( + String.format("Can not find rowkind %s in table schema: %s", field, rowType)); + } + DataType fieldType = rowType.getTypeAt(index); + checkArgument( + fieldType.is(CHARACTER_STRING), + "only support string type for rowkind, but %s is %s", + field, + fieldType); + } + + public RowKind generate(InternalRow row) { + if (row.isNullAt(index)) { + throw new RuntimeException("Row kind cannot be null."); + } + return RowKind.fromShortString(row.getString(index).toString()); + } + + @Nullable + public static RowKindGenerator create(TableSchema schema, CoreOptions options) { + return options.rowkindField() + .map(field -> new RowKindGenerator(field, schema.logicalRowType())) + .orElse(null); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java index 293c9df54..4fe0315aa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java @@ -85,6 +85,7 @@ public class SequenceGenerator { generator = fieldType.accept(new SequenceGeneratorVisitor()); } + @Nullable public static SequenceGenerator create(TableSchema schema, CoreOptions options) { List<SequenceAutoPadding> sequenceAutoPadding = options.sequenceAutoPadding().stream() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java index e85c155d2..8a37b03bf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java @@ -30,6 +30,7 @@ import org.apache.paimon.mergetree.compact.MergeFunction; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.PrimaryKeyTableUtils; +import org.apache.paimon.table.sink.RowKindGenerator; import org.apache.paimon.table.sink.SequenceGenerator; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowKind; @@ -62,6 +63,7 @@ public class LocalMergeOperator extends AbstractStreamOperator<InternalRow> private transient long recordCount; private transient SequenceGenerator sequenceGenerator; + private transient RowKindGenerator rowKindGenerator; private transient MergeFunction<KeyValue> mergeFunction; private transient SortBufferWriteBuffer buffer; @@ -92,6 +94,7 @@ public class LocalMergeOperator extends AbstractStreamOperator<InternalRow> recordCount = 0; sequenceGenerator = SequenceGenerator.create(schema, options); + rowKindGenerator = RowKindGenerator.create(schema, options); mergeFunction = PrimaryKeyTableUtils.createMergeFunctionFactory( schema, @@ -131,7 +134,8 @@ public class LocalMergeOperator extends AbstractStreamOperator<InternalRow> recordCount++; InternalRow row = record.getValue(); - RowKind rowKind = row.getRowKind(); + RowKind rowKind = + rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row); // row kind must be INSERT when it is divided into key and value row.setRowKind(RowKind.INSERT); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 00c7b4aa4..fae1f5acf 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -357,4 +357,15 @@ public class BatchFileStoreITCase extends CatalogITCaseBase { assertThat(tEnv.explainSql(joinSql)).contains("DynamicFilteringDataCollector"); assertThat(sql(joinSql).toString()).isEqualTo(expected2); } + + @Test + public void testRowKindField() { + sql( + "CREATE TABLE R_T (pk INT PRIMARY KEY NOT ENFORCED, v INT, rf STRING) " + + "WITH ('rowkind.field'='rf')"); + sql("INSERT INTO R_T VALUES (1, 1, '+I')"); + assertThat(sql("SELECT * FROM R_T")).containsExactly(Row.of(1, 1, "+I")); + sql("INSERT INTO R_T VALUES (1, 2, '-D')"); + assertThat(sql("SELECT * FROM R_T")).isEmpty(); + } }
