This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 574f16252 [core] support deletion vector with input ChangelogProducer
(#3929)
574f16252 is described below
commit 574f162527f4ff96763b44e68c0682d363365762
Author: wangwj <[email protected]>
AuthorDate: Sun Aug 11 21:24:26 2024 +0800
[core] support deletion vector with input ChangelogProducer (#3929)
---
docs/content/concepts/spec/snapshot.md | 2 +-
.../org/apache/paimon/append/AppendOnlyWriter.java | 2 +-
.../org/apache/paimon/io/RecordLevelExpire.java | 4 +-
.../org/apache/paimon/schema/SchemaValidation.java | 3 +-
.../paimon/manifest/ManifestFileMetaTestBase.java | 4 +-
.../sink/MultiTablesStoreCompactOperator.java | 2 +-
.../apache/paimon/flink/DeletionVectorITCase.java | 47 ++++++++++++++++++++++
7 files changed, 56 insertions(+), 8 deletions(-)
diff --git a/docs/content/concepts/spec/snapshot.md
b/docs/content/concepts/spec/snapshot.md
index d10598272..30b5fa0d0 100644
--- a/docs/content/concepts/spec/snapshot.md
+++ b/docs/content/concepts/spec/snapshot.md
@@ -61,5 +61,5 @@ Snapshot File is JSON, it includes:
12. totalRecordCount: record count of all changes occurred in this snapshot.
13. deltaRecordCount: record count of all new changes occurred in this
snapshot.
14. changelogRecordCount: record count of all changelog produced in this
snapshot.
-15. watermark: watermark for input records, from Flink watermark mechanism,
null if there is no watermark.
+15. watermark: watermark for input records, from Flink watermark mechanism,
Long.MIN_VALUE if there is no watermark.
16. statistics: stats file name for statistics of this table.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 17ebe215e..6fda88fd8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -84,7 +84,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
private final FileIndexOptions fileIndexOptions;
private MemorySegmentPool memorySegmentPool;
- private MemorySize maxDiskSize;
+ private final MemorySize maxDiskSize;
public AppendOnlyWriter(
FileIO fileIO,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index a49e31fc9..4a61b66a7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -69,7 +69,7 @@ public class RecordLevelExpire {
return new RecordLevelExpire(fieldIndex, (int)
expireTime.getSeconds());
}
- public RecordLevelExpire(int timeField, int expireTime) {
+ private RecordLevelExpire(int timeField, int expireTime) {
this.timeField = timeField;
this.expireTime = expireTime;
}
@@ -78,7 +78,7 @@ public class RecordLevelExpire {
return file -> wrap(readerFactory.createRecordReader(file));
}
- public RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
+ private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
int currentTime = (int) (System.currentTimeMillis() / 1000);
return reader.filter(
kv -> {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 791d38b2c..b2c3ed765 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -487,8 +487,9 @@ public class SchemaValidation {
private static void validateForDeletionVectors(CoreOptions options) {
checkArgument(
options.changelogProducer() == ChangelogProducer.NONE
+ || options.changelogProducer() ==
ChangelogProducer.INPUT
|| options.changelogProducer() ==
ChangelogProducer.LOOKUP,
- "Deletion vectors mode is only supported for none or lookup
changelog producer now.");
+ "Deletion vectors mode is only supported for NONE/INPUT/LOOKUP
changelog producer now.");
checkArgument(
!options.mergeEngine().equals(MergeEngine.FIRST_ROW),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index f4b3c69ba..fdb3cb7ff 100644
---
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -133,7 +133,7 @@ public abstract class ManifestFileMetaTestBase {
path,
getPartitionType(),
"default",
-
CoreOptions.FILE_FORMAT.defaultValue().toString()),
+ CoreOptions.FILE_FORMAT.defaultValue()),
Long.MAX_VALUE,
null)
.create();
@@ -166,7 +166,7 @@ public abstract class ManifestFileMetaTestBase {
protected List<ManifestFileMeta> createBaseManifestFileMetas(boolean
hasPartition) {
List<ManifestFileMeta> input = new ArrayList<>();
- // base with 3 partition ,16 entry each parition
+ // base with 3 partition, 16 entry each partition
for (int j = 0; j < 3; j++) {
List<ManifestEntry> entrys = new ArrayList<>();
for (int i = 0; i < 16; i++) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index f253a3bf8..ccbccaa5f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -51,7 +51,7 @@ import static
org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
* A dedicated operator for manual triggered compaction.
*
* <p>In-coming records are generated by sources built from {@link
- * org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder}. The
records will contain
+ * org.apache.paimon.flink.source.operator.MultiTablesReadOperator}. The
records will contain
* partition keys, bucket number, table name and database name.
*/
public class MultiTablesStoreCompactOperator
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index 1fe424b33..9f27f9405 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -30,6 +30,53 @@ import static
org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
/** ITCase for deletion vector table. */
public class DeletionVectorITCase extends CatalogITCaseBase {
+ @ParameterizedTest
+ @ValueSource(strings = {"input"})
+ public void testStreamingReadDVTableWhenChangelogProducerIsInput(String
changelogProducer)
+ throws Exception {
+ sql(
+ String.format(
+ "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name
STRING) "
+ + "WITH ('deletion-vectors.enabled' = 'true',
'changelog-producer' = '%s')",
+ changelogProducer));
+
+ sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4,
'4')");
+
+ sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')");
+
+ sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')");
+
+ // test read from APPEND snapshot
+ try (BlockingIterator<Row, Row> iter =
+ streamSqlBlockIter(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) {
+ assertThat(iter.collect(8))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, "111111111"),
+ Row.ofKind(RowKind.INSERT, 2, "2"),
+ Row.ofKind(RowKind.INSERT, 3, "3"),
+ Row.ofKind(RowKind.INSERT, 4, "4"),
+ Row.ofKind(RowKind.INSERT, 2, "2_1"),
+ Row.ofKind(RowKind.INSERT, 3, "3_1"),
+ Row.ofKind(RowKind.INSERT, 2, "2_2"),
+ Row.ofKind(RowKind.INSERT, 4, "4_1"));
+ }
+
+ // test read from COMPACT snapshot
+ try (BlockingIterator<Row, Row> iter =
+ streamSqlBlockIter(
+ "SELECT * FROM T /*+
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
+ assertThat(iter.collect(6))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, "111111111"),
+ Row.ofKind(RowKind.INSERT, 2, "2_1"),
+ Row.ofKind(RowKind.INSERT, 3, "3_1"),
+ Row.ofKind(RowKind.INSERT, 4, "4"),
+ Row.ofKind(RowKind.INSERT, 2, "2_2"),
+ Row.ofKind(RowKind.INSERT, 4, "4_1"));
+ }
+ }
+
@ParameterizedTest
@ValueSource(strings = {"none", "lookup"})
public void testStreamingReadDVTable(String changelogProducer) throws
Exception {