This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b6dd27a91 [core] Fix dv table with partial-update and aggregate (#3036)
b6dd27a91 is described below

commit b6dd27a91fd3b40dcf0621ec86a485be326cc321
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Mar 18 17:02:15 2024 +0800

    [core] Fix dv table with partial-update and aggregate (#3036)
---
 .../paimon/operation/KeyValueFileStoreWrite.java   | 10 ++-
 .../org/apache/paimon/schema/SchemaValidation.java |  3 +-
 .../apache/paimon/flink/DeletionVectorITCase.java  | 89 +++++++++++++++++++++-
 3 files changed, 94 insertions(+), 8 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index e06ed9ea2..6f74e6720 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -80,6 +80,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.function.Supplier;
 
 import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
 import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
@@ -255,8 +256,6 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             @Nullable FieldsComparator userDefinedSeqComparator,
             Levels levels,
             @Nullable DeletionVectorsMaintainer dvMaintainer) {
-        KeyValueFileReaderFactory.Builder readerFactoryBuilder =
-                this.readerFactoryBuilder.copyWithoutProjection();
         DeletionVector.Factory dvFactory = 
DeletionVector.factory(dvMaintainer);
         KeyValueFileReaderFactory readerFactory =
                 readerFactoryBuilder.build(partition, bucket, dvFactory);
@@ -286,10 +285,11 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             if (mergeEngine == FIRST_ROW) {
                 if (options.deletionVectorsEnabled()) {
                     throw new UnsupportedOperationException(
-                            "Deletion vectors mode is not supported for first 
row merge engine now.");
+                            "First row merge engine does not need deletion 
vectors because there is no deletion of old data in this merge engine.");
                 }
                 lookupReaderFactory =
                         readerFactoryBuilder
+                                .copyWithoutProjection()
                                 .withValueProjection(new int[0][])
                                 .build(partition, bucket, dvFactory);
                 processor = new ContainsValueProcessor();
@@ -298,7 +298,9 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 processor =
                         lookupStrategy.deletionVector
                                 ? new PositionedKeyValueProcessor(
-                                        valueType, 
lookupStrategy.produceChangelog)
+                                        valueType,
+                                        lookupStrategy.produceChangelog
+                                                || mergeEngine != DEDUPLICATE)
                                 : new KeyValueProcessor(valueType);
                 wrapperFactory =
                         new LookupMergeFunctionWrapperFactory<>(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index cbdcb64e5..cb52570cd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -463,10 +463,9 @@ public class SchemaValidation {
                         || options.changelogProducer() == 
ChangelogProducer.LOOKUP,
                 "Deletion vectors mode is only supported for none or lookup 
changelog producer now.");
 
-        // todo: implement it
         checkArgument(
                 !options.mergeEngine().equals(MergeEngine.FIRST_ROW),
-                "Deletion vectors mode is not supported for first row merge 
engine now.");
+                "First row merge engine does not need deletion vectors because 
there is no deletion of old data in this merge engine.");
     }
 
     private static void validateSequenceField(TableSchema schema, CoreOptions 
options) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index 162e29eec..1fe424b33 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -48,7 +48,7 @@ public class DeletionVectorITCase extends CatalogITCaseBase {
         // test read from APPEND snapshot
         try (BlockingIterator<Row, Row> iter =
                 streamSqlBlockIter(
-                        "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */"); ) {
+                        "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) {
             assertThat(iter.collect(12))
                     .containsExactlyInAnyOrder(
                             Row.ofKind(RowKind.INSERT, 1, "111111111"),
@@ -68,7 +68,7 @@ public class DeletionVectorITCase extends CatalogITCaseBase {
         // test read from COMPACT snapshot
         try (BlockingIterator<Row, Row> iter =
                 streamSqlBlockIter(
-                        "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */"); ) {
+                        "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
             assertThat(iter.collect(8))
                     .containsExactlyInAnyOrder(
                             Row.ofKind(RowKind.INSERT, 1, "111111111"),
@@ -113,4 +113,89 @@ public class DeletionVectorITCase extends 
CatalogITCaseBase {
                 .containsExactlyInAnyOrder(
                         Row.of(1, "111111111"), Row.of(2, "2_1"), Row.of(3, 
"3_1"), Row.of(4, "4"));
     }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"none", "lookup"})
+    public void testDVTableWithAggregationMergeEngine(String 
changelogProducer) throws Exception {
+        sql(
+                String.format(
+                        "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v 
INT) "
+                                + "WITH ('deletion-vectors.enabled' = 'true', 
'changelog-producer' = '%s', "
+                                + "'merge-engine'='aggregation', 
'fields.v.aggregate-function'='sum')",
+                        changelogProducer));
+
+        sql("INSERT INTO T VALUES (1, 111111111), (2, 2), (3, 3), (4, 4)");
+
+        sql("INSERT INTO T VALUES (2, 1), (3, 1)");
+
+        sql("INSERT INTO T VALUES (2, 1), (4, 1)");
+
+        // test batch read
+        assertThat(batchSql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 111111111), Row.of(2, 4), Row.of(3, 4), 
Row.of(4, 5));
+
+        // test streaming read
+        if (changelogProducer.equals("lookup")) {
+            try (BlockingIterator<Row, Row> iter =
+                    streamSqlBlockIter(
+                            "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
+                assertThat(iter.collect(8))
+                        .containsExactlyInAnyOrder(
+                                Row.ofKind(RowKind.INSERT, 1, 111111111),
+                                Row.ofKind(RowKind.INSERT, 2, 3),
+                                Row.ofKind(RowKind.INSERT, 3, 4),
+                                Row.ofKind(RowKind.INSERT, 4, 4),
+                                Row.ofKind(RowKind.UPDATE_BEFORE, 2, 3),
+                                Row.ofKind(RowKind.UPDATE_AFTER, 2, 4),
+                                Row.ofKind(RowKind.UPDATE_BEFORE, 4, 4),
+                                Row.ofKind(RowKind.UPDATE_AFTER, 4, 5));
+            }
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"none", "lookup"})
+    public void testDVTableWithPartialUpdateMergeEngine(String 
changelogProducer) throws Exception {
+        sql(
+                String.format(
+                        "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v1 
STRING, v2 STRING) "
+                                + "WITH ('deletion-vectors.enabled' = 'true', 
'changelog-producer' = '%s', "
+                                + "'merge-engine'='partial-update')",
+                        changelogProducer));
+
+        sql(
+                "INSERT INTO T VALUES (1, '111111111', '1'), (2, '2', 
CAST(NULL AS STRING)), (3, '3', '3'), (4, CAST(NULL AS STRING), '4')");
+
+        sql("INSERT INTO T VALUES (2, CAST(NULL AS STRING), '2'), (3, '3_1', 
'3_1')");
+
+        sql(
+                "INSERT INTO T VALUES (2, '2_1', CAST(NULL AS STRING)), (4, 
'4', CAST(NULL AS STRING))");
+
+        // test batch read
+        assertThat(batchSql("SELECT * FROM T"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, "111111111", "1"),
+                        Row.of(2, "2_1", "2"),
+                        Row.of(3, "3_1", "3_1"),
+                        Row.of(4, "4", "4"));
+
+        // test streaming read
+        if (changelogProducer.equals("lookup")) {
+            try (BlockingIterator<Row, Row> iter =
+                    streamSqlBlockIter(
+                            "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
+                assertThat(iter.collect(8))
+                        .containsExactlyInAnyOrder(
+                                Row.ofKind(RowKind.INSERT, 1, "111111111", 
"1"),
+                                Row.ofKind(RowKind.INSERT, 2, "2", "2"),
+                                Row.ofKind(RowKind.INSERT, 3, "3_1", "3_1"),
+                                Row.ofKind(RowKind.INSERT, 4, null, "4"),
+                                Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2", "2"),
+                                Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_1", 
"2"),
+                                Row.ofKind(RowKind.UPDATE_BEFORE, 4, null, 
"4"),
+                                Row.ofKind(RowKind.UPDATE_AFTER, 4, "4", "4"));
+            }
+        }
+    }
 }

Reply via email to