This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 574f16252 [core] support deletion vector with input ChangelogProducer 
(#3929)
574f16252 is described below

commit 574f162527f4ff96763b44e68c0682d363365762
Author: wangwj <[email protected]>
AuthorDate: Sun Aug 11 21:24:26 2024 +0800

    [core] support deletion vector with input ChangelogProducer (#3929)
---
 docs/content/concepts/spec/snapshot.md             |  2 +-
 .../org/apache/paimon/append/AppendOnlyWriter.java |  2 +-
 .../org/apache/paimon/io/RecordLevelExpire.java    |  4 +-
 .../org/apache/paimon/schema/SchemaValidation.java |  3 +-
 .../paimon/manifest/ManifestFileMetaTestBase.java  |  4 +-
 .../sink/MultiTablesStoreCompactOperator.java      |  2 +-
 .../apache/paimon/flink/DeletionVectorITCase.java  | 47 ++++++++++++++++++++++
 7 files changed, 56 insertions(+), 8 deletions(-)

diff --git a/docs/content/concepts/spec/snapshot.md 
b/docs/content/concepts/spec/snapshot.md
index d10598272..30b5fa0d0 100644
--- a/docs/content/concepts/spec/snapshot.md
+++ b/docs/content/concepts/spec/snapshot.md
@@ -61,5 +61,5 @@ Snapshot File is JSON, it includes:
 12. totalRecordCount: record count of all changes occurred in this snapshot.
 13. deltaRecordCount: record count of all new changes occurred in this 
snapshot.
 14. changelogRecordCount: record count of all changelog produced in this 
snapshot.
-15. watermark: watermark for input records, from Flink watermark mechanism, 
null if there is no watermark.
+15. watermark: watermark for input records, from Flink watermark mechanism, 
Long.MIN_VALUE if there is no watermark.
 16. statistics: stats file name for statistics of this table.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 17ebe215e..6fda88fd8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -84,7 +84,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     private final FileIndexOptions fileIndexOptions;
 
     private MemorySegmentPool memorySegmentPool;
-    private MemorySize maxDiskSize;
+    private final MemorySize maxDiskSize;
 
     public AppendOnlyWriter(
             FileIO fileIO,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
index a49e31fc9..4a61b66a7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java
@@ -69,7 +69,7 @@ public class RecordLevelExpire {
         return new RecordLevelExpire(fieldIndex, (int) 
expireTime.getSeconds());
     }
 
-    public RecordLevelExpire(int timeField, int expireTime) {
+    private RecordLevelExpire(int timeField, int expireTime) {
         this.timeField = timeField;
         this.expireTime = expireTime;
     }
@@ -78,7 +78,7 @@ public class RecordLevelExpire {
         return file -> wrap(readerFactory.createRecordReader(file));
     }
 
-    public RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
+    private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
         int currentTime = (int) (System.currentTimeMillis() / 1000);
         return reader.filter(
                 kv -> {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 791d38b2c..b2c3ed765 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -487,8 +487,9 @@ public class SchemaValidation {
     private static void validateForDeletionVectors(CoreOptions options) {
         checkArgument(
                 options.changelogProducer() == ChangelogProducer.NONE
+                        || options.changelogProducer() == 
ChangelogProducer.INPUT
                         || options.changelogProducer() == 
ChangelogProducer.LOOKUP,
-                "Deletion vectors mode is only supported for none or lookup 
changelog producer now.");
+                "Deletion vectors mode is only supported for NONE/INPUT/LOOKUP 
changelog producer now.");
 
         checkArgument(
                 !options.mergeEngine().equals(MergeEngine.FIRST_ROW),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index f4b3c69ba..fdb3cb7ff 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -133,7 +133,7 @@ public abstract class ManifestFileMetaTestBase {
                                 path,
                                 getPartitionType(),
                                 "default",
-                                
CoreOptions.FILE_FORMAT.defaultValue().toString()),
+                                CoreOptions.FILE_FORMAT.defaultValue()),
                         Long.MAX_VALUE,
                         null)
                 .create();
@@ -166,7 +166,7 @@ public abstract class ManifestFileMetaTestBase {
 
     protected List<ManifestFileMeta> createBaseManifestFileMetas(boolean 
hasPartition) {
         List<ManifestFileMeta> input = new ArrayList<>();
-        // base with 3 partition ,16 entry each parition
+        // base with 3 partition, 16 entry each partition
         for (int j = 0; j < 3; j++) {
             List<ManifestEntry> entrys = new ArrayList<>();
             for (int i = 0; i < 16; i++) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index f253a3bf8..ccbccaa5f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -51,7 +51,7 @@ import static 
org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
  * A dedicated operator for manual triggered compaction.
  *
  * <p>In-coming records are generated by sources built from {@link
- * org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder}. The 
records will contain
+ * org.apache.paimon.flink.source.operator.MultiTablesReadOperator}. The 
records will contain
  * partition keys, bucket number, table name and database name.
  */
 public class MultiTablesStoreCompactOperator
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index 1fe424b33..9f27f9405 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -30,6 +30,53 @@ import static 
org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 /** ITCase for deletion vector table. */
 public class DeletionVectorITCase extends CatalogITCaseBase {
 
+    @ParameterizedTest
+    @ValueSource(strings = {"input"})
+    public void testStreamingReadDVTableWhenChangelogProducerIsInput(String 
changelogProducer)
+            throws Exception {
+        sql(
+                String.format(
+                        "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name 
STRING) "
+                                + "WITH ('deletion-vectors.enabled' = 'true', 
'changelog-producer' = '%s')",
+                        changelogProducer));
+
+        sql("INSERT INTO T VALUES (1, '111111111'), (2, '2'), (3, '3'), (4, 
'4')");
+
+        sql("INSERT INTO T VALUES (2, '2_1'), (3, '3_1')");
+
+        sql("INSERT INTO T VALUES (2, '2_2'), (4, '4_1')");
+
+        // test read from APPEND snapshot
+        try (BlockingIterator<Row, Row> iter =
+                streamSqlBlockIter(
+                        "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '3') */")) {
+            assertThat(iter.collect(8))
+                    .containsExactlyInAnyOrder(
+                            Row.ofKind(RowKind.INSERT, 1, "111111111"),
+                            Row.ofKind(RowKind.INSERT, 2, "2"),
+                            Row.ofKind(RowKind.INSERT, 3, "3"),
+                            Row.ofKind(RowKind.INSERT, 4, "4"),
+                            Row.ofKind(RowKind.INSERT, 2, "2_1"),
+                            Row.ofKind(RowKind.INSERT, 3, "3_1"),
+                            Row.ofKind(RowKind.INSERT, 2, "2_2"),
+                            Row.ofKind(RowKind.INSERT, 4, "4_1"));
+        }
+
+        // test read from COMPACT snapshot
+        try (BlockingIterator<Row, Row> iter =
+                streamSqlBlockIter(
+                        "SELECT * FROM T /*+ 
OPTIONS('scan.mode'='from-snapshot-full','scan.snapshot-id' = '4') */")) {
+            assertThat(iter.collect(6))
+                    .containsExactlyInAnyOrder(
+                            Row.ofKind(RowKind.INSERT, 1, "111111111"),
+                            Row.ofKind(RowKind.INSERT, 2, "2_1"),
+                            Row.ofKind(RowKind.INSERT, 3, "3_1"),
+                            Row.ofKind(RowKind.INSERT, 4, "4"),
+                            Row.ofKind(RowKind.INSERT, 2, "2_2"),
+                            Row.ofKind(RowKind.INSERT, 4, "4_1"));
+        }
+    }
+
     @ParameterizedTest
     @ValueSource(strings = {"none", "lookup"})
     public void testStreamingReadDVTable(String changelogProducer) throws 
Exception {

Reply via email to