This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f970245100 [core] Add partition and bucket arguements to 
FileStorePathFactory.indexFileFactory
f970245100 is described below

commit f9702451006b876b5f3455eeb86bed782bd97f74
Author: JingsongLi <jingsongl...@gmail.com>
AuthorDate: Sat Aug 23 11:50:53 2025 +0800

    [core] Add partition and bucket arguements to 
FileStorePathFactory.indexFileFactory
---
 .../java/org/apache/paimon/AbstractFileStore.java  | 14 ++---
 .../deletionvectors/BucketedDvMaintainer.java      | 13 +++--
 .../append/BaseAppendDeleteFileMaintainer.java     |  9 ++-
 .../paimon/index/DynamicBucketIndexMaintainer.java |  6 +-
 .../org/apache/paimon/index/IndexFileHandler.java  | 65 ++++++++++++----------
 .../org/apache/paimon/index/PartitionIndex.java    |  6 +-
 .../paimon/operation/AbstractFileStoreWrite.java   |  6 +-
 .../apache/paimon/table/sink/TableCommitImpl.java  |  5 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  |  6 +-
 .../apache/paimon/utils/FileStorePathFactory.java  |  2 +-
 .../paimon/utils/IndexFilePathFactories.java       | 41 ++++++++++++++
 .../org/apache/paimon/TestAppendFileStore.java     |  2 +-
 .../deletionvectors/BucketedDvMaintainerTest.java  | 46 +++++++--------
 .../append/AppendDeletionFileMaintainerHelper.java |  7 ++-
 .../append/AppendDeletionFileMaintainerTest.java   |  2 +-
 .../index/DynamicBucketIndexMaintainerTest.java    |  3 +-
 .../paimon/index/HashBucketAssignerTest.java       | 50 ++++++++++++-----
 .../paimon/operation/FileStoreCommitTest.java      | 56 +++++++++++++++----
 .../apache/paimon/flink/BatchFileStoreITCase.java  |  4 +-
 19 files changed, 230 insertions(+), 113 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 4d2b7726de..1c12877e75 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -22,13 +22,11 @@ import org.apache.paimon.CoreOptions.ExternalPathStrategy;
 import org.apache.paimon.catalog.RenamingSnapshotCommit;
 import org.apache.paimon.catalog.SnapshotCommit;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.iceberg.IcebergCommitCallback;
 import org.apache.paimon.iceberg.IcebergOptions;
-import org.apache.paimon.index.HashIndexFile;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.manifest.ManifestFile;
@@ -61,6 +59,7 @@ import org.apache.paimon.tag.TagPreview;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.ChangelogManager;
 import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.IndexFilePathFactories;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.SegmentsCache;
 import org.apache.paimon.utils.SnapshotManager;
@@ -221,15 +220,12 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
     @Override
     public IndexFileHandler newIndexFileHandler() {
         return new IndexFileHandler(
+                fileIO,
                 snapshotManager(),
-                pathFactory().indexFileFactory(),
                 indexManifestFileFactory().create(),
-                new HashIndexFile(fileIO, pathFactory().indexFileFactory()),
-                new DeletionVectorsIndexFile(
-                        fileIO,
-                        pathFactory().indexFileFactory(),
-                        options.dvIndexFileTargetSize(),
-                        options.deletionVectorBitmap64()));
+                new IndexFilePathFactories(pathFactory()),
+                options.dvIndexFileTargetSize(),
+                options.deletionVectorBitmap64());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
index eef21f3b54..788ea1cce8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.deletionvectors;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 
@@ -160,17 +161,19 @@ public class BucketedDvMaintainer {
             return handler;
         }
 
-        public BucketedDvMaintainer create(@Nullable List<IndexFileMeta> 
restoredFiles) {
+        public BucketedDvMaintainer create(
+                BinaryRow partition, int bucket, @Nullable List<IndexFileMeta> 
restoredFiles) {
             if (restoredFiles == null) {
                 restoredFiles = Collections.emptyList();
             }
             Map<String, DeletionVector> deletionVectors =
-                    new 
HashMap<>(handler.readAllDeletionVectors(restoredFiles));
-            return create(deletionVectors);
+                    new HashMap<>(handler.readAllDeletionVectors(partition, 
bucket, restoredFiles));
+            return create(partition, bucket, deletionVectors);
         }
 
-        public BucketedDvMaintainer create(Map<String, DeletionVector> 
deletionVectors) {
-            return new BucketedDvMaintainer(handler.dvIndex(), 
deletionVectors);
+        public BucketedDvMaintainer create(
+                BinaryRow partition, int bucket, Map<String, DeletionVector> 
deletionVectors) {
+            return new BucketedDvMaintainer(handler.dvIndex(partition, 
bucket), deletionVectors);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
index 729f660177..21d1870b76 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
@@ -37,6 +37,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /**
@@ -68,7 +69,8 @@ public interface BaseAppendDeleteFileMaintainer {
         List<IndexFileMeta> indexFiles =
                 indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX, 
partition, bucket);
         BucketedDvMaintainer maintainer =
-                
BucketedDvMaintainer.factory(indexFileHandler).create(indexFiles);
+                BucketedDvMaintainer.factory(indexFileHandler)
+                        .create(partition, bucket, indexFiles);
         return new BucketedAppendDeleteFileMaintainer(partition, bucket, 
maintainer);
     }
 
@@ -94,6 +96,9 @@ public interface BaseAppendDeleteFileMaintainer {
             }
         }
         return new AppendDeleteFileMaintainer(
-                indexFileHandler.dvIndex(), partition, manifestEntries, 
deletionFiles);
+                indexFileHandler.dvIndex(partition, UNAWARE_BUCKET),
+                partition,
+                manifestEntries,
+                deletionFiles);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
index fbf95d7a14..50397a7a85 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/index/DynamicBucketIndexMaintainer.java
@@ -110,8 +110,10 @@ public class DynamicBucketIndexMaintainer {
             return handler;
         }
 
-        public DynamicBucketIndexMaintainer create(@Nullable IndexFileMeta 
restoredFile) {
-            return new DynamicBucketIndexMaintainer(handler.hashIndex(), 
restoredFile);
+        public DynamicBucketIndexMaintainer create(
+                BinaryRow partition, int bucket, @Nullable IndexFileMeta 
restoredFile) {
+            return new DynamicBucketIndexMaintainer(
+                    handler.hashIndex(partition, bucket), restoredFile);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 188efe9348..186f375633 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -22,11 +22,13 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
+import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.utils.IndexFilePathFactories;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.PathFactory;
 import org.apache.paimon.utils.SnapshotManager;
 
 import java.io.IOException;
@@ -40,36 +42,39 @@ import java.util.Set;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Handle index files. */
 public class IndexFileHandler {
 
+    private final FileIO fileIO;
     private final SnapshotManager snapshotManager;
-    private final PathFactory pathFactory;
     private final IndexManifestFile indexManifestFile;
-    private final HashIndexFile hashIndex;
-    private final DeletionVectorsIndexFile dvIndex;
+    private final IndexFilePathFactories pathFactories;
+    private final MemorySize dvTargetFileSize;
+    private final boolean dvBitmap64;
 
     public IndexFileHandler(
+            FileIO fileIO,
             SnapshotManager snapshotManager,
-            PathFactory pathFactory,
             IndexManifestFile indexManifestFile,
-            HashIndexFile hashIndex,
-            DeletionVectorsIndexFile dvIndex) {
+            IndexFilePathFactories pathFactories,
+            MemorySize dvTargetFileSize,
+            boolean dvBitmap64) {
+        this.fileIO = fileIO;
         this.snapshotManager = snapshotManager;
-        this.pathFactory = pathFactory;
+        this.pathFactories = pathFactories;
         this.indexManifestFile = indexManifestFile;
-        this.hashIndex = hashIndex;
-        this.dvIndex = dvIndex;
+        this.dvTargetFileSize = dvTargetFileSize;
+        this.dvBitmap64 = dvBitmap64;
     }
 
-    public HashIndexFile hashIndex() {
-        return this.hashIndex;
+    public HashIndexFile hashIndex(BinaryRow partition, int bucket) {
+        return new HashIndexFile(fileIO, pathFactories.get(partition, bucket));
     }
 
-    public DeletionVectorsIndexFile dvIndex() {
-        return this.dvIndex;
+    public DeletionVectorsIndexFile dvIndex(BinaryRow partition, int bucket) {
+        return new DeletionVectorsIndexFile(
+                fileIO, pathFactories.get(partition, bucket), 
dvTargetFileSize, dvBitmap64);
     }
 
     public Optional<IndexFileMeta> scanHashIndex(
@@ -166,7 +171,9 @@ public class IndexFileHandler {
     }
 
     public Path filePath(IndexManifestEntry entry) {
-        return pathFactory.toPath(entry.indexFile().fileName());
+        return pathFactories
+                .get(entry.partition(), entry.bucket())
+                .toPath(entry.indexFile().fileName());
     }
 
     public boolean existsManifest(String indexManifest) {
@@ -182,36 +189,36 @@ public class IndexFileHandler {
         return indexManifestFile.readWithIOException(indexManifest);
     }
 
-    private IndexFile indexFile(IndexFileMeta file) {
+    private IndexFile indexFile(IndexManifestEntry entry) {
+        IndexFileMeta file = entry.indexFile();
         switch (file.indexType()) {
             case HASH_INDEX:
-                return hashIndex;
+                return hashIndex(entry.partition(), entry.bucket());
             case DELETION_VECTORS_INDEX:
-                return dvIndex;
+                return dvIndex(entry.partition(), entry.bucket());
             default:
                 throw new IllegalArgumentException("Unknown index type: " + 
file.indexType());
         }
     }
 
     public boolean existsIndexFile(IndexManifestEntry file) {
-        return indexFile(file.indexFile()).exists(file.indexFile().fileName());
+        return indexFile(file).exists(file.indexFile().fileName());
     }
 
     public void deleteIndexFile(IndexManifestEntry entry) {
-        IndexFileMeta file = entry.indexFile();
-        indexFile(file).delete(file.fileName());
+        indexFile(entry).delete(entry.indexFile().fileName());
     }
 
     public void deleteManifest(String indexManifest) {
         indexManifestFile.delete(indexManifest);
     }
 
-    public Map<String, DeletionVector> 
readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
-        for (IndexFileMeta indexFile : fileMetas) {
-            checkArgument(
-                    indexFile.indexType().equals(DELETION_VECTORS_INDEX),
-                    "Input file is not deletion vectors index " + 
indexFile.indexType());
-        }
-        return dvIndex.readAllDeletionVectors(fileMetas);
+    public Map<String, DeletionVector> readAllDeletionVectors(
+            BinaryRow partition, int bucket, List<IndexFileMeta> fileMetas) {
+        return dvIndex(partition, bucket).readAllDeletionVectors(fileMetas);
+    }
+
+    public Map<String, DeletionVector> 
readAllDeletionVectors(IndexManifestEntry entry) {
+        return dvIndex(entry.partition(), 
entry.bucket()).readAllDeletionVectors(entry.indexFile());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java 
b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
index 958dcfcdee..1e525ea4e0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java
@@ -124,11 +124,13 @@ public class PartitionIndex {
             IntPredicate loadFilter,
             IntPredicate bucketFilter) {
         List<IndexManifestEntry> files = 
indexFileHandler.scanEntries(HASH_INDEX, partition);
-        HashIndexFile hashIndex = indexFileHandler.hashIndex();
         Int2ShortHashMap.Builder mapBuilder = Int2ShortHashMap.builder();
         Map<Integer, Long> buckets = new HashMap<>();
         for (IndexManifestEntry file : files) {
-            try (IntIterator iterator = 
hashIndex.read(file.indexFile().fileName())) {
+            try (IntIterator iterator =
+                    indexFileHandler
+                            .hashIndex(file.partition(), file.bucket())
+                            .read(file.indexFile().fileName())) {
                 while (true) {
                     try {
                         int hash = iterator.next();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 15af7eb4fd..7b77fb179f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -429,11 +429,13 @@ public abstract class AbstractFileStoreWrite<T> 
implements FileStoreWrite<T> {
         DynamicBucketIndexMaintainer indexMaintainer =
                 dbMaintainerFactory == null
                         ? null
-                        : 
dbMaintainerFactory.create(restored.dynamicBucketIndex());
+                        : dbMaintainerFactory.create(
+                                partition, bucket, 
restored.dynamicBucketIndex());
         BucketedDvMaintainer dvMaintainer =
                 dvMaintainerFactory == null
                         ? null
-                        : 
dvMaintainerFactory.create(restored.deleteVectorsIndex());
+                        : dvMaintainerFactory.create(
+                                partition, bucket, 
restored.deleteVectorsIndex());
 
         List<DataFileMeta> restoreFiles = restored.dataFiles();
         if (restoreFiles == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 40c69289a5..fd2aa13f36 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -35,6 +35,7 @@ import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.tag.TagTimeExpire;
 import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.IndexFilePathFactories;
 import org.apache.paimon.utils.PathFactory;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -278,12 +279,14 @@ public class TableCommitImpl implements InnerTableCommit {
     private void checkFilesExistence(List<ManifestCommittable> committables) {
         List<Path> files = new ArrayList<>();
         DataFilePathFactories factories = new 
DataFilePathFactories(commit.pathFactory());
-        PathFactory indexFileFactory = commit.pathFactory().indexFileFactory();
+        IndexFilePathFactories indexFactories = new 
IndexFilePathFactories(commit.pathFactory());
         for (ManifestCommittable committable : committables) {
             for (CommitMessage message : committable.fileCommittables()) {
                 CommitMessageImpl msg = (CommitMessageImpl) message;
                 DataFilePathFactory pathFactory =
                         factories.get(message.partition(), message.bucket());
+                PathFactory indexFileFactory =
+                        indexFactories.get(message.partition(), 
message.bucket());
                 Consumer<DataFileMeta> collector = f -> 
files.addAll(f.collectFiles(pathFactory));
                 msg.newFilesIncrement().newFiles().forEach(collector);
                 msg.newFilesIncrement().changelogFiles().forEach(collector);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index ce3f2e9c06..e7c8366372 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -393,7 +393,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
                     if (deletionVectors && deletionIndexFilesMap != null) {
                         builder.withDataDeletionFiles(
                                 getDeletionFiles(
-                                        indexFileHandler.dvIndex(),
+                                        indexFileHandler.dvIndex(partition, 
bucket),
                                         dataFiles,
                                         deletionIndexFilesMap.getOrDefault(
                                                 Pair.of(partition, bucket),
@@ -520,13 +520,13 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                         && deletionIndexFilesMap != null) {
                     builder.withBeforeDeletionFiles(
                             getDeletionFiles(
-                                    indexFileHandler.dvIndex(),
+                                    indexFileHandler.dvIndex(part, bucket),
                                     before,
                                     beforDeletionIndexFilesMap.getOrDefault(
                                             Pair.of(part, bucket), 
Collections.emptyList())));
                     builder.withDataDeletionFiles(
                             getDeletionFiles(
-                                    indexFileHandler.dvIndex(),
+                                    indexFileHandler.dvIndex(part, bucket),
                                     data,
                                     deletionIndexFilesMap.getOrDefault(
                                             Pair.of(part, bucket), 
Collections.emptyList())));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index b63023653a..63d2b06dc0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -268,7 +268,7 @@ public class FileStorePathFactory {
         };
     }
 
-    public PathFactory indexFileFactory() {
+    public PathFactory indexFileFactory(BinaryRow partition, int bucket) {
         return new PathFactory() {
             @Override
             public Path newPath() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/IndexFilePathFactories.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/IndexFilePathFactories.java
new file mode 100644
index 0000000000..934e6b01f2
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/IndexFilePathFactories.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryRow;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Cache for index {@link PathFactory}s. */
+public class IndexFilePathFactories {
+
+    private final Map<Pair<BinaryRow, Integer>, PathFactory> cache = new 
HashMap<>();
+    private final FileStorePathFactory pathFactory;
+
+    public IndexFilePathFactories(FileStorePathFactory pathFactory) {
+        this.pathFactory = pathFactory;
+    }
+
+    public PathFactory get(BinaryRow partition, int bucket) {
+        return cache.computeIfAbsent(
+                Pair.of(partition, bucket),
+                k -> pathFactory.indexFileFactory(k.getKey(), k.getValue()));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 55ffdcbe53..9caa379884 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -132,7 +132,7 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
         BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
         List<IndexFileMeta> indexFiles =
                 fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, 
partition, bucket);
-        return factory.create(indexFiles);
+        return factory.create(partition, bucket, indexFiles);
     }
 
     public CommitMessageImpl writeDVIndexFiles(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
index 5d3b1594ee..c195517731 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
@@ -48,6 +48,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
+import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -61,7 +62,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         initIndexHandler(bitmap64);
 
         BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
-        BucketedDvMaintainer dvMaintainer = factory.create(emptyList());
+        BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, 
emptyList());
         assertThat(dvMaintainer.bitmap64).isEqualTo(bitmap64);
 
         dvMaintainer.notifyNewDeletion("f1", 1);
@@ -74,7 +75,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
 
         Map<String, DeletionVector> deletionVectors =
-                
fileHandler.readAllDeletionVectors(Collections.singletonList(file));
+                fileHandler.readAllDeletionVectors(EMPTY_ROW, 0, 
Collections.singletonList(file));
         assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue();
         assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse();
         assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse();
@@ -89,7 +90,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
 
         BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
 
-        BucketedDvMaintainer dvMaintainer = factory.create(new HashMap<>());
+        BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, new 
HashMap<>());
         DeletionVector deletionVector1 = createDeletionVector(bitmap64);
         deletionVector1.delete(1);
         deletionVector1.delete(3);
@@ -100,7 +101,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
         CommitMessage commitMessage =
                 new CommitMessageImpl(
-                        BinaryRow.EMPTY_ROW,
+                        EMPTY_ROW,
                         0,
                         1,
                         DataIncrement.emptyIncrement(),
@@ -111,8 +112,8 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
 
         Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
         List<IndexFileMeta> indexFiles =
-                fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, 
BinaryRow.EMPTY_ROW, 0);
-        dvMaintainer = factory.create(indexFiles);
+                fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, 
EMPTY_ROW, 0);
+        dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles);
         DeletionVector deletionVector2 = 
dvMaintainer.deletionVectorOf("f1").get();
         assertThat(deletionVector2.isDeleted(1)).isTrue();
         assertThat(deletionVector2.isDeleted(2)).isFalse();
@@ -123,7 +124,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         file = dvMaintainer.writeDeletionVectorsIndex().get();
         commitMessage =
                 new CommitMessageImpl(
-                        BinaryRow.EMPTY_ROW,
+                        EMPTY_ROW,
                         0,
                         1,
                         DataIncrement.emptyIncrement(),
@@ -133,9 +134,8 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         commit.commit(Collections.singletonList(commitMessage));
 
         latestSnapshot = table.snapshotManager().latestSnapshot();
-        indexFiles =
-                fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, 
BinaryRow.EMPTY_ROW, 0);
-        dvMaintainer = factory.create(indexFiles);
+        indexFiles = fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, 
EMPTY_ROW, 0);
+        dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles);
         DeletionVector deletionVector3 = 
dvMaintainer.deletionVectorOf("f1").get();
         assertThat(deletionVector3.isDeleted(1)).isTrue();
         assertThat(deletionVector3.isDeleted(2)).isTrue();
@@ -147,7 +147,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         initIndexHandler(bitmap64);
 
         BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
-        BucketedDvMaintainer dvMaintainer = factory.create(emptyList());
+        BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, 
emptyList());
 
         File indexDir = new File(tempPath.toFile(), "/default.db/T/index");
 
@@ -188,7 +188,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         // write first kind dv
         initIndexHandler(bitmap64);
         BucketedDvMaintainer.Factory factory1 = 
BucketedDvMaintainer.factory(fileHandler);
-        BucketedDvMaintainer dvMaintainer1 = factory1.create(new HashMap<>());
+        BucketedDvMaintainer dvMaintainer1 = factory1.create(EMPTY_ROW, 0, new 
HashMap<>());
         dvMaintainer1.notifyNewDeletion("f1", 1);
         dvMaintainer1.notifyNewDeletion("f1", 3);
         dvMaintainer1.notifyNewDeletion("f2", 1);
@@ -198,7 +198,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         IndexFileMeta file = dvMaintainer1.writeDeletionVectorsIndex().get();
         CommitMessage commitMessage1 =
                 new CommitMessageImpl(
-                        BinaryRow.EMPTY_ROW,
+                        EMPTY_ROW,
                         0,
                         1,
                         DataIncrement.emptyIncrement(),
@@ -212,11 +212,8 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         BucketedDvMaintainer.Factory factory2 = 
BucketedDvMaintainer.factory(fileHandler);
         List<IndexFileMeta> indexFiles =
                 fileHandler.scan(
-                        table.latestSnapshot().get(),
-                        DELETION_VECTORS_INDEX,
-                        BinaryRow.EMPTY_ROW,
-                        0);
-        BucketedDvMaintainer dvMaintainer2 = factory2.create(indexFiles);
+                        table.latestSnapshot().get(), DELETION_VECTORS_INDEX, 
EMPTY_ROW, 0);
+        BucketedDvMaintainer dvMaintainer2 = factory2.create(EMPTY_ROW, 0, 
indexFiles);
         dvMaintainer2.notifyNewDeletion("f1", 10);
         dvMaintainer2.notifyNewDeletion("f3", 1);
         dvMaintainer2.notifyNewDeletion("f3", 3);
@@ -234,7 +231,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         file = dvMaintainer2.writeDeletionVectorsIndex().get();
         CommitMessage commitMessage2 =
                 new CommitMessageImpl(
-                        BinaryRow.EMPTY_ROW,
+                        EMPTY_ROW,
                         0,
                         1,
                         DataIncrement.emptyIncrement(),
@@ -246,11 +243,10 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
         // test read dv index file which contains two kinds of dv
         Map<String, DeletionVector> readDvs =
                 fileHandler.readAllDeletionVectors(
+                        EMPTY_ROW,
+                        0,
                         fileHandler.scan(
-                                table.latestSnapshot().get(),
-                                "DELETION_VECTORS",
-                                BinaryRow.EMPTY_ROW,
-                                0));
+                                table.latestSnapshot().get(), 
"DELETION_VECTORS", EMPTY_ROW, 0));
         assertThat(readDvs.size()).isEqualTo(3);
         assertThat(dvs.get("f1").getCardinality()).isEqualTo(3);
         assertThat(dvs.get("f2").getCardinality()).isEqualTo(2);
@@ -282,7 +278,7 @@ public class BucketedDvMaintainerTest extends 
PrimaryKeyTableTestBase {
                                 .map(IndexManifestEntry::indexFile)
                                 .collect(Collectors.toList());
         Map<String, DeletionVector> deletionVectors =
-                new HashMap<>(handler.readAllDeletionVectors(indexFiles));
-        return factory.create(deletionVectors);
+                new HashMap<>(handler.readAllDeletionVectors(EMPTY_ROW, 0, 
indexFiles));
+        return factory.create(EMPTY_ROW, 0, deletionVectors);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
index 98adf5e8fb..e84d09ae72 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
@@ -28,6 +28,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
+
 /** Helper for {@link BaseAppendDeleteFileMaintainer}. */
 public class AppendDeletionFileMaintainerHelper {
 
@@ -48,6 +50,9 @@ public class AppendDeletionFileMaintainerHelper {
                                                 
indexManifestEntry.indexFile().fileName()))
                         .collect(Collectors.toList());
         return new AppendDeleteFileMaintainer(
-                indexFileHandler.dvIndex(), partition, manifests, 
deletionFiles);
+                indexFileHandler.dvIndex(partition, UNAWARE_BUCKET),
+                partition,
+                manifests,
+                deletionFiles);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index 3b81c8478e..ab0376095c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -66,7 +66,7 @@ class AppendDeletionFileMaintainerTest {
                         Collections.singletonMap("f3", Arrays.asList(1, 2, 
3)));
         store.commit(commitMessage1, commitMessage2);
 
-        PathFactory indexPathFactory = store.pathFactory().indexFileFactory();
+        PathFactory indexPathFactory = 
store.pathFactory().indexFileFactory(BinaryRow.EMPTY_ROW, 0);
         Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
         dataFileToDeletionFiles.putAll(
                 createDeletionFileMapFromIndexFileMetas(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
index 4c6c79d17e..769817d63b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/DynamicBucketIndexMaintainerTest.java
@@ -80,7 +80,8 @@ public class DynamicBucketIndexMaintainerTest extends 
PrimaryKeyTableTestBase {
                 continue;
             }
             int[] ints =
-                    
fileHandler.hashIndex().readList(files.get(0).fileName()).stream()
+                    fileHandler.hashIndex(message.partition(), 
message.bucket())
+                            .readList(files.get(0).fileName()).stream()
                             .mapToInt(Integer::intValue)
                             .toArray();
             index.computeIfAbsent(message.partition(), k -> new HashMap<>())
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
index 6ece642aa7..01eca82c2e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java
@@ -45,13 +45,11 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 public class HashBucketAssignerTest extends PrimaryKeyTableTestBase {
 
     private IndexFileHandler fileHandler;
-    private HashIndexFile indexFile;
     private StreamTableCommit commit;
 
     @BeforeEach
     public void beforeEach() throws Exception {
         fileHandler = table.store().newIndexFileHandler();
-        indexFile = fileHandler.hashIndex();
         commit = 
table.newStreamWriteBuilder().withCommitUser(commitUser).newCommit();
     }
 
@@ -226,13 +224,19 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
 
     @Test
     public void testAssignRestore() throws IOException {
-        IndexFileMeta bucket0 = indexFile.write(new int[] {2, 5});
-        IndexFileMeta bucket2 = indexFile.write(new int[] {4, 7});
         commit.commit(
                 0,
                 Arrays.asList(
-                        createCommitMessage(row(1), 0, 3, bucket0),
-                        createCommitMessage(row(1), 2, 3, bucket2)));
+                        createCommitMessage(
+                                row(1),
+                                0,
+                                3,
+                                fileHandler.hashIndex(row(1), 0).write(new 
int[] {2, 5})),
+                        createCommitMessage(
+                                row(1),
+                                2,
+                                3,
+                                fileHandler.hashIndex(row(1), 2).write(new 
int[] {4, 7}))));
 
         HashBucketAssigner assigner0 = createAssigner(3, 3, 0);
         HashBucketAssigner assigner2 = createAssigner(3, 3, 2);
@@ -252,13 +256,19 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
 
     @Test
     public void testAssignRestoreWithUpperBound() throws IOException {
-        IndexFileMeta bucket0 = indexFile.write(new int[] {2, 5});
-        IndexFileMeta bucket2 = indexFile.write(new int[] {4, 7});
         commit.commit(
                 0,
                 Arrays.asList(
-                        createCommitMessage(row(1), 0, 3, bucket0),
-                        createCommitMessage(row(1), 2, 3, bucket2)));
+                        createCommitMessage(
+                                row(1),
+                                0,
+                                3,
+                                fileHandler.hashIndex(row(1), 0).write(new 
int[] {2, 5})),
+                        createCommitMessage(
+                                row(1),
+                                2,
+                                3,
+                                fileHandler.hashIndex(row(1), 2).write(new 
int[] {4, 7}))));
 
         HashBucketAssigner assigner0 = createAssigner(3, 3, 0, 1);
         HashBucketAssigner assigner2 = createAssigner(3, 3, 2, 1);
@@ -316,8 +326,17 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
         commit.commit(
                 0,
                 Arrays.asList(
-                        createCommitMessage(row(1), 0, 1, indexFile.write(new 
int[] {0})),
-                        createCommitMessage(row(2), 0, 1, indexFile.write(new 
int[] {0}))));
+                        createCommitMessage(
+                                row(1),
+                                0,
+                                1,
+                                fileHandler.hashIndex(row(1), 0).write(new 
int[] {0})),
+                        createCommitMessage(
+                                row(2),
+                                0,
+                                1,
+                                fileHandler.hashIndex(row(2), 0).write(new 
int[] {0}))));
+
         
assertThat(assigner.currentPartitions()).containsExactlyInAnyOrder(row(1), 
row(2));
 
         // checkpoint 1, but no commit
@@ -333,7 +352,12 @@ public class HashBucketAssignerTest extends 
PrimaryKeyTableTestBase {
         commit.commit(
                 1,
                 Collections.singletonList(
-                        createCommitMessage(row(1), 0, 1, indexFile.write(new 
int[] {1}))));
+                        createCommitMessage(
+                                row(1),
+                                0,
+                                1,
+                                fileHandler.hashIndex(row(1), 0).write(new 
int[] {1}))));
+
         assigner.prepareCommit(3);
         assertThat(assigner.currentPartitions()).isEmpty();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index f9a584b508..f8a9309a09 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -29,7 +29,6 @@ import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.index.HashIndexFile;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.manifest.FileKind;
@@ -715,7 +714,6 @@ public class FileStoreCommitTest {
     public void testIndexFiles() throws Exception {
         TestFileStore store = createStore(false, 2);
         IndexFileHandler indexFileHandler = store.newIndexFileHandler();
-        HashIndexFile hashIndex = indexFileHandler.hashIndex();
 
         KeyValue record1 = gen.next();
         BinaryRow part1 = gen.getPartition(record1);
@@ -727,9 +725,23 @@ public class FileStoreCommitTest {
         }
 
         // init write
-        store.commitDataIndex(record1, gen::getPartition, 0, 
hashIndex.write(new int[] {1, 2, 5}));
-        store.commitDataIndex(record1, gen::getPartition, 1, 
hashIndex.write(new int[] {6, 8}));
-        store.commitDataIndex(record2, gen::getPartition, 2, 
hashIndex.write(new int[] {3, 5}));
+        store.commitDataIndex(
+                record1,
+                gen::getPartition,
+                0,
+                indexFileHandler
+                        .hashIndex(gen.getPartition(record1), 0)
+                        .write(new int[] {1, 2, 5}));
+        store.commitDataIndex(
+                record1,
+                gen::getPartition,
+                1,
+                indexFileHandler.hashIndex(gen.getPartition(record1), 
1).write(new int[] {6, 8}));
+        store.commitDataIndex(
+                record2,
+                gen::getPartition,
+                2,
+                indexFileHandler.hashIndex(gen.getPartition(record2), 
2).write(new int[] {3, 5}));
 
         Snapshot snapshot = store.snapshotManager().latestSnapshot();
 
@@ -740,12 +752,18 @@ public class FileStoreCommitTest {
 
         IndexManifestEntry indexManifestEntry =
                 part1Index.stream().filter(entry -> entry.bucket() == 
0).findAny().get();
-        
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
+        assertThat(
+                        indexFileHandler
+                                .hashIndex(part1, 0)
+                                
.readList(indexManifestEntry.indexFile().fileName()))
                 .containsExactlyInAnyOrder(1, 2, 5);
 
         indexManifestEntry =
                 part1Index.stream().filter(entry -> entry.bucket() == 
1).findAny().get();
-        
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
+        assertThat(
+                        indexFileHandler
+                                .hashIndex(part1, 1)
+                                
.readList(indexManifestEntry.indexFile().fileName()))
                 .containsExactlyInAnyOrder(6, 8);
 
         // assert part2
@@ -753,11 +771,18 @@ public class FileStoreCommitTest {
                 indexFileHandler.scanEntries(snapshot, HASH_INDEX, part2);
         assertThat(part2Index.size()).isEqualTo(1);
         assertThat(part2Index.get(0).bucket()).isEqualTo(2);
-        
assertThat(hashIndex.readList(part2Index.get(0).indexFile().fileName()))
+        assertThat(
+                        indexFileHandler
+                                .hashIndex(part2, 2)
+                                
.readList(part2Index.get(0).indexFile().fileName()))
                 .containsExactlyInAnyOrder(3, 5);
 
         // update part1
-        store.commitDataIndex(record1, gen::getPartition, 0, 
hashIndex.write(new int[] {1, 4}));
+        store.commitDataIndex(
+                record1,
+                gen::getPartition,
+                0,
+                indexFileHandler.hashIndex(gen.getPartition(record1), 
0).write(new int[] {1, 4}));
         snapshot = store.snapshotManager().latestSnapshot();
 
         // assert update part1
@@ -766,18 +791,25 @@ public class FileStoreCommitTest {
 
         indexManifestEntry =
                 part1Index.stream().filter(entry -> entry.bucket() == 
0).findAny().get();
-        
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
+        assertThat(
+                        indexFileHandler
+                                .hashIndex(part1, 0)
+                                
.readList(indexManifestEntry.indexFile().fileName()))
                 .containsExactlyInAnyOrder(1, 4);
 
         indexManifestEntry =
                 part1Index.stream().filter(entry -> entry.bucket() == 
1).findAny().get();
-        
assertThat(hashIndex.readList(indexManifestEntry.indexFile().fileName()))
+        assertThat(
+                        indexFileHandler
+                                .hashIndex(part1, 1)
+                                
.readList(indexManifestEntry.indexFile().fileName()))
                 .containsExactlyInAnyOrder(6, 8);
 
         // assert scan one bucket
         Optional<IndexFileMeta> file = 
indexFileHandler.scanHashIndex(snapshot, part1, 0);
         assertThat(file).isPresent();
-        
assertThat(hashIndex.readList(file.get().fileName())).containsExactlyInAnyOrder(1,
 4);
+        assertThat(indexFileHandler.hashIndex(part1, 
0).readList(file.get().fileName()))
+                .containsExactlyInAnyOrder(1, 4);
 
         // overwrite one partition
         
store.options().toConfiguration().set(CoreOptions.DYNAMIC_PARTITION_OVERWRITE, 
true);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 44c7ebc754..f61f1fa1d7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -23,7 +23,6 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.flink.util.AbstractTestBase;
-import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -121,10 +120,9 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         List<IndexManifestEntry> indexManifestEntries =
                 table.indexManifestFileReader().read(snapshot.indexManifest());
         assertThat(indexManifestEntries.size()).isEqualTo(1);
-        IndexFileMeta indexFileMeta = indexManifestEntries.get(0).indexFile();
         return table.store()
                 .newIndexFileHandler()
-                .readAllDeletionVectors(singletonList(indexFileMeta));
+                .readAllDeletionVectors(indexManifestEntries.get(0));
     }
 
     @Test


Reply via email to