This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ed9698b683 [core]Improve partial update merge engine configuration 
ignore-delete and remove-record-on-sequence-group cannot be enabled at the same 
time. (#5609)
ed9698b683 is described below

commit ed9698b683bb3e237bbaf952c36f556d25db793a
Author: Kerwin <[email protected]>
AuthorDate: Tue May 20 10:19:02 2025 +0800

    [core]Improve partial update merge engine configuration ignore-delete and 
remove-record-on-sequence-group cannot be enabled at the same time. (#5609)
---
 .../merge-engine/partial-update.md                 |  3 +-
 .../compact/PartialUpdateMergeFunction.java        | 71 ++++++++++++----------
 .../apache/paimon/flink/PartialUpdateITCase.java   | 37 ++++++++++-
 3 files changed, 74 insertions(+), 37 deletions(-)

diff --git a/docs/content/primary-key-table/merge-engine/partial-update.md 
b/docs/content/primary-key-table/merge-engine/partial-update.md
index 2329d72cba..e9dc313fab 100644
--- a/docs/content/primary-key-table/merge-engine/partial-update.md
+++ b/docs/content/primary-key-table/merge-engine/partial-update.md
@@ -50,8 +50,7 @@ By default, Partial update can not accept delete records, you 
can choose one of
 
 - Configure 'ignore-delete' to ignore delete records.
 - Configure 'partial-update.remove-record-on-delete' to remove the whole row 
when receiving delete records.
-- Configure 'sequence-group's to retract partial columns.
-  * Configure 'partial-update.remove-record-on-sequence-group' to remove the 
whole row when receiving delete records of specified sequence group.
+- Configure 'sequence-group's to retract partial columns. Also configure 
'partial-update.remove-record-on-sequence-group' to remove the whole row when 
receiving deleted records of `specified sequence group`.
 {{< /hint >}}
 
 ## Sequence Group
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index f061192979..d0633781ca 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -161,7 +161,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                                     + " you can choose one of the following 
solutions:",
                             "1. Configure 'ignore-delete' to ignore delete 
records.",
                             "2. Configure 
'partial-update.remove-record-on-delete' to remove the whole row when receiving 
delete records.",
-                            "3. Configure 'sequence-group's to retract partial 
columns.");
+                            "3. Configure 'sequence-group's to retract partial 
columns. Also configure 'partial-update.remove-record-on-sequence-group' to 
remove the whole row when receiving deleted records of `specified sequence 
group`.");
 
             throw new IllegalArgumentException(msg);
         }
@@ -391,27 +391,26 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
         private final boolean removeRecordOnDelete;
 
-        private final String removeRecordOnSequenceGroup;
-
         private Set<Integer> sequenceGroupPartialDelete;
 
         private Factory(Options options, RowType rowType, List<String> 
primaryKeys) {
             this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
             this.rowType = rowType;
             this.tableTypes = rowType.getFieldTypes();
-            this.removeRecordOnSequenceGroup =
+            this.removeRecordOnDelete = 
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
+            String removeRecordOnSequenceGroup =
                     
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP);
             this.sequenceGroupPartialDelete = new HashSet<>();
 
             List<String> fieldNames = rowType.getFieldNames();
             this.fieldSeqComparators = new HashMap<>();
             Map<String, Integer> sequenceGroupMap = new HashMap<>();
-            List<String> allSequenceFields = new ArrayList<>();
+            List<Integer> allSequenceFields = new ArrayList<>();
             for (Map.Entry<String, String> entry : options.toMap().entrySet()) 
{
                 String k = entry.getKey();
                 String v = entry.getValue();
                 if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) 
{
-                    List<String> sequenceFields =
+                    int[] sequenceFields =
                             Arrays.stream(
                                             k.substring(
                                                             
FIELDS_PREFIX.length() + 1,
@@ -419,17 +418,13 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                                                                     - 
SEQUENCE_GROUP.length()
                                                                     - 1)
                                                     .split(FIELDS_SEPARATOR))
-                                    .map(fieldName -> 
validateFieldName(fieldName, fieldNames))
-                                    .collect(Collectors.toList());
-                    allSequenceFields.addAll(sequenceFields);
+                                    .mapToInt(fieldName -> 
requireField(fieldName, fieldNames))
+                                    .toArray();
 
                     Supplier<FieldsComparator> userDefinedSeqComparator =
                             () -> UserDefinedSeqComparator.create(rowType, 
sequenceFields, true);
                     Arrays.stream(v.split(FIELDS_SEPARATOR))
-                            .map(
-                                    fieldName ->
-                                            fieldNames.indexOf(
-                                                    
validateFieldName(fieldName, fieldNames)))
+                            .map(fieldName -> requireField(fieldName, 
fieldNames))
                             .forEach(
                                     field -> {
                                         if 
(fieldSeqComparators.containsKey(field)) {
@@ -442,42 +437,54 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                                     });
 
                     // add self
-                    sequenceFields.forEach(
-                            fieldName -> {
-                                int index = fieldNames.indexOf(fieldName);
-                                fieldSeqComparators.put(index, 
userDefinedSeqComparator);
-                                sequenceGroupMap.put(fieldName, index);
-                            });
+                    for (int index : sequenceFields) {
+                        allSequenceFields.add(index);
+                        String fieldName = fieldNames.get(index);
+                        fieldSeqComparators.put(index, 
userDefinedSeqComparator);
+                        sequenceGroupMap.put(fieldName, index);
+                    }
                 }
             }
             this.fieldAggregators =
                     createFieldAggregators(
                             rowType, primaryKeys, allSequenceFields, new 
CoreOptions(options));
 
-            removeRecordOnDelete = 
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
-
+            // check if partial-update.remove-record-on-delete and 
ignore-delete are enabled at the
             Preconditions.checkState(
                     !(removeRecordOnDelete && ignoreDelete),
                     String.format(
                             "%s and %s have conflicting behavior so should not 
be enabled at the same time.",
-                            CoreOptions.IGNORE_DELETE, 
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
+                            CoreOptions.IGNORE_DELETE.key(),
+                            PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE.key()));
+
+            // check if partial-update.remove-record-on-sequence-grou and 
ignore-delete are enabled
+            // at the same time.
+            Preconditions.checkState(
+                    !(removeRecordOnSequenceGroup != null && ignoreDelete),
+                    String.format(
+                            "%s and %s have conflicting behavior so should not 
be enabled at the same time.",
+                            CoreOptions.IGNORE_DELETE.key(),
+                            
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP.key()));
+
+            // check if partial-update.remove-record-on-delete and 
sequence-group are enabled at the
+            // same time.
             Preconditions.checkState(
                     !removeRecordOnDelete || fieldSeqComparators.isEmpty(),
                     String.format(
-                            "sequence group and %s have conflicting behavior 
so should not be enabled at the same time.",
-                            PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
+                            "%s and %s have conflicting behavior so should not 
be enabled at the same time.",
+                            SEQUENCE_GROUP, 
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE.key()));
 
             if (removeRecordOnSequenceGroup != null) {
-                String[] sequenceGroupArr = 
removeRecordOnSequenceGroup.split(FIELDS_SEPARATOR);
+                List<String> sequenceGroupFields =
+                        
Arrays.asList(removeRecordOnSequenceGroup.split(FIELDS_SEPARATOR));
                 Preconditions.checkState(
-                        
sequenceGroupMap.keySet().containsAll(Arrays.asList(sequenceGroupArr)),
+                        
sequenceGroupMap.keySet().containsAll(sequenceGroupFields),
                         String.format(
                                 "field '%s' defined in '%s' option must be 
part of sequence groups",
                                 removeRecordOnSequenceGroup,
                                 
PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP.key()));
                 sequenceGroupPartialDelete =
-                        Arrays.stream(sequenceGroupArr)
-                                .filter(sequenceGroupMap::containsKey)
+                        sequenceGroupFields.stream()
                                 .map(sequenceGroupMap::get)
                                 .collect(Collectors.toSet());
             }
@@ -608,14 +615,14 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
             return new AdjustedProjection(pushDown, outer);
         }
 
-        private String validateFieldName(String fieldName, List<String> 
fieldNames) {
+        private int requireField(String fieldName, List<String> fieldNames) {
             int field = fieldNames.indexOf(fieldName);
             if (field == -1) {
                 throw new IllegalArgumentException(
                         String.format("Field %s can not be found in table 
schema", fieldName));
             }
 
-            return fieldName;
+            return field;
         }
 
         /**
@@ -626,7 +633,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         private Map<Integer, Supplier<FieldAggregator>> createFieldAggregators(
                 RowType rowType,
                 List<String> primaryKeys,
-                List<String> allSequenceFields,
+                List<Integer> allSequenceFields,
                 CoreOptions options) {
 
             List<String> fieldNames = rowType.getFieldNames();
@@ -636,7 +643,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                 String fieldName = fieldNames.get(i);
                 DataType fieldType = fieldTypes.get(i);
 
-                if (allSequenceFields.contains(fieldName)) {
+                if (allSequenceFields.contains(i)) {
                     // no agg for sequence fields
                     continue;
                 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index d5f1f7bbea..2da837d2f4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -623,7 +623,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
     }
 
     @Test
-    public void testRemoveRecordOnDeleteWithoutSequenceGroup() {
+    public void testRemoveRecordOnDeleteWithoutSequenceGroup() throws 
Exception {
         sql(
                 "CREATE TABLE remove_record_on_delete (pk INT PRIMARY KEY NOT 
ENFORCED, a STRING, b STRING) WITH ("
                         + " 'merge-engine' = 'partial-update',"
@@ -645,6 +645,19 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
         // batch read
         assertThat(sql("SELECT * FROM remove_record_on_delete"))
                 .containsExactlyInAnyOrder(Row.of(1, "A", "apache"));
+
+        // delete record with changelog stream
+        String id =
+                TestValuesTableFactory.registerData(
+                        Collections.singletonList(Row.ofKind(RowKind.DELETE, 
1, "A", null)));
+        sEnv.executeSql(
+                String.format(
+                        "CREATE TEMPORARY TABLE delete_source1 (pk INT, a 
STRING, b STRING) "
+                                + "WITH ('connector'='values', 
'bounded'='true', 'data-id'='%s', "
+                                + "'changelog-mode' = 'I,D,UA,UB')",
+                        id));
+        sEnv.executeSql("INSERT INTO remove_record_on_delete SELECT * FROM 
delete_source1").await();
+        assertThat(sql("SELECT * FROM remove_record_on_delete")).isEmpty();
     }
 
     @Test
@@ -680,8 +693,26 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
         assertThat(sql("SELECT * FROM remove_record_on_delete_sequence_group"))
                 .containsExactlyInAnyOrder(Row.of(1, "apple", 2, null, 2));
 
-        // delete record
-        sql("DELETE FROM remove_record_on_delete_sequence_group WHERE pk = 1");
+        // delete record with seq_a
+        String id2 =
+                TestValuesTableFactory.registerData(
+                        Collections.singletonList(
+                                Row.ofKind(RowKind.DELETE, 1, "apple", 2, 
null, null)));
+        sEnv.executeSql(
+                String.format(
+                        "CREATE TEMPORARY TABLE delete_source2 (pk INT, a 
STRING, seq_a INT, b STRING, seq_b INT) "
+                                + "WITH ('connector'='values', 
'bounded'='true', 'data-id'='%s', "
+                                + "'changelog-mode' = 'I,D,UA,UB')",
+                        id2));
+        sEnv.executeSql(
+                        "INSERT INTO remove_record_on_delete_sequence_group 
SELECT * FROM delete_source2")
+                .await();
+        assertThat(sql("SELECT * FROM 
remove_record_on_delete_sequence_group")).isEmpty();
+
+        // batch delete record
+        sql(
+                "INSERT INTO remove_record_on_delete_sequence_group VALUES (2, 
'flink', 2, 'paimon', 1)");
+        sql("DELETE FROM remove_record_on_delete_sequence_group WHERE pk = 2");
         assertThat(sql("SELECT * FROM 
remove_record_on_delete_sequence_group")).isEmpty();
     }
 

Reply via email to