This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 818b9ea2a [core] Add support of first_row merge engine (#1557)
818b9ea2a is described below

commit 818b9ea2ad15039d15981c3b4f838de35e972c01
Author: Aitozi <[email protected]>
AuthorDate: Tue Jul 25 09:22:28 2023 +0800

    [core] Add support of first_row merge engine (#1557)
---
 docs/content/concepts/primary-key-table.md         |  12 +++
 .../shortcodes/generated/core_configuration.html   |   2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |   4 +-
 .../mergetree/compact/FirstRowMergeFunction.java   |  87 +++++++++++++++
 .../compact/FullChangelogMergeFunctionWrapper.java |   6 ++
 .../LookupChangelogMergeFunctionWrapper.java       |  12 ++-
 .../mergetree/compact/LookupMergeFunction.java     |   6 +-
 .../org/apache/paimon/schema/SchemaValidation.java |   9 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   9 +-
 .../mergetree/SortBufferWriteBufferTestBase.java   |  24 +++++
 .../FullChangelogMergeFunctionWrapperTestBase.java | 120 ++++++++++++++++++++-
 .../LookupChangelogMergeFunctionWrapperTest.java   |  95 +++++++++++++++-
 .../mergetree/compact/MergeFunctionTestUtils.java  |  14 +++
 .../mergetree/compact/SortMergeReaderTestBase.java |  26 +++++
 .../paimon/flink/source/DataTableSource.java       |   4 +
 .../apache/paimon/flink/utils/TableScanUtils.java  |   1 +
 .../org/apache/paimon/flink/FirstRowITCase.java    | 106 ++++++++++++++++++
 17 files changed, 525 insertions(+), 12 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index 09d387e30..868497995 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -214,6 +214,18 @@ If you allow some functions to ignore retraction messages, 
you can configure:
 For streaming queries, `aggregation` merge engine must be used together with 
`lookup` or `full-compaction` [changelog producer]({{< ref 
"concepts/primary-key-table#changelog-producers" >}}).
 {{< /hint >}}
 
+### First Row
+
+By specifying `'merge-engine' = 'first-row'`, users can keep the first row of 
the same primary key. It differs from the `deduplicate` merge engine that in 
the `first-row` merge engine, it will generate insert only changelog. 
+
+{{< hint info >}}
+For streaming queries, `first-row` merge engine must be used together with 
`lookup` or `full-compaction` [changelog producer]({{< ref 
"concepts/primary-key-table#changelog-producers" >}}).
+{{< /hint >}}
+
+{{< hint info >}}
+Currently, only the first row of insert order supported, so you can not 
specify `sequence.field` for this merge engine. And also not accept `DELETE` 
and `UPDATE_BEFORE` message.
+{{< /hint>}}
+
 ## Changelog Producers
 
 Streaming queries will continuously produce the latest changes. These changes 
can come from the underlying table files or from an [external log system]({{< 
ref "concepts/external-log-systems" >}}) like Kafka. Compared to the external 
log system, changes from table files have lower cost but higher latency 
(depending on how often snapshots are created).
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index d5a09f48a..01b22cbab 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -264,7 +264,7 @@ under the License.
             <td><h5>merge-engine</h5></td>
             <td style="word-wrap: break-word;">deduplicate</td>
             <td><p>Enum</p></td>
-            <td>Specify the merge engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last 
row.</li><li>"partial-update": Partial update non-null 
fields.</li><li>"aggregation": Aggregate fields with same primary 
key.</li></ul></td>
+            <td>Specify the merge engine for table with primary key.<br /><br 
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last 
row.</li><li>"partial-update": Partial update non-null 
fields.</li><li>"aggregation": Aggregate fields with same primary 
key.</li><li>"first-row": De-duplicate and keep the first row.</li></ul></td>
         </tr>
         <tr>
             <td><h5>metadata.stats-mode</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 084cf45df..85bb06460 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1218,7 +1218,9 @@ public class CoreOptions implements Serializable {
 
         PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),
 
-        AGGREGATE("aggregation", "Aggregate fields with same primary key.");
+        AGGREGATE("aggregation", "Aggregate fields with same primary key."),
+
+        FIRST_ROW("first-row", "De-duplicate and keep the first row.");
 
         private final String value;
         private final String description;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
new file mode 100644
index 000000000..f8c1c4403
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -0,0 +1,87 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the 
full record, only keep
+ * the first one.
+ */
+public class FirstRowMergeFunction implements MergeFunction<KeyValue> {
+
+    private final InternalRowSerializer keySerializer;
+    private final InternalRowSerializer valueSerializer;
+    private KeyValue first;
+
+    protected FirstRowMergeFunction(RowType keyType, RowType valueType) {
+        this.keySerializer = new InternalRowSerializer(keyType);
+        this.valueSerializer = new InternalRowSerializer(valueType);
+    }
+
+    @Override
+    public void reset() {
+        this.first = null;
+    }
+
+    @Override
+    public void add(KeyValue kv) {
+        RowKind rowKind = kv.valueKind();
+        Preconditions.checkArgument(
+                rowKind.isAdd(), "First row merge engine don't accept %s 
message", rowKind);
+        if (first == null) {
+            this.first = kv.copy(keySerializer, valueSerializer);
+        }
+    }
+
+    @Nullable
+    @Override
+    public KeyValue getResult() {
+        return first;
+    }
+
+    public static MergeFunctionFactory<KeyValue> factory(RowType keyType, 
RowType valueType) {
+        return new FirstRowMergeFunction.Factory(keyType, valueType);
+    }
+
+    private static class Factory implements MergeFunctionFactory<KeyValue> {
+
+        private static final long serialVersionUID = 1L;
+        private final RowType keyType;
+        private final RowType valueType;
+
+        public Factory(RowType keyType, RowType valueType) {
+            this.keyType = keyType;
+            this.valueType = valueType;
+        }
+
+        @Override
+        public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+            return new FirstRowMergeFunction(keyType, valueType);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
index 0b1e41450..86eb16698 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
@@ -41,6 +41,7 @@ public class FullChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper<C
     private final int maxLevel;
     private final RecordEqualiser valueEqualiser;
     private final boolean changelogRowDeduplicate;
+    private final boolean isFirstRow;
 
     // only full compaction will write files into maxLevel, see 
UniversalCompaction class
     private KeyValue topLevelKv;
@@ -61,6 +62,7 @@ public class FullChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper<C
                 "Value count merge function does not need to produce changelog 
from full compaction. "
                         + "Please set changelog producer to 'input'.");
         this.mergeFunction = mergeFunction;
+        this.isFirstRow = mergeFunction instanceof FirstRowMergeFunction;
         this.maxLevel = maxLevel;
         this.valueEqualiser = valueEqualiser;
         this.changelogRowDeduplicate = changelogRowDeduplicate;
@@ -108,6 +110,10 @@ public class FullChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper<C
                     reusedResult.addChangelog(replace(reusedAfter, 
RowKind.INSERT, merged));
                 }
             } else {
+                // For first row, we should just return old value. And produce 
no changelog.
+                if (isFirstRow) {
+                    return reusedResult.setResultIfNotRetract(merged);
+                }
                 if (merged == null || !isAdd(merged)) {
                     reusedResult.addChangelog(replace(reusedBefore, 
RowKind.DELETE, topLevelKv));
                 } else if (!changelogRowDeduplicate
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index a53091a3f..e862789b2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -53,6 +53,7 @@ public class LookupChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper
     private final KeyValue reusedAfter = new KeyValue();
     private final RecordEqualiser valueEqualiser;
     private final boolean changelogRowDeduplicate;
+    private final boolean isFirstRow;
 
     public LookupChangelogMergeFunctionWrapper(
             MergeFunctionFactory<KeyValue> mergeFunctionFactory,
@@ -65,6 +66,7 @@ public class LookupChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper
                 "Merge function should be a LookupMergeFunction, but is %s, 
there is a bug.",
                 mergeFunction.getClass().getName());
         this.mergeFunction = (LookupMergeFunction) mergeFunction;
+        this.isFirstRow = this.mergeFunction.isFirstRow;
         this.mergeFunction2 = mergeFunctionFactory.create();
         this.lookup = lookup;
         this.valueEqualiser = valueEqualiser;
@@ -97,18 +99,24 @@ public class LookupChangelogMergeFunctionWrapper implements 
MergeFunctionWrapper
 
         // 2. With level 0, with the latest high level, return changelog
         if (highLevel != null) {
-            setChangelog(highLevel, result);
+            // For first row, we should just return old value. And produce no 
changelog.
+            if (!isFirstRow) {
+                setChangelog(highLevel, result);
+            }
             return reusedResult.setResult(result);
         }
 
         // 3. Lookup to find the latest high level record
         highLevel = lookup.apply(result.key());
+
         if (highLevel != null) {
             mergeFunction2.reset();
             mergeFunction2.add(highLevel);
             mergeFunction2.add(result);
             result = mergeFunction2.getResult();
-            setChangelog(highLevel, result);
+            if (!isFirstRow) {
+                setChangelog(highLevel, result);
+            }
         } else {
             setChangelog(null, result);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 7d3d8c050..fe8056e4c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -41,10 +41,12 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
 
     KeyValue highLevel;
     boolean containLevel0;
+    protected final boolean isFirstRow;
 
     public LookupMergeFunction(
             MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType 
valueType) {
         this.mergeFunction = mergeFunction;
+        this.isFirstRow = mergeFunction instanceof FirstRowMergeFunction;
         this.keySerializer = new InternalRowSerializer(keyType);
         this.valueSerializer = new InternalRowSerializer(valueType);
     }
@@ -64,7 +66,9 @@ public class LookupMergeFunction implements 
MergeFunction<KeyValue> {
     @Override
     public KeyValue getResult() {
         // 1. Find the latest high level record
-        Iterator<KeyValue> descending = candidates.descendingIterator();
+        // For the first row, the candidates should in the highest level.
+        Iterator<KeyValue> descending =
+                isFirstRow ? candidates.iterator() : 
candidates.descendingIterator();
         while (descending.hasNext()) {
             KeyValue kv = descending.next();
             if (kv.level() > 0) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index df2d2c11a..72ea13649 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -99,7 +99,8 @@ public class SchemaValidation {
                         + " should not be larger than "
                         + SNAPSHOT_NUM_RETAINED_MAX.key());
 
-        // Only changelog tables with primary keys support full compaction or 
lookup changelog
+        // Only changelog tables with primary keys support full compaction or 
lookup
+        // changelog
         // producer
         if (options.writeMode() == WriteMode.CHANGE_LOG) {
             switch (options.changelogProducer()) {
@@ -170,6 +171,12 @@ public class SchemaValidation {
                                 schema.fieldNames().contains(field),
                                 "Nonexistent sequence field: '%s'",
                                 field));
+
+        CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
+        if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW && 
sequenceField.isPresent()) {
+            throw new IllegalArgumentException(
+                    "Do not support use sequence field on FIRST_MERGE merge 
engine");
+        }
     }
 
     private static void validatePrimaryKeysType(List<DataField> fields, 
List<String> primaryKeys) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 512354f0f..8c764bca4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -28,6 +28,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
 import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
@@ -95,9 +96,10 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
             Options conf = Options.fromMap(tableSchema.options());
             CoreOptions options = new CoreOptions(conf);
             CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
-            MergeFunctionFactory<KeyValue> mfFactory;
             KeyValueFieldsExtractor extractor = 
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
 
+            MergeFunctionFactory<KeyValue> mfFactory;
+
             switch (mergeEngine) {
                 case DEDUPLICATE:
                     mfFactory = DeduplicateMergeFunction.factory();
@@ -113,6 +115,11 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                                     rowType.getFieldTypes(),
                                     tableSchema.primaryKeys());
                     break;
+                case FIRST_ROW:
+                    mfFactory =
+                            FirstRowMergeFunction.factory(
+                                    new 
RowType(extractor.keyFields(tableSchema)), rowType);
+                    break;
                 default:
                     throw new UnsupportedOperationException(
                             "Unsupported merge engine: " + mergeEngine);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 0e51dbe8e..d415f17c8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
 import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
@@ -40,6 +41,7 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ReusingKeyValue;
 import org.apache.paimon.utils.ReusingTestData;
 
+import org.assertj.core.util.Lists;
 import org.junit.jupiter.api.Test;
 
 import java.io.EOFException;
@@ -266,4 +268,26 @@ public abstract class SortBufferWriteBufferTestBase {
                     .create();
         }
     }
+
+    /** Test for {@link SortBufferWriteBuffer} with {@link 
FirstRowMergeFunction}. */
+    public static class WithFirstRowMergeFunctionTest extends 
SortBufferWriteBufferTestBase {
+
+        @Override
+        protected boolean addOnly() {
+            return true;
+        }
+
+        @Override
+        protected List<ReusingTestData> getExpected(List<ReusingTestData> 
input) {
+            return MergeFunctionTestUtils.getExpectedForFirstRow(input);
+        }
+
+        @Override
+        protected MergeFunction<KeyValue> createMergeFunction() {
+            return FirstRowMergeFunction.factory(
+                            new RowType(Lists.list(new DataField(0, "f0", new 
IntType()))),
+                            new RowType(Lists.list(new DataField(1, "f1", new 
BigIntType()))))
+                    .create();
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
index 0d4dce736..bddd37111 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
@@ -20,7 +20,9 @@ package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -46,6 +48,10 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
 
     protected abstract boolean changelogRowDeduplicate();
 
+    protected List<List<KeyValue>> getInputKvs() {
+        return INPUT_KVS;
+    }
+
     @BeforeEach
     public void beforeEach() {
         wrapper =
@@ -107,9 +113,10 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
 
     @Test
     public void testFullChangelogMergeFunctionWrapper() {
-        for (int i = 0; i < INPUT_KVS.size(); i++) {
+        List<List<KeyValue>> inputs = getInputKvs();
+        for (int i = 0; i < inputs.size(); i++) {
             wrapper.reset();
-            List<KeyValue> kvs = INPUT_KVS.get(i);
+            List<KeyValue> kvs = inputs.get(i);
             kvs.forEach(kv -> wrapper.add(kv));
             ChangelogResult actualResult = wrapper.getResult();
             List<KeyValue> expectedChangelogs = new ArrayList<>();
@@ -215,4 +222,113 @@ public abstract class 
FullChangelogMergeFunctionWrapperTestBase {
             return true;
         }
     }
+
+    /** Test for {@link FirstRowMergeFunction} with {@link 
FullChangelogMergeFunctionWrapper}. */
+    public static class FirstRowMergeFunctionTest
+            extends FullChangelogMergeFunctionWrapperTestBase {
+
+        private static final List<List<KeyValue>> INPUT_KVS =
+                Arrays.asList(
+                        // only 1 insert record, not from top level
+                        Collections.singletonList(
+                                new KeyValue()
+                                        .replace(row(1), 1, RowKind.INSERT, 
row(1))
+                                        .setLevel(0)),
+                        // only 1 delete record, not from top level
+                        Collections.singletonList(
+                                new KeyValue()
+                                        .replace(row(2), 2, RowKind.DELETE, 
row(0))
+                                        .setLevel(0)),
+                        // only 1 insert record, from top level
+                        Collections.singletonList(
+                                new KeyValue()
+                                        .replace(row(3), 3, RowKind.INSERT, 
row(3))
+                                        .setLevel(MAX_LEVEL)),
+                        // multiple records, none from top level
+                        Arrays.asList(
+                                new KeyValue()
+                                        .replace(row(4), 4, RowKind.INSERT, 
row(3))
+                                        .setLevel(0),
+                                new KeyValue()
+                                        .replace(row(4), 5, RowKind.INSERT, 
row(-3))
+                                        .setLevel(0)),
+                        // multiple records, one from top level
+                        Arrays.asList(
+                                new KeyValue()
+                                        .replace(row(6), 8, RowKind.INSERT, 
row(3))
+                                        .setLevel(MAX_LEVEL),
+                                new KeyValue()
+                                        .replace(row(6), 9, RowKind.INSERT, 
row(-3))
+                                        .setLevel(0)),
+                        Arrays.asList(
+                                new KeyValue()
+                                        .replace(row(8), 14, RowKind.INSERT, 
row(3))
+                                        .setLevel(MAX_LEVEL),
+                                new KeyValue()
+                                        .replace(row(8), 15, RowKind.INSERT, 
row(3))
+                                        .setLevel(0)));
+
+        @Override
+        protected List<List<KeyValue>> getInputKvs() {
+            return INPUT_KVS;
+        }
+
+        private final List<KeyValue> expectedBefore =
+                Arrays.asList(
+                        null,
+                        null,
+                        null,
+                        null,
+                        null,
+                        changelogRowDeduplicate()
+                                ? null
+                                : new KeyValue()
+                                        .replace(row(8), 14, 
RowKind.UPDATE_BEFORE, row(3)));
+
+        private final List<KeyValue> expectedAfter =
+                Arrays.asList(
+                        new KeyValue().replace(row(1), 1, RowKind.INSERT, 
row(1)),
+                        null,
+                        null,
+                        new KeyValue().replace(row(4), 4, RowKind.INSERT, 
row(3)),
+                        null,
+                        changelogRowDeduplicate()
+                                ? null
+                                : new KeyValue().replace(row(8), 15, 
RowKind.UPDATE_AFTER, row(3)));
+
+        private final List<KeyValue> expectedResult =
+                Arrays.asList(
+                        new KeyValue().replace(row(1), 1, RowKind.INSERT, 
row(1)),
+                        null,
+                        new KeyValue().replace(row(3), 3, RowKind.INSERT, 
row(3)),
+                        new KeyValue().replace(row(4), 4, RowKind.INSERT, 
row(3)),
+                        new KeyValue().replace(row(6), 8, RowKind.INSERT, 
row(3)),
+                        new KeyValue().replace(row(8), 14, RowKind.INSERT, 
row(3)));
+
+        @Override
+        protected MergeFunction<KeyValue> createMergeFunction() {
+            return new FirstRowMergeFunction(
+                    RowType.of(DataTypes.INT()), RowType.of(DataTypes.INT()));
+        }
+
+        @Override
+        protected boolean changelogRowDeduplicate() {
+            return true;
+        }
+
+        @Override
+        protected KeyValue getExpectedBefore(int idx) {
+            return expectedBefore.get(idx);
+        }
+
+        @Override
+        protected KeyValue getExpectedAfter(int idx) {
+            return expectedAfter.get(idx);
+        }
+
+        @Override
+        protected KeyValue getExpectedResult(int idx) {
+            return expectedResult.get(idx);
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index a468bbef5..892446da6 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -25,9 +25,13 @@ import org.apache.paimon.data.InternalRow.FieldGetter;
 import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -200,7 +204,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
 
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
-    public void testSum(boolean changlogRowDeduplicate) {
+    public void testSum(boolean changelogRowDeduplicate) {
         LookupChangelogMergeFunctionWrapper function =
                 new LookupChangelogMergeFunctionWrapper(
                         LookupMergeFunction.wrap(
@@ -216,7 +220,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                 RowType.of(DataTypes.INT())),
                         key -> null,
                         EQUALISER,
-                        changlogRowDeduplicate);
+                        changelogRowDeduplicate);
 
         // Without level-0
         function.reset();
@@ -270,7 +274,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
         result = function.getResult();
         assertThat(result).isNotNull();
         changelogs = result.changelogs();
-        if (changlogRowDeduplicate) {
+        if (changelogRowDeduplicate) {
             assertThat(changelogs).isEmpty();
         } else {
             assertThat(changelogs).hasSize(2);
@@ -283,4 +287,89 @@ public class LookupChangelogMergeFunctionWrapperTest {
         assertThat(kv).isNotNull();
         assertThat(kv.value().getInt(0)).isEqualTo(2);
     }
+
+    @Test
+    public void testFirstRow() {
+        Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+        LookupChangelogMergeFunctionWrapper function =
+                new LookupChangelogMergeFunctionWrapper(
+                        LookupMergeFunction.wrap(
+                                projection ->
+                                        new FirstRowMergeFunction(
+                                                new RowType(
+                                                        Lists.list(
+                                                                new DataField(
+                                                                        0, 
"f0", new IntType()))),
+                                                new RowType(
+                                                        Lists.list(
+                                                                new DataField(
+                                                                        1, 
"f1", new IntType())))),
+                                RowType.of(DataTypes.INT()),
+                                RowType.of(DataTypes.INT())),
+                        highLevel::get,
+                        EQUALISER,
+                        false);
+
+        // Without level-0
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(2)).setLevel(1));
+        ChangelogResult result = function.getResult();
+        assertThat(result).isNotNull();
+        assertThat(result.changelogs()).isEmpty();
+        KeyValue kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(1);
+
+        // With level-0 record, with level-x (x > 0) record
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(1));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(2)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        List<KeyValue> changelogs = result.changelogs();
+        assertThat(changelogs).isEmpty();
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(1);
+
+        // With level-0 record, with multiple level-x (x > 0) record
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 1, INSERT, 
row(1)).setLevel(3));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(1)).setLevel(2));
+        function.add(new KeyValue().replace(row(1), 3, INSERT, 
row(2)).setLevel(1));
+        function.add(new KeyValue().replace(row(1), 4, INSERT, 
row(2)).setLevel(0));
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        changelogs = result.changelogs();
+        assertThat(changelogs).isEmpty();
+        assertThat(kv.value().getInt(0)).isEqualTo(1);
+
+        // Without high level value
+        function.reset();
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(0)).setLevel(0));
+
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        changelogs = result.changelogs();
+        assertThat(changelogs).hasSize(1);
+        assertThat(changelogs.get(0).valueKind()).isEqualTo(INSERT);
+        assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(0);
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(0);
+
+        // with high level value
+        function.reset();
+        highLevel.put(row(1), new KeyValue().replace(row(1), INSERT, row(10)));
+        function.add(new KeyValue().replace(row(1), 2, INSERT, 
row(0)).setLevel(0));
+
+        result = function.getResult();
+        assertThat(result).isNotNull();
+        changelogs = result.changelogs();
+        assertThat(changelogs).hasSize(0);
+        kv = result.result();
+        assertThat(kv).isNotNull();
+        assertThat(kv.value().getInt(0)).isEqualTo(10);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
index 4b9da284c..53e03d4d2 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
@@ -121,6 +121,20 @@ public class MergeFunctionTestUtils {
         return expected;
     }
 
+    public static List<ReusingTestData> 
getExpectedForFirstRow(List<ReusingTestData> input) {
+        input = new ArrayList<>(input);
+        Collections.sort(input);
+
+        List<ReusingTestData> expected = new ArrayList<>();
+        for (int i = 0; i < input.size(); i++) {
+            if (i == 0 || input.get(i).key != input.get(i - 1).key) {
+                expected.add(input.get(i));
+            }
+        }
+
+        return expected;
+    }
+
     public static void assertKvsEquals(List<KeyValue> expected, List<KeyValue> 
actual) {
         assertThat(actual).hasSize(expected.size());
         for (int i = 0; i < actual.size(); i++) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index e19b8113e..a12c5d152 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -22,9 +22,14 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.SortEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ReusingTestData;
 import org.apache.paimon.utils.TestReusingRecordReader;
 
+import org.assertj.core.util.Lists;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
@@ -139,4 +144,25 @@ public abstract class SortMergeReaderTestBase extends 
CombiningRecordReaderTestB
             runTest(parseData("1, 2, +, 100", "1, 1, +, -100"), sortEngine);
         }
     }
+
+    /** Test for {@link SortMergeReader} with {@link FirstRowMergeFunction}. */
+    public static class WithFirstRowMergeFunctionTest extends 
SortMergeReaderTestBase {
+
+        @Override
+        protected boolean addOnly() {
+            return true;
+        }
+
+        @Override
+        protected List<ReusingTestData> getExpected(List<ReusingTestData> 
input) {
+            return MergeFunctionTestUtils.getExpectedForFirstRow(input);
+        }
+
+        @Override
+        protected MergeFunction<KeyValue> createMergeFunction() {
+            return new FirstRowMergeFunction(
+                    new RowType(Lists.list(new DataField(0, "f0", new 
IntType()))),
+                    new RowType(Lists.list(new DataField(1, "f1", new 
BigIntType()))));
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index accc39953..d9f72d335 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -139,6 +139,10 @@ public class DataTableSource extends FlinkTableSource {
         } else if (table instanceof ChangelogWithKeyFileStoreTable) {
             Options options = Options.fromMap(table.options());
 
+            if (new CoreOptions(options).mergeEngine() == 
CoreOptions.MergeEngine.FIRST_ROW) {
+                return ChangelogMode.insertOnly();
+            }
+
             if (options.get(LOG_SCAN_REMOVE_NORMALIZE)) {
                 return ChangelogMode.all();
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
index 7d4911d3e..b9c6bb912 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -35,6 +35,7 @@ public class TableScanUtils {
                     {
                         put(CoreOptions.MergeEngine.PARTIAL_UPDATE, "Partial 
update");
                         put(CoreOptions.MergeEngine.AGGREGATE, 
"Pre-aggregate");
+                        put(CoreOptions.MergeEngine.FIRST_ROW, "First row");
                     }
                 };
         if (table.primaryKeys().size() > 0 && 
mergeEngineDesc.containsKey(mergeEngine)) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
new file mode 100644
index 000000000..cd625b5ac
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
@@ -0,0 +1,106 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for first row merge engine. */
+public class FirstRowITCase extends CatalogITCaseBase {
+
+    @Override
+    protected List<String> ddl() {
+        return Arrays.asList(
+                "CREATE TABLE IF NOT EXISTS T ("
+                        + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT 
ENFORCED)"
+                        + " WITH ('merge-engine'='first-row');",
+                "CREATE TABLE IF NOT EXISTS T1 ("
+                        + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT 
ENFORCED)"
+                        + " WITH ('merge-engine'='first-row', 
'changelog-producer' = 'lookup');",
+                "CREATE TABLE IF NOT EXISTS T2 ("
+                        + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT 
ENFORCED)"
+                        + " WITH ('merge-engine'='first-row', 
'changelog-producer' = 'full-compaction', 'full-compaction.delta-commits' = 
'3');");
+    }
+
+    @Test
+    public void testBatchQuery() {
+        batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
+        List<Row> result = batchSql("SELECT * FROM T");
+        
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1, 
"1"));
+
+        result = batchSql("SELECT c FROM T");
+        
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1"));
+    }
+
+    @Test
+    public void testReadAfterFullCompaction() {
+        batchSql("ALTER TABLE T SET ('full-compaction.delta-commits'='1')");
+
+        batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
+        List<Row> result = batchSql("SELECT * FROM T");
+        
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1, 
"1"));
+
+        batchSql("INSERT INTO T VALUES (1, 1, '1'), (2, 2, '2')");
+        result = batchSql("SELECT * FROM T");
+        assertThat(result)
+                .containsExactlyInAnyOrder(
+                        Row.ofKind(RowKind.INSERT, 1, 1, "1"),
+                        Row.ofKind(RowKind.INSERT, 2, 2, "2"));
+    }
+
+    @Test
+    public void testStreamingReadOnFullCompaction() throws Exception {
+        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM T2");
+
+        sql("INSERT INTO T2 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (1, 
4, '4')");
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(
+                        Row.ofKind(RowKind.INSERT, 1, 1, "1"),
+                        Row.ofKind(RowKind.INSERT, 2, 2, "2"));
+
+        sql("INSERT INTO T2 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (3, 
3, '3')");
+        assertThat(iterator.collect(1))
+                .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 3, 3, 
"3"));
+    }
+
+    @Test
+    public void testStreamingReadOnLookup() throws Exception {
+        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM T1");
+
+        sql("INSERT INTO T1 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (1, 
4, '4')");
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(
+                        Row.ofKind(RowKind.INSERT, 1, 1, "1"),
+                        Row.ofKind(RowKind.INSERT, 2, 2, "2"));
+
+        sql("INSERT INTO T1 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (3, 
3, '3')");
+        assertThat(iterator.collect(1))
+                .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 3, 3, 
"3"));
+    }
+}


Reply via email to