This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 169bea95c [core] Fix tag read for deletion vectors table (#4121)
169bea95c is described below

commit 169bea95cd881581922db246ee054936446d45a6
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 4 13:26:21 2024 +0800

    [core] Fix tag read for deletion vectors table (#4121)
---
 .../org/apache/paimon/index/IndexFileHandler.java  | 17 ++++++++---
 .../paimon/operation/AbstractFileStoreScan.java    |  9 ++----
 .../org/apache/paimon/operation/FileStoreScan.java |  6 ++--
 .../table/source/snapshot/SnapshotReaderImpl.java  | 33 ++++++++++++----------
 .../operation/KeyValueFileStoreScanTest.java       |  3 +-
 .../apache/paimon/flink/DeletionVectorITCase.java  | 17 +++++++++++
 6 files changed, 54 insertions(+), 31 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index bdf47b16a..7e5efccdd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -146,9 +146,14 @@ public class IndexFileHandler {
     }
 
     public Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> scan(
-            long snapshotId, String indexType, Set<BinaryRow> partitions) {
+            long snapshot, String indexType, Set<BinaryRow> partitions) {
+        return scan(snapshotManager.snapshot(snapshot), indexType, partitions);
+    }
+
+    public Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> scan(
+            Snapshot snapshot, String indexType, Set<BinaryRow> partitions) {
         Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> result = new 
HashMap<>();
-        for (IndexManifestEntry file : scanEntries(snapshotId, indexType, 
partitions)) {
+        for (IndexManifestEntry file : scanEntries(snapshot, indexType, 
partitions)) {
             result.computeIfAbsent(Pair.of(file.partition(), file.bucket()), k 
-> new ArrayList<>())
                     .add(file.indexFile());
         }
@@ -179,8 +184,12 @@ public class IndexFileHandler {
     }
 
     public List<IndexManifestEntry> scanEntries(
-            long snapshotId, String indexType, Set<BinaryRow> partitions) {
-        Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+            long snapshot, String indexType, Set<BinaryRow> partitions) {
+        return scanEntries(snapshotManager.snapshot(snapshot), indexType, 
partitions);
+    }
+
+    public List<IndexManifestEntry> scanEntries(
+            Snapshot snapshot, String indexType, Set<BinaryRow> partitions) {
         String indexManifest = snapshot.indexManifest();
         if (indexManifest == null) {
             return Collections.emptyList();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index ec3d4a239..f1a95aec9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -236,13 +236,8 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
             @Nullable
             @Override
-            public Long snapshotId() {
-                return readSnapshot == null ? null : readSnapshot.id();
-            }
-
-            @Override
-            public ScanMode scanMode() {
-                return scanMode;
+            public Snapshot snapshot() {
+                return readSnapshot;
             }
 
             @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index c7b0e8cdf..427e958e8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -111,13 +111,11 @@ public interface FileStoreScan {
         Long watermark();
 
         /**
-         * Snapshot id of this plan, return null if the table is empty or the 
manifest list is
+         * Snapshot of this plan, return null if the table is empty or the 
manifest list is
          * specified.
          */
         @Nullable
-        Long snapshotId();
-
-        ScanMode scanMode();
+        Snapshot snapshot();
 
         /** Result {@link ManifestEntry} files. */
         List<ManifestEntry> files();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index ef3523dfd..47e697130 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -47,6 +47,8 @@ import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -58,6 +60,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.BiConsumer;
 
+import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
@@ -254,7 +257,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
     @Override
     public Plan read() {
         FileStoreScan.Plan plan = scan.plan();
-        Long snapshotId = plan.snapshotId();
+        @Nullable Snapshot snapshot = plan.snapshot();
 
         Map<BinaryRow, Map<Integer, List<DataFileMeta>>> files =
                 groupByPartFiles(plan.files(FileKind.ADD));
@@ -266,25 +269,22 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
             files = newFiles;
         }
         List<DataSplit> splits =
-                generateSplits(
-                        snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 : 
snapshotId,
-                        scanMode != ScanMode.ALL,
-                        splitGenerator,
-                        files);
-        return new PlanImpl(plan.watermark(), plan.snapshotId(), (List) 
splits);
+                generateSplits(snapshot, scanMode != ScanMode.ALL, 
splitGenerator, files);
+        return new PlanImpl(
+                plan.watermark(), snapshot == null ? null : snapshot.id(), 
(List) splits);
     }
 
     private List<DataSplit> generateSplits(
-            long snapshotId,
+            @Nullable Snapshot snapshot,
             boolean isStreaming,
             SplitGenerator splitGenerator,
             Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles) 
{
         List<DataSplit> splits = new ArrayList<>();
         // Read deletion indexes at once to reduce file IO
         Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> 
deletionIndexFilesMap =
-                deletionVectors
+                deletionVectors && snapshot != null
                         ? indexFileHandler.scan(
-                                snapshotId, DELETION_VECTORS_INDEX, 
groupedDataFiles.keySet())
+                                snapshot, DELETION_VECTORS_INDEX, 
groupedDataFiles.keySet())
                         : Collections.emptyMap();
         for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
                 groupedDataFiles.entrySet()) {
@@ -295,7 +295,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
                 List<DataFileMeta> bucketFiles = bucketEntry.getValue();
                 DataSplit.Builder builder =
                         DataSplit.builder()
-                                .withSnapshot(snapshotId)
+                                .withSnapshot(
+                                        snapshot == null ? FIRST_SNAPSHOT_ID - 
1 : snapshot.id())
                                 .withPartition(partition)
                                 .withBucket(bucket)
                                 .isStreaming(isStreaming);
@@ -344,7 +345,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
         Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
                 groupByPartFiles(plan.files(FileKind.ADD));
 
-        return toChangesPlan(true, plan, plan.snapshotId() - 1, beforeFiles, 
dataFiles);
+        return toChangesPlan(true, plan, plan.snapshot().id() - 1, 
beforeFiles, dataFiles);
     }
 
     private Plan toChangesPlan(
@@ -353,6 +354,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
             long beforeSnapshotId,
             Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles,
             Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles) {
+        Snapshot snapshot = plan.snapshot();
         List<DataSplit> splits = new ArrayList<>();
         Map<BinaryRow, Set<Integer>> buckets = new HashMap<>();
         beforeFiles.forEach(
@@ -372,7 +374,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
         Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> 
deletionIndexFilesMap =
                 deletionVectors
                         ? indexFileHandler.scan(
-                                plan.snapshotId(), DELETION_VECTORS_INDEX, 
dataFiles.keySet())
+                                snapshot, DELETION_VECTORS_INDEX, 
dataFiles.keySet())
                         : Collections.emptyMap();
 
         for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
@@ -392,7 +394,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
 
                 DataSplit.Builder builder =
                         DataSplit.builder()
-                                .withSnapshot(plan.snapshotId())
+                                .withSnapshot(snapshot.id())
                                 .withPartition(part)
                                 .withBucket(bucket)
                                 .withBeforeFiles(before)
@@ -415,7 +417,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
             }
         }
 
-        return new PlanImpl(plan.watermark(), plan.snapshotId(), (List) 
splits);
+        return new PlanImpl(
+                plan.watermark(), snapshot == null ? null : snapshot.id(), 
(List) splits);
     }
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index e4b74c13c..ce1745053 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -294,7 +294,8 @@ public class KeyValueFileStoreScanTest {
     private Map<BinaryRow, BinaryRow> getActualKvMap(FileStoreScan scan, Long 
expectedSnapshotId)
             throws Exception {
         FileStoreScan.Plan plan = scan.plan();
-        assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId);
+        Snapshot snapshot = plan.snapshot();
+        assertThat(snapshot == null ? null : 
snapshot.id()).isEqualTo(expectedSnapshotId);
 
         List<KeyValue> actualKvs = 
store.readKvsFromManifestEntries(plan.files(), false);
         gen.sort(actualKvs);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index 832cdf286..ad29709f5 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -262,4 +263,20 @@ public class DeletionVectorITCase extends 
CatalogITCaseBase {
         assertThat(batchSql("SELECT * FROM T"))
                 .containsExactlyInAnyOrder(Row.of(1, 3, "1_2"), Row.of(2, 2, 
"2_1"));
     }
+
+    @Test
+    public void testReadTagWithDv() {
+        sql(
+                "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING) 
WITH ("
+                        + "'deletion-vectors.enabled' = 'true', "
+                        + "'snapshot.num-retained.min' = '1', "
+                        + "'snapshot.num-retained.max' = '1')");
+
+        sql("INSERT INTO T VALUES (1, '1'), (2, '2')");
+        sql("CALL sys.create_tag('default.T', 'my_tag')");
+        sql("INSERT INTO T VALUES (3, '3'), (4, '4')");
+
+        assertThat(batchSql("SELECT * FROM T /*+ 
OPTIONS('scan.tag-name'='my_tag') */"))
+                .containsExactlyInAnyOrder(Row.of(1, "1"), Row.of(2, "2"));
+    }
 }

Reply via email to