This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 558efab32 [core] Partial update supports retraction with sequence 
group (#1722)
558efab32 is described below

commit 558efab32037d3493235c3d565af6a7764aeea99
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Aug 3 15:09:11 2023 +0800

    [core] Partial update supports retraction with sequence group (#1722)
    
    * [core] Partial update supports retraction with sequence group
---
 docs/content/concepts/primary-key-table.md         |  4 ++-
 .../compact/PartialUpdateMergeFunction.java        | 36 ++++++++++++++++++++--
 .../compact/PartialUpdateMergeFunctionTest.java    | 17 ++++++++--
 3 files changed, 52 insertions(+), 5 deletions(-)

diff --git a/docs/content/concepts/primary-key-table.md 
b/docs/content/concepts/primary-key-table.md
index c21e280b8..9218178be 100644
--- a/docs/content/concepts/primary-key-table.md
+++ b/docs/content/concepts/primary-key-table.md
@@ -80,7 +80,9 @@ For streaming queries, `partial-update` merge engine must be 
used together with
 {{< /hint >}}
 
 {{< hint info >}}
-Partial cannot receive `DELETE` messages because the behavior cannot be 
defined. You can configure `partial-update.ignore-delete` to ignore `DELETE` 
messages.
+By default, Partial update can not accept delete records, you can choose one 
of the following solutions:
+- Configure 'partial-update.ignore-delete' to ignore delete records.
+- Configure 'sequence-group's to retract partial columns.
 {{< /hint >}}
 
 #### Sequence Group
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index d99d96586..540391369 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -89,13 +89,24 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                 return;
             }
 
+            if (fieldSequences.size() > 1) {
+                retractWithSequenceGroup(kv);
+                return;
+            }
+
             if (kv.valueKind() == RowKind.UPDATE_BEFORE) {
                 throw new IllegalArgumentException(
                         "Partial update can not accept update_before records, 
it is a bug.");
             }
 
-            throw new IllegalArgumentException(
-                    "Partial update can not accept delete records. Partial 
delete is not supported!");
+            String msg =
+                    String.join(
+                            "By default, Partial update can not accept delete 
records,"
+                                    + " you can choose one of the following 
solutions:",
+                            "1. Configure 'partial-update.ignore-delete' to 
ignore delete records.",
+                            "2. Configure 'sequence-group's to retract partial 
columns.");
+
+            throw new IllegalArgumentException(msg);
         }
 
         latestSequenceNumber = kv.sequenceNumber();
@@ -136,6 +147,27 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         }
     }
 
+    private void retractWithSequenceGroup(KeyValue kv) {
+        for (int i = 0; i < getters.length; i++) {
+            SequenceGenerator sequenceGen = fieldSequences.get(i);
+            if (sequenceGen != null) {
+                Long currentSeq = sequenceGen.generateNullable(kv.value());
+                if (currentSeq != null) {
+                    Long previousSeq = sequenceGen.generateNullable(row);
+                    if (previousSeq == null || currentSeq >= previousSeq) {
+                        if (sequenceGen.index() == i) {
+                            // update sequence field
+                            row.setField(i, 
getters[i].getFieldOrNull(kv.value()));
+                        } else {
+                            // retract normal field
+                            row.setField(i, null);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
     @Override
     @Nullable
     public KeyValue getResult() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
index 68b274676..113aaac10 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunctionTest.java
@@ -78,6 +78,16 @@ public class PartialUpdateMergeFunctionTest {
         validate(func, 1, 2, 2, 2, 1, 1, 1);
         add(func, 1, 3, 3, 1, 3, 3, 3);
         validate(func, 1, 2, 2, 2, 3, 3, 3);
+
+        // delete
+        add(func, RowKind.DELETE, 1, 1, 1, 3, 1, 1, null);
+        validate(func, 1, null, null, 3, 3, 3, 3);
+        add(func, RowKind.DELETE, 1, 1, 1, 3, 1, 1, 4);
+        validate(func, 1, null, null, 3, null, null, 4);
+        add(func, 1, 4, 4, 4, 5, 5, 5);
+        validate(func, 1, 4, 4, 4, 5, 5, 5);
+        add(func, RowKind.DELETE, 1, 1, 1, 6, 1, 1, 6);
+        validate(func, 1, null, null, 6, null, null, 6);
     }
 
     @Test
@@ -283,9 +293,12 @@ public class PartialUpdateMergeFunctionTest {
     }
 
     private void add(MergeFunction<KeyValue> function, Integer... f) {
+        add(function, RowKind.INSERT, f);
+    }
+
+    private void add(MergeFunction<KeyValue> function, RowKind rowKind, 
Integer... f) {
         function.add(
-                new KeyValue()
-                        .replace(GenericRow.of(1), sequence++, RowKind.INSERT, 
GenericRow.of(f)));
+                new KeyValue().replace(GenericRow.of(1), sequence++, rowKind, 
GenericRow.of(f)));
     }
 
     private void validate(MergeFunction<KeyValue> function, Integer... f) {

Reply via email to