This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f457d35e4 [core] Support delete row in partial updates (#3602)
f457d35e4 is described below

commit f457d35e460a2d84c8e58b2fa87b31b339b5b302
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Wed Jul 17 16:13:28 2024 +0800

    [core] Support delete row in partial updates (#3602)
---
 .../merge-engine/partial-update.md                 |   1 +
 .../shortcodes/generated/core_configuration.html   |   6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   |   8 ++
 .../compact/PartialUpdateMergeFunction.java        |  39 +++++++-
 .../paimon/table/PrimaryKeyFileStoreTableTest.java | 101 +++++++++++++++++++++
 5 files changed, 152 insertions(+), 3 deletions(-)

diff --git a/docs/content/primary-key-table/merge-engine/partial-update.md 
b/docs/content/primary-key-table/merge-engine/partial-update.md
index 70b12618e..09c376c25 100644
--- a/docs/content/primary-key-table/merge-engine/partial-update.md
+++ b/docs/content/primary-key-table/merge-engine/partial-update.md
@@ -49,6 +49,7 @@ but only returns input records.)
 By default, Partial update can not accept delete records, you can choose one 
of the following solutions:
 
 - Configure 'ignore-delete' to ignore delete records.
+- Configure 'partial-update.remove-record-on-delete' to remove the whole row 
when receiving delete records.
 - Configure 'sequence-group's to retract partial columns.
   {{< /hint >}}
 
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 89db43b43..19a73cc5d 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -485,6 +485,12 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Integer</td>
             <td>Turn off the dictionary encoding for all fields in 
parquet.</td>
         </tr>
+        <tr>
+            <td><h5>partial-update.remove-record-on-delete</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to remove the whole row in partial-update engine when 
-D records are received.</td>
+        </tr>
         <tr>
             <td><h5>partition</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 8864bc3d3..9bb6ab91f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -553,6 +553,14 @@ public class CoreOptions implements Serializable {
                             "The field that generates the sequence number for 
primary key table,"
                                     + " the sequence number determines which 
data is the most recent.");
 
+    @Immutable
+    public static final ConfigOption<Boolean> 
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE =
+            key("partial-update.remove-record-on-delete")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to remove the whole row in partial-update 
engine when -D records are received.");
+
     @Immutable
     public static final ConfigOption<String> ROWKIND_FIELD =
             key("rowkind.field")
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index 154595859..3013d6ad5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -29,6 +29,7 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FieldsComparator;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.Projection;
 import org.apache.paimon.utils.UserDefinedSeqComparator;
 
@@ -48,6 +49,7 @@ import java.util.stream.Stream;
 
 import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
 import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
+import static 
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
 import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
 
 /**
@@ -63,6 +65,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
     private final Map<Integer, FieldsComparator> fieldSeqComparators;
     private final boolean fieldSequenceEnabled;
     private final Map<Integer, FieldAggregator> fieldAggregators;
+    private final boolean removeRecordOnDelete;
 
     private InternalRow currentKey;
     private long latestSequenceNumber;
@@ -74,12 +77,14 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
             boolean ignoreDelete,
             Map<Integer, FieldsComparator> fieldSeqComparators,
             Map<Integer, FieldAggregator> fieldAggregators,
-            boolean fieldSequenceEnabled) {
+            boolean fieldSequenceEnabled,
+            boolean removeRecordOnDelete) {
         this.getters = getters;
         this.ignoreDelete = ignoreDelete;
         this.fieldSeqComparators = fieldSeqComparators;
         this.fieldAggregators = fieldAggregators;
         this.fieldSequenceEnabled = fieldSequenceEnabled;
+        this.removeRecordOnDelete = removeRecordOnDelete;
     }
 
     @Override
@@ -106,6 +111,14 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                 return;
             }
 
+            if (removeRecordOnDelete) {
+                if (kv.valueKind() == RowKind.DELETE) {
+                    row = null;
+                }
+                // ignore -U records
+                return;
+            }
+
             String msg =
                     String.join(
                             "\n",
@@ -235,6 +248,9 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         if (reused == null) {
             reused = new KeyValue();
         }
+        if (removeRecordOnDelete && row == null) {
+            return null;
+        }
         return reused.replace(currentKey, latestSequenceNumber, 
RowKind.INSERT, row);
     }
 
@@ -256,6 +272,8 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
         private final Map<Integer, FieldAggregator> fieldAggregators;
 
+        private final boolean removeRecordOnDelete;
+
         private Factory(Options options, RowType rowType, List<String> 
primaryKeys) {
             this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
             this.rowType = rowType;
@@ -310,6 +328,19 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                 throw new IllegalArgumentException(
                         "Must use sequence group for aggregation functions.");
             }
+
+            removeRecordOnDelete = 
options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE);
+
+            Preconditions.checkState(
+                    !(removeRecordOnDelete && ignoreDelete),
+                    String.format(
+                            "%s and %s have conflicting behavior so should not 
be enabled at the same time.",
+                            CoreOptions.IGNORE_DELETE, 
PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
+            Preconditions.checkState(
+                    !removeRecordOnDelete || fieldSeqComparators.isEmpty(),
+                    String.format(
+                            "sequence group and %s have conflicting behavior 
so should not be enabled at the same time.",
+                            PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE));
         }
 
         @Override
@@ -368,14 +399,16 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                         ignoreDelete,
                         projectedSeqComparators,
                         projectedAggregators,
-                        !fieldSeqComparators.isEmpty());
+                        !fieldSeqComparators.isEmpty(),
+                        removeRecordOnDelete);
             } else {
                 return new PartialUpdateMergeFunction(
                         createFieldGetters(tableTypes),
                         ignoreDelete,
                         fieldSeqComparators,
                         fieldAggregators,
-                        !fieldSeqComparators.isEmpty());
+                        !fieldSeqComparators.isEmpty(),
+                        removeRecordOnDelete);
             }
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index c4867bcac..b74f242e1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -1023,6 +1023,107 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         assertThat(result).containsExactlyInAnyOrder("+I[20, 2]", "+I[30, 1]", 
"+I[10, 1]");
     }
 
+    @Test
+    public void testPartialUpdateRemoveRecordOnDelete() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), 
DataTypes.INT()
+                        },
+                        new String[] {"pt", "a", "b", "c"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> {
+                            options.set("merge-engine", "partial-update");
+                            
options.set("partial-update.remove-record-on-delete", "true");
+                        },
+                        rowType);
+        Function<InternalRow, String> rowToString = row -> 
internalRowToString(row, rowType);
+        SnapshotReader snapshotReader = table.newSnapshotReader();
+        TableRead read = table.newRead();
+        StreamTableWrite write = table.newWrite("");
+        StreamTableCommit commit = table.newCommit("");
+        // 1. inserts
+        write.write(GenericRow.of(1, 1, 3, 3));
+        write.write(GenericRow.of(1, 1, 1, 1));
+        write.write(GenericRow.of(1, 1, 2, 2));
+        commit.commit(0, write.prepareCommit(true, 0));
+        List<String> result =
+                getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
+
+        // 2. Update Before
+        write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
+        commit.commit(1, write.prepareCommit(true, 1));
+        result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 2]");
+
+        // 3. Update After
+        write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
+        commit.commit(2, write.prepareCommit(true, 2));
+        result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 3]");
+
+        // 4. Retracts
+        write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
+        commit.commit(3, write.prepareCommit(true, 3));
+        result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
+        assertThat(result).isEmpty();
+        write.close();
+        commit.close();
+    }
+
+    @Test
+    public void testPartialUpdateWithAgg() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), 
DataTypes.INT()
+                        },
+                        new String[] {"pt", "a", "b", "c"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> {
+                            options.set("merge-engine", "partial-update");
+                            options.set("fields.a.sequence-group", "c");
+                            options.set("fields.c.aggregate-function", "sum");
+                        },
+                        rowType);
+        Function<InternalRow, String> rowToString = row -> 
internalRowToString(row, rowType);
+        SnapshotReader snapshotReader = table.newSnapshotReader();
+        TableRead read = table.newRead();
+        StreamTableWrite write = table.newWrite("");
+        StreamTableCommit commit = table.newCommit("");
+        // 1. inserts
+        write.write(GenericRow.of(1, 1, 3, 3));
+        write.write(GenericRow.of(1, 1, 1, 1));
+        write.write(GenericRow.of(1, 1, 2, 2));
+        commit.commit(0, write.prepareCommit(true, 0));
+        List<String> result =
+                getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 6]");
+
+        // 2. Update Before
+        write.write(GenericRow.ofKind(RowKind.UPDATE_BEFORE, 1, 1, 2, 2));
+        commit.commit(1, write.prepareCommit(true, 1));
+        result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
+
+        // 3. Update After
+        write.write(GenericRow.ofKind(RowKind.UPDATE_AFTER, 1, 1, 2, 3));
+        commit.commit(2, write.prepareCommit(true, 2));
+        result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 7]");
+
+        // 4. Retracts
+        write.write(GenericRow.ofKind(RowKind.DELETE, 1, 1, 2, 3));
+        commit.commit(3, write.prepareCommit(true, 3));
+        result = getResult(read, toSplits(snapshotReader.read().dataSplits()), 
rowToString);
+        assertThat(result).containsExactlyInAnyOrder("+I[1, 1, 2, 4]");
+        write.close();
+        commit.close();
+    }
+
     @Test
     public void testAggMergeFunc() throws Exception {
         RowType rowType =

Reply via email to