This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 818b9ea2a [core] Add support of first_row merge engine (#1557)
818b9ea2a is described below
commit 818b9ea2ad15039d15981c3b4f838de35e972c01
Author: Aitozi <[email protected]>
AuthorDate: Tue Jul 25 09:22:28 2023 +0800
[core] Add support of first_row merge engine (#1557)
---
docs/content/concepts/primary-key-table.md | 12 +++
.../shortcodes/generated/core_configuration.html | 2 +-
.../main/java/org/apache/paimon/CoreOptions.java | 4 +-
.../mergetree/compact/FirstRowMergeFunction.java | 87 +++++++++++++++
.../compact/FullChangelogMergeFunctionWrapper.java | 6 ++
.../LookupChangelogMergeFunctionWrapper.java | 12 ++-
.../mergetree/compact/LookupMergeFunction.java | 6 +-
.../org/apache/paimon/schema/SchemaValidation.java | 9 +-
.../table/ChangelogWithKeyFileStoreTable.java | 9 +-
.../mergetree/SortBufferWriteBufferTestBase.java | 24 +++++
.../FullChangelogMergeFunctionWrapperTestBase.java | 120 ++++++++++++++++++++-
.../LookupChangelogMergeFunctionWrapperTest.java | 95 +++++++++++++++-
.../mergetree/compact/MergeFunctionTestUtils.java | 14 +++
.../mergetree/compact/SortMergeReaderTestBase.java | 26 +++++
.../paimon/flink/source/DataTableSource.java | 4 +
.../apache/paimon/flink/utils/TableScanUtils.java | 1 +
.../org/apache/paimon/flink/FirstRowITCase.java | 106 ++++++++++++++++++
17 files changed, 525 insertions(+), 12 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index 09d387e30..868497995 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -214,6 +214,18 @@ If you allow some functions to ignore retraction messages,
you can configure:
For streaming queries, `aggregation` merge engine must be used together with
`lookup` or `full-compaction` [changelog producer]({{< ref
"concepts/primary-key-table#changelog-producers" >}}).
{{< /hint >}}
+### First Row
+
+By specifying `'merge-engine' = 'first-row'`, users can keep the first row of
the same primary key. It differs from the `deduplicate` merge engine that in
the `first-row` merge engine, it will generate insert only changelog.
+
+{{< hint info >}}
+For streaming queries, `first-row` merge engine must be used together with
`lookup` or `full-compaction` [changelog producer]({{< ref
"concepts/primary-key-table#changelog-producers" >}}).
+{{< /hint >}}
+
+{{< hint info >}}
+Currently, only the first row of insert order supported, so you can not
specify `sequence.field` for this merge engine. And also not accept `DELETE`
and `UPDATE_BEFORE` message.
+{{< /hint>}}
+
## Changelog Producers
Streaming queries will continuously produce the latest changes. These changes
can come from the underlying table files or from an [external log system]({{<
ref "concepts/external-log-systems" >}}) like Kafka. Compared to the external
log system, changes from table files have lower cost but higher latency
(depending on how often snapshots are created).
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index d5a09f48a..01b22cbab 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -264,7 +264,7 @@ under the License.
<td><h5>merge-engine</h5></td>
<td style="word-wrap: break-word;">deduplicate</td>
<td><p>Enum</p></td>
- <td>Specify the merge engine for table with primary key.<br /><br
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last
row.</li><li>"partial-update": Partial update non-null
fields.</li><li>"aggregation": Aggregate fields with same primary
key.</li></ul></td>
+ <td>Specify the merge engine for table with primary key.<br /><br
/>Possible values:<ul><li>"deduplicate": De-duplicate and keep the last
row.</li><li>"partial-update": Partial update non-null
fields.</li><li>"aggregation": Aggregate fields with same primary
key.</li><li>"first-row": De-duplicate and keep the first row.</li></ul></td>
</tr>
<tr>
<td><h5>metadata.stats-mode</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 084cf45df..85bb06460 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1218,7 +1218,9 @@ public class CoreOptions implements Serializable {
PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),
- AGGREGATE("aggregation", "Aggregate fields with same primary key.");
+ AGGREGATE("aggregation", "Aggregate fields with same primary key."),
+
+ FIRST_ROW("first-row", "De-duplicate and keep the first row.");
private final String value;
private final String description;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
new file mode 100644
index 000000000..f8c1c4403
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -0,0 +1,87 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.mergetree.compact;
+
+import org.apache.paimon.KeyValue;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link MergeFunction} where key is primary key (unique) and value is the
full record, only keep
+ * the first one.
+ */
+public class FirstRowMergeFunction implements MergeFunction<KeyValue> {
+
+ private final InternalRowSerializer keySerializer;
+ private final InternalRowSerializer valueSerializer;
+ private KeyValue first;
+
+ protected FirstRowMergeFunction(RowType keyType, RowType valueType) {
+ this.keySerializer = new InternalRowSerializer(keyType);
+ this.valueSerializer = new InternalRowSerializer(valueType);
+ }
+
+ @Override
+ public void reset() {
+ this.first = null;
+ }
+
+ @Override
+ public void add(KeyValue kv) {
+ RowKind rowKind = kv.valueKind();
+ Preconditions.checkArgument(
+ rowKind.isAdd(), "First row merge engine don't accept %s
message", rowKind);
+ if (first == null) {
+ this.first = kv.copy(keySerializer, valueSerializer);
+ }
+ }
+
+ @Nullable
+ @Override
+ public KeyValue getResult() {
+ return first;
+ }
+
+ public static MergeFunctionFactory<KeyValue> factory(RowType keyType,
RowType valueType) {
+ return new FirstRowMergeFunction.Factory(keyType, valueType);
+ }
+
+ private static class Factory implements MergeFunctionFactory<KeyValue> {
+
+ private static final long serialVersionUID = 1L;
+ private final RowType keyType;
+ private final RowType valueType;
+
+ public Factory(RowType keyType, RowType valueType) {
+ this.keyType = keyType;
+ this.valueType = valueType;
+ }
+
+ @Override
+ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
+ return new FirstRowMergeFunction(keyType, valueType);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
index 0b1e41450..86eb16698 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapper.java
@@ -41,6 +41,7 @@ public class FullChangelogMergeFunctionWrapper implements
MergeFunctionWrapper<C
private final int maxLevel;
private final RecordEqualiser valueEqualiser;
private final boolean changelogRowDeduplicate;
+ private final boolean isFirstRow;
// only full compaction will write files into maxLevel, see
UniversalCompaction class
private KeyValue topLevelKv;
@@ -61,6 +62,7 @@ public class FullChangelogMergeFunctionWrapper implements
MergeFunctionWrapper<C
"Value count merge function does not need to produce changelog
from full compaction. "
+ "Please set changelog producer to 'input'.");
this.mergeFunction = mergeFunction;
+ this.isFirstRow = mergeFunction instanceof FirstRowMergeFunction;
this.maxLevel = maxLevel;
this.valueEqualiser = valueEqualiser;
this.changelogRowDeduplicate = changelogRowDeduplicate;
@@ -108,6 +110,10 @@ public class FullChangelogMergeFunctionWrapper implements
MergeFunctionWrapper<C
reusedResult.addChangelog(replace(reusedAfter,
RowKind.INSERT, merged));
}
} else {
+ // For first row, we should just return old value. And produce
no changelog.
+ if (isFirstRow) {
+ return reusedResult.setResultIfNotRetract(merged);
+ }
if (merged == null || !isAdd(merged)) {
reusedResult.addChangelog(replace(reusedBefore,
RowKind.DELETE, topLevelKv));
} else if (!changelogRowDeduplicate
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index a53091a3f..e862789b2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -53,6 +53,7 @@ public class LookupChangelogMergeFunctionWrapper implements
MergeFunctionWrapper
private final KeyValue reusedAfter = new KeyValue();
private final RecordEqualiser valueEqualiser;
private final boolean changelogRowDeduplicate;
+ private final boolean isFirstRow;
public LookupChangelogMergeFunctionWrapper(
MergeFunctionFactory<KeyValue> mergeFunctionFactory,
@@ -65,6 +66,7 @@ public class LookupChangelogMergeFunctionWrapper implements
MergeFunctionWrapper
"Merge function should be a LookupMergeFunction, but is %s,
there is a bug.",
mergeFunction.getClass().getName());
this.mergeFunction = (LookupMergeFunction) mergeFunction;
+ this.isFirstRow = this.mergeFunction.isFirstRow;
this.mergeFunction2 = mergeFunctionFactory.create();
this.lookup = lookup;
this.valueEqualiser = valueEqualiser;
@@ -97,18 +99,24 @@ public class LookupChangelogMergeFunctionWrapper implements
MergeFunctionWrapper
// 2. With level 0, with the latest high level, return changelog
if (highLevel != null) {
- setChangelog(highLevel, result);
+ // For first row, we should just return old value. And produce no
changelog.
+ if (!isFirstRow) {
+ setChangelog(highLevel, result);
+ }
return reusedResult.setResult(result);
}
// 3. Lookup to find the latest high level record
highLevel = lookup.apply(result.key());
+
if (highLevel != null) {
mergeFunction2.reset();
mergeFunction2.add(highLevel);
mergeFunction2.add(result);
result = mergeFunction2.getResult();
- setChangelog(highLevel, result);
+ if (!isFirstRow) {
+ setChangelog(highLevel, result);
+ }
} else {
setChangelog(null, result);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
index 7d3d8c050..fe8056e4c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeFunction.java
@@ -41,10 +41,12 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
KeyValue highLevel;
boolean containLevel0;
+ protected final boolean isFirstRow;
public LookupMergeFunction(
MergeFunction<KeyValue> mergeFunction, RowType keyType, RowType
valueType) {
this.mergeFunction = mergeFunction;
+ this.isFirstRow = mergeFunction instanceof FirstRowMergeFunction;
this.keySerializer = new InternalRowSerializer(keyType);
this.valueSerializer = new InternalRowSerializer(valueType);
}
@@ -64,7 +66,9 @@ public class LookupMergeFunction implements
MergeFunction<KeyValue> {
@Override
public KeyValue getResult() {
// 1. Find the latest high level record
- Iterator<KeyValue> descending = candidates.descendingIterator();
+ // For the first row, the candidates should in the highest level.
+ Iterator<KeyValue> descending =
+ isFirstRow ? candidates.iterator() :
candidates.descendingIterator();
while (descending.hasNext()) {
KeyValue kv = descending.next();
if (kv.level() > 0) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index df2d2c11a..72ea13649 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -99,7 +99,8 @@ public class SchemaValidation {
+ " should not be larger than "
+ SNAPSHOT_NUM_RETAINED_MAX.key());
- // Only changelog tables with primary keys support full compaction or
lookup changelog
+ // Only changelog tables with primary keys support full compaction or
lookup
+ // changelog
// producer
if (options.writeMode() == WriteMode.CHANGE_LOG) {
switch (options.changelogProducer()) {
@@ -170,6 +171,12 @@ public class SchemaValidation {
schema.fieldNames().contains(field),
"Nonexistent sequence field: '%s'",
field));
+
+ CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
+ if (mergeEngine == CoreOptions.MergeEngine.FIRST_ROW &&
sequenceField.isPresent()) {
+ throw new IllegalArgumentException(
+ "Do not support use sequence field on FIRST_MERGE merge
engine");
+ }
}
private static void validatePrimaryKeysType(List<DataField> fields,
List<String> primaryKeys) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 512354f0f..8c764bca4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -28,6 +28,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
import org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction;
@@ -95,9 +96,10 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
Options conf = Options.fromMap(tableSchema.options());
CoreOptions options = new CoreOptions(conf);
CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
- MergeFunctionFactory<KeyValue> mfFactory;
KeyValueFieldsExtractor extractor =
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
+ MergeFunctionFactory<KeyValue> mfFactory;
+
switch (mergeEngine) {
case DEDUPLICATE:
mfFactory = DeduplicateMergeFunction.factory();
@@ -113,6 +115,11 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
rowType.getFieldTypes(),
tableSchema.primaryKeys());
break;
+ case FIRST_ROW:
+ mfFactory =
+ FirstRowMergeFunction.factory(
+ new
RowType(extractor.keyFields(tableSchema)), rowType);
+ break;
default:
throw new UnsupportedOperationException(
"Unsupported merge engine: " + mergeEngine);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index 0e51dbe8e..d415f17c8 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.mergetree.compact.FirstRowMergeFunction;
import org.apache.paimon.mergetree.compact.LookupMergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
@@ -40,6 +41,7 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ReusingKeyValue;
import org.apache.paimon.utils.ReusingTestData;
+import org.assertj.core.util.Lists;
import org.junit.jupiter.api.Test;
import java.io.EOFException;
@@ -266,4 +268,26 @@ public abstract class SortBufferWriteBufferTestBase {
.create();
}
}
+
+ /** Test for {@link SortBufferWriteBuffer} with {@link
FirstRowMergeFunction}. */
+ public static class WithFirstRowMergeFunctionTest extends
SortBufferWriteBufferTestBase {
+
+ @Override
+ protected boolean addOnly() {
+ return true;
+ }
+
+ @Override
+ protected List<ReusingTestData> getExpected(List<ReusingTestData>
input) {
+ return MergeFunctionTestUtils.getExpectedForFirstRow(input);
+ }
+
+ @Override
+ protected MergeFunction<KeyValue> createMergeFunction() {
+ return FirstRowMergeFunction.factory(
+ new RowType(Lists.list(new DataField(0, "f0", new
IntType()))),
+ new RowType(Lists.list(new DataField(1, "f1", new
BigIntType()))))
+ .create();
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
index 0d4dce736..bddd37111 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/FullChangelogMergeFunctionWrapperTestBase.java
@@ -20,7 +20,9 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -46,6 +48,10 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
protected abstract boolean changelogRowDeduplicate();
+ protected List<List<KeyValue>> getInputKvs() {
+ return INPUT_KVS;
+ }
+
@BeforeEach
public void beforeEach() {
wrapper =
@@ -107,9 +113,10 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
@Test
public void testFullChangelogMergeFunctionWrapper() {
- for (int i = 0; i < INPUT_KVS.size(); i++) {
+ List<List<KeyValue>> inputs = getInputKvs();
+ for (int i = 0; i < inputs.size(); i++) {
wrapper.reset();
- List<KeyValue> kvs = INPUT_KVS.get(i);
+ List<KeyValue> kvs = inputs.get(i);
kvs.forEach(kv -> wrapper.add(kv));
ChangelogResult actualResult = wrapper.getResult();
List<KeyValue> expectedChangelogs = new ArrayList<>();
@@ -215,4 +222,113 @@ public abstract class
FullChangelogMergeFunctionWrapperTestBase {
return true;
}
}
+
+ /** Test for {@link FirstRowMergeFunction} with {@link
FullChangelogMergeFunctionWrapper}. */
+ public static class FirstRowMergeFunctionTest
+ extends FullChangelogMergeFunctionWrapperTestBase {
+
+ private static final List<List<KeyValue>> INPUT_KVS =
+ Arrays.asList(
+ // only 1 insert record, not from top level
+ Collections.singletonList(
+ new KeyValue()
+ .replace(row(1), 1, RowKind.INSERT,
row(1))
+ .setLevel(0)),
+ // only 1 delete record, not from top level
+ Collections.singletonList(
+ new KeyValue()
+ .replace(row(2), 2, RowKind.DELETE,
row(0))
+ .setLevel(0)),
+ // only 1 insert record, from top level
+ Collections.singletonList(
+ new KeyValue()
+ .replace(row(3), 3, RowKind.INSERT,
row(3))
+ .setLevel(MAX_LEVEL)),
+ // multiple records, none from top level
+ Arrays.asList(
+ new KeyValue()
+ .replace(row(4), 4, RowKind.INSERT,
row(3))
+ .setLevel(0),
+ new KeyValue()
+ .replace(row(4), 5, RowKind.INSERT,
row(-3))
+ .setLevel(0)),
+ // multiple records, one from top level
+ Arrays.asList(
+ new KeyValue()
+ .replace(row(6), 8, RowKind.INSERT,
row(3))
+ .setLevel(MAX_LEVEL),
+ new KeyValue()
+ .replace(row(6), 9, RowKind.INSERT,
row(-3))
+ .setLevel(0)),
+ Arrays.asList(
+ new KeyValue()
+ .replace(row(8), 14, RowKind.INSERT,
row(3))
+ .setLevel(MAX_LEVEL),
+ new KeyValue()
+ .replace(row(8), 15, RowKind.INSERT,
row(3))
+ .setLevel(0)));
+
+ @Override
+ protected List<List<KeyValue>> getInputKvs() {
+ return INPUT_KVS;
+ }
+
+ private final List<KeyValue> expectedBefore =
+ Arrays.asList(
+ null,
+ null,
+ null,
+ null,
+ null,
+ changelogRowDeduplicate()
+ ? null
+ : new KeyValue()
+ .replace(row(8), 14,
RowKind.UPDATE_BEFORE, row(3)));
+
+ private final List<KeyValue> expectedAfter =
+ Arrays.asList(
+ new KeyValue().replace(row(1), 1, RowKind.INSERT,
row(1)),
+ null,
+ null,
+ new KeyValue().replace(row(4), 4, RowKind.INSERT,
row(3)),
+ null,
+ changelogRowDeduplicate()
+ ? null
+ : new KeyValue().replace(row(8), 15,
RowKind.UPDATE_AFTER, row(3)));
+
+ private final List<KeyValue> expectedResult =
+ Arrays.asList(
+ new KeyValue().replace(row(1), 1, RowKind.INSERT,
row(1)),
+ null,
+ new KeyValue().replace(row(3), 3, RowKind.INSERT,
row(3)),
+ new KeyValue().replace(row(4), 4, RowKind.INSERT,
row(3)),
+ new KeyValue().replace(row(6), 8, RowKind.INSERT,
row(3)),
+ new KeyValue().replace(row(8), 14, RowKind.INSERT,
row(3)));
+
+ @Override
+ protected MergeFunction<KeyValue> createMergeFunction() {
+ return new FirstRowMergeFunction(
+ RowType.of(DataTypes.INT()), RowType.of(DataTypes.INT()));
+ }
+
+ @Override
+ protected boolean changelogRowDeduplicate() {
+ return true;
+ }
+
+ @Override
+ protected KeyValue getExpectedBefore(int idx) {
+ return expectedBefore.get(idx);
+ }
+
+ @Override
+ protected KeyValue getExpectedAfter(int idx) {
+ return expectedAfter.get(idx);
+ }
+
+ @Override
+ protected KeyValue getExpectedResult(int idx) {
+ return expectedResult.get(idx);
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index a468bbef5..892446da6 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -25,9 +25,13 @@ import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
+import org.assertj.core.util.Lists;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -200,7 +204,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
@ParameterizedTest
@ValueSource(booleans = {false, true})
- public void testSum(boolean changlogRowDeduplicate) {
+ public void testSum(boolean changelogRowDeduplicate) {
LookupChangelogMergeFunctionWrapper function =
new LookupChangelogMergeFunctionWrapper(
LookupMergeFunction.wrap(
@@ -216,7 +220,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
RowType.of(DataTypes.INT())),
key -> null,
EQUALISER,
- changlogRowDeduplicate);
+ changelogRowDeduplicate);
// Without level-0
function.reset();
@@ -270,7 +274,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
result = function.getResult();
assertThat(result).isNotNull();
changelogs = result.changelogs();
- if (changlogRowDeduplicate) {
+ if (changelogRowDeduplicate) {
assertThat(changelogs).isEmpty();
} else {
assertThat(changelogs).hasSize(2);
@@ -283,4 +287,89 @@ public class LookupChangelogMergeFunctionWrapperTest {
assertThat(kv).isNotNull();
assertThat(kv.value().getInt(0)).isEqualTo(2);
}
+
+ @Test
+ public void testFirstRow() {
+ Map<InternalRow, KeyValue> highLevel = new HashMap<>();
+ LookupChangelogMergeFunctionWrapper function =
+ new LookupChangelogMergeFunctionWrapper(
+ LookupMergeFunction.wrap(
+ projection ->
+ new FirstRowMergeFunction(
+ new RowType(
+ Lists.list(
+ new DataField(
+ 0,
"f0", new IntType()))),
+ new RowType(
+ Lists.list(
+ new DataField(
+ 1,
"f1", new IntType())))),
+ RowType.of(DataTypes.INT()),
+ RowType.of(DataTypes.INT())),
+ highLevel::get,
+ EQUALISER,
+ false);
+
+ // Without level-0
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(2)).setLevel(1));
+ ChangelogResult result = function.getResult();
+ assertThat(result).isNotNull();
+ assertThat(result.changelogs()).isEmpty();
+ KeyValue kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(1);
+
+ // With level-0 record, with level-x (x > 0) record
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(1));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(2)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ List<KeyValue> changelogs = result.changelogs();
+ assertThat(changelogs).isEmpty();
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(1);
+
+ // With level-0 record, with multiple level-x (x > 0) record
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 1, INSERT,
row(1)).setLevel(3));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(1)).setLevel(2));
+ function.add(new KeyValue().replace(row(1), 3, INSERT,
row(2)).setLevel(1));
+ function.add(new KeyValue().replace(row(1), 4, INSERT,
row(2)).setLevel(0));
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ changelogs = result.changelogs();
+ assertThat(changelogs).isEmpty();
+ assertThat(kv.value().getInt(0)).isEqualTo(1);
+
+ // Without high level value
+ function.reset();
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(0)).setLevel(0));
+
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ changelogs = result.changelogs();
+ assertThat(changelogs).hasSize(1);
+ assertThat(changelogs.get(0).valueKind()).isEqualTo(INSERT);
+ assertThat(changelogs.get(0).value().getInt(0)).isEqualTo(0);
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(0);
+
+ // with high level value
+ function.reset();
+ highLevel.put(row(1), new KeyValue().replace(row(1), INSERT, row(10)));
+ function.add(new KeyValue().replace(row(1), 2, INSERT,
row(0)).setLevel(0));
+
+ result = function.getResult();
+ assertThat(result).isNotNull();
+ changelogs = result.changelogs();
+ assertThat(changelogs).hasSize(0);
+ kv = result.result();
+ assertThat(kv).isNotNull();
+ assertThat(kv.value().getInt(0)).isEqualTo(10);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
index 4b9da284c..53e03d4d2 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/MergeFunctionTestUtils.java
@@ -121,6 +121,20 @@ public class MergeFunctionTestUtils {
return expected;
}
+ public static List<ReusingTestData>
getExpectedForFirstRow(List<ReusingTestData> input) {
+ input = new ArrayList<>(input);
+ Collections.sort(input);
+
+ List<ReusingTestData> expected = new ArrayList<>();
+ for (int i = 0; i < input.size(); i++) {
+ if (i == 0 || input.get(i).key != input.get(i - 1).key) {
+ expected.add(input.get(i));
+ }
+ }
+
+ return expected;
+ }
+
public static void assertKvsEquals(List<KeyValue> expected, List<KeyValue>
actual) {
assertThat(actual).hasSize(expected.size());
for (int i = 0; i < actual.size(); i++) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index e19b8113e..a12c5d152 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -22,9 +22,14 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.SortEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ReusingTestData;
import org.apache.paimon.utils.TestReusingRecordReader;
+import org.assertj.core.util.Lists;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -139,4 +144,25 @@ public abstract class SortMergeReaderTestBase extends
CombiningRecordReaderTestB
runTest(parseData("1, 2, +, 100", "1, 1, +, -100"), sortEngine);
}
}
+
+ /** Test for {@link SortMergeReader} with {@link FirstRowMergeFunction}. */
+ public static class WithFirstRowMergeFunctionTest extends
SortMergeReaderTestBase {
+
+ @Override
+ protected boolean addOnly() {
+ return true;
+ }
+
+ @Override
+ protected List<ReusingTestData> getExpected(List<ReusingTestData>
input) {
+ return MergeFunctionTestUtils.getExpectedForFirstRow(input);
+ }
+
+ @Override
+ protected MergeFunction<KeyValue> createMergeFunction() {
+ return new FirstRowMergeFunction(
+ new RowType(Lists.list(new DataField(0, "f0", new
IntType()))),
+ new RowType(Lists.list(new DataField(1, "f1", new
BigIntType()))));
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index accc39953..d9f72d335 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -139,6 +139,10 @@ public class DataTableSource extends FlinkTableSource {
} else if (table instanceof ChangelogWithKeyFileStoreTable) {
Options options = Options.fromMap(table.options());
+ if (new CoreOptions(options).mergeEngine() ==
CoreOptions.MergeEngine.FIRST_ROW) {
+ return ChangelogMode.insertOnly();
+ }
+
if (options.get(LOG_SCAN_REMOVE_NORMALIZE)) {
return ChangelogMode.all();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
index 7d4911d3e..b9c6bb912 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -35,6 +35,7 @@ public class TableScanUtils {
{
put(CoreOptions.MergeEngine.PARTIAL_UPDATE, "Partial
update");
put(CoreOptions.MergeEngine.AGGREGATE,
"Pre-aggregate");
+ put(CoreOptions.MergeEngine.FIRST_ROW, "First row");
}
};
if (table.primaryKeys().size() > 0 &&
mergeEngineDesc.containsKey(mergeEngine)) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
new file mode 100644
index 000000000..cd625b5ac
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.utils.BlockingIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for first row merge engine. */
+public class FirstRowITCase extends CatalogITCaseBase {
+
+ @Override
+ protected List<String> ddl() {
+ return Arrays.asList(
+ "CREATE TABLE IF NOT EXISTS T ("
+ + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT
ENFORCED)"
+ + " WITH ('merge-engine'='first-row');",
+ "CREATE TABLE IF NOT EXISTS T1 ("
+ + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT
ENFORCED)"
+ + " WITH ('merge-engine'='first-row',
'changelog-producer' = 'lookup');",
+ "CREATE TABLE IF NOT EXISTS T2 ("
+ + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT
ENFORCED)"
+ + " WITH ('merge-engine'='first-row',
'changelog-producer' = 'full-compaction', 'full-compaction.delta-commits' =
'3');");
+ }
+
+ @Test
+ public void testBatchQuery() {
+ batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
+ List<Row> result = batchSql("SELECT * FROM T");
+
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1,
"1"));
+
+ result = batchSql("SELECT c FROM T");
+
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1"));
+ }
+
+ @Test
+ public void testReadAfterFullCompaction() {
+ batchSql("ALTER TABLE T SET ('full-compaction.delta-commits'='1')");
+
+ batchSql("INSERT INTO T VALUES (1, 1, '1'), (1, 2, '2')");
+ List<Row> result = batchSql("SELECT * FROM T");
+
assertThat(result).containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 1, 1,
"1"));
+
+ batchSql("INSERT INTO T VALUES (1, 1, '1'), (2, 2, '2')");
+ result = batchSql("SELECT * FROM T");
+ assertThat(result)
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, 1, "1"),
+ Row.ofKind(RowKind.INSERT, 2, 2, "2"));
+ }
+
+ @Test
+ public void testStreamingReadOnFullCompaction() throws Exception {
+ BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM T2");
+
+ sql("INSERT INTO T2 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (1,
4, '4')");
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, 1, "1"),
+ Row.ofKind(RowKind.INSERT, 2, 2, "2"));
+
+ sql("INSERT INTO T2 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (3,
3, '3')");
+ assertThat(iterator.collect(1))
+ .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 3, 3,
"3"));
+ }
+
+ @Test
+ public void testStreamingReadOnLookup() throws Exception {
+ BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM T1");
+
+ sql("INSERT INTO T1 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (1,
4, '4')");
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, 1, "1"),
+ Row.ofKind(RowKind.INSERT, 2, 2, "2"));
+
+ sql("INSERT INTO T1 VALUES(1, 1, '1'), (2, 2, '2'), (1, 3, '3'), (3,
3, '3')");
+ assertThat(iterator.collect(1))
+ .containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, 3, 3,
"3"));
+ }
+}