This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.6
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit abf3fe3ecb0d436dd47053acca2e95bbc5e253a7
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon Dec 11 10:58:15 2023 +0800

    [core] Introduce 'rowkind.field' to define rowkind from data (#2476)
---
 docs/content/concepts/primary-key-table.md         |  7 +++
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 13 +++++
 .../main/java/org/apache/paimon/types/RowKind.java | 21 +++++++
 .../org/apache/paimon/schema/SchemaValidation.java |  9 +++
 .../paimon/table/PrimaryKeyFileStoreTable.java     | 21 ++++---
 .../apache/paimon/table/sink/RowKindGenerator.java | 65 ++++++++++++++++++++++
 .../paimon/table/sink/SequenceGenerator.java       |  1 +
 .../paimon/flink/sink/LocalMergeOperator.java      |  6 +-
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 11 ++++
 10 files changed, 151 insertions(+), 9 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index 8c3b69516..eb3ade035 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -458,3 +458,10 @@ of sequence number will be made up to microsecond by 
system.
 
 3. Composite pattern: for example, "second-to-micro,row-kind-flag", first, add 
the micro to the second, and then
 pad the row kind flag.
+
+## Row Kind Field
+
+By default, the primary key table determines the row kind according to the 
input row. You can also define the 
+`'rowkind.field'` to use a field to extract row kind.
+
+The valid row kind string should be `'+I'`, `'-U'`, `'+U'` or `'-D'`.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index a6024d99a..82a8fc798 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -419,6 +419,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Integer</td>
             <td>Read batch size for orc and parquet.</td>
         </tr>
+        <tr>
+            <td><h5>rowkind.field</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>The field that generates the row kind for primary key table, 
the row kind determines which data is '+I', '-U', '+U' or '-D'.</td>
+        </tr>
         <tr>
             <td><h5>scan.bounded.watermark</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 7a24523e9..4274ea8d2 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -427,6 +427,15 @@ public class CoreOptions implements Serializable {
                             "The field that generates the sequence number for 
primary key table,"
                                     + " the sequence number determines which 
data is the most recent.");
 
+    @Immutable
+    public static final ConfigOption<String> ROWKIND_FIELD =
+            key("rowkind.field")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The field that generates the row kind for primary 
key table,"
+                                    + " the row kind determines which data is 
'+I', '-U', '+U' or '-D'.");
+
     public static final ConfigOption<String> SEQUENCE_AUTO_PADDING =
             key("sequence.auto-padding")
                     .stringType()
@@ -1311,6 +1320,10 @@ public class CoreOptions implements Serializable {
         return options.getOptional(SEQUENCE_FIELD);
     }
 
+    public Optional<String> rowkindField() {
+        return options.getOptional(ROWKIND_FIELD);
+    }
+
     public List<String> sequenceAutoPadding() {
         String padding = options.get(SEQUENCE_AUTO_PADDING);
         if (padding == null) {
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java 
b/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java
index 6903ab524..17096dfbf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowKind.java
@@ -128,4 +128,25 @@ public enum RowKind {
                         "Unsupported byte value '" + value + "' for row 
kind.");
         }
     }
+
+    /**
+     * Creates a {@link RowKind} from the given short string.
+     *
+     * @see #shortString() for mapping of string and {@link RowKind}.
+     */
+    public static RowKind fromShortString(String value) {
+        switch (value.toUpperCase()) {
+            case "+I":
+                return INSERT;
+            case "-U":
+                return UPDATE_BEFORE;
+            case "+U":
+                return UPDATE_AFTER;
+            case "-D":
+                return DELETE;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported short string '" + value + "' for row 
kind.");
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index c4c13b5c7..538cff2dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -170,6 +170,15 @@ public class SchemaValidation {
                                 schema.fieldNames().contains(field),
                                 "Nonexistent sequence field: '%s'",
                                 field));
+
+        Optional<String> rowkindField = options.rowkindField();
+        rowkindField.ifPresent(
+                field ->
+                        checkArgument(
+                                schema.fieldNames().contains(field),
+                                "Nonexistent rowkind field: '%s'",
+                                field));
+
         sequenceField.ifPresent(
                 field ->
                         checkArgument(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index feb643937..84a586066 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -36,6 +36,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.RowKindGenerator;
 import org.apache.paimon.table.sink.SequenceGenerator;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.InnerTableRead;
@@ -43,6 +44,7 @@ import org.apache.paimon.table.source.KeyValueTableRead;
 import org.apache.paimon.table.source.MergeTreeSplitGenerator;
 import org.apache.paimon.table.source.SplitGenerator;
 import org.apache.paimon.table.source.ValueContentRowDataRecordIterator;
+import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
 import java.util.List;
@@ -181,22 +183,25 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
     @Override
     public TableWriteImpl<KeyValue> newWrite(
             String commitUser, ManifestCacheFilter manifestFilter) {
-        final SequenceGenerator sequenceGenerator =
-                SequenceGenerator.create(schema(), store().options());
+        TableSchema schema = schema();
+        CoreOptions options = store().options();
+        final SequenceGenerator sequenceGenerator = 
SequenceGenerator.create(schema, options);
+        final RowKindGenerator rowKindGenerator = 
RowKindGenerator.create(schema, options);
         final KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
                 store().newWrite(commitUser, manifestFilter),
                 createRowKeyExtractor(),
                 record -> {
+                    InternalRow row = record.row();
                     long sequenceNumber =
                             sequenceGenerator == null
                                     ? KeyValue.UNKNOWN_SEQUENCE
-                                    : sequenceGenerator.generate(record.row());
-                    return kv.replace(
-                            record.primaryKey(),
-                            sequenceNumber,
-                            record.row().getRowKind(),
-                            record.row());
+                                    : sequenceGenerator.generate(row);
+                    RowKind rowKind =
+                            rowKindGenerator == null
+                                    ? row.getRowKind()
+                                    : rowKindGenerator.generate(row);
+                    return kv.replace(record.primaryKey(), sequenceNumber, 
rowKind, row);
                 });
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java
new file mode 100644
index 000000000..2735339b4
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import static org.apache.paimon.types.DataTypeFamily.CHARACTER_STRING;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Generate row kind. */
+public class RowKindGenerator {
+
+    private final int index;
+
+    public RowKindGenerator(String field, RowType rowType) {
+        this.index = rowType.getFieldNames().indexOf(field);
+        if (index == -1) {
+            throw new RuntimeException(
+                    String.format("Can not find rowkind %s in table schema: 
%s", field, rowType));
+        }
+        DataType fieldType = rowType.getTypeAt(index);
+        checkArgument(
+                fieldType.is(CHARACTER_STRING),
+                "only support string type for rowkind, but %s is %s",
+                field,
+                fieldType);
+    }
+
+    public RowKind generate(InternalRow row) {
+        if (row.isNullAt(index)) {
+            throw new RuntimeException("Row kind cannot be null.");
+        }
+        return RowKind.fromShortString(row.getString(index).toString());
+    }
+
+    @Nullable
+    public static RowKindGenerator create(TableSchema schema, CoreOptions 
options) {
+        return options.rowkindField()
+                .map(field -> new RowKindGenerator(field, 
schema.logicalRowType()))
+                .orElse(null);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
index 293c9df54..4fe0315aa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/SequenceGenerator.java
@@ -85,6 +85,7 @@ public class SequenceGenerator {
         generator = fieldType.accept(new SequenceGeneratorVisitor());
     }
 
+    @Nullable
     public static SequenceGenerator create(TableSchema schema, CoreOptions 
options) {
         List<SequenceAutoPadding> sequenceAutoPadding =
                 options.sequenceAutoPadding().stream()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index e85c155d2..8a37b03bf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -30,6 +30,7 @@ import org.apache.paimon.mergetree.compact.MergeFunction;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.PrimaryKeyTableUtils;
+import org.apache.paimon.table.sink.RowKindGenerator;
 import org.apache.paimon.table.sink.SequenceGenerator;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowKind;
@@ -62,6 +63,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
 
     private transient long recordCount;
     private transient SequenceGenerator sequenceGenerator;
+    private transient RowKindGenerator rowKindGenerator;
     private transient MergeFunction<KeyValue> mergeFunction;
 
     private transient SortBufferWriteBuffer buffer;
@@ -92,6 +94,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
 
         recordCount = 0;
         sequenceGenerator = SequenceGenerator.create(schema, options);
+        rowKindGenerator = RowKindGenerator.create(schema, options);
         mergeFunction =
                 PrimaryKeyTableUtils.createMergeFunctionFactory(
                                 schema,
@@ -131,7 +134,8 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
         recordCount++;
         InternalRow row = record.getValue();
 
-        RowKind rowKind = row.getRowKind();
+        RowKind rowKind =
+                rowKindGenerator == null ? row.getRowKind() : 
rowKindGenerator.generate(row);
         // row kind must be INSERT when it is divided into key and value
         row.setRowKind(RowKind.INSERT);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 00c7b4aa4..fae1f5acf 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -357,4 +357,15 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         
assertThat(tEnv.explainSql(joinSql)).contains("DynamicFilteringDataCollector");
         assertThat(sql(joinSql).toString()).isEqualTo(expected2);
     }
+
+    @Test
+    public void testRowKindField() {
+        sql(
+                "CREATE TABLE R_T (pk INT PRIMARY KEY NOT ENFORCED, v INT, rf 
STRING) "
+                        + "WITH ('rowkind.field'='rf')");
+        sql("INSERT INTO R_T VALUES (1, 1, '+I')");
+        assertThat(sql("SELECT * FROM R_T")).containsExactly(Row.of(1, 1, 
"+I"));
+        sql("INSERT INTO R_T VALUES (1, 2, '-D')");
+        assertThat(sql("SELECT * FROM R_T")).isEmpty();
+    }
 }

Reply via email to