This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cf9cdf9cb6 [core] Optimize deletion vector index scan (#8066)
cf9cdf9cb6 is described below

commit cf9cdf9cb6772b6dcd03bb09c98b9a476adbf00b
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jun 2 12:17:04 2026 +0800

    [core] Optimize deletion vector index scan (#8066)
    
    Optimize deletion vector index scanning during snapshot planning by
    avoiding unnecessary manifest reads, limiting scans when no DV meta
    cache is available, and lazily warming the DV meta cache when it is
    available.
---
 .../org/apache/paimon/index/IndexFileHandler.java  | 49 ++++++++++++
 .../manifest/IndexManifestEntrySerializer.java     | 15 ++++
 .../table/source/snapshot/SnapshotReaderImpl.java  | 60 ++++++++++----
 .../java/org/apache/paimon/utils/DVMetaCache.java  | 86 ++++++++++++++++++--
 .../apache/paimon/index/IndexFileHandlerTest.java  | 62 +++++++++++++++
 .../org/apache/paimon/utils/DVMetaCacheTest.java   | 92 +++++++++++++++++++++-
 6 files changed, 341 insertions(+), 23 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index b211dd593c..cb9525cc5c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -20,11 +20,13 @@ package org.apache.paimon.index;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.IndexManifestEntrySerializer;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.utils.Filter;
@@ -37,10 +39,12 @@ import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Function;
 
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 
@@ -149,6 +153,20 @@ public class IndexFileHandler {
         return result;
     }
 
+    public Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> scanBuckets(
+            Snapshot snapshot, String indexType, Set<Pair<BinaryRow, Integer>> 
buckets) {
+        if (buckets.isEmpty()) {
+            return Collections.emptyMap();
+        }
+
+        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> result = new 
HashMap<>();
+        for (IndexManifestEntry file : scanBucketEntries(snapshot, indexType, 
buckets)) {
+            result.computeIfAbsent(Pair.of(file.partition(), file.bucket()), k 
-> new ArrayList<>())
+                    .add(file.indexFile());
+        }
+        return result;
+    }
+
     public List<IndexManifestEntry> scanEntries() {
         Snapshot snapshot = snapshotManager.latestSnapshot();
         if (snapshot == null || snapshot.indexManifest() == null) {
@@ -184,6 +202,37 @@ public class IndexFileHandler {
         return result;
     }
 
+    public List<IndexManifestEntry> scanBucketEntries(
+            Snapshot snapshot, String indexType, Set<Pair<BinaryRow, Integer>> 
buckets) {
+        if (snapshot == null || buckets.isEmpty()) {
+            return Collections.emptyList();
+        }
+        String indexManifest = snapshot.indexManifest();
+        if (indexManifest == null) {
+            return Collections.emptyList();
+        }
+
+        Function<InternalRow, BinaryRow> partitionGetter =
+                IndexManifestEntrySerializer.partitionGetter();
+        Function<InternalRow, Integer> bucketGetter = 
IndexManifestEntrySerializer.bucketGetter();
+        Function<InternalRow, String> indexTypeGetter =
+                IndexManifestEntrySerializer.indexTypeGetter();
+        Map<BinaryRow, Set<Integer>> bucketsByPartition = new HashMap<>();
+        for (Pair<BinaryRow, Integer> bucket : buckets) {
+            bucketsByPartition
+                    .computeIfAbsent(bucket.getLeft(), k -> new HashSet<>())
+                    .add(bucket.getRight());
+        }
+        Filter<InternalRow> rowFilter =
+                row ->
+                        indexType.equals(indexTypeGetter.apply(row))
+                                && bucketsByPartition
+                                        .getOrDefault(
+                                                partitionGetter.apply(row), 
Collections.emptySet())
+                                        .contains(bucketGetter.apply(row));
+        return indexManifestFile.read(indexManifest, null, rowFilter, 
Filter.alwaysTrue());
+    }
+
     public Path indexManifestFilePath(String indexManifest) {
         return indexManifestFile.indexManifestFilePath(indexManifest);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
index 98ab4df6f1..60113adff1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.manifest;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
@@ -25,6 +26,8 @@ import org.apache.paimon.index.GlobalIndexMeta;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
+import java.util.function.Function;
+
 import static org.apache.paimon.data.BinaryString.fromString;
 import static 
org.apache.paimon.index.IndexFileMetaSerializer.dvMetasToRowArrayData;
 import static 
org.apache.paimon.index.IndexFileMetaSerializer.rowArrayDataToDvMetas;
@@ -104,4 +107,16 @@ public class IndexManifestEntrySerializer extends 
VersionedObjectSerializer<Inde
                         row.isNullAt(8) ? null : row.getString(8).toString(),
                         globalIndexMeta));
     }
+
+    public static Function<InternalRow, BinaryRow> partitionGetter() {
+        return row -> deserializeBinaryRow(row.getBinary(2));
+    }
+
+    public static Function<InternalRow, Integer> bucketGetter() {
+        return row -> row.getInt(3);
+    }
+
+    public static Function<InternalRow, String> indexTypeGetter() {
+        return row -> row.getString(4).toString();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index d033e427a6..0a752f2564 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -630,15 +630,16 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
 
     private Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> 
scanDvIndex(
             @Nullable Snapshot snapshot, Set<Pair<BinaryRow, Integer>> 
buckets) {
-        if (snapshot == null || snapshot.indexManifest() == null) {
+        if (snapshot == null || snapshot.indexManifest() == null || 
buckets.isEmpty()) {
             return Collections.emptyMap();
         }
         Map<Pair<BinaryRow, Integer>, Map<String, DeletionFile>> result = new 
HashMap<>();
         Path indexManifestPath = 
indexFileHandler.indexManifestFilePath(snapshot.indexManifest());
+        Set<Pair<BinaryRow, Integer>> remainingBuckets = new 
HashSet<>(buckets);
 
         // 1. read from cache
         if (dvMetaCache != null) {
-            Iterator<Pair<BinaryRow, Integer>> iterator = buckets.iterator();
+            Iterator<Pair<BinaryRow, Integer>> iterator = 
remainingBuckets.iterator();
             while (iterator.hasNext()) {
                 Pair<BinaryRow, Integer> next = iterator.next();
                 BinaryRow partition = next.getLeft();
@@ -658,31 +659,58 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                 }
             }
         }
+        if (remainingBuckets.isEmpty()) {
+            return result;
+        }
 
         // 2. read from file system
         Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> partitionFileMetas =
-                indexFileHandler.scan(
-                        snapshot,
-                        DELETION_VECTORS_INDEX,
-                        
buckets.stream().map(Pair::getLeft).collect(Collectors.toSet()));
+                dvMetaCache == null
+                        ? indexFileHandler.scanBuckets(
+                                snapshot, DELETION_VECTORS_INDEX, 
remainingBuckets)
+                        : indexFileHandler.scan(
+                                snapshot,
+                                DELETION_VECTORS_INDEX,
+                                remainingBuckets.stream()
+                                        .map(Pair::getLeft)
+                                        .collect(Collectors.toSet()));
         partitionFileMetas.forEach(
                 (entry, indexFileMetas) -> {
-                    Map<String, DeletionFile> deletionFiles =
-                            toDeletionFiles(entry, indexFileMetas);
-                    if (dvMetaCache != null) {
-                        dvMetaCache.put(
+                    Pair<BinaryRow, Integer> partitionBucket = entry;
+                    if (remainingBuckets.contains(entry)) {
+                        Map<String, DeletionFile> deletionFiles =
+                                toDeletionFiles(partitionBucket, 
indexFileMetas);
+                        result.put(partitionBucket, deletionFiles);
+                        if (dvMetaCache != null) {
+                            dvMetaCache.put(
+                                    indexManifestPath,
+                                    partitionBucket.getLeft(),
+                                    partitionBucket.getRight(),
+                                    deletionFiles);
+                        }
+                    } else if (dvMetaCache != null) {
+                        dvMetaCache.putLazy(
                                 indexManifestPath,
-                                entry.getLeft(),
-                                entry.getRight(),
-                                deletionFiles);
-                    }
-                    if (buckets.contains(entry)) {
-                        result.put(entry, deletionFiles);
+                                partitionBucket.getLeft(),
+                                partitionBucket.getRight(),
+                                deletionFileNumber(indexFileMetas),
+                                () -> toDeletionFiles(partitionBucket, 
indexFileMetas));
                     }
                 });
         return result;
     }
 
+    private int deletionFileNumber(List<IndexFileMeta> fileMetas) {
+        int count = 0;
+        for (IndexFileMeta indexFile : fileMetas) {
+            LinkedHashMap<String, DeletionVectorMeta> dvRanges = 
indexFile.dvRanges();
+            if (dvRanges != null) {
+                count += dvRanges.size();
+            }
+        }
+        return count;
+    }
+
     private Map<String, DeletionFile> toDeletionFiles(
             Pair<BinaryRow, Integer> partitionBucket, List<IndexFileMeta> 
fileMetas) {
         Map<String, DeletionFile> deletionFiles = new HashMap<>();
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
index 34b92f9bc3..b81241257b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/DVMetaCache.java
@@ -29,11 +29,12 @@ import javax.annotation.Nullable;
 
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Supplier;
 
 /** Cache for deletion vector meta. */
 public class DVMetaCache {
 
-    private final Cache<DVMetaCacheKey, Map<String, DeletionFile>> cache;
+    private final Cache<DVMetaCacheKey, DVMetaCacheValue> cache;
 
     public DVMetaCache(long maxValueNumber) {
         this.cache =
@@ -45,20 +46,95 @@ public class DVMetaCache {
                         .build();
     }
 
-    private static int weigh(DVMetaCacheKey cacheKey, Map<String, 
DeletionFile> cacheValue) {
-        return cacheValue.size() + 1;
+    private static int weigh(DVMetaCacheKey cacheKey, DVMetaCacheValue 
cacheValue) {
+        return cacheValue.weight();
     }
 
     @Nullable
     public Map<String, DeletionFile> read(Path manifestPath, BinaryRow 
partition, int bucket) {
         DVMetaCacheKey cacheKey = new DVMetaCacheKey(manifestPath, partition, 
bucket);
-        return this.cache.getIfPresent(cacheKey);
+        DVMetaCacheValue cacheValue = this.cache.getIfPresent(cacheKey);
+        return cacheValue == null ? null : cacheValue.get();
     }
 
     public void put(
             Path path, BinaryRow partition, int bucket, Map<String, 
DeletionFile> dvFilesMap) {
         DVMetaCacheKey key = new DVMetaCacheKey(path, partition, bucket);
-        this.cache.put(key, dvFilesMap);
+        this.cache.put(key, DVMetaCacheValue.eager(dvFilesMap));
+    }
+
+    public void putLazy(
+            Path path,
+            BinaryRow partition,
+            int bucket,
+            int valueNumber,
+            Supplier<Map<String, DeletionFile>> dvFilesSupplier) {
+        DVMetaCacheKey key = new DVMetaCacheKey(path, partition, bucket);
+        this.cache.put(key, DVMetaCacheValue.lazy(valueNumber, 
dvFilesSupplier));
+    }
+
+    /** Cache value for deletion vector meta at bucket level. */
+    private static final class DVMetaCacheValue {
+
+        private final int weight;
+        private final DeletionFilesField deletionFilesField;
+
+        private DVMetaCacheValue(int weight, DeletionFilesField 
deletionFilesField) {
+            this.weight = weight;
+            this.deletionFilesField = deletionFilesField;
+        }
+
+        private static DVMetaCacheValue eager(Map<String, DeletionFile> 
deletionFiles) {
+            return new DVMetaCacheValue(
+                    deletionFiles.size() + 1, new 
ExistingDeletionFilesField(deletionFiles));
+        }
+
+        private static DVMetaCacheValue lazy(
+                int valueNumber, Supplier<Map<String, DeletionFile>> 
deletionFilesSupplier) {
+            return new DVMetaCacheValue(
+                    valueNumber + 1, new 
LazyDeletionFilesField(deletionFilesSupplier));
+        }
+
+        private int weight() {
+            return weight;
+        }
+
+        private Map<String, DeletionFile> get() {
+            return deletionFilesField.get();
+        }
+    }
+
+    private interface DeletionFilesField {
+
+        Map<String, DeletionFile> get();
+    }
+
+    private static final class ExistingDeletionFilesField implements 
DeletionFilesField {
+
+        private final Map<String, DeletionFile> deletionFiles;
+
+        private ExistingDeletionFilesField(Map<String, DeletionFile> 
deletionFiles) {
+            this.deletionFiles = deletionFiles;
+        }
+
+        @Override
+        public Map<String, DeletionFile> get() {
+            return deletionFiles;
+        }
+    }
+
+    private static final class LazyDeletionFilesField implements 
DeletionFilesField {
+
+        private final LazyField<Map<String, DeletionFile>> deletionFiles;
+
+        private LazyDeletionFilesField(Supplier<Map<String, DeletionFile>> 
deletionFilesSupplier) {
+            this.deletionFiles = new LazyField<>(deletionFilesSupplier);
+        }
+
+        @Override
+        public synchronized Map<String, DeletionFile> get() {
+            return deletionFiles.get();
+        }
     }
 
     /** Cache key for deletion vector meta at bucket level. */
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileHandlerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileHandlerTest.java
index 24972642c4..70d5881e34 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileHandlerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileHandlerTest.java
@@ -19,25 +19,39 @@
 package org.apache.paimon.index;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestAppendFileStore;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.IndexFilePathFactories;
+import org.apache.paimon.utils.Pair;
 
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
+import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -110,4 +124,52 @@ public class IndexFileHandlerTest {
 
         assertThat(handler.existsIndexFile(entry)).isFalse();
     }
+
+    @Test
+    void testScanBucketsOnlyReturnsRequestedBuckets() throws Exception {
+        TestAppendFileStore store =
+                TestAppendFileStore.createAppendStore(tempPath, new 
HashMap<>());
+        Map<String, List<Integer>> bucket0Dvs = new HashMap<>();
+        bucket0Dvs.put("f0", Arrays.asList(1, 2));
+        Map<String, List<Integer>> bucket1Dvs = new HashMap<>();
+        bucket1Dvs.put("f1", Collections.singletonList(3));
+        IndexFileMeta hashIndex =
+                store.newIndexFileHandler()
+                        .hashIndex(BinaryRow.EMPTY_ROW, 1)
+                        .write(new int[] {1, 2, 3});
+        store.commit(
+                store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, 0, bucket0Dvs),
+                store.writeDVIndexFiles(BinaryRow.EMPTY_ROW, 1, bucket1Dvs),
+                new CommitMessageImpl(
+                        BinaryRow.EMPTY_ROW,
+                        1,
+                        1,
+                        
DataIncrement.indexIncrement(Collections.singletonList(hashIndex)),
+                        CompactIncrement.emptyIncrement()));
+
+        Snapshot snapshot = store.snapshotManager().latestSnapshot();
+        IndexFileHandler indexFileHandler = store.newIndexFileHandler();
+        assertThat(
+                        indexFileHandler.scanBuckets(
+                                snapshot, DELETION_VECTORS_INDEX, 
Collections.emptySet()))
+                .isEmpty();
+
+        Map<Pair<BinaryRow, Integer>, List<IndexFileMeta>> scanned =
+                indexFileHandler.scanBuckets(
+                        snapshot,
+                        DELETION_VECTORS_INDEX,
+                        Collections.singleton(Pair.of(BinaryRow.EMPTY_ROW, 
1)));
+
+        assertThat(scanned).containsOnlyKeys(Pair.of(BinaryRow.EMPTY_ROW, 1));
+        assertThat(scanned.get(Pair.of(BinaryRow.EMPTY_ROW, 1)))
+                .extracting(IndexFileMeta::dvRanges)
+                .allSatisfy(dvRanges -> 
assertThat(dvRanges).containsOnlyKeys("f1"));
+
+        assertThat(
+                        indexFileHandler.scanBuckets(
+                                snapshot,
+                                HASH_INDEX,
+                                
Collections.singleton(Pair.of(BinaryRow.EMPTY_ROW, 1))))
+                .containsOnlyKeys(Pair.of(BinaryRow.EMPTY_ROW, 1));
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
index c0c28ce3ab..ab1ae19cfb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/DVMetaCacheTest.java
@@ -31,6 +31,12 @@ import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -101,10 +107,92 @@ public class DVMetaCacheTest {
     }
 
     @Test
-    public void testCacheEviction() {
-        DVMetaCache cache = new DVMetaCache(5);
+    public void testLazyValue() {
+        DVMetaCache cache = new DVMetaCache(100);
+        Path path = new Path("manifest/index-manifest-00003");
+        BinaryRow partition = partition("year=2023/month=10");
+        AtomicInteger invoked = new AtomicInteger();
+
+        cache.putLazy(
+                path,
+                partition,
+                1,
+                1,
+                () -> {
+                    invoked.incrementAndGet();
+                    Map<String, DeletionFile> dvFiles = new HashMap<>();
+                    dvFiles.put(
+                            
"data-d4e5f6g7-h8i9-0123-defg-456789012345-1.parquet",
+                            new DeletionFile(
+                                    
"index-d4e5f6g7-h8i9-0123-defg-456789012345-1", 0L, 100L, 1L));
+                    return dvFiles;
+                });
+
+        assertThat(invoked).hasValue(0);
+
+        Map<String, DeletionFile> result1 = cache.read(path, partition, 1);
+        assertThat(result1).isNotNull().hasSize(1);
+        assertThat(invoked).hasValue(1);
+
+        Map<String, DeletionFile> result2 = cache.read(path, partition, 1);
+        assertThat(result2).isSameAs(result1);
+        assertThat(invoked).hasValue(1);
+    }
+
+    @Test
+    public void testLazyValueInitializedOnceConcurrently() throws Exception {
+        DVMetaCache cache = new DVMetaCache(100);
         Path path = new Path("manifest/index-manifest-00004");
         BinaryRow partition = partition("year=2023/month=09");
+        AtomicInteger invoked = new AtomicInteger();
+        CountDownLatch supplierEntered = new CountDownLatch(1);
+        CountDownLatch releaseSupplier = new CountDownLatch(1);
+        Map<String, DeletionFile> dvFiles = new HashMap<>();
+        dvFiles.put(
+                "data-d4e5f6g7-h8i9-0123-defg-456789012345-1.parquet",
+                new 
DeletionFile("index-d4e5f6g7-h8i9-0123-defg-456789012345-1", 0L, 100L, 1L));
+
+        cache.putLazy(
+                path,
+                partition,
+                1,
+                1,
+                () -> {
+                    invoked.incrementAndGet();
+                    supplierEntered.countDown();
+                    try {
+                        assertThat(releaseSupplier.await(5, 
TimeUnit.SECONDS)).isTrue();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                    }
+                    return dvFiles;
+                });
+
+        ExecutorService executor = Executors.newFixedThreadPool(2);
+        try {
+            Future<Map<String, DeletionFile>> first =
+                    executor.submit(() -> cache.read(path, partition, 1));
+            assertThat(supplierEntered.await(5, TimeUnit.SECONDS)).isTrue();
+
+            Future<Map<String, DeletionFile>> second =
+                    executor.submit(() -> cache.read(path, partition, 1));
+            releaseSupplier.countDown();
+
+            assertThat(first.get(5, TimeUnit.SECONDS)).isSameAs(dvFiles);
+            assertThat(second.get(5, TimeUnit.SECONDS)).isSameAs(dvFiles);
+            assertThat(invoked).hasValue(1);
+        } finally {
+            releaseSupplier.countDown();
+            executor.shutdownNow();
+        }
+    }
+
+    @Test
+    public void testCacheEviction() {
+        DVMetaCache cache = new DVMetaCache(5);
+        Path path = new Path("manifest/index-manifest-00005");
+        BinaryRow partition = partition("year=2023/month=08");
 
         // Fill cache to capacity
         Map<String, DeletionFile> dvFiles1 = new HashMap<>();

Reply via email to