This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6378e75a2 [core] Move ignore-delete from MergeFunction to Table writer
(#3128)
6378e75a2 is described below
commit 6378e75a23242c76366c2819fd04c36dad6beddd
Author: yuzelin <[email protected]>
AuthorDate: Mon Apr 1 17:15:13 2024 +0800
[core] Move ignore-delete from MergeFunction to Table writer (#3128)
---
.../concepts/primary-key-table/merge-engine.md | 6 +-
.../shortcodes/generated/core_configuration.html | 24 ++-----
.../main/java/org/apache/paimon/CoreOptions.java | 26 ++++----
.../org/apache/paimon/append/AppendOnlyWriter.java | 6 +-
.../compact/DeduplicateMergeFunction.java | 25 +-------
.../mergetree/compact/FirstRowMergeFunction.java | 34 +++-------
.../compact/PartialUpdateMergeFunction.java | 14 +---
.../paimon/table/AppendOnlyFileStoreTable.java | 3 +-
.../paimon/table/PrimaryKeyFileStoreTable.java | 3 +-
.../apache/paimon/table/PrimaryKeyTableUtils.java | 4 +-
.../apache/paimon/table/sink/TableWriteImpl.java | 15 ++++-
.../mergetree/SortBufferWriteBufferTestBase.java | 5 +-
.../LookupChangelogMergeFunctionWrapperTest.java | 32 +---------
.../mergetree/compact/SortMergeReaderTestBase.java | 2 +-
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 2 +-
.../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 2 +-
.../apache/paimon/flink/action/DeleteAction.java | 1 -
.../paimon/flink/action/MergeIntoAction.java | 20 ++++++
.../paimon/flink/action/TableActionBase.java | 22 -------
.../flink/sink/GlobalFullCompactionSinkWrite.java | 10 ++-
.../flink/sink/RowDataStoreWriteOperator.java | 2 +-
.../apache/paimon/flink/sink/StoreSinkWrite.java | 2 +
.../paimon/flink/sink/StoreSinkWriteImpl.java | 2 +
.../apache/paimon/flink/BatchFileStoreITCase.java | 17 +++--
.../paimon/flink/ContinuousFileStoreITCase.java | 18 +++---
.../org/apache/paimon/flink/FirstRowITCase.java | 2 +-
.../apache/paimon/flink/PartialUpdateITCase.java | 28 +++++++-
.../paimon/flink/action/DeleteActionITCase.java | 74 ----------------------
.../paimon/flink/action/MergeIntoActionITCase.java | 2 +-
29 files changed, 141 insertions(+), 262 deletions(-)
diff --git a/docs/content/concepts/primary-key-table/merge-engine.md
b/docs/content/concepts/primary-key-table/merge-engine.md
index 9d06020fa..5e87c4f76 100644
--- a/docs/content/concepts/primary-key-table/merge-engine.md
+++ b/docs/content/concepts/primary-key-table/merge-engine.md
@@ -38,7 +38,7 @@ result in strange behavior. When the input is out of order,
we recommend that yo
`deduplicate` merge engine is the default merge engine. Paimon will only keep
the latest record and throw away other records with the same primary keys.
-Specifically, if the latest record is a `DELETE` record, all records with the
same primary keys will be deleted.
+Specifically, if the latest record is a `DELETE` record, all records with the
same primary keys will be deleted. You can config `ignore-delete` to ignore it.
## Partial Update
@@ -59,7 +59,7 @@ For streaming queries, `partial-update` merge engine must be
used together with
{{< hint info >}}
By default, Partial update can not accept delete records, you can choose one
of the following solutions:
-- Configure 'partial-update.ignore-delete' to ignore delete records.
+- Configure 'ignore-delete' to ignore delete records.
- Configure 'sequence-group's to retract partial columns.
{{< /hint >}}
@@ -328,7 +328,7 @@ By specifying `'merge-engine' = 'first-row'`, users can
keep the first row of th
{{< hint info >}}
1. `first-row` merge engine must be used together with `lookup` [changelog
producer]({{< ref "concepts/primary-key-table/changelog-producer" >}}).
2. You can not specify `sequence.field`.
-3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config
`first-row.ignore-delete` to ignore these two kinds records.
+3. Not accept `DELETE` and `UPDATE_BEFORE` message. You can config
`ignore-delete` to ignore these two kinds records.
{{< /hint >}}
This is of great help in replacing log deduplication in streaming computation.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 3f4c76bf0..e1a52ec2a 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -176,12 +176,6 @@ under the License.
<td>Duration</td>
<td>The TTL in rocksdb index for cross partition upsert (primary
keys not contain all partition fields), this can avoid maintaining too many
indexes and lead to worse and worse performance, but please note that this may
also cause data duplication.</td>
</tr>
- <tr>
- <td><h5>deduplicate.ignore-delete</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to ignore delete records in deduplicate mode.</td>
- </tr>
<tr>
<td><h5>deletion-vectors.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
@@ -242,18 +236,18 @@ under the License.
<td>Map</td>
<td>Define different file format for different level, you can add
the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file
format for level is not provided, the default format which set by `file.format`
will be used.</td>
</tr>
- <tr>
- <td><h5>first-row.ignore-delete</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to ignore delete records in first-row mode.</td>
- </tr>
<tr>
<td><h5>full-compaction.delta-commits</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Full compaction will be constantly triggered after delta
commits.</td>
</tr>
+ <tr>
+ <td><h5>ignore-delete</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to ignore delete records.</td>
+ </tr>
<tr>
<td><h5>incremental-between</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -413,12 +407,6 @@ This config option does not affect the default filesystem
metastore.</td>
<td>Integer</td>
<td>Turn off the dictionary encoding for all fields in
parquet.</td>
</tr>
- <tr>
- <td><h5>partial-update.ignore-delete</h5></td>
- <td style="word-wrap: break-word;">false</td>
- <td>Boolean</td>
- <td>Whether to ignore delete records in partial-update mode.</td>
- </tr>
<tr>
<td><h5>partition</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 895da7fe8..287dc6e16 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -255,23 +255,15 @@ public class CoreOptions implements Serializable {
.defaultValue(MergeEngine.DEDUPLICATE)
.withDescription("Specify the merge engine for table with
primary key.");
- public static final ConfigOption<Boolean> DEDUPLICATE_IGNORE_DELETE =
- key("deduplicate.ignore-delete")
+ public static final ConfigOption<Boolean> IGNORE_DELETE =
+ key("ignore-delete")
.booleanType()
.defaultValue(false)
- .withDescription("Whether to ignore delete records in
deduplicate mode.");
-
- public static final ConfigOption<Boolean> PARTIAL_UPDATE_IGNORE_DELETE =
- key("partial-update.ignore-delete")
- .booleanType()
- .defaultValue(false)
- .withDescription("Whether to ignore delete records in
partial-update mode.");
-
- public static final ConfigOption<Boolean> FIRST_ROW_IGNORE_DELETE =
- key("first-row.ignore-delete")
- .booleanType()
- .defaultValue(false)
- .withDescription("Whether to ignore delete records in
first-row mode.");
+ .withDeprecatedKeys(
+ "first-row.ignore-delete",
+ "deduplicate.ignore-delete",
+ "partial-update.ignore-delete")
+ .withDescription("Whether to ignore delete records.");
public static final ConfigOption<SortEngine> SORT_ENGINE =
key("sort-engine")
@@ -1275,6 +1267,10 @@ public class CoreOptions implements Serializable {
return options.get(MERGE_ENGINE);
}
+ public boolean ignoreDelete() {
+ return options.get(IGNORE_DELETE);
+ }
+
public SortEngine sortEngine() {
return options.get(SORT_ENGINE);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 1ab054531..9566dd372 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -35,7 +35,6 @@ import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.statistics.FieldStatsCollector;
-import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.IOUtils;
@@ -130,8 +129,9 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
@Override
public void write(InternalRow rowData) throws Exception {
Preconditions.checkArgument(
- rowData.getRowKind() == RowKind.INSERT,
- "Append-only writer can only accept insert row kind, but
current row kind is: %s",
+ rowData.getRowKind().isAdd(),
+ "Append-only writer can only accept insert or update_after row
kind, but current row kind is: %s. "
+ + "You can configure 'ignore-delete' to ignore retract
records.",
rowData.getRowKind());
boolean success = sinkWriter.write(rowData);
if (!success) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
index b062ce9b6..9c1cf0f5e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/DeduplicateMergeFunction.java
@@ -18,9 +18,7 @@
package org.apache.paimon.mergetree.compact;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
-import org.apache.paimon.options.Options;
import javax.annotation.Nullable;
@@ -30,14 +28,8 @@ import javax.annotation.Nullable;
*/
public class DeduplicateMergeFunction implements MergeFunction<KeyValue> {
- private final boolean ignoreDelete;
-
private KeyValue latestKv;
- protected DeduplicateMergeFunction(boolean ignoreDelete) {
- this.ignoreDelete = ignoreDelete;
- }
-
@Override
public void reset() {
latestKv = null;
@@ -45,9 +37,6 @@ public class DeduplicateMergeFunction implements
MergeFunction<KeyValue> {
@Override
public void add(KeyValue kv) {
- if (ignoreDelete && kv.valueKind().isRetract()) {
- return;
- }
latestKv = kv;
}
@@ -58,26 +47,16 @@ public class DeduplicateMergeFunction implements
MergeFunction<KeyValue> {
}
public static MergeFunctionFactory<KeyValue> factory() {
- return new Factory(new Options());
- }
-
- public static MergeFunctionFactory<KeyValue> factory(Options options) {
- return new Factory(options);
+ return new Factory();
}
private static class Factory implements MergeFunctionFactory<KeyValue> {
private static final long serialVersionUID = 1L;
- private final Options options;
-
- private Factory(Options options) {
- this.options = options;
- }
-
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- return new
DeduplicateMergeFunction(options.get(CoreOptions.DEDUPLICATE_IGNORE_DELETE));
+ return new DeduplicateMergeFunction();
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
index f0551e0fa..b955e0fea 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/FirstRowMergeFunction.java
@@ -18,11 +18,10 @@
package org.apache.paimon.mergetree.compact;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
@@ -36,12 +35,9 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
private final InternalRowSerializer valueSerializer;
private KeyValue first;
- private final boolean ignoreDelete;
-
- protected FirstRowMergeFunction(RowType keyType, RowType valueType,
boolean ignoreDelete) {
+ protected FirstRowMergeFunction(RowType keyType, RowType valueType) {
this.keySerializer = new InternalRowSerializer(keyType);
this.valueSerializer = new InternalRowSerializer(valueType);
- this.ignoreDelete = ignoreDelete;
}
@Override
@@ -51,15 +47,10 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
@Override
public void add(KeyValue kv) {
- if (kv.valueKind().isRetract()) {
- if (ignoreDelete) {
- return;
- } else {
- throw new IllegalArgumentException(
- "By default, First row merge engine can not accept
DELETE/UPDATE_BEFORE records.\n"
- + "You can config 'first-row.ignore-delete' to
ignore the DELETE/UPDATE_BEFORE records.");
- }
- }
+ Preconditions.checkArgument(
+ kv.valueKind().isAdd(),
+ "By default, First row merge engine can not accept
DELETE/UPDATE_BEFORE records.\n"
+ + "You can config 'ignore-delete' to ignore the
DELETE/UPDATE_BEFORE records.");
if (first == null) {
this.first = kv.copy(keySerializer, valueSerializer);
}
@@ -71,9 +62,8 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
return first;
}
- public static MergeFunctionFactory<KeyValue> factory(
- RowType keyType, RowType valueType, Options options) {
- return new FirstRowMergeFunction.Factory(keyType, valueType, options);
+ public static MergeFunctionFactory<KeyValue> factory(RowType keyType,
RowType valueType) {
+ return new FirstRowMergeFunction.Factory(keyType, valueType);
}
private static class Factory implements MergeFunctionFactory<KeyValue> {
@@ -82,18 +72,14 @@ public class FirstRowMergeFunction implements
MergeFunction<KeyValue> {
private final RowType keyType;
private final RowType valueType;
- private final Options options;
-
- public Factory(RowType keyType, RowType valueType, Options options) {
+ public Factory(RowType keyType, RowType valueType) {
this.keyType = keyType;
this.valueType = valueType;
- this.options = options;
}
@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
- return new FirstRowMergeFunction(
- keyType, valueType,
options.get(CoreOptions.FIRST_ROW_IGNORE_DELETE));
+ return new FirstRowMergeFunction(keyType, valueType);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index dbf6dfd75..c1fc9293f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -67,7 +67,6 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
public static final String SEQUENCE_GROUP = "sequence-group";
private final InternalRow.FieldGetter[] getters;
- private final boolean ignoreDelete;
private final Map<Integer, SequenceGenerator> fieldSequences;
private final boolean fieldSequenceEnabled;
private final Map<Integer, FieldAggregator> fieldAggregators;
@@ -80,12 +79,10 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
protected PartialUpdateMergeFunction(
InternalRow.FieldGetter[] getters,
- boolean ignoreDelete,
Map<Integer, SequenceGenerator> fieldSequences,
Map<Integer, FieldAggregator> fieldAggregators,
boolean fieldSequenceEnabled) {
this.getters = getters;
- this.ignoreDelete = ignoreDelete;
this.fieldSequences = fieldSequences;
this.fieldAggregators = fieldAggregators;
this.fieldSequenceEnabled = fieldSequenceEnabled;
@@ -104,12 +101,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
// refresh key object to avoid reference overwritten
currentKey = kv.key();
- // ignore delete?
if (kv.valueKind().isRetract()) {
- if (ignoreDelete) {
- return;
- }
-
if (fieldSequenceEnabled) {
retractWithSequenceGroup(kv);
return;
@@ -120,7 +112,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
"\n",
"By default, Partial update can not accept delete
records,"
+ " you can choose one of the following
solutions:",
- "1. Configure 'partial-update.ignore-delete' to
ignore delete records.",
+ "1. Configure 'ignore-delete' to ignore delete
records.",
"2. Configure 'sequence-group's to retract partial
columns.");
throw new IllegalArgumentException(msg);
@@ -232,14 +224,12 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
private static final long serialVersionUID = 1L;
- private final boolean ignoreDelete;
private final List<DataType> tableTypes;
private final Map<Integer, SequenceGenerator> fieldSequences;
private final Map<Integer, FieldAggregator> fieldAggregators;
private Factory(Options options, RowType rowType, List<String>
primaryKeys) {
- this.ignoreDelete =
options.get(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE);
this.tableTypes = rowType.getFieldTypes();
List<String> fieldNames = rowType.getFieldNames();
@@ -325,14 +315,12 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
return new PartialUpdateMergeFunction(
createFieldGetters(Projection.of(projection).project(tableTypes)),
- ignoreDelete,
projectedSequences,
projectedAggregators,
!fieldSequences.isEmpty());
} else {
return new PartialUpdateMergeFunction(
createFieldGetters(tableTypes),
- ignoreDelete,
fieldSequences,
fieldAggregators,
!fieldSequences.isEmpty());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 4d91328f2..2eb41fdd5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -147,7 +147,8 @@ class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
"Append only writer can not accept row with
RowKind %s",
record.row().getRowKind());
return record.row();
- });
+ },
+ CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index fea783259..f2ab323d2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -200,7 +200,8 @@ class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
? row.getRowKind()
: rowKindGenerator.generate(row);
return kv.replace(record.primaryKey(),
KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
- });
+ },
+ CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index 112ab0890..572e488c6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -64,7 +64,7 @@ public class PrimaryKeyTableUtils {
switch (mergeEngine) {
case DEDUPLICATE:
- return DeduplicateMergeFunction.factory(conf);
+ return DeduplicateMergeFunction.factory();
case PARTIAL_UPDATE:
return PartialUpdateMergeFunction.factory(conf, rowType,
tableSchema.primaryKeys());
case AGGREGATE:
@@ -75,7 +75,7 @@ public class PrimaryKeyTableUtils {
tableSchema.primaryKeys());
case FIRST_ROW:
return FirstRowMergeFunction.factory(
- new RowType(extractor.keyFields(tableSchema)),
rowType, conf);
+ new RowType(extractor.keyFields(tableSchema)),
rowType);
default:
throw new UnsupportedOperationException("Unsupported merge
engine: " + mergeEngine);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index b1cacc4c6..c03ef7e13 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -32,6 +32,8 @@ import org.apache.paimon.operation.FileStoreWrite.State;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.Restorable;
+import javax.annotation.Nullable;
+
import java.util.List;
import java.util.concurrent.ExecutorService;
@@ -47,6 +49,7 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
private final FileStoreWrite<T> write;
private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
private final RecordExtractor<T> recordExtractor;
+ private final boolean ignoreDelete;
private boolean batchCommitted = false;
private BucketMode bucketMode;
@@ -54,10 +57,12 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
public TableWriteImpl(
FileStoreWrite<T> write,
KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
- RecordExtractor<T> recordExtractor) {
+ RecordExtractor<T> recordExtractor,
+ boolean ignoreDelete) {
this.write = write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
+ this.ignoreDelete = ignoreDelete;
}
@Override
@@ -121,13 +126,21 @@ public class TableWriteImpl<T> implements
InnerTableWrite, Restorable<List<State
writeAndReturn(row, bucket);
}
+ @Nullable
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
+ if (ignoreDelete && row.getRowKind().isRetract()) {
+ return null;
+ }
SinkRecord record = toSinkRecord(row);
write.write(record.partition(), record.bucket(),
recordExtractor.extract(record));
return record;
}
+ @Nullable
public SinkRecord writeAndReturn(InternalRow row, int bucket) throws
Exception {
+ if (ignoreDelete && row.getRowKind().isRetract()) {
+ return null;
+ }
SinkRecord record = toSinkRecord(row, bucket);
write.write(record.partition(), bucket,
recordExtractor.extract(record));
return record;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
index b71971ab6..89b217b5a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/SortBufferWriteBufferTestBase.java
@@ -18,7 +18,6 @@
package org.apache.paimon.mergetree;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.memory.HeapMemorySegmentPool;
@@ -179,7 +178,6 @@ public abstract class SortBufferWriteBufferTestBase {
@Override
protected MergeFunction<KeyValue> createMergeFunction() {
Options options = new Options();
- options.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE, true);
return PartialUpdateMergeFunction.factory(
options, RowType.of(DataTypes.BIGINT()),
ImmutableList.of("key"))
.create();
@@ -260,8 +258,7 @@ public abstract class SortBufferWriteBufferTestBase {
protected MergeFunction<KeyValue> createMergeFunction() {
return FirstRowMergeFunction.factory(
new RowType(Lists.list(new DataField(0, "f0", new
IntType()))),
- new RowType(Lists.list(new DataField(1, "f1", new
BigIntType()))),
- new Options())
+ new RowType(Lists.list(new DataField(1, "f1", new
BigIntType()))))
.create();
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index 4c377af17..96aa57684 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -18,7 +18,6 @@
package org.apache.paimon.mergetree.compact;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
@@ -27,7 +26,6 @@ import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
-import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.IntType;
@@ -38,7 +36,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -308,8 +305,7 @@ public class LookupChangelogMergeFunctionWrapperTest {
new RowType(
Lists.list(new DataField(0,
"f0", new IntType()))),
new RowType(
- Lists.list(new DataField(1,
"f1", new IntType()))),
- false),
+ Lists.list(new DataField(1,
"f1", new IntType())))),
highLevel::contains);
// Without level-0
@@ -373,30 +369,4 @@ public class LookupChangelogMergeFunctionWrapperTest {
kv = result.result();
assertThat(kv).isNull();
}
-
- @Test
- public void testPartialUpdateIgnoreDelete() {
- Options options = new Options();
- options.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE, true);
- LookupChangelogMergeFunctionWrapper function =
- new LookupChangelogMergeFunctionWrapper(
- LookupMergeFunction.wrap(
- PartialUpdateMergeFunction.factory(
- options,
- DataTypes.ROW(DataTypes.INT()),
- Collections.singletonList("f0")),
- RowType.of(DataTypes.INT()),
- RowType.of(DataTypes.INT())),
- key -> null,
- EQUALISER,
- false,
- LookupStrategy.CHANGELOG_ONLY,
- null);
-
- function.reset();
- function.add(new KeyValue().replace(row(1), 1, DELETE,
row(1)).setLevel(2));
- ChangelogResult result = function.getResult();
- assertThat(result).isNotNull();
- assertThat(result.result()).isNull();
- }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
index 81e49b81d..3ae461532 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/SortMergeReaderTestBase.java
@@ -130,7 +130,7 @@ public abstract class SortMergeReaderTestBase extends
CombiningRecordReaderTestB
RowType keyType = new RowType(Lists.list(new DataField(0, "f0",
new IntType())));
RowType valueType = new RowType(Lists.list(new DataField(1, "f1",
new BigIntType())));
return new LookupMergeFunction(
- new FirstRowMergeFunction(keyType, valueType, false),
keyType, valueType);
+ new FirstRowMergeFunction(keyType, valueType), keyType,
valueType);
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index c245a2ce7..db6e12305 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -747,7 +747,7 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
conf.set(
CoreOptions.MERGE_ENGINE,
CoreOptions.MergeEngine.PARTIAL_UPDATE);
- conf.set(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE,
true);
+ conf.set(CoreOptions.IGNORE_DELETE, true);
});
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 40743fae6..9ad54f04d 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -995,7 +995,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaSyncTableActionITCase
kafkaConfig.put(TOPIC.key(), topic);
Map<String, String> tableConfig = getBasicTableConfig();
- tableConfig.put(CoreOptions.DEDUPLICATE_IGNORE_DELETE.key(),
String.valueOf(ignoreDelete));
+ tableConfig.put(CoreOptions.IGNORE_DELETE.key(),
String.valueOf(ignoreDelete));
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(tableConfig).build();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
index 19f77fe91..2fdb658ad 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteAction.java
@@ -46,7 +46,6 @@ public class DeleteAction extends TableActionBase {
String filter,
Map<String, String> catalogConfig) {
super(warehouse, databaseName, tableName, catalogConfig);
- changeIgnoreMergeEngine();
this.filter = filter;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
index 687ea5869..41f8b7677 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
@@ -38,6 +39,7 @@ import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -159,6 +161,24 @@ public class MergeIntoAction extends TableActionBase {
.collect(Collectors.toList());
}
+ /**
+ * The {@link CoreOptions.MergeEngine}s will process -U/-D records in
different ways, but we
+ * want these records to be sunk directly. This method is a workaround
which disables merge
+ * engine settings and force compaction.
+ */
+ private void changeIgnoreMergeEngine() {
+ if (CoreOptions.fromMap(table.options()).mergeEngine()
+ != CoreOptions.MergeEngine.DEDUPLICATE) {
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(
+ CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.DEDUPLICATE.toString());
+ dynamicOptions.put(CoreOptions.IGNORE_DELETE.key(), "false");
+ // force compaction
+
dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
+ table = ((FileStoreTable)
table).internalCopyWithoutCheck(dynamicOptions);
+ }
+ }
+
public MergeIntoAction withTargetAlias(String targetAlias) {
this.targetAlias = targetAlias;
return this;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index a97335cd2..f10f3d625 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -18,13 +18,11 @@
package org.apache.paimon.flink.action;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -35,7 +33,6 @@ import org.apache.flink.table.data.RowData;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -99,23 +96,4 @@ public abstract class TableActionBase extends ActionBase {
e);
}
}
-
- /**
- * The {@link CoreOptions.MergeEngine}s will process -U/-D records in
different ways, but we
- * want these records to be sunk directly. This method is a workaround.
Actions that may produce
- * -U/-D records can call this to disable merge engine settings and force
compaction.
- */
- protected void changeIgnoreMergeEngine() {
- if (CoreOptions.fromMap(table.options()).mergeEngine()
- != CoreOptions.MergeEngine.DEDUPLICATE) {
- Map<String, String> dynamicOptions = new HashMap<>();
- dynamicOptions.put(
- CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.DEDUPLICATE.toString());
- // force compaction
-
dynamicOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
- Preconditions.checkArgument(
- table instanceof FileStoreTable, "Only supports
FileStoreTable.");
- table = ((FileStoreTable)
table).internalCopyWithoutCheck(dynamicOptions);
- }
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index 3fb6fe6ae..62341a180 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -108,16 +108,22 @@ public class GlobalFullCompactionSinkWrite extends
StoreSinkWriteImpl {
}
@Override
+ @Nullable
public SinkRecord write(InternalRow rowData) throws Exception {
SinkRecord sinkRecord = super.write(rowData);
- touchBucket(sinkRecord.partition(), sinkRecord.bucket());
+ if (sinkRecord != null) {
+ touchBucket(sinkRecord.partition(), sinkRecord.bucket());
+ }
return sinkRecord;
}
@Override
+ @Nullable
public SinkRecord write(InternalRow rowData, int bucket) throws Exception {
SinkRecord sinkRecord = super.write(rowData, bucket);
- touchBucket(sinkRecord.partition(), bucket);
+ if (sinkRecord != null) {
+ touchBucket(sinkRecord.partition(), bucket);
+ }
return sinkRecord;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
index b61fecab5..07fe27554 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataStoreWriteOperator.java
@@ -130,7 +130,7 @@ public class RowDataStoreWriteOperator extends
TableWriteOperator<InternalRow> {
throw new IOException(e);
}
- if (logSinkFunction != null) {
+ if (record != null && logSinkFunction != null) {
// write to log store, need to preserve original pk (which
includes partition fields)
SinkRecord logRecord = write.toLogRecord(record);
logSinkFunction.invoke(logRecord, sinkContext);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index 7278e1f21..6001721b7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -39,8 +39,10 @@ import java.util.List;
/** Helper class of {@link PrepareCommitOperator} for different types of
paimon sinks. */
public interface StoreSinkWrite {
+ @Nullable
SinkRecord write(InternalRow rowData) throws Exception;
+ @Nullable
SinkRecord write(InternalRow rowData, int bucket) throws Exception;
SinkRecord toLogRecord(SinkRecord record);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index c70f6038e..3ecc80bb6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -170,11 +170,13 @@ public class StoreSinkWriteImpl implements StoreSinkWrite
{
}
@Override
+ @Nullable
public SinkRecord write(InternalRow rowData) throws Exception {
return write.writeAndReturn(rowData);
}
@Override
+ @Nullable
public SinkRecord write(InternalRow rowData, int bucket) throws Exception {
return write.writeAndReturn(rowData, bucket);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 9829e1040..7c8ffb6d6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -372,20 +372,19 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testIgnoreDelete() throws Exception {
+ public void testIgnoreDelete() {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
v STRING) "
- + "WITH ('deduplicate.ignore-delete' = 'true',
'bucket' = '1')");
- BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM ignore_delete");
+ + "WITH ('merge-engine' = 'deduplicate',
'ignore-delete' = 'true', 'bucket' = '1')");
+
+ sql("INSERT INTO ignore_delete VALUES (1, 'A')");
+ assertThat(sql("SELECT * FROM
ignore_delete")).containsExactly(Row.of(1, "A"));
- sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')");
sql("DELETE FROM ignore_delete WHERE pk = 1");
- sql("INSERT INTO ignore_delete VALUES (1, 'B')");
+ assertThat(sql("SELECT * FROM
ignore_delete")).containsExactly(Row.of(1, "A"));
- assertThat(iterator.collect(2))
- .containsExactlyInAnyOrder(
- Row.ofKind(RowKind.INSERT, 1, "B"),
Row.ofKind(RowKind.INSERT, 2, "B"));
- iterator.close();
+ sql("INSERT INTO ignore_delete VALUES (1, 'B')");
+ assertThat(sql("SELECT * FROM
ignore_delete")).containsExactly(Row.of(1, "B"));
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index 66a154ba4..1fbed646e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -29,6 +29,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -549,19 +550,20 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
}
@Test
- public void testIgnoreDelete() {
+ public void testIgnoreDelete() throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
v STRING) "
- + "WITH ('deduplicate.ignore-delete' = 'true',
'bucket' = '1')");
-
- sql("INSERT INTO ignore_delete VALUES (1, 'A')");
- assertThat(sql("SELECT * FROM
ignore_delete")).containsExactly(Row.of(1, "A"));
+ + "WITH ('merge-engine' = 'deduplicate',
'ignore-delete' = 'true', 'bucket' = '1')");
+ BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM ignore_delete");
+ sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')");
sql("DELETE FROM ignore_delete WHERE pk = 1");
- assertThat(sql("SELECT * FROM
ignore_delete")).containsExactly(Row.of(1, "A"));
-
sql("INSERT INTO ignore_delete VALUES (1, 'B')");
- assertThat(sql("SELECT * FROM
ignore_delete")).containsExactly(Row.of(1, "B"));
+
+ assertThat(iterator.collect(2))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, "B"),
Row.ofKind(RowKind.INSERT, 2, "B"));
+ iterator.close();
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
index 5b9909d9c..5d7927a97 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FirstRowITCase.java
@@ -110,7 +110,7 @@ public class FirstRowITCase extends CatalogITCaseBase {
sql(
"CREATE TABLE IF NOT EXISTS T1 ("
+ "a INT, b INT, c STRING, PRIMARY KEY (a) NOT
ENFORCED)"
- + " WITH ('merge-engine'='first-row',
'first-row.ignore-delete' = 'true',"
+ + " WITH ('merge-engine'='first-row', 'ignore-delete'
= 'true',"
+ " 'changelog-producer' = 'lookup');");
List<Row> input =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index f4cf0b67f..23a574fb5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -52,7 +52,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
+ " WITH ('merge-engine'='partial-update');",
"CREATE TABLE IF NOT EXISTS dwd_orders ("
+ "OrderID INT, OrderNumber INT, PersonID INT,
LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (OrderID) NOT ENFORCED)"
- + " WITH ('merge-engine'='partial-update',
'partial-update.ignore-delete'='true');",
+ + " WITH ('merge-engine'='partial-update',
'ignore-delete'='true');",
"CREATE TABLE IF NOT EXISTS ods_orders (OrderID INT,
OrderNumber INT, PersonID INT, PRIMARY KEY (OrderID) NOT ENFORCED) WITH
('changelog-producer'='input', 'continuous.discovery-interval'='1s');",
"CREATE TABLE IF NOT EXISTS dim_persons (PersonID INT,
LastName STRING, FirstName STRING, Age INT, PRIMARY KEY (PersonID) NOT
ENFORCED) WITH ('changelog-producer'='input',
'continuous.discovery-interval'='1s');");
}
@@ -408,4 +408,30 @@ public class PartialUpdateITCase extends CatalogITCaseBase
{
insert1.close();
insert2.close();
}
+
+ @Test
+ public void testIgnoreDelete() throws Exception {
+ sql(
+ "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED,
a INT, g INT) WITH ("
+ + " 'merge-engine' = 'partial-update',"
+ + " 'ignore-delete' = 'true',"
+ + " 'fields.a.aggregate-function' = 'sum',"
+ + " 'fields.g.sequence-group'='a')");
+
+ String id =
+ TestValuesTableFactory.registerData(
+ Arrays.asList(
+ Row.ofKind(RowKind.INSERT, 1, 10, 1),
+ Row.ofKind(RowKind.DELETE, 1, 10, 2),
+ Row.ofKind(RowKind.INSERT, 1, 20, 3)));
+ streamSqlIter(
+ "CREATE TEMPORARY TABLE input (pk INT PRIMARY KEY NOT
ENFORCED, a INT, g INT) "
+ + "WITH ('connector'='values',
'bounded'='true', 'data-id'='%s', "
+ + "'changelog-mode' = 'I,D')",
+ id)
+ .close();
+ sEnv.executeSql("INSERT INTO ignore_delete SELECT * FROM
input").await();
+
+ assertThat(sql("SELECT * FROM
ignore_delete")).containsExactlyInAnyOrder(Row.of(1, 30, 3));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
index da1cf763a..c67228c25 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.action;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.table.FileStoreTable;
@@ -38,13 +37,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
-import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.insertInto;
-import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testStreamingRead;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.validateStreamingReadResult;
import static org.assertj.core.api.Assertions.assertThat;
@@ -99,76 +95,6 @@ public class DeleteActionITCase extends ActionITCaseBase {
iterator.close();
}
- @Test
- public void testWorkWithPartialUpdateTable() throws Exception {
- createFileStoreTable(
- RowType.of(
- new DataType[] {DataTypes.INT(), DataTypes.STRING(),
DataTypes.STRING()},
- new String[] {"k", "a", "b"}),
- Collections.emptyList(),
- Collections.singletonList("k"),
- new HashMap<String, String>() {
- {
- put(
- CoreOptions.MERGE_ENGINE.key(),
-
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
- put(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE.key(),
"true");
- put(
- CoreOptions.CHANGELOG_PRODUCER.key(),
- ThreadLocalRandom.current().nextBoolean()
- ?
CoreOptions.ChangelogProducer.LOOKUP.toString()
- :
CoreOptions.ChangelogProducer.FULL_COMPACTION.toString());
- }
- });
-
- DeleteAction action =
- createAction(
- DeleteAction.class,
- "delete",
- "--warehouse",
- warehouse,
- "--database",
- database,
- "--table",
- tableName,
- "--where",
- "k<3");
-
- insertInto(
- tableName, "(1, 'Say', 'A'), (2, 'Hi', 'B'), (3, 'To', 'C'),
(4, 'Paimon', 'D')");
-
- BlockingIterator<Row, Row> streamItr =
- testStreamingRead(
- buildSimpleQuery(tableName),
- Arrays.asList(
- changelogRow("+I", 1, "Say", "A"),
- changelogRow("+I", 2, "Hi", "B"),
- changelogRow("+I", 3, "To", "C"),
- changelogRow("+I", 4, "Paimon", "D")));
-
- action.run();
-
- // test delete records hasn't been thrown
- validateStreamingReadResult(
- streamItr,
- Arrays.asList(changelogRow("-D", 1, "Say", "A"),
changelogRow("-D", 2, "Hi", "B")));
-
- // test partial update still works after action
- insertInto(
- tableName, "(4, CAST (NULL AS STRING), '$')", "(4, 'Test',
CAST (NULL AS STRING))");
-
- validateStreamingReadResult(
- streamItr,
- Arrays.asList(
- changelogRow("-U", 4, "Paimon", "D"),
changelogRow("+U", 4, "Test", "$")));
- streamItr.close();
-
- testBatchRead(
- buildSimpleQuery(tableName),
- Arrays.asList(
- changelogRow("+I", 3, "To", "C"), changelogRow("+I",
4, "Test", "$")));
- }
-
private void prepareTable() throws Exception {
Map<String, String> options = new HashMap<>();
FileStoreTable table =
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index a1ca5eb55..3e27f0c0c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -557,7 +557,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
put(
CoreOptions.MERGE_ENGINE.key(),
CoreOptions.MergeEngine.PARTIAL_UPDATE.toString());
-
put(CoreOptions.PARTIAL_UPDATE_IGNORE_DELETE.key(), "true");
+ put(CoreOptions.IGNORE_DELETE.key(),
"true");
}
}
}));