This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6378e75a2 [core] Move ignore-delete from MergeFunction to Table writer 
(#3128)
6378e75a2 is described below

commit 6378e75a23242c76366c2819fd04c36dad6beddd
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 1 17:15:13 2024 +0800

    [core] Move ignore-delete from MergeFunction to Table writer (#3128)
---
 .../concepts/primary-key-table/merge-engine.md     |  6 +-
 .../shortcodes/generated/core_configuration.html   | 24 ++-----
 .../main/java/org/apache/paimon/CoreOptions.java   | 26 ++++----
 .../org/apache/paimon/append/AppendOnlyWriter.java |  6 +-
 .../compact/DeduplicateMergeFunction.java          | 25 +-------
 .../mergetree/compact/FirstRowMergeFunction.java   | 34 +++-------
 .../compact/PartialUpdateMergeFunction.java        | 14 +---
 .../paimon/table/AppendOnlyFileStoreTable.java     |  3 +-
 .../paimon/table/PrimaryKeyFileStoreTable.java     |  3 +-
 .../apache/paimon/table/PrimaryKeyTableUtils.java  |  4 +-
 .../apache/paimon/table/sink/TableWriteImpl.java   | 15 ++++-
 .../mergetree/SortBufferWriteBufferTestBase.java   |  5 +-
 .../LookupChangelogMergeFunctionWrapperTest.java   | 32 +---------
 .../mergetree/compact/SortMergeReaderTestBase.java |  2 +-
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  2 +-
 .../cdc/kafka/KafkaCanalSyncTableActionITCase.java |  2 +-
 .../apache/paimon/flink/action/DeleteAction.java   |  1 -
 .../paimon/flink/action/MergeIntoAction.java       | 20 ++++++
 .../paimon/flink/action/TableActionBase.java       | 22 -------
 .../flink/sink/GlobalFullCompactionSinkWrite.java  | 10 ++-
 .../flink/sink/RowDataStoreWriteOperator.java      |  2 +-
 .../apache/paimon/flink/sink/StoreSinkWrite.java   |  2 +
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  2 +
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 17 +++--
 .../paimon/flink/ContinuousFileStoreITCase.java    | 18 +++---
 .../org/apache/paimon/flink/FirstRowITCase.java    |  2 +-
 .../apache/paimon/flink/PartialUpdateITCase.java   | 28 +++++++-
 .../paimon/flink/action/DeleteActionITCase.java    | 74 ----------------------
 .../paimon/flink/action/MergeIntoActionITCase.java |  2 +-
 29 files changed, 141 insertions(+), 262 deletions(-)

diff --git a/docs/content/concepts/primary-key-table/merge-engine.md 
b/docs/content/concepts/primary-key-table/merge-engine.md
index 9d06020fa..5e87c4f76 100644
--- a/docs/content/concepts/primary-key-table/merge-engine.md
+++ b/docs/content/concepts/primary-key-table/merge-engine.md
@@ -38,7 +38,7 @@ result in strange behavior. When the input is out of order, 
we recommend that yo
 
 `deduplicate` merge engine is the default merge engine. Paimon will only keep 
the latest record and throw away other records with the same primary keys.
 
-Specifically, if the latest record is a `DELETE` record, all records with the 
same primary keys will be deleted.
+Specifically, if the latest record is a `DELETE` record, all records with the 
same primary keys will be deleted.  You can config `ignore-delete` to ignore it.
 
 ## Partial Update
 
@@ -59,7 +59,7 @@ For streaming queries, `partial-update` merge engine must be 
used together with
 
 {{< hint info >}}
 By default, Partial update can not accept delete records, you can choose one 
of the following solutions:
-- Configure 'partial-update.ignore-delete' to ignore delete records.
+- Configure 'ignore-delete' to ignore delete records.
 - Configure 'sequence-group's to retract partial columns.
   {{< /hint >}}
 
@@ -328,7 +328,7 @@ By specifying `'merge-engine' = 'first-row'`, users can 
keep the first row of th
 {{< hint info >}}
 1. `first-row` merge engine must be used together with `lookup` [changelog 
producer]({{< ref "concepts/primary-key-table/changelog-producer" >}}).
 2. You can not specify `sequence.field`.
-3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config 
`first-row.ignore-delete` to ignore these two kinds records.
+3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config 
`ignore-delete` to ignore these two kinds records.
    {{< /hint >}}
 
 This is of great help in replacing log deduplication in streaming computation.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 3f4c76bf0..e1a52ec2a 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -176,12 +176,6 @@ under the License.
             <td>Duration</td>
             <td>The TTL in rocksdb index for cross partition upsert (primary 
keys not contain all partition fields), this can avoid maintaining too many 
indexes and lead to worse and worse performance, but please note that this may 
also cause data duplication.</td>
         </tr>
-        <tr>
-            <td><h5>deduplicate.ignore-delete</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to ignore delete records in deduplicate mode.</td>
-        </tr>
         <tr>
             <td><h5>deletion-vectors.enabled</h5></td>
             <td style="word-wrap: break-word;">false</td>
@@ -242,18 +236,18 @@ under the License.
             <td>Map</td>
             <td>Define different file format for different level, you can add 
the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file 
format for level is not provided, the default format which set by `file.format` 
will be used.</td>
         </tr>
-        <tr>
-            <td><h5>first-row.ignore-delete</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to ignore delete records in first-row mode.</td>
-        </tr>
         <tr>
             <td><h5>full-compaction.delta-commits</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Integer</td>
             <td>Full compaction will be constantly triggered after delta 
commits.</td>
         </tr>
+        <tr>
+            <td><h5>ignore-delete</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to ignore delete records.</td>
+        </tr>
         <tr>
             <td><h5>incremental-between</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -413,12 +407,6 @@ This config option does not affect the default filesystem 
metastore.</td>
             <td>Integer</td>
             <td>Turn off the dictionary encoding for all fields in 
parquet.</td>
         </tr>
-        <tr>
-            <td><h5>partial-update.ignore-delete</h5></td>
-            <td style="word-wrap: break-word;">false</td>
-            <td>Boolean</td>
-            <td>Whether to ignore delete records in partial-update mode.</td>
-        </tr>
         <tr>
             <td><h5>partition</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 895da7fe8..287dc6e16 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -255,23 +255,15 @@ public class CoreOptions implements Serializable {
                     .defaultValue(MergeEngine.DEDUPLICATE)
                     .withDescription("Specify the merge engine for table with 
primary key.");
 
-    public static final ConfigOption<Boolean> DEDUPLICATE_IGNORE_DELETE =
-            key("deduplicate.ignore-delete")
+    public static final ConfigOption<Boolean> IGNORE_DELETE =
+            key("ignore-delete")
                     .booleanType()
                     .defaultValue(false)
-                    .withDescription("Whether to ignore delete records in 
deduplicate mode.");
-
-    public static final ConfigOption<Boolean> PARTIAL_UPDATE_IGNORE_DELETE =
-            key("partial-update.ignore-delete")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("Whether to ignore delete records in 
partial-update mode.");
-
-    public static final ConfigOption<Boolean> FIRST_ROW_IGNORE_DELETE =
-            key("first-row.ignore-delete")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("Whether to ignore delete records in 
first-row mode.");
+                    .withDeprecatedKeys(
+                            "first-row.ignore-delete",
+                            "deduplicate.ignore-delete",
+                            "partial-update.ignore-delete")
+                    .withDescription("Whether to ignore delete records.");
 
     public static final ConfigOption<SortEngine> SORT_ENGINE =
             key("sort-engine")
@@ -1275,6 +1267,10 @@ public class CoreOptions implements Serializable {
         return options.get(MERGE_ENGINE);
     }
 
+    public boolean ignoreDelete() {
+        return options.get(IGNORE_DELETE);
+    }
+
     public SortEngine sortEngine() {
         return options.get(SORT_ENGINE);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 1ab054531..9566dd372 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -35,7 +35,6 @@ import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.statistics.FieldStatsCollector;
-import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.IOUtils;
@@ -130,8 +129,9 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     @Override
     public void write(InternalRow rowData) throws Exception {
         Preconditions.checkArgument(
-                rowData.getRowKind() == RowKind.INSERT,
-                "Append-only writer can only accept insert row kind, but 
current row kind is: %s",
+                rowData.getRowKind().isAdd(),
+                "Append-only writer can only accept insert or update_after row 
kind, but current row kind is: %s. "
+                        + "You can configure 'ignore-delete' to ignore retract 
records.",
                 rowData.getRowKind());
         boolean success = sinkWriter.write(rowData);
         if (!success) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
index b062ce9b6..9c1cf0f5e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
@@ -18,9 +18,7 @@
 
 package org.apache.paimon.mergetree.compact;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
-import org.apache.paimon.options.Options;
 
 import javax.annotation.Nullable;
 
@@ -30,14 +28,8 @@ import javax.annotation.Nullable;
  */
 public class DeduplicateMergeFunction implements MergeFunction<KeyValue> {
 
-    private final boolean ignoreDelete;
-
     private KeyValue latestKv;
 
-    protected DeduplicateMergeFunction(boolean ignoreDelete) {
-        this.ignoreDelete = ignoreDelete;
-    }
-
     @Override
     public void reset() {
         latestKv = null;
@@ -45,9 +37,6 @@ public class DeduplicateMergeFunction implements 
MergeFunction<KeyValue> {
 
     @Override
     public void add(KeyValue kv) {
-        if (ignoreDelete && kv.valueKind().isRetract()) {
-            return;
-        }
         latestKv = kv;
     }
 
@@ -58,26 +47,16 @@ public class DeduplicateMergeFunction implements 
MergeFunction<KeyValue> {
     }
 
     public static MergeFunctionFactory<KeyValue> factory() {
-        return new Factory(new Options());
-    }
-
-    public static MergeFunctionFactory<KeyValue> factory(Options options) {
-        return new Factory(options);
+        return new Factory();
     }
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
 
         private static final long serialVersionUID = 1L;
 
-        private final Options options;
-
-        private Factory(Options options) {
-            this.options = options;
-        }
-
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            return new 
DeduplicateMergeFunction(options.get(CoreOptions.DEDUPLICATE_IGNORE_DELETE));
+            return new DeduplicateMergeFunction();
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
index f0551e0fa..b955e0fea 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -18,11 +18,10 @@
 
 package org.apache.paimon.mergetree.compact;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
 
 import javax.annotation.Nullable;
 
@@ -36,12 +35,9 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
     private final InternalRowSerializer valueSerializer;
     private KeyValue first;
 
-    private final boolean ignoreDelete;
-
-    protected FirstRowMergeFunction(RowType keyType, RowType valueType, 
boolean ignoreDelete) {
+    protected FirstRowMergeFunction(RowType keyType, RowType valueType) {
         this.keySerializer = new InternalRowSerializer(keyType);
         this.valueSerializer = new InternalRowSerializer(valueType);
-        this.ignoreDelete = ignoreDelete;
     }
 
     @Override
@@ -51,15 +47,10 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
 
     @Override
     public void add(KeyValue kv) {
-        if (kv.valueKind().isRetract()) {
-            if (ignoreDelete) {
-                return;
-            } else {
-                throw new IllegalArgumentException(
-                        "By default, First row merge engine can not accept 
DELETE/UPDATE_BEFORE records.\n"
-                                + "You can config 'first-row.ignore-delete' to 
ignore the DELETE/UPDATE_BEFORE records.");
-            }
-        }
+        Preconditions.checkArgument(
+                kv.valueKind().isAdd(),
+                "By default, First row merge engine can not accept 
DELETE/UPDATE_BEFORE records.\n"
+                        + "You can config 'ignore-delete' to ignore the 
DELETE/UPDATE_BEFORE records.");
         if (first == null) {
             this.first = kv.copy(keySerializer, valueSerializer);
         }
@@ -71,9 +62,8 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
         return first;
     }
 
-    public static MergeFunctionFactory<KeyValue> factory(
-            RowType keyType, RowType valueType, Options options) {
-        return new FirstRowMergeFunction.Factory(keyType, valueType, options);
+    public static MergeFunctionFactory<KeyValue> factory(RowType keyType, 
RowType valueType) {
+        return new FirstRowMergeFunction.Factory(keyType, valueType);
     }
 
     private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -82,18 +72,14 @@ public class FirstRowMergeFunction implements 
MergeFunction<KeyValue> {
         private final RowType keyType;
         private final RowType valueType;
 
-        private final Options options;
-
-        public Factory(RowType keyType, RowType valueType, Options options) {
+        public Factory(RowType keyType, RowType valueType) {
             this.keyType = keyType;
             this.valueType = valueType;
-            this.options = options;
         }
 
         @Override
         public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
-            return new FirstRowMergeFunction(
-                    keyType, valueType, 
options.get(CoreOptions.FIRST_ROW_IGNORE_DELETE));
+            return new FirstRowMergeFunction(keyType, valueType);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index dbf6dfd75..c1fc9293f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -67,7 +67,6 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
     public static final String SEQUENCE_GROUP = "sequence-group";
 
     private final InternalRow.FieldGetter[] getters;
-    private final boolean ignoreDelete;
     private final Map<Integer, SequenceGenerator> fieldSequences;
     private final boolean fieldSequenceEnabled;
     private final Map<Integer, FieldAggregator> fieldAggregators;
@@ -80,12 +79,10 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
     protected PartialUpdateMergeFunction(
             InternalRow.FieldGetter[] getters,
-            boolean ignoreDelete,
             Map<Integer, SequenceGenerator> fieldSequences,
             Map<Integer, FieldAggregator> fieldAggregators,
             boolean fieldSequenceEnabled) {
         this.getters = getters;
-        this.ignoreDelete = ignoreDelete;
         this.fieldSequences = fieldSequences;
         this.fieldAggregators = fieldAggregators;
         this.fieldSequenceEnabled = fieldSequenceEnabled;
@@ -104,12 +101,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
         // refresh key object to avoid reference overwritten
         currentKey = kv.key();
 
-        // ignore delete?
         if (kv.valueKind().isRetract()) {
-            if (ignoreDelete) {
-                return;
-            }
-
             if (fieldSequenceEnabled) {
                 retractWithSequenceGroup(kv);
                 return;
@@ -120,7 +112,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                             "\n",
                             "By default, Partial update can not accept delete 
records,"
                                     + " you can choose one of the following 
solutions:",
-                            "1. Configure 'partial-update.ignore-delete' to 
ignore delete records.",
+                            "1. Configure 'ignore-delete' to ignore delete 
records.",
                             "2. Configure 'sequence-group's to retract partial 
columns.");
 
             throw new IllegalArgumentException(msg);
@@ -232,14 +224,12 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
         private static final long serialVersionUID = 1L;
 
-        private final boolean ignoreDelete;
         private final List<DataType> tableTypes;
         private final Map<Integer, SequenceGenerator> fieldSequences;
 
         private final Map<Integer, FieldAggregator> fieldAggregators;
 
         private Factory(Options options, RowType rowType, List<String> 
primaryKeys) {
-            this.ignoreDelete = 
options.get(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE);
             this.tableTypes = rowType.getFieldTypes();
 
             List<String> fieldNames = rowType.getFieldNames();
@@ -325,14 +315,12 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
 
                 return new PartialUpdateMergeFunction(
                         
createFieldGetters(Projection.of(projection).project(tableTypes)),
-                        ignoreDelete,
                         projectedSequences,
                         projectedAggregators,
                         !fieldSequences.isEmpty());
             } else {
                 return new PartialUpdateMergeFunction(
                         createFieldGetters(tableTypes),
-                        ignoreDelete,
                         fieldSequences,
                         fieldAggregators,
                         !fieldSequences.isEmpty());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 4d91328f2..2eb41fdd5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -147,7 +147,8 @@ class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
                             "Append only writer can not accept row with 
RowKind %s",
                             record.row().getRowKind());
                     return record.row();
-                });
+                },
+                CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index fea783259..f2ab323d2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -200,7 +200,8 @@ class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
                                     ? row.getRowKind()
                                     : rowKindGenerator.generate(row);
                     return kv.replace(record.primaryKey(), 
KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
-                });
+                },
+                CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index 112ab0890..572e488c6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -64,7 +64,7 @@ public class PrimaryKeyTableUtils {
 
         switch (mergeEngine) {
             case DEDUPLICATE:
-                return DeduplicateMergeFunction.factory(conf);
+                return DeduplicateMergeFunction.factory();
             case PARTIAL_UPDATE:
                 return PartialUpdateMergeFunction.factory(conf, rowType, 
tableSchema.primaryKeys());
             case AGGREGATE:
@@ -75,7 +75,7 @@ public class PrimaryKeyTableUtils {
                         tableSchema.primaryKeys());
             case FIRST_ROW:
                 return FirstRowMergeFunction.factory(
-                        new RowType(extractor.keyFields(tableSchema)), 
rowType, conf);
+                        new RowType(extractor.keyFields(tableSchema)), 
rowType);
             default:
                 throw new UnsupportedOperationException("Unsupported merge 
engine: " + mergeEngine);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index b1cacc4c6..c03ef7e13 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -32,6 +32,8 @@ import org.apache.paimon.operation.FileStoreWrite.State;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.utils.Restorable;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
@@ -47,6 +49,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     private final FileStoreWrite<T> write;
     private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
     private final RecordExtractor<T> recordExtractor;
+    private final boolean ignoreDelete;
 
     private boolean batchCommitted = false;
     private BucketMode bucketMode;
@@ -54,10 +57,12 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     public TableWriteImpl(
             FileStoreWrite<T> write,
             KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
-            RecordExtractor<T> recordExtractor) {
+            RecordExtractor<T> recordExtractor,
+            boolean ignoreDelete) {
         this.write = write;
         this.keyAndBucketExtractor = keyAndBucketExtractor;
         this.recordExtractor = recordExtractor;
+        this.ignoreDelete = ignoreDelete;
     }
 
     @Override
@@ -121,13 +126,21 @@ public class TableWriteImpl<T> implements 
InnerTableWrite, Restorable<List<State
         writeAndReturn(row, bucket);
     }
 
+    @Nullable
     public SinkRecord writeAndReturn(InternalRow row) throws Exception {
+        if (ignoreDelete && row.getRowKind().isRetract()) {
+            return null;
+        }
         SinkRecord record = toSinkRecord(row);
         write.write(record.partition(), record.bucket(), 
recordExtractor.extract(record));
         return record;
     }
 
+    @Nullable
     public SinkRecord writeAndReturn(InternalRow row, int bucket) throws 
Exception {
+        if (ignoreDelete && row.getRowKind().isRetract()) {
+            return null;
+        }
         SinkRecord record = toSinkRecord(row, bucket);
         write.write(record.partition(), bucket, 
recordExtractor.extract(record));
         return record;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index b71971ab6..89b217b5a 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.mergetree;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
@@ -179,7 +178,6 @@ public abstract class SortBufferWriteBufferTestBase {
         @Override
         protected MergeFunction<KeyValue> createMergeFunction() {
             Options options = new Options();
-            options.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE, true);
             return PartialUpdateMergeFunction.factory(
                             options, RowType.of(DataTypes.BIGINT()), 
ImmutableList.of("key"))
                     .create();
@@ -260,8 +258,7 @@ public abstract class SortBufferWriteBufferTestBase {
         protected MergeFunction<KeyValue> createMergeFunction() {
             return FirstRowMergeFunction.factory(
                             new RowType(Lists.list(new DataField(0, "f0", new 
IntType()))),
-                            new RowType(Lists.list(new DataField(1, "f1", new 
BigIntType()))),
-                            new Options())
+                            new RowType(Lists.list(new DataField(1, "f1", new 
BigIntType()))))
                     .create();
         }
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 4c377af17..96aa57684 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.mergetree.compact;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
@@ -27,7 +26,6 @@ import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
-import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
@@ -38,7 +36,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -308,8 +305,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                         new RowType(
                                                 Lists.list(new DataField(0, 
"f0", new IntType()))),
                                         new RowType(
-                                                Lists.list(new DataField(1, 
"f1", new IntType()))),
-                                        false),
+                                                Lists.list(new DataField(1, 
"f1", new IntType())))),
                         highLevel::contains);
 
         // Without level-0
@@ -373,30 +369,4 @@ public class LookupChangelogMergeFunctionWrapperTest {
         kv = result.result();
         assertThat(kv).isNull();
     }
-
-    @Test
-    public void testPartialUpdateIgnoreDelete() {
-        Options options = new Options();
-        options.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE, true);
-        LookupChangelogMergeFunctionWrapper function =
-                new LookupChangelogMergeFunctionWrapper(
-                        LookupMergeFunction.wrap(
-                                PartialUpdateMergeFunction.factory(
-                                        options,
-                                        DataTypes.ROW(DataTypes.INT()),
-                                        Collections.singletonList("f0")),
-                                RowType.of(DataTypes.INT()),
-                                RowType.of(DataTypes.INT())),
-                        key -> null,
-                        EQUALISER,
-                        false,
-                        LookupStrategy.CHANGELOG_ONLY,
-                        null);
-
-        function.reset();
-        function.add(new KeyValue().replace(row(1), 1, DELETE, 
row(1)).setLevel(2));
-        ChangelogResult result = function.getResult();
-        assertThat(result).isNotNull();
-        assertThat(result.result()).isNull();
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 81e49b81d..3ae461532 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -130,7 +130,7 @@ public abstract class SortMergeReaderTestBase extends 
CombiningRecordReaderTestB
             RowType keyType = new RowType(Lists.list(new DataField(0, "f0", 
new IntType())));
             RowType valueType = new RowType(Lists.list(new DataField(1, "f1", 
new BigIntType())));
             return new LookupMergeFunction(
-                    new FirstRowMergeFunction(keyType, valueType, false), 
keyType, valueType);
+                    new FirstRowMergeFunction(keyType, valueType), keyType, 
valueType);
         }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index c245a2ce7..db6e12305 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -747,7 +747,7 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                             conf.set(
                                     CoreOptions.MERGE_ENGINE,
                                     CoreOptions.MergeEngine.PARTIAL_UPDATE);
-                            conf.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE, 
true);
+                            conf.set(CoreOptions.IGNORE_DELETE, true);
                         });
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 40743fae6..9ad54f04d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -995,7 +995,7 @@ public class KafkaCanalSyncTableActionITCase extends 
KafkaSyncTableActionITCase
         kafkaConfig.put(TOPIC.key(), topic);
 
         Map<String, String> tableConfig = getBasicTableConfig();
-        tableConfig.put(CoreOptions.DEDUPLICATE_IGNORE_DELETE.key(), 
String.valueOf(ignoreDelete));
+        tableConfig.put(CoreOptions.IGNORE_DELETE.key(), 
String.valueOf(ignoreDelete));
 
         KafkaSyncTableAction action =
                 
syncTableActionBuilder(kafkaConfig).withTableConfig(tableConfig).build();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
index 19f77fe91..2fdb658ad 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
@@ -46,7 +46,6 @@ public class DeleteAction extends TableActionBase {
             String filter,
             Map<String, String> catalogConfig) {
         super(warehouse, databaseName, tableName, catalogConfig);
-        changeIgnoreMergeEngine();
         this.filter = filter;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 687ea5869..41f8b7677 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataField;
@@ -38,6 +39,7 @@ import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -159,6 +161,24 @@ public class MergeIntoAction extends TableActionBase {
                         .collect(Collectors.toList());
     }
 
+    /**
+     * The {@link CoreOptions.MergeEngine}s will process -U/-D records in 
different ways, but we
+     * want these records to be sunk directly. This method is a workaround 
which disables merge
+     * engine settings and force compaction.
+     */
+    private void changeIgnoreMergeEngine() {
+        if (CoreOptions.fromMap(table.options()).mergeEngine()
+                != CoreOptions.MergeEngine.DEDUPLICATE) {
+            Map<String, String> dynamicOptions = new HashMap<>();
+            dynamicOptions.put(
+                    CoreOptions.MERGE_ENGINE.key(), 
CoreOptions.MergeEngine.DEDUPLICATE.toString());
+            dynamicOptions.put(CoreOptions.IGNORE_DELETE.key(), "false");
+            // force compaction
+            
dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
+            table = ((FileStoreTable) 
table).internalCopyWithoutCheck(dynamicOptions);
+        }
+    }
+
     public MergeIntoAction withTargetAlias(String targetAlias) {
         this.targetAlias = targetAlias;
         return this;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index a97335cd2..f10f3d625 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -18,13 +18,11 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -35,7 +33,6 @@ import org.apache.flink.table.data.RowData;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -99,23 +96,4 @@ public abstract class TableActionBase extends ActionBase {
                     e);
         }
     }
-
-    /**
-     * The {@link CoreOptions.MergeEngine}s will process -U/-D records in 
different ways, but we
-     * want these records to be sunk directly. This method is a workaround. 
Actions that may produce
-     * -U/-D records can call this to disable merge engine settings and force 
compaction.
-     */
-    protected void changeIgnoreMergeEngine() {
-        if (CoreOptions.fromMap(table.options()).mergeEngine()
-                != CoreOptions.MergeEngine.DEDUPLICATE) {
-            Map<String, String> dynamicOptions = new HashMap<>();
-            dynamicOptions.put(
-                    CoreOptions.MERGE_ENGINE.key(), 
CoreOptions.MergeEngine.DEDUPLICATE.toString());
-            // force compaction
-            
dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
-            Preconditions.checkArgument(
-                    table instanceof FileStoreTable, "Only supports 
FileStoreTable.");
-            table = ((FileStoreTable) 
table).internalCopyWithoutCheck(dynamicOptions);
-        }
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index 3fb6fe6ae..62341a180 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -108,16 +108,22 @@ public class GlobalFullCompactionSinkWrite extends 
StoreSinkWriteImpl {
     }
 
     @Override
+    @Nullable
     public SinkRecord write(InternalRow rowData) throws Exception {
         SinkRecord sinkRecord = super.write(rowData);
-        touchBucket(sinkRecord.partition(), sinkRecord.bucket());
+        if (sinkRecord != null) {
+            touchBucket(sinkRecord.partition(), sinkRecord.bucket());
+        }
         return sinkRecord;
     }
 
     @Override
+    @Nullable
     public SinkRecord write(InternalRow rowData, int bucket) throws Exception {
         SinkRecord sinkRecord = super.write(rowData, bucket);
-        touchBucket(sinkRecord.partition(), bucket);
+        if (sinkRecord != null) {
+            touchBucket(sinkRecord.partition(), bucket);
+        }
         return sinkRecord;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index b61fecab5..07fe27554 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -130,7 +130,7 @@ public class RowDataStoreWriteOperator extends 
TableWriteOperator<InternalRow> {
             throw new IOException(e);
         }
 
-        if (logSinkFunction != null) {
+        if (record != null && logSinkFunction != null) {
             // write to log store, need to preserve original pk (which 
includes partition fields)
             SinkRecord logRecord = write.toLogRecord(record);
             logSinkFunction.invoke(logRecord, sinkContext);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index 7278e1f21..6001721b7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -39,8 +39,10 @@ import java.util.List;
 /** Helper class of {@link PrepareCommitOperator} for different types of 
paimon sinks. */
 public interface StoreSinkWrite {
 
+    @Nullable
     SinkRecord write(InternalRow rowData) throws Exception;
 
+    @Nullable
     SinkRecord write(InternalRow rowData, int bucket) throws Exception;
 
     SinkRecord toLogRecord(SinkRecord record);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index c70f6038e..3ecc80bb6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -170,11 +170,13 @@ public class StoreSinkWriteImpl implements StoreSinkWrite 
{
     }
 
     @Override
+    @Nullable
     public SinkRecord write(InternalRow rowData) throws Exception {
         return write.writeAndReturn(rowData);
     }
 
     @Override
+    @Nullable
     public SinkRecord write(InternalRow rowData, int bucket) throws Exception {
         return write.writeAndReturn(rowData, bucket);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 9829e1040..7c8ffb6d6 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -372,20 +372,19 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testIgnoreDelete() throws Exception {
+    public void testIgnoreDelete() {
         sql(
                 "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
v STRING) "
-                        + "WITH ('deduplicate.ignore-delete' = 'true', 
'bucket' = '1')");
-        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM ignore_delete");
+                        + "WITH ('merge-engine' = 'deduplicate', 
'ignore-delete' = 'true', 'bucket' = '1')");
+
+        sql("INSERT INTO ignore_delete VALUES (1, 'A')");
+        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "A"));
 
-        sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')");
         sql("DELETE FROM ignore_delete WHERE pk = 1");
-        sql("INSERT INTO ignore_delete VALUES (1, 'B')");
+        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "A"));
 
-        assertThat(iterator.collect(2))
-                .containsExactlyInAnyOrder(
-                        Row.ofKind(RowKind.INSERT, 1, "B"), 
Row.ofKind(RowKind.INSERT, 2, "B"));
-        iterator.close();
+        sql("INSERT INTO ignore_delete VALUES (1, 'B')");
+        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "B"));
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 66a154ba4..1fbed646e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -29,6 +29,7 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -549,19 +550,20 @@ public class ContinuousFileStoreITCase extends 
CatalogITCaseBase {
     }
 
     @Test
-    public void testIgnoreDelete() {
+    public void testIgnoreDelete() throws Exception {
         sql(
                 "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
v STRING) "
-                        + "WITH ('deduplicate.ignore-delete' = 'true', 
'bucket' = '1')");
-
-        sql("INSERT INTO ignore_delete VALUES (1, 'A')");
-        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "A"));
+                        + "WITH ('merge-engine' = 'deduplicate', 
'ignore-delete' = 'true', 'bucket' = '1')");
+        BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * 
FROM ignore_delete");
 
+        sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')");
         sql("DELETE FROM ignore_delete WHERE pk = 1");
-        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "A"));
-
         sql("INSERT INTO ignore_delete VALUES (1, 'B')");
-        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "B"));
+
+        assertThat(iterator.collect(2))
+                .containsExactlyInAnyOrder(
+                        Row.ofKind(RowKind.INSERT, 1, "B"), 
Row.ofKind(RowKind.INSERT, 2, "B"));
+        iterator.close();
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
index 5b9909d9c..5d7927a97 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
@@ -110,7 +110,7 @@ public class FirstRowITCase extends CatalogITCaseBase {
         sql(
                 "CREATE TABLE IF NOT EXISTS T1 ("
                         + "a INT, b INT, c STRING, PRIMARY KEY (a) NOT 
ENFORCED)"
-                        + " WITH ('merge-engine'='first-row', 
'first-row.ignore-delete' = 'true',"
+                        + " WITH ('merge-engine'='first-row', 'ignore-delete' 
= 'true',"
                         + " 'changelog-producer' = 'lookup');");
 
         List<Row> input =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index f4cf0b67f..23a574fb5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -52,7 +52,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
                         + " WITH ('merge-engine'='partial-update');",
                 "CREATE TABLE IF NOT EXISTS dwd_orders ("
                         + "OrderID INT, OrderNumber INT, PersonID INT, 
LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (OrderID) NOT ENFORCED)"
-                        + " WITH ('merge-engine'='partial-update', 
'partial-update.ignore-delete'='true');",
+                        + " WITH ('merge-engine'='partial-update', 
'ignore-delete'='true');",
                 "CREATE TABLE IF NOT EXISTS ods_orders (OrderID INT, 
OrderNumber INT, PersonID INT, PRIMARY KEY (OrderID) NOT ENFORCED) WITH 
('changelog-producer'='input', 'continuous.discovery-interval'='1s');",
                 "CREATE TABLE IF NOT EXISTS dim_persons (PersonID INT, 
LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (PersonID) NOT 
ENFORCED) WITH ('changelog-producer'='input', 
'continuous.discovery-interval'='1s');");
     }
@@ -408,4 +408,30 @@ public class PartialUpdateITCase extends CatalogITCaseBase 
{
         insert1.close();
         insert2.close();
     }
+
+    @Test
+    public void testIgnoreDelete() throws Exception {
+        sql(
+                "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
a INT, g INT) WITH ("
+                        + " 'merge-engine' = 'partial-update',"
+                        + " 'ignore-delete' = 'true',"
+                        + " 'fields.a.aggregate-function' = 'sum',"
+                        + " 'fields.g.sequence-group'='a')");
+
+        String id =
+                TestValuesTableFactory.registerData(
+                        Arrays.asList(
+                                Row.ofKind(RowKind.INSERT, 1, 10, 1),
+                                Row.ofKind(RowKind.DELETE, 1, 10, 2),
+                                Row.ofKind(RowKind.INSERT, 1, 20, 3)));
+        streamSqlIter(
+                        "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT 
ENFORCED, a INT, g INT) "
+                                + "WITH ('connector'='values', 
'bounded'='true', 'data-id'='%s', "
+                                + "'changelog-mode' = 'I,D')",
+                        id)
+                .close();
+        sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM 
input").await();
+
+        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
index da1cf763a..c67228c25 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.table.FileStoreTable;
@@ -38,13 +37,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
 
 import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
 import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
-import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto;
-import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testStreamingRead;
 import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.validateStreamingReadResult;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -99,76 +95,6 @@ public class DeleteActionITCase extends ActionITCaseBase {
         iterator.close();
     }
 
-    @Test
-    public void testWorkWithPartialUpdateTable() throws Exception {
-        createFileStoreTable(
-                RowType.of(
-                        new DataType[] {DataTypes.INT(), DataTypes.STRING(), 
DataTypes.STRING()},
-                        new String[] {"k", "a", "b"}),
-                Collections.emptyList(),
-                Collections.singletonList("k"),
-                new HashMap<String, String>() {
-                    {
-                        put(
-                                CoreOptions.MERGE_ENGINE.key(),
-                                
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
-                        put(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE.key(), 
"true");
-                        put(
-                                CoreOptions.CHANGELOG_PRODUCER.key(),
-                                ThreadLocalRandom.current().nextBoolean()
-                                        ? 
CoreOptions.ChangelogProducer.LOOKUP.toString()
-                                        : 
CoreOptions.ChangelogProducer.FULL_COMPACTION.toString());
-                    }
-                });
-
-        DeleteAction action =
-                createAction(
-                        DeleteAction.class,
-                        "delete",
-                        "--warehouse",
-                        warehouse,
-                        "--database",
-                        database,
-                        "--table",
-                        tableName,
-                        "--where",
-                        "k<3");
-
-        insertInto(
-                tableName, "(1, 'Say', 'A'), (2, 'Hi', 'B'), (3, 'To', 'C'), 
(4, 'Paimon', 'D')");
-
-        BlockingIterator<Row, Row> streamItr =
-                testStreamingRead(
-                        buildSimpleQuery(tableName),
-                        Arrays.asList(
-                                changelogRow("+I", 1, "Say", "A"),
-                                changelogRow("+I", 2, "Hi", "B"),
-                                changelogRow("+I", 3, "To", "C"),
-                                changelogRow("+I", 4, "Paimon", "D")));
-
-        action.run();
-
-        // test delete records hasn't been thrown
-        validateStreamingReadResult(
-                streamItr,
-                Arrays.asList(changelogRow("-D", 1, "Say", "A"), 
changelogRow("-D", 2, "Hi", "B")));
-
-        // test partial update still works after action
-        insertInto(
-                tableName, "(4, CAST (NULL AS STRING), '$')", "(4, 'Test', 
CAST (NULL AS STRING))");
-
-        validateStreamingReadResult(
-                streamItr,
-                Arrays.asList(
-                        changelogRow("-U", 4, "Paimon", "D"), 
changelogRow("+U", 4, "Test", "$")));
-        streamItr.close();
-
-        testBatchRead(
-                buildSimpleQuery(tableName),
-                Arrays.asList(
-                        changelogRow("+I", 3, "To", "C"), changelogRow("+I", 
4, "Test", "$")));
-    }
-
     private void prepareTable() throws Exception {
         Map<String, String> options = new HashMap<>();
         FileStoreTable table =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index a1ca5eb55..3e27f0c0c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -557,7 +557,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase 
{
                                     put(
                                             CoreOptions.MERGE_ENGINE.key(),
                                             
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
-                                    
put(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE.key(), "true");
+                                    put(CoreOptions.IGNORE_DELETE.key(), 
"true");
                                 }
                             }
                         }));

Reply via email to