This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fc22edc03 [core] Support to ignore delete/update_before records in
first_row merge engine (#2349)
fc22edc03 is described below
commit fc22edc0331440765e30d91d16b97075f33dd938
Author: Aitozi <[email protected]>
AuthorDate: Tue Nov 21 11:26:54 2023 +0800
[core] Support to ignore delete/update_before records in first_row merge
engine (#2349)
---
docs/content/concepts/primary-key-table.md | 5 +++-
.../shortcodes/generated/core_configuration.html | 6 ++++
.../main/java/org/apache/paimon/CoreOptions.java | 6 ++++
.../mergetree/compact/FirstRowMergeFunction.java | 34 +++++++++++++++-------
.../apache/paimon/table/PrimaryKeyTableUtils.java | 2 +-
.../mergetree/SortBufferWriteBufferTestBase.java | 3 +-
.../LookupChangelogMergeFunctionWrapperTest.java | 3 +-
.../mergetree/compact/SortMergeReaderTestBase.java | 2 +-
.../org/apache/paimon/flink/FirstRowITCase.java | 29 ++++++++++++++++++
9 files changed, 75 insertions(+), 15 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index 85326dcac..1d62887ae 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -300,9 +300,12 @@ This is an experimental feature.
By specifying `'merge-engine' = 'first-row'`, users can keep the first row of
the same primary key. It differs from the
`deduplicate` merge engine that in the `first-row` merge engine, it will
generate insert only changelog.
+{{< hint info >}}
1. `first-row` merge engine must be used together with `lookup` [changelog
producer]({{< ref "concepts/primary-key-table#changelog-producers" >}}).
2. You can not specify `sequence.field`.
-3. Not accept `DELETE` and `UPDATE_BEFORE` message.
+3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config
`first-row.ignore-delete` to ignore these two kinds records.
+{{< /hint >}}
+
This is of great help in replacing log deduplication in streaming computation.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 96ceb92fd..1450ded16 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -182,6 +182,12 @@ under the License.
<td>Map</td>
<td>Define different file format for different level, you can add
the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file
format for level is not provided, the default format which set by `file.format`
will be used.</td>
</tr>
+ <tr>
+ <td><h5>first-row.ignore-delete</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to ignore delete records in first-row mode.</td>
+ </tr>
<tr>
<td><h5>full-compaction.delta-commits</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 358c370eb..cbc16cc75 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -232,6 +232,12 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether to ignore delete records in
partial-update mode.");
+ public static final ConfigOption<Boolean> FIRST_ROW_IGNORE_DELETE =
+ key("first-row.ignore-delete")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to ignore delete records in
first-row mode.");
+
public static final ConfigOption<SortEngine> SORT_ENGINE =
key("sort-engine")
.enumType(SortEngine.class)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
index f8c1c4403..34b9bc74a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -20,11 +20,11 @@
package org.apache.paimon.mergetree.compact;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.types.RowKind;
+import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
@@ -38,9 +38,12 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
private final InternalRowSerializer valueSerializer;
private KeyValue first;
- protected FirstRowMergeFunction(RowType keyType, RowType valueType) {
+ private final boolean ignoreDelete;
+
+ protected FirstRowMergeFunction(RowType keyType, RowType valueType,
boolean ignoreDelete) {
this.keySerializer = new InternalRowSerializer(keyType);
this.valueSerializer = new InternalRowSerializer(valueType);
+ this.ignoreDelete = ignoreDelete;
}
@Override
@@ -50,9 +53,15 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
@Override
public void add(KeyValue kv) {
- RowKind rowKind = kv.valueKind();
- Preconditions.checkArgument(
- rowKind.isAdd(), "First row merge engine don't accept %s
message", rowKind);
+ if (kv.valueKind().isRetract()) {
+ if (ignoreDelete) {
+ return;
+ } else {
+ throw new IllegalArgumentException(
+ "By default, First row merge engine can not accept
DELETE/UPDATE_BEFORE records.\n"
+ + "You can config 'first-row.ignore-delete' to
ignore the DELETE/UPDATE_BEFORE records.");
+ }
+ }
if (first == null) {
this.first = kv.copy(keySerializer, valueSerializer);
}
@@ -64,8 +73,9 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
return first;
}
- public static MergeFunctionFactory<KeyValue> factory(RowType keyType,
RowType valueType) {
- return new FirstRowMergeFunction.Factory(keyType, valueType);
+ public static MergeFunctionFactory<KeyValue> factory(
+ RowType keyType, RowType valueType, Options options) {
+ return new FirstRowMergeFunction.Factory(keyType, valueType, options);
}
private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -74,14 +84,18 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
private final RowType keyType;
private final RowType valueType;
- public Factory(RowType keyType, RowType valueType) {
+ private final Options options;
+
+ public Factory(RowType keyType, RowType valueType, Options options) {
this.keyType = keyType;
this.valueType = valueType;
+ this.options = options;
}
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- return new FirstRowMergeFunction(keyType, valueType);
+ return new FirstRowMergeFunction(
+ keyType, valueType,
options.get(CoreOptions.FIRST_ROW_IGNORE_DELETE));
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index 8ab18ca4d..727664da2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -75,7 +75,7 @@ public class PrimaryKeyTableUtils {
tableSchema.primaryKeys());
case FIRST_ROW:
return FirstRowMergeFunction.factory(
- new RowType(extractor.keyFields(tableSchema)),
rowType);
+ new RowType(extractor.keyFields(tableSchema)),
rowType, conf);
default:
throw new UnsupportedOperationException("Unsupported merge
engine: " + mergeEngine);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index bdd57777a..b7d76ff9d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -256,7 +256,8 @@ public abstract class SortBufferWriteBufferTestBase {
protected MergeFunction<KeyValue> createMergeFunction() {
return FirstRowMergeFunction.factory(
new RowType(Lists.list(new DataField(0, "f0", new
IntType()))),
- new RowType(Lists.list(new DataField(1, "f1", new
BigIntType()))))
+ new RowType(Lists.list(new DataField(1, "f1", new
BigIntType()))),
+ new Options())
.create();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 5cf976313..07c93e1a7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -300,7 +300,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
new RowType(
Lists.list(new DataField(0,
"f0", new IntType()))),
new RowType(
- Lists.list(new DataField(1,
"f1", new IntType())))),
+ Lists.list(new DataField(1,
"f1", new IntType()))),
+ false),
highLevel::contains);
// Without level-0
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 9782fd8af..876280125 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -129,7 +129,7 @@ public abstract class SortMergeReaderTestBase extends
CombiningRecordReaderTestB
RowType keyType = new RowType(Lists.list(new DataField(0, "f0",
new IntType())));
RowType valueType = new RowType(Lists.list(new DataField(1, "f1",
new BigIntType())));
return new LookupMergeFunction(
- new FirstRowMergeFunction(keyType, valueType), keyType,
valueType);
+ new FirstRowMergeFunction(keyType, valueType, false),
keyType, valueType);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
index 3ed345d52..c4492596f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
@@ -22,6 +22,9 @@ package org.apache.paimon.flink;
import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.Test;
@@ -103,4 +106,30 @@ public class FirstRowITCase extends CatalogITCaseBase {
Row.ofKind(RowKind.INSERT, 1, 1, "1"),
Row.ofKind(RowKind.INSERT, 2, 3, "3"));
}
+
+ @Test
+ public void testIgnoreDelete() {
+ sql(
+ "CREATE TABLE IF NOT EXISTS T1 ("
+ + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT
ENFORCED)"
+ + " WITH ('merge-engine'='first-row',
'first-row.ignore-delete' = 'true',"
+ + " 'changelog-producer' = 'lookup');");
+
+ List<Row> input =
+ ImmutableList.of(
+ Row.ofKind(RowKind.INSERT, 1, 1, "1"),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 1, 1, "1"),
+ Row.ofKind(RowKind.UPDATE_AFTER, 1, 2, "2"));
+
+ String id = TestValuesTableFactory.registerData(input);
+ sql(
+ "CREATE TEMPORARY TABLE source (a INT, b INT, c STRING) WITH "
+ + "('connector'='values', 'bounded'='true',
'data-id'='%s')",
+ id);
+
+ batchSql("INSERT INTO T1 SELECT * FROM source");
+ List<Row> result = batchSql("SELECT * FROM T1");
+
+ assertThat(result).containsExactly(Row.ofKind(RowKind.INSERT, 1, 1,
"1"));
+ }
}