This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 169bea95c [core] Fix tag read for deletion vectors table (#4121)
169bea95c is described below
commit 169bea95cd881581922db246ee054936446d45a6
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Sep 4 13:26:21 2024 +0800
[core] Fix tag read for deletion vectors table (#4121)
---
.../org/apache/paimon/index/IndexFileHandler.java | 17 ++++++++---
.../paimon/operation/AbstractFileStoreScan.java | 9 ++----
.../org/apache/paimon/operation/FileStoreScan.java | 6 ++--
.../table/source/snapshot/SnapshotReaderImpl.java | 33 ++++++++++++----------
.../operation/KeyValueFileStoreScanTest.java | 3 +-
.../apache/paimon/flink/DeletionVectorITCase.java | 17 +++++++++++
6 files changed, 54 insertions(+), 31 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index bdf47b16a..7e5efccdd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -146,9 +146,14 @@ public class IndexFileHandler {
}
public Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> scan(
- long snapshotId, String indexType, Set<BinaryRow> partitions) {
+ long snapshot, String indexType, Set<BinaryRow> partitions) {
+ return scan(snapshotManager.snapshot(snapshot), indexType, partitions);
+ }
+
+ public Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> scan(
+ Snapshot snapshot, String indexType, Set<BinaryRow> partitions) {
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> result = new
HashMap<>();
- for (IndexManifestEntry file : scanEntries(snapshotId, indexType,
partitions)) {
+ for (IndexManifestEntry file : scanEntries(snapshot, indexType,
partitions)) {
result.computeIfAbsent(Pair.of(file.partition(), file.bucket()), k
-> new ArrayList<>())
.add(file.indexFile());
}
@@ -179,8 +184,12 @@ public class IndexFileHandler {
}
public List<IndexManifestEntry> scanEntries(
- long snapshotId, String indexType, Set<BinaryRow> partitions) {
- Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+ long snapshot, String indexType, Set<BinaryRow> partitions) {
+ return scanEntries(snapshotManager.snapshot(snapshot), indexType,
partitions);
+ }
+
+ public List<IndexManifestEntry> scanEntries(
+ Snapshot snapshot, String indexType, Set<BinaryRow> partitions) {
String indexManifest = snapshot.indexManifest();
if (indexManifest == null) {
return Collections.emptyList();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index ec3d4a239..f1a95aec9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -236,13 +236,8 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
@Nullable
@Override
- public Long snapshotId() {
- return readSnapshot == null ? null : readSnapshot.id();
- }
-
- @Override
- public ScanMode scanMode() {
- return scanMode;
+ public Snapshot snapshot() {
+ return readSnapshot;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index c7b0e8cdf..427e958e8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -111,13 +111,11 @@ public interface FileStoreScan {
Long watermark();
/**
- * Snapshot id of this plan, return null if the table is empty or the
manifest list is
+ * Snapshot of this plan, return null if the table is empty or the
manifest list is
* specified.
*/
@Nullable
- Long snapshotId();
-
- ScanMode scanMode();
+ Snapshot snapshot();
/** Result {@link ManifestEntry} files. */
List<ManifestEntry> files();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index ef3523dfd..47e697130 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -47,6 +47,8 @@ import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -58,6 +60,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;
+import static org.apache.paimon.Snapshot.FIRST_SNAPSHOT_ID;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
@@ -254,7 +257,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
@Override
public Plan read() {
FileStoreScan.Plan plan = scan.plan();
- Long snapshotId = plan.snapshotId();
+ @Nullable Snapshot snapshot = plan.snapshot();
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> files =
groupByPartFiles(plan.files(FileKind.ADD));
@@ -266,25 +269,22 @@ public class SnapshotReaderImpl implements SnapshotReader
{
files = newFiles;
}
List<DataSplit> splits =
- generateSplits(
- snapshotId == null ? Snapshot.FIRST_SNAPSHOT_ID - 1 :
snapshotId,
- scanMode != ScanMode.ALL,
- splitGenerator,
- files);
- return new PlanImpl(plan.watermark(), plan.snapshotId(), (List)
splits);
+ generateSplits(snapshot, scanMode != ScanMode.ALL,
splitGenerator, files);
+ return new PlanImpl(
+ plan.watermark(), snapshot == null ? null : snapshot.id(),
(List) splits);
}
private List<DataSplit> generateSplits(
- long snapshotId,
+ @Nullable Snapshot snapshot,
boolean isStreaming,
SplitGenerator splitGenerator,
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> groupedDataFiles)
{
List<DataSplit> splits = new ArrayList<>();
// Read deletion indexes at once to reduce file IO
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>>
deletionIndexFilesMap =
- deletionVectors
+ deletionVectors && snapshot != null
? indexFileHandler.scan(
- snapshotId, DELETION_VECTORS_INDEX,
groupedDataFiles.keySet())
+ snapshot, DELETION_VECTORS_INDEX,
groupedDataFiles.keySet())
: Collections.emptyMap();
for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
groupedDataFiles.entrySet()) {
@@ -295,7 +295,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
List<DataFileMeta> bucketFiles = bucketEntry.getValue();
DataSplit.Builder builder =
DataSplit.builder()
- .withSnapshot(snapshotId)
+ .withSnapshot(
+ snapshot == null ? FIRST_SNAPSHOT_ID -
1 : snapshot.id())
.withPartition(partition)
.withBucket(bucket)
.isStreaming(isStreaming);
@@ -344,7 +345,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles =
groupByPartFiles(plan.files(FileKind.ADD));
- return toChangesPlan(true, plan, plan.snapshotId() - 1, beforeFiles,
dataFiles);
+ return toChangesPlan(true, plan, plan.snapshot().id() - 1,
beforeFiles, dataFiles);
}
private Plan toChangesPlan(
@@ -353,6 +354,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
long beforeSnapshotId,
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> beforeFiles,
Map<BinaryRow, Map<Integer, List<DataFileMeta>>> dataFiles) {
+ Snapshot snapshot = plan.snapshot();
List<DataSplit> splits = new ArrayList<>();
Map<BinaryRow, Set<Integer>> buckets = new HashMap<>();
beforeFiles.forEach(
@@ -372,7 +374,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>>
deletionIndexFilesMap =
deletionVectors
? indexFileHandler.scan(
- plan.snapshotId(), DELETION_VECTORS_INDEX,
dataFiles.keySet())
+ snapshot, DELETION_VECTORS_INDEX,
dataFiles.keySet())
: Collections.emptyMap();
for (Map.Entry<BinaryRow, Set<Integer>> entry : buckets.entrySet()) {
@@ -392,7 +394,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
DataSplit.Builder builder =
DataSplit.builder()
- .withSnapshot(plan.snapshotId())
+ .withSnapshot(snapshot.id())
.withPartition(part)
.withBucket(bucket)
.withBeforeFiles(before)
@@ -415,7 +417,8 @@ public class SnapshotReaderImpl implements SnapshotReader {
}
}
- return new PlanImpl(plan.watermark(), plan.snapshotId(), (List)
splits);
+ return new PlanImpl(
+ plan.watermark(), snapshot == null ? null : snapshot.id(),
(List) splits);
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
index e4b74c13c..ce1745053 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java
@@ -294,7 +294,8 @@ public class KeyValueFileStoreScanTest {
private Map<BinaryRow, BinaryRow> getActualKvMap(FileStoreScan scan, Long
expectedSnapshotId)
throws Exception {
FileStoreScan.Plan plan = scan.plan();
- assertThat(plan.snapshotId()).isEqualTo(expectedSnapshotId);
+ Snapshot snapshot = plan.snapshot();
+ assertThat(snapshot == null ? null :
snapshot.id()).isEqualTo(expectedSnapshotId);
List<KeyValue> actualKvs =
store.readKvsFromManifestEntries(plan.files(), false);
gen.sort(actualKvs);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
index 832cdf286..ad29709f5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/DeletionVectorITCase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -262,4 +263,20 @@ public class DeletionVectorITCase extends
CatalogITCaseBase {
assertThat(batchSql("SELECT * FROM T"))
.containsExactlyInAnyOrder(Row.of(1, 3, "1_2"), Row.of(2, 2,
"2_1"));
}
+
+ @Test
+ public void testReadTagWithDv() {
+ sql(
+ "CREATE TABLE T (id INT PRIMARY KEY NOT ENFORCED, name STRING)
WITH ("
+ + "'deletion-vectors.enabled' = 'true', "
+ + "'snapshot.num-retained.min' = '1', "
+ + "'snapshot.num-retained.max' = '1')");
+
+ sql("INSERT INTO T VALUES (1, '1'), (2, '2')");
+ sql("CALL sys.create_tag('default.T', 'my_tag')");
+ sql("INSERT INTO T VALUES (3, '3'), (4, '4')");
+
+ assertThat(batchSql("SELECT * FROM T /*+
OPTIONS('scan.tag-name'='my_tag') */"))
+ .containsExactlyInAnyOrder(Row.of(1, "1"), Row.of(2, "2"));
+ }
}