This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 558efab32 [core] Partial update supports retraction with sequence
group (#1722)
558efab32 is described below
commit 558efab32037d3493235c3d565af6a7764aeea99
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Aug 3 15:09:11 2023 +0800
[core] Partial update supports retraction with sequence group (#1722)
* [core] Partial update supports retraction with sequence group
---
docs/content/concepts/primary-key-table.md | 4 ++-
.../compact/PartialUpdateMergeFunction.java | 36 ++++++++++++++++++++--
.../compact/PartialUpdateMergeFunctionTest.java | 17 ++++++++--
3 files changed, 52 insertions(+), 5 deletions(-)
diff --git a/docs/content/concepts/primary-key-table.md
b/docs/content/concepts/primary-key-table.md
index c21e280b8..9218178be 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -80,7 +80,9 @@ For streaming queries, `partial-update` merge engine must be
used together with
{{< /hint >}}
{{< hint info >}}
-Partial cannot receive `DELETE` messages because the behavior cannot be
defined. You can configure `partial-update.ignore-delete` to ignore `DELETE`
messages.
+By default, Partial update can not accept delete records, you can choose one
of the following solutions:
+- Configure 'partial-update.ignore-delete' to ignore delete records.
+- Configure 'sequence-group's to retract partial columns.
{{< /hint >}}
#### Sequence Group
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index d99d96586..540391369 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -89,13 +89,24 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
return;
}
+ if (fieldSequences.size() > 1) {
+ retractWithSequenceGroup(kv);
+ return;
+ }
+
if (kv.valueKind() == RowKind.UPDATE_BEFORE) {
throw new IllegalArgumentException(
"Partial update can not accept update_before records,
it is a bug.");
}
- throw new IllegalArgumentException(
- "Partial update can not accept delete records. Partial
delete is not supported!");
+ String msg =
+ String.join(
+ "By default, Partial update can not accept delete
records,"
+ + " you can choose one of the following
solutions:",
+ "1. Configure 'partial-update.ignore-delete' to
ignore delete records.",
+ "2. Configure 'sequence-group's to retract partial
columns.");
+
+ throw new IllegalArgumentException(msg);
}
latestSequenceNumber = kv.sequenceNumber();
@@ -136,6 +147,27 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
}
}
+ private void retractWithSequenceGroup(KeyValue kv) {
+ for (int i = 0; i < getters.length; i++) {
+ SequenceGenerator sequenceGen = fieldSequences.get(i);
+ if (sequenceGen != null) {
+ Long currentSeq = sequenceGen.generateNullable(kv.value());
+ if (currentSeq != null) {
+ Long previousSeq = sequenceGen.generateNullable(row);
+ if (previousSeq == null || currentSeq >= previousSeq) {
+ if (sequenceGen.index() == i) {
+ // update sequence field
+ row.setField(i,
getters[i].getFieldOrNull(kv.value()));
+ } else {
+ // retract normal field
+ row.setField(i, null);
+ }
+ }
+ }
+ }
+ }
+ }
+
@Override
@Nullable
public KeyValue getResult() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index 68b274676..113aaac10 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -78,6 +78,16 @@ public class PartialUpdateMergeFunctionTest {
validate(func, 1, 2, 2, 2, 1, 1, 1);
add(func, 1, 3, 3, 1, 3, 3, 3);
validate(func, 1, 2, 2, 2, 3, 3, 3);
+
+ // delete
+ add(func, RowKind.DELETE, 1, 1, 1, 3, 1, 1, null);
+ validate(func, 1, null, null, 3, 3, 3, 3);
+ add(func, RowKind.DELETE, 1, 1, 1, 3, 1, 1, 4);
+ validate(func, 1, null, null, 3, null, null, 4);
+ add(func, 1, 4, 4, 4, 5, 5, 5);
+ validate(func, 1, 4, 4, 4, 5, 5, 5);
+ add(func, RowKind.DELETE, 1, 1, 1, 6, 1, 1, 6);
+ validate(func, 1, null, null, 6, null, null, 6);
}
@Test
@@ -283,9 +293,12 @@ public class PartialUpdateMergeFunctionTest {
}
private void add(MergeFunction<KeyValue> function, Integer... f) {
+ add(function, RowKind.INSERT, f);
+ }
+
+ private void add(MergeFunction<KeyValue> function, RowKind rowKind,
Integer... f) {
function.add(
- new KeyValue()
- .replace(GenericRow.of(1), sequence++, RowKind.INSERT,
GenericRow.of(f)));
+ new KeyValue().replace(GenericRow.of(1), sequence++, rowKind,
GenericRow.of(f)));
}
private void validate(MergeFunction<KeyValue> function, Integer... f) {