This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.8
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit e3e4303ab5e8a9686ed348b971e09b9160baa09f
Author: yuzelin <[email protected]>
AuthorDate: Wed May 22 16:47:18 2024 +0800

    [core] Fix that ignore-delete hasn't handle rowkind.field (#3365)
---
 .../apache/paimon/table/AbstractFileStoreTable.java |  5 +++++
 .../paimon/table/AppendOnlyFileStoreTable.java      |  7 ++++---
 .../paimon/table/PrimaryKeyFileStoreTable.java      | 21 +++++++--------------
 .../apache/paimon/table/sink/RowKindGenerator.java  |  4 ++++
 .../apache/paimon/table/sink/TableWriteImpl.java    | 16 +++++++++++-----
 .../paimon/flink/sink/LocalMergeOperator.java       |  3 +--
 .../apache/paimon/flink/BatchFileStoreITCase.java   | 16 ++++++++++++++++
 7 files changed, 48 insertions(+), 24 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 7d4a4a7c2..9a590a0c4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -40,6 +40,7 @@ import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor;
 import org.apache.paimon.table.sink.RowKeyExtractor;
+import org.apache.paimon.table.sink.RowKindGenerator;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor;
 import org.apache.paimon.table.source.InnerStreamTableScan;
@@ -588,6 +589,10 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 store().newTagDeletion());
     }
 
+    protected RowKindGenerator rowKindGenerator() {
+        return RowKindGenerator.create(schema(), store().options());
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 810669f2e..f45865d01 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -147,13 +147,14 @@ class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
         return new TableWriteImpl<>(
                 writer,
                 createRowKeyExtractor(),
-                record -> {
+                (record, rowKind) -> {
                     Preconditions.checkState(
-                            record.row().getRowKind() == RowKind.INSERT,
+                            rowKind.isAdd(),
                             "Append only writer can not accept row with 
RowKind %s",
-                            record.row().getRowKind());
+                            rowKind);
                     return record.row();
                 },
+                rowKindGenerator(),
                 CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index bf26ec31c..8c6925d73 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.KeyValueFileStore;
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -35,13 +34,11 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.query.LocalTableQuery;
-import org.apache.paimon.table.sink.RowKindGenerator;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.KeyValueTableRead;
 import org.apache.paimon.table.source.MergeTreeSplitGenerator;
 import org.apache.paimon.table.source.SplitGenerator;
-import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
 import java.util.List;
@@ -167,21 +164,17 @@ class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
     @Override
     public TableWriteImpl<KeyValue> newWrite(
             String commitUser, ManifestCacheFilter manifestFilter) {
-        TableSchema schema = schema();
-        CoreOptions options = store().options();
-        RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, 
options);
         KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
                 store().newWrite(commitUser, manifestFilter),
                 createRowKeyExtractor(),
-                record -> {
-                    InternalRow row = record.row();
-                    RowKind rowKind =
-                            rowKindGenerator == null
-                                    ? row.getRowKind()
-                                    : rowKindGenerator.generate(row);
-                    return kv.replace(record.primaryKey(), 
KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
-                },
+                (record, rowKind) ->
+                        kv.replace(
+                                record.primaryKey(),
+                                KeyValue.UNKNOWN_SEQUENCE,
+                                rowKind,
+                                record.row()),
+                rowKindGenerator(),
                 CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java
index de55afceb..1834fdffa 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java
@@ -62,4 +62,8 @@ public class RowKindGenerator {
                 .map(field -> new RowKindGenerator(field, 
schema.logicalRowType()))
                 .orElse(null);
     }
+
+    public static RowKind getRowKind(@Nullable RowKindGenerator 
rowKindGenerator, InternalRow row) {
+        return rowKindGenerator == null ? row.getRowKind() : 
rowKindGenerator.generate(row);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index c03ef7e13..d7eaaa4b0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -30,6 +30,7 @@ import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.operation.FileStoreWrite.State;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.types.RowKind;
 import org.apache.paimon.utils.Restorable;
 
 import javax.annotation.Nullable;
@@ -49,6 +50,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     private final FileStoreWrite<T> write;
     private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor;
     private final RecordExtractor<T> recordExtractor;
+    @Nullable private final RowKindGenerator rowKindGenerator;
     private final boolean ignoreDelete;
 
     private boolean batchCommitted = false;
@@ -58,10 +60,12 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
             FileStoreWrite<T> write,
             KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
             RecordExtractor<T> recordExtractor,
+            @Nullable RowKindGenerator rowKindGenerator,
             boolean ignoreDelete) {
         this.write = write;
         this.keyAndBucketExtractor = keyAndBucketExtractor;
         this.recordExtractor = recordExtractor;
+        this.rowKindGenerator = rowKindGenerator;
         this.ignoreDelete = ignoreDelete;
     }
 
@@ -128,21 +132,23 @@ public class TableWriteImpl<T> implements 
InnerTableWrite, Restorable<List<State
 
     @Nullable
     public SinkRecord writeAndReturn(InternalRow row) throws Exception {
-        if (ignoreDelete && row.getRowKind().isRetract()) {
+        RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
+        if (ignoreDelete && rowKind.isRetract()) {
             return null;
         }
         SinkRecord record = toSinkRecord(row);
-        write.write(record.partition(), record.bucket(), 
recordExtractor.extract(record));
+        write.write(record.partition(), record.bucket(), 
recordExtractor.extract(record, rowKind));
         return record;
     }
 
     @Nullable
     public SinkRecord writeAndReturn(InternalRow row, int bucket) throws 
Exception {
-        if (ignoreDelete && row.getRowKind().isRetract()) {
+        RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
+        if (ignoreDelete && rowKind.isRetract()) {
             return null;
         }
         SinkRecord record = toSinkRecord(row, bucket);
-        write.write(record.partition(), bucket, 
recordExtractor.extract(record));
+        write.write(record.partition(), bucket, 
recordExtractor.extract(record, rowKind));
         return record;
     }
 
@@ -231,6 +237,6 @@ public class TableWriteImpl<T> implements InnerTableWrite, 
Restorable<List<State
     /** Extractor to extract {@link T} from the {@link SinkRecord}. */
     public interface RecordExtractor<T> {
 
-        T extract(SinkRecord record);
+        T extract(SinkRecord record, RowKind rowKind);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index a09372443..273f38308 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -137,8 +137,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
         recordCount++;
         InternalRow row = record.getValue();
 
-        RowKind rowKind =
-                rowKindGenerator == null ? row.getRowKind() : 
rowKindGenerator.generate(row);
+        RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
         if (ignoreDelete && rowKind.isRetract()) {
             return;
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 2758b7352..55bd886ad 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -422,6 +422,22 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "B"));
     }
 
+    @Test
+    public void testIgnoreDeleteWithRowKindField() {
+        sql(
+                "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, 
v STRING, kind STRING) "
+                        + "WITH ('merge-engine' = 'deduplicate', 
'ignore-delete' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')");
+
+        sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')");
+        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "A", "+I"));
+
+        sql("INSERT INTO ignore_delete VALUES (1, 'A', '-D')");
+        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "A", "+I"));
+
+        sql("INSERT INTO ignore_delete VALUES (1, 'B', '+I')");
+        assertThat(sql("SELECT * FROM 
ignore_delete")).containsExactly(Row.of(1, "B", "+I"));
+    }
+
     @Test
     public void testDeleteWithPkLookup() throws Exception {
         sql(

Reply via email to