This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit e3e4303ab5e8a9686ed348b971e09b9160baa09f Author: yuzelin <[email protected]> AuthorDate: Wed May 22 16:47:18 2024 +0800 [core] Fix that ignore-delete hasn't handle rowkind.field (#3365) --- .../apache/paimon/table/AbstractFileStoreTable.java | 5 +++++ .../paimon/table/AppendOnlyFileStoreTable.java | 7 ++++--- .../paimon/table/PrimaryKeyFileStoreTable.java | 21 +++++++-------------- .../apache/paimon/table/sink/RowKindGenerator.java | 4 ++++ .../apache/paimon/table/sink/TableWriteImpl.java | 16 +++++++++++----- .../paimon/flink/sink/LocalMergeOperator.java | 3 +-- .../apache/paimon/flink/BatchFileStoreITCase.java | 16 ++++++++++++++++ 7 files changed, 48 insertions(+), 24 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 7d4a4a7c2..9a590a0c4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -40,6 +40,7 @@ import org.apache.paimon.table.sink.CommitCallback; import org.apache.paimon.table.sink.DynamicBucketRowKeyExtractor; import org.apache.paimon.table.sink.FixedBucketRowKeyExtractor; import org.apache.paimon.table.sink.RowKeyExtractor; +import org.apache.paimon.table.sink.RowKindGenerator; import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.table.sink.UnawareBucketRowKeyExtractor; import org.apache.paimon.table.source.InnerStreamTableScan; @@ -588,6 +589,10 @@ abstract class AbstractFileStoreTable implements FileStoreTable { store().newTagDeletion()); } + protected RowKindGenerator rowKindGenerator() { + return RowKindGenerator.create(schema(), store().options()); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 810669f2e..f45865d01 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -147,13 +147,14 @@ class AppendOnlyFileStoreTable extends AbstractFileStoreTable { return new TableWriteImpl<>( writer, createRowKeyExtractor(), - record -> { + (record, rowKind) -> { Preconditions.checkState( - record.row().getRowKind() == RowKind.INSERT, + rowKind.isAdd(), "Append only writer can not accept row with RowKind %s", - record.row().getRowKind()); + rowKind); return record.row(); }, + rowKindGenerator(), CoreOptions.fromMap(tableSchema.options()).ignoreDelete()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index bf26ec31c..8c6925d73 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -21,7 +21,6 @@ package org.apache.paimon.table; import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; import org.apache.paimon.KeyValueFileStore; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.manifest.ManifestCacheFilter; @@ -35,13 +34,11 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.query.LocalTableQuery; -import org.apache.paimon.table.sink.RowKindGenerator; import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.KeyValueTableRead; import org.apache.paimon.table.source.MergeTreeSplitGenerator; import org.apache.paimon.table.source.SplitGenerator; -import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import java.util.List; @@ -167,21 +164,17 @@ class PrimaryKeyFileStoreTable extends AbstractFileStoreTable { @Override public TableWriteImpl<KeyValue> newWrite( String commitUser, ManifestCacheFilter manifestFilter) { - TableSchema schema = schema(); - CoreOptions options = store().options(); - RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options); KeyValue kv = new KeyValue(); return new TableWriteImpl<>( store().newWrite(commitUser, manifestFilter), createRowKeyExtractor(), - record -> { - InternalRow row = record.row(); - RowKind rowKind = - rowKindGenerator == null - ? row.getRowKind() - : rowKindGenerator.generate(row); - return kv.replace(record.primaryKey(), KeyValue.UNKNOWN_SEQUENCE, rowKind, row); - }, + (record, rowKind) -> + kv.replace( + record.primaryKey(), + KeyValue.UNKNOWN_SEQUENCE, + rowKind, + record.row()), + rowKindGenerator(), CoreOptions.fromMap(tableSchema.options()).ignoreDelete()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java index de55afceb..1834fdffa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowKindGenerator.java @@ -62,4 +62,8 @@ public class RowKindGenerator { .map(field -> new RowKindGenerator(field, schema.logicalRowType())) .orElse(null); } + + public static RowKind getRowKind(@Nullable RowKindGenerator rowKindGenerator, InternalRow row) { + return rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index c03ef7e13..d7eaaa4b0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -30,6 +30,7 @@ import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.FileStoreWrite; import org.apache.paimon.operation.FileStoreWrite.State; import org.apache.paimon.table.BucketMode; +import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.Restorable; import javax.annotation.Nullable; @@ -49,6 +50,7 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State private final FileStoreWrite<T> write; private final KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor; private final RecordExtractor<T> recordExtractor; + @Nullable private final RowKindGenerator rowKindGenerator; private final boolean ignoreDelete; private boolean batchCommitted = false; @@ -58,10 +60,12 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State FileStoreWrite<T> write, KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor, RecordExtractor<T> recordExtractor, + @Nullable RowKindGenerator rowKindGenerator, boolean ignoreDelete) { this.write = write; this.keyAndBucketExtractor = keyAndBucketExtractor; this.recordExtractor = recordExtractor; + this.rowKindGenerator = rowKindGenerator; this.ignoreDelete = ignoreDelete; } @@ -128,21 +132,23 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State @Nullable public SinkRecord writeAndReturn(InternalRow row) throws Exception { - if (ignoreDelete && row.getRowKind().isRetract()) { + RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); + if (ignoreDelete && rowKind.isRetract()) { return null; } SinkRecord record = toSinkRecord(row); - write.write(record.partition(), record.bucket(), recordExtractor.extract(record)); + write.write(record.partition(), record.bucket(), recordExtractor.extract(record, rowKind)); return record; } @Nullable public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception { - if (ignoreDelete && row.getRowKind().isRetract()) { + RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); + if (ignoreDelete && rowKind.isRetract()) { return null; } SinkRecord record = toSinkRecord(row, bucket); - write.write(record.partition(), bucket, recordExtractor.extract(record)); + write.write(record.partition(), bucket, recordExtractor.extract(record, rowKind)); return record; } @@ -231,6 +237,6 @@ public class TableWriteImpl<T> implements InnerTableWrite, Restorable<List<State /** Extractor to extract {@link T} from the {@link SinkRecord}. */ public interface RecordExtractor<T> { - T extract(SinkRecord record); + T extract(SinkRecord record, RowKind rowKind); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java index a09372443..273f38308 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java @@ -137,8 +137,7 @@ public class LocalMergeOperator extends AbstractStreamOperator<InternalRow> recordCount++; InternalRow row = record.getValue(); - RowKind rowKind = - rowKindGenerator == null ? row.getRowKind() : rowKindGenerator.generate(row); + RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); if (ignoreDelete && rowKind.isRetract()) { return; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 2758b7352..55bd886ad 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -422,6 +422,22 @@ public class BatchFileStoreITCase extends CatalogITCaseBase { assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B")); } + @Test + public void testIgnoreDeleteWithRowKindField() { + sql( + "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING, kind STRING) " + + "WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')"); + + sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')"); + assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I")); + + sql("INSERT INTO ignore_delete VALUES (1, 'A', '-D')"); + assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I")); + + sql("INSERT INTO ignore_delete VALUES (1, 'B', '+I')"); + assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B", "+I")); + } + @Test public void testDeleteWithPkLookup() throws Exception { sql(
