This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b6dd27a91 [core] Fix dv table with partial-update and aggregate (#3036)
b6dd27a91 is described below
commit b6dd27a91fd3b40dcf0621ec86a485be326cc321
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Mar 18 17:02:15 2024 +0800
[core] Fix dv table with partial-update and aggregate (#3036)
---
.../paimon/operation/KeyValueFileStoreWrite.java | 10 ++-
.../org/apache/paimon/schema/SchemaValidation.java | 3 +-
.../apache/paimon/flink/DeletionVectorITCase.java | 89 +++++++++++++++++++++-
3 files changed, 94 insertions(+), 8 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index e06ed9ea2..6f74e6720 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -80,6 +80,7 @@ import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
+import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
@@ -255,8 +256,6 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
@Nullable FieldsComparator userDefinedSeqComparator,
Levels levels,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
- KeyValueFileReaderFactory.Builder readerFactoryBuilder =
- this.readerFactoryBuilder.copyWithoutProjection();
DeletionVector.Factory dvFactory =
DeletionVector.factory(dvMaintainer);
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(partition, bucket, dvFactory);
@@ -286,10 +285,11 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
if (mergeEngine == FIRST_ROW) {
if (options.deletionVectorsEnabled()) {
throw new UnsupportedOperationException(
- "Deletion vectors mode is not supported for first
row merge engine now.");
+ "First row merge engine does not need deletion
vectors because there is no deletion of old data in this merge engine.");
}
lookupReaderFactory =
readerFactoryBuilder
+ .copyWithoutProjection()
.withValueProjection(new int[0][])
.build(partition, bucket, dvFactory);
processor = new ContainsValueProcessor();
@@ -298,7 +298,9 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
processor =
lookupStrategy.deletionVector
? new PositionedKeyValueProcessor(
- valueType,
lookupStrategy.produceChangelog)
+ valueType,
+ lookupStrategy.produceChangelog
+ || mergeEngine != DEDUPLICATE)
: new KeyValueProcessor(valueType);
wrapperFactory =
new LookupMergeFunctionWrapperFactory<>(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index cbdcb64e5..cb52570cd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -463,10 +463,9 @@ public class SchemaValidation {
|| options.changelogProducer() ==
ChangelogProducer.LOOKUP,
"Deletion vectors mode is only supported for none or lookup
changelog producer now.");
- // todo: implement it
checkArgument(
!options.mergeEngine().equals(MergeEngine.FIRST_ROW),
- "Deletion vectors mode is not supported for first row merge
engine now.");
+ "First row merge engine does not need deletion vectors because
there is no deletion of old data in this merge engine.");
}
private static void validateSequenceField(TableSchema schema, CoreOptions
options) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index 162e29eec..1fe424b33 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -48,7 +48,7 @@ public class DeletionVectorITCase extends CatalogITCaseBase {
// test read from APPEND snapshot
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
- "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */"); ) {
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) {
assertThat(iter.collect(12))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "111111111"),
@@ -68,7 +68,7 @@ public class DeletionVectorITCase extends CatalogITCaseBase {
// test read from COMPACT snapshot
try (BlockingIterator<Row, Row> iter =
streamSqlBlockIter(
- "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */"); ) {
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
assertThat(iter.collect(8))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "111111111"),
@@ -113,4 +113,89 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
.containsExactlyInAnyOrder(
Row.of(1, "111111111"), Row.of(2, "2_1"), Row.of(3,
"3_1"), Row.of(4, "4"));
}
+
+ @ParameterizedTest
+ @ValueSource(strings = {"none", "lookup"})
+ public void testDVTableWithAggregationMergeEngine(String
changelogProducer) throws Exception {
+ sql(
+ String.format(
+ "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v
INT) "
+ + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', "
+ + "'merge-engine'='aggregation',
'fields.v.aggregate-function'='sum')",
+ changelogProducer));
+
+ sql("INSERT INTO T VALUES (1, 111111111), (2, 2), (3, 3), (4, 4)");
+
+ sql("INSERT INTO T VALUES (2, 1), (3, 1)");
+
+ sql("INSERT INTO T VALUES (2, 1), (4, 1)");
+
+ // test batch read
+ assertThat(batchSql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, 111111111), Row.of(2, 4), Row.of(3, 4),
Row.of(4, 5));
+
+ // test streaming read
+ if (changelogProducer.equals("lookup")) {
+ try (BlockingIterator<Row, Row> iter =
+ streamSqlBlockIter(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
+ assertThat(iter.collect(8))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, 111111111),
+ Row.ofKind(RowKind.INSERT, 2, 3),
+ Row.ofKind(RowKind.INSERT, 3, 4),
+ Row.ofKind(RowKind.INSERT, 4, 4),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 2, 3),
+ Row.ofKind(RowKind.UPDATE_AFTER, 2, 4),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 4, 4),
+ Row.ofKind(RowKind.UPDATE_AFTER, 4, 5));
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"none", "lookup"})
+ public void testDVTableWithPartialUpdateMergeEngine(String
changelogProducer) throws Exception {
+ sql(
+ String.format(
+ "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, v1
STRING, v2 STRING) "
+ + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s', "
+ + "'merge-engine'='partial-update')",
+ changelogProducer));
+
+ sql(
+ "INSERT INTO T VALUES (1, '111111111', '1'), (2, '2',
CAST(NULL AS STRING)), (3, '3', '3'), (4, CAST(NULL AS STRING), '4')");
+
+ sql("INSERT INTO T VALUES (2, CAST(NULL AS STRING), '2'), (3, '3_1',
'3_1')");
+
+ sql(
+ "INSERT INTO T VALUES (2, '2_1', CAST(NULL AS STRING)), (4,
'4', CAST(NULL AS STRING))");
+
+ // test batch read
+ assertThat(batchSql("SELECT * FROM T"))
+ .containsExactlyInAnyOrder(
+ Row.of(1, "111111111", "1"),
+ Row.of(2, "2_1", "2"),
+ Row.of(3, "3_1", "3_1"),
+ Row.of(4, "4", "4"));
+
+ // test streaming read
+ if (changelogProducer.equals("lookup")) {
+ try (BlockingIterator<Row, Row> iter =
+ streamSqlBlockIter(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
+ assertThat(iter.collect(8))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, "111111111",
"1"),
+ Row.ofKind(RowKind.INSERT, 2, "2", "2"),
+ Row.ofKind(RowKind.INSERT, 3, "3_1", "3_1"),
+ Row.ofKind(RowKind.INSERT, 4, null, "4"),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 2, "2", "2"),
+ Row.ofKind(RowKind.UPDATE_AFTER, 2, "2_1",
"2"),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 4, null,
"4"),
+ Row.ofKind(RowKind.UPDATE_AFTER, 4, "4", "4"));
+ }
+ }
+ }
}