This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ab90c670c [flink] Flink batch delete supports 
partial-update.remove-record-on-sequence-group option (#4861)
1ab90c670c is described below

commit 1ab90c670c8eff9a70e41e0d79b79911da837ca9
Author: yuzelin <[email protected]>
AuthorDate: Wed Jan 8 16:22:00 2025 +0800

    [flink] Flink batch delete supports 
partial-update.remove-record-on-sequence-group option (#4861)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |  4 ---
 .../SupportsRowLevelOperationFlinkTableSink.java   | 37 ++++++++++++++------
 .../apache/paimon/flink/PartialUpdateITCase.java   | 40 +++++++++++++++++++++-
 3 files changed, 66 insertions(+), 15 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 9a59bad356..93bc23a41f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2208,10 +2208,6 @@ public class CoreOptions implements Serializable {
         return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
     }
 
-    public boolean partialUpdateRemoveRecordOnDelete() {
-        return options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
-    }
-
     public Optional<String> rowkindField() {
         return options.getOptional(ROWKIND_FIELD);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 4e4c2ff2c6..c0d19abff2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -58,7 +58,10 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
+import static 
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
+import static 
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP;
 import static org.apache.paimon.CoreOptions.createCommitUser;
+import static 
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Flink table sink that supports row level update and delete. */
@@ -185,17 +188,31 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
                             table.getClass().getName()));
         }
 
-        CoreOptions coreOptions = CoreOptions.fromMap(table.options());
-        if (coreOptions.mergeEngine() == DEDUPLICATE
-                || (coreOptions.mergeEngine() == PARTIAL_UPDATE
-                        && coreOptions.partialUpdateRemoveRecordOnDelete())) {
-            return;
-        }
+        Options options = Options.fromMap(table.options());
+        MergeEngine mergeEngine = options.get(MERGE_ENGINE);
 
-        throw new UnsupportedOperationException(
-                String.format(
-                        "Merge engine %s can not support batch delete.",
-                        coreOptions.mergeEngine()));
+        switch (mergeEngine) {
+            case DEDUPLICATE:
+                return;
+            case PARTIAL_UPDATE:
+                if (options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE)
+                        || 
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP) != null) {
+                    return;
+                } else {
+                    throw new UnsupportedOperationException(
+                            String.format(
+                                    "Merge engine %s doesn't support batch 
delete by default. To support batch delete, "
+                                            + "please set %s to true when 
there is no %s or set %s.",
+                                    mergeEngine,
+                                    
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE.key(),
+                                    SEQUENCE_GROUP,
+                                    
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP));
+                }
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Merge engine %s can not support batch 
delete.", mergeEngine));
+        }
     }
 
     private boolean canPushDownDeleteFilter() {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 76ee8309e8..be2d6b3433 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -623,7 +623,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testRemoveRecordOnDelete() {
+    public void testRemoveRecordOnDeleteWithoutSequenceGroup() {
         sql(
                 "CREATE TABLE remove_record_on_delete (pk INT PRIMARY KEY NOT 
ENFORCED, a STRING, b STRING) WITH ("
                         + " 'merge-engine' = 'partial-update',"
@@ -647,6 +647,44 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
                 .containsExactlyInAnyOrder(Row.of(1, "A", "apache"));
     }
 
+    @Test
+    public void testRemoveRecordOnDeleteWithSequenceGroup() throws Exception {
+        sql(
+                "CREATE TABLE remove_record_on_delete_sequence_group"
+                        + " (pk INT PRIMARY KEY NOT ENFORCED, a STRING, seq_a 
INT, b STRING, seq_b INT) WITH ("
+                        + " 'merge-engine' = 'partial-update',"
+                        + " 'fields.seq_a.sequence-group' = 'a',"
+                        + " 'fields.seq_b.sequence-group' = 'b',"
+                        + " 'partial-update.remove-record-on-sequence-group' = 
'seq_a'"
+                        + ")");
+
+        sql("INSERT INTO remove_record_on_delete_sequence_group VALUES (1, 
'apple', 2, 'a', 1)");
+        sql("INSERT INTO remove_record_on_delete_sequence_group VALUES (1, 
'banana', 1, 'b', 2)");
+        assertThat(sql("SELECT * FROM remove_record_on_delete_sequence_group"))
+                .containsExactlyInAnyOrder(Row.of(1, "apple", 2, "b", 2));
+
+        // delete with seq_b won't delete record but retract b
+        String id =
+                TestValuesTableFactory.registerData(
+                        Collections.singletonList(
+                                Row.ofKind(RowKind.DELETE, 1, null, null, "b", 
2)));
+        sEnv.executeSql(
+                String.format(
+                        "CREATE TEMPORARY TABLE delete_source1 (pk INT, a 
STRING, seq_a INT, b STRING, seq_b INT) "
+                                + "WITH ('connector'='values', 
'bounded'='true', 'data-id'='%s', "
+                                + "'changelog-mode' = 'I,D,UA,UB')",
+                        id));
+        sEnv.executeSql(
+                        "INSERT INTO remove_record_on_delete_sequence_group 
SELECT * FROM delete_source1")
+                .await();
+        assertThat(sql("SELECT * FROM remove_record_on_delete_sequence_group"))
+                .containsExactlyInAnyOrder(Row.of(1, "apple", 2, null, 2));
+
+        // delete record
+        sql("DELETE FROM remove_record_on_delete_sequence_group WHERE pk = 1");
+        assertThat(sql("SELECT * FROM 
remove_record_on_delete_sequence_group")).isEmpty();
+    }
+
     @Test
     public void testRemoveRecordOnDeleteLookup() throws Exception {
         sql(

Reply via email to