This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fc22edc03 [core] Support to ignore delete/update_before records in 
first_row merge engine (#2349)
fc22edc03 is described below

commit fc22edc0331440765e30d91d16b97075f33dd938
Author: Aitozi <[email protected]>
AuthorDate: Tue Nov 21 11:26:54 2023 +0800

    [core] Support to ignore delete/update_before records in first_row merge 
engine (#2349)
---
 docs/content/concepts/primary-key-table.md         |  5 +++-
 .../shortcodes/generated/core_configuration.html   |  6 ++++
 .../main/java/org/apache/paimon/CoreOptions.java   |  6 ++++
 .../mergetree/compact/FirstRowMergeFunction.java   | 34 +++++++++++++++-------
 .../apache/paimon/table/PrimaryKeyTableUtils.java  |  2 +-
 .../mergetree/SortBufferWriteBufferTestBase.java   |  3 +-
 .../LookupChangelogMergeFunctionWrapperTest.java   |  3 +-
 .../mergetree/compact/SortMergeReaderTestBase.java |  2 +-
 .../org/apache/paimon/flink/FirstRowITCase.java    | 29 ++++++++++++++++++
 9 files changed, 75 insertions(+), 15 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index 85326dcac..1d62887ae 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -300,9 +300,12 @@ This is an experimental feature.
 By specifying `'merge-engine' = 'first-row'`, users can keep the first row of 
the same primary key. It differs from the
 `deduplicate` merge engine that in the `first-row` merge engine, it will 
generate insert only changelog. 
 
+{{< hint info >}}
 1. `first-row` merge engine must be used together with `lookup` [changelog 
producer]({{< ref "concepts/primary-key-table#changelog-producers" >}}).
 2. You can not specify `sequence.field`.
-3. Not accept `DELETE` and `UPDATE_BEFORE` message.
+3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config 
`first-row.ignore-delete` to ignore these two kinds records.
+{{< /hint >}}
+
 
 This is of great help in replacing log deduplication in streaming computation.
 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 96ceb92fd..1450ded16 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -182,6 +182,12 @@ under the License.
             <td>Map</td>
             <td>Define different file format for different level, you can add 
the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file 
format for level is not provided, the default format which set by `file.format` 
will be used.</td>
         </tr>
+        <tr>
+            <td><h5>first-row.ignore-delete</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to ignore delete records in first-row mode.</td>
+        </tr>
         <tr>
             <td><h5>full-compaction.delta-commits</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 358c370eb..cbc16cc75 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -232,6 +232,12 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether to ignore delete records in 
partial-update mode.");
 
+    public static final ConfigOption<Boolean> FIRST_ROW_IGNORE_DELETE =
+            key("first-row.ignore-delete")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to ignore delete records in 
first-row mode.");
+
     public static final ConfigOption<SortEngine> SORT_ENGINE =
             key("sort-engine")
                     .enumType(SortEngine.class)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
index f8c1c4403..34b9bc74a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -20,11 +20,11 @@
 
 package org.apache.paimon.mergetree.compact;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.types.RowKind;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -38,9 +38,12 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
     private final InternalRowSerializer valueSerializer;
     private KeyValue first;
 
-    protected FirstRowMergeFunction(RowType keyType, RowType valueType) {
+    private final boolean ignoreDelete;
+
+    protected FirstRowMergeFunction(RowType keyType, RowType valueType, 
boolean ignoreDelete) {
         this.keySerializer = new InternalRowSerializer(keyType);
         this.valueSerializer = new InternalRowSerializer(valueType);
+        this.ignoreDelete = ignoreDelete;
     }
 
     @Override
@@ -50,9 +53,15 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
 
     @Override
     public void add(KeyValue kv) {
-        RowKind rowKind = kv.valueKind();
-        Preconditions.checkArgument(
-                rowKind.isAdd(), "First row merge engine don't accept %s 
message", rowKind);
+        if (kv.valueKind().isRetract()) {
+            if (ignoreDelete) {
+                return;
+            } else {
+                throw new IllegalArgumentException(
+                        "By default, First row merge engine can not accept 
DELETE/UPDATE_BEFORE records.\n"
+                                + "You can config 'first-row.ignore-delete' to 
ignore the DELETE/UPDATE_BEFORE records.");
+            }
+        }
         if (first == null) {
             this.first = kv.copy(keySerializer, valueSerializer);
         }
@@ -64,8 +73,9 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
         return first;
     }
 
-    public static MergeFunctionFactory<KeyValue> factory(RowType keyType, 
RowType valueType) {
-        return new FirstRowMergeFunction.Factory(keyType, valueType);
+    public static MergeFunctionFactory<KeyValue> factory(
+            RowType keyType, RowType valueType, Options options) {
+        return new FirstRowMergeFunction.Factory(keyType, valueType, options);
     }
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -74,14 +84,18 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
         private final RowType keyType;
         private final RowType valueType;
 
-        public Factory(RowType keyType, RowType valueType) {
+        private final Options options;
+
+        public Factory(RowType keyType, RowType valueType, Options options) {
             this.keyType = keyType;
             this.valueType = valueType;
+            this.options = options;
         }
 
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            return new FirstRowMergeFunction(keyType, valueType);
+            return new FirstRowMergeFunction(
+                    keyType, valueType, 
options.get(CoreOptions.FIRST_ROW_IGNORE_DELETE));
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index 8ab18ca4d..727664da2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -75,7 +75,7 @@ public class PrimaryKeyTableUtils {
                         tableSchema.primaryKeys());
             case FIRST_ROW:
                 return FirstRowMergeFunction.factory(
-                        new RowType(extractor.keyFields(tableSchema)), 
rowType);
+                        new RowType(extractor.keyFields(tableSchema)), 
rowType, conf);
             default:
                 throw new UnsupportedOperationException("Unsupported merge 
engine: " + mergeEngine);
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index bdd57777a..b7d76ff9d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -256,7 +256,8 @@ public abstract class SortBufferWriteBufferTestBase {
         protected MergeFunction<KeyValue> createMergeFunction() {
             return FirstRowMergeFunction.factory(
                             new RowType(Lists.list(new DataField(0, "f0", new 
IntType()))),
-                            new RowType(Lists.list(new DataField(1, "f1", new 
BigIntType()))))
+                            new RowType(Lists.list(new DataField(1, "f1", new 
BigIntType()))),
+                            new Options())
                     .create();
         }
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 5cf976313..07c93e1a7 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -300,7 +300,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                         new RowType(
                                                 Lists.list(new DataField(0, 
"f0", new IntType()))),
                                         new RowType(
-                                                Lists.list(new DataField(1, 
"f1", new IntType())))),
+                                                Lists.list(new DataField(1, 
"f1", new IntType()))),
+                                        false),
                         highLevel::contains);
 
         // Without level-0
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 9782fd8af..876280125 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -129,7 +129,7 @@ public abstract class SortMergeReaderTestBase extends 
CombiningRecordReaderTestB
             RowType keyType = new RowType(Lists.list(new DataField(0, "f0", 
new IntType())));
             RowType valueType = new RowType(Lists.list(new DataField(1, "f1", 
new BigIntType())));
             return new LookupMergeFunction(
-                    new FirstRowMergeFunction(keyType, valueType), keyType, 
valueType);
+                    new FirstRowMergeFunction(keyType, valueType, false), 
keyType, valueType);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
index 3ed345d52..c4492596f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
@@ -22,6 +22,9 @@ package org.apache.paimon.flink;
 
 import org.apache.paimon.utils.BlockingIterator;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.junit.jupiter.api.Test;
@@ -103,4 +106,30 @@ public class FirstRowITCase extends CatalogITCaseBase {
                         Row.ofKind(RowKind.INSERT, 1, 1, "1"),
                         Row.ofKind(RowKind.INSERT, 2, 3, "3"));
     }
+
+    @Test
+    public void testIgnoreDelete() {
+        sql(
+                "CREATE TABLE IF NOT EXISTS T1 ("
+                        + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT 
ENFORCED)"
+                        + " WITH ('merge-engine'='first-row', 
'first-row.ignore-delete' = 'true',"
+                        + " 'changelog-producer' = 'lookup');");
+
+        List<Row> input =
+                ImmutableList.of(
+                        Row.ofKind(RowKind.INSERT, 1, 1, "1"),
+                        Row.ofKind(RowKind.UPDATE_BEFORE, 1, 1, "1"),
+                        Row.ofKind(RowKind.UPDATE_AFTER, 1, 2, "2"));
+
+        String id = TestValuesTableFactory.registerData(input);
+        sql(
+                "CREATE TEMPORARY TABLE source (a INT, b INT, c STRING) WITH "
+                        + "('connector'='values', 'bounded'='true', 
'data-id'='%s')",
+                id);
+
+        batchSql("INSERT INTO T1 SELECT * FROM source");
+        List<Row> result = batchSql("SELECT * FROM T1");
+
+        assertThat(result).containsExactly(Row.ofKind(RowKind.INSERT, 1, 1, 
"1"));
+    }
 }

Reply via email to