This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c269968c2 [core] Simplify AppendDeleteFileMaintainer to maintainer 
free (#6123)
1c269968c2 is described below

commit 1c269968c2b32d2eb4987435b98dc143f76df1ca
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Fri Aug 22 19:57:32 2025 +0800

    [core] Simplify AppendDeleteFileMaintainer to maintainer free (#6123)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |  2 +-
 .../java/org/apache/paimon/AbstractFileStore.java  |  6 +-
 .../org/apache/paimon/AppendOnlyFileStore.java     | 11 ++-
 .../java/org/apache/paimon/KeyValueFileStore.java  |  9 +--
 .../append/BucketedAppendCompactManager.java       | 16 ++--
 .../apache/paimon/compact/CompactDeletionFile.java | 21 ++---
 ...rsMaintainer.java => BucketedDvMaintainer.java} | 27 +++----
 .../paimon/deletionvectors/DeletionVector.java     |  2 +-
 .../DeletionVectorIndexFileWriter.java             | 38 +++++----
 .../deletionvectors/DeletionVectorsIndexFile.java  | 32 ++++----
 .../append/AppendDeleteFileMaintainer.java         | 90 ++++++++--------------
 .../append/BaseAppendDeleteFileMaintainer.java     | 36 +++++++--
 .../append/BucketedAppendDeleteFileMaintainer.java | 15 ++--
 .../org/apache/paimon/index/IndexFileHandler.java  | 55 +------------
 .../paimon/mergetree/compact/CompactStrategy.java  |  4 +-
 .../LookupChangelogMergeFunctionWrapper.java       |  6 +-
 .../compact/LookupMergeTreeCompactRewriter.java    | 12 +--
 .../mergetree/compact/MergeTreeCompactManager.java |  6 +-
 .../paimon/operation/AbstractFileStoreWrite.java   | 14 ++--
 .../paimon/operation/AppendFileStoreWrite.java     |  7 +-
 .../paimon/operation/BaseAppendFileStoreWrite.java |  8 +-
 .../operation/BucketedAppendFileStoreWrite.java    |  6 +-
 .../apache/paimon/operation/FileStoreWrite.java    |  6 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   | 10 +--
 .../paimon/operation/MemoryFileStoreWrite.java     |  4 +-
 .../postpone/PostponeBucketFileStoreWrite.java     |  4 +-
 .../org/apache/paimon/TestAppendFileStore.java     | 14 ++--
 ...inerTest.java => BucketedDvMaintainerTest.java} | 58 ++++++--------
 .../DeletionVectorsIndexFileTest.java              | 16 ++--
 .../append/AppendDeletionFileMaintainerHelper.java | 19 ++++-
 .../paimon/operation/FileStoreCommitTest.java      |  5 +-
 .../paimon/spark/sql/DeletionVectorTest.scala      | 20 ++---
 32 files changed, 266 insertions(+), 313 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e0f9af088d..8d86219cfb 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2863,7 +2863,7 @@ public class CoreOptions implements Serializable {
         return deletionVectorsEnabled() || mergeEngine() == FIRST_ROW;
     }
 
-    public MemorySize deletionVectorIndexFileTargetSize() {
+    public MemorySize dvIndexFileTargetSize() {
         return options.get(DELETION_VECTOR_INDEX_FILE_TARGET_SIZE);
     }
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 11f67a2c10..4d2b7726de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -43,14 +43,12 @@ import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
-import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.partition.PartitionExpireStrategy;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.stats.StatsFile;
 import org.apache.paimon.stats.StatsFileHandler;
-import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.PartitionHandler;
@@ -230,9 +228,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 new DeletionVectorsIndexFile(
                         fileIO,
                         pathFactory().indexFileFactory(),
-                        bucketMode() == BucketMode.BUCKET_UNAWARE
-                                ? options.deletionVectorIndexFileTargetSize()
-                                : MemorySize.ofBytes(Long.MAX_VALUE),
+                        options.dvIndexFileTargetSize(),
                         options.deletionVectorBitmap64()));
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 0d9ad3b121..2f552d799b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -19,7 +19,7 @@
 package org.apache.paimon;
 
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.operation.AppendFileStoreWrite;
@@ -107,10 +107,6 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
 
     @Override
     public BaseAppendFileStoreWrite newWrite(String commitUser, @Nullable 
Integer writeId) {
-        DeletionVectorsMaintainer.Factory dvMaintainerFactory =
-                options.deletionVectorsEnabled()
-                        ? 
DeletionVectorsMaintainer.factory(newIndexFileHandler())
-                        : null;
         if (bucketMode() == BucketMode.BUCKET_UNAWARE) {
             RawFileSplitRead readForCompact = newRead();
             if (options.rowTrackingEnabled()) {
@@ -126,9 +122,12 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                     snapshotManager(),
                     newScan(),
                     options,
-                    dvMaintainerFactory,
                     tableName);
         } else {
+            BucketedDvMaintainer.Factory dvMaintainerFactory =
+                    options.deletionVectorsEnabled()
+                            ? 
BucketedDvMaintainer.factory(newIndexFileHandler())
+                            : null;
             return new BucketedAppendFileStoreWrite(
                     fileIO,
                     newRead(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index f7a743ace1..5e5b354e65 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -20,7 +20,7 @@ package org.apache.paimon;
 
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -177,10 +177,9 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
         if (bucketMode() == BucketMode.HASH_DYNAMIC) {
             indexFactory = new 
DynamicBucketIndexMaintainer.Factory(newIndexFileHandler());
         }
-        DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory = 
null;
+        BucketedDvMaintainer.Factory dvMaintainerFactory = null;
         if (options.deletionVectorsEnabled()) {
-            deletionVectorsMaintainerFactory =
-                    new 
DeletionVectorsMaintainer.Factory(newIndexFileHandler());
+            dvMaintainerFactory = 
BucketedDvMaintainer.factory(newIndexFileHandler());
         }
         return new KeyValueFileStoreWrite(
                 fileIO,
@@ -199,7 +198,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 snapshotManager(),
                 newScan(),
                 indexFactory,
-                deletionVectorsMaintainerFactory,
+                dvMaintainerFactory,
                 options,
                 keyValueFieldsExtractor,
                 tableName);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index 666704b836..4a057f9576 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -24,7 +24,7 @@ import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.compact.CompactFutureManager;
 import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.compact.CompactTask;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.operation.metrics.CompactionMetrics;
 import org.apache.paimon.operation.metrics.MetricUtils;
@@ -56,7 +56,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
     private static final int FULL_COMPACT_MIN_FILE = 3;
 
     private final ExecutorService executor;
-    private final DeletionVectorsMaintainer dvMaintainer;
+    private final BucketedDvMaintainer dvMaintainer;
     private final PriorityQueue<DataFileMeta> toCompact;
     private final int minFileNum;
     private final long targetFileSize;
@@ -70,7 +70,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
     public BucketedAppendCompactManager(
             ExecutorService executor,
             List<DataFileMeta> restored,
-            @Nullable DeletionVectorsMaintainer dvMaintainer,
+            @Nullable BucketedDvMaintainer dvMaintainer,
             int minFileNum,
             long targetFileSize,
             boolean forceRewriteAllFiles,
@@ -241,14 +241,14 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
     /** A {@link CompactTask} impl for full compaction of append-only table. */
     public static class FullCompactTask extends CompactTask {
 
-        private final DeletionVectorsMaintainer dvMaintainer;
+        private final BucketedDvMaintainer dvMaintainer;
         private final LinkedList<DataFileMeta> toCompact;
         private final long targetFileSize;
         private final boolean forceRewriteAllFiles;
         private final CompactRewriter rewriter;
 
         public FullCompactTask(
-                DeletionVectorsMaintainer dvMaintainer,
+                BucketedDvMaintainer dvMaintainer,
                 Collection<DataFileMeta> inputs,
                 long targetFileSize,
                 boolean forceRewriteAllFiles,
@@ -314,12 +314,12 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
      */
     public static class AutoCompactTask extends CompactTask {
 
-        private final DeletionVectorsMaintainer dvMaintainer;
+        private final BucketedDvMaintainer dvMaintainer;
         private final List<DataFileMeta> toCompact;
         private final CompactRewriter rewriter;
 
         public AutoCompactTask(
-                DeletionVectorsMaintainer dvMaintainer,
+                BucketedDvMaintainer dvMaintainer,
                 List<DataFileMeta> toCompact,
                 CompactRewriter rewriter,
                 @Nullable CompactionMetrics.Reporter metricsReporter) {
@@ -336,7 +336,7 @@ public class BucketedAppendCompactManager extends 
CompactFutureManager {
     }
 
     private static CompactResult compact(
-            @Nullable DeletionVectorsMaintainer dvMaintainer,
+            @Nullable BucketedDvMaintainer dvMaintainer,
             List<DataFileMeta> toCompact,
             CompactRewriter rewriter)
             throws Exception {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
index 390ab7af90..f52c3a41d8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
@@ -18,13 +18,12 @@
 
 package org.apache.paimon.compact;
 
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 
 import javax.annotation.Nullable;
 
-import java.util.List;
 import java.util.Optional;
 
 /** Deletion File from compaction. */
@@ -41,19 +40,13 @@ public interface CompactDeletionFile {
      * immediately, so when updateCompactResult, we need to merge old deletion 
files (just delete
      * them).
      */
-    static CompactDeletionFile generateFiles(DeletionVectorsMaintainer 
maintainer) {
-        List<IndexFileMeta> files = maintainer.writeDeletionVectorsIndex();
-        if (files.size() > 1) {
-            throw new IllegalStateException(
-                    "Should only generate one compact deletion file, this is a 
bug.");
-        }
-
-        return new GeneratedDeletionFile(
-                files.isEmpty() ? null : files.get(0), 
maintainer.indexFileHandler());
+    static CompactDeletionFile generateFiles(BucketedDvMaintainer maintainer) {
+        Optional<IndexFileMeta> file = maintainer.writeDeletionVectorsIndex();
+        return new GeneratedDeletionFile(file.orElse(null), 
maintainer.indexFileHandler());
     }
 
     /** For sync compaction, only create deletion files when prepareCommit. */
-    static CompactDeletionFile lazyGeneration(DeletionVectorsMaintainer 
maintainer) {
+    static CompactDeletionFile lazyGeneration(BucketedDvMaintainer maintainer) 
{
         return new LazyCompactDeletionFile(maintainer);
     }
 
@@ -107,11 +100,11 @@ public interface CompactDeletionFile {
     /** A lazy generation implementation of {@link CompactDeletionFile}. */
     class LazyCompactDeletionFile implements CompactDeletionFile {
 
-        private final DeletionVectorsMaintainer maintainer;
+        private final BucketedDvMaintainer maintainer;
 
         private boolean generated = false;
 
-        public LazyCompactDeletionFile(DeletionVectorsMaintainer maintainer) {
+        public LazyCompactDeletionFile(BucketedDvMaintainer maintainer) {
             this.maintainer = maintainer;
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
similarity index 86%
rename from 
paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
rename to 
paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
index f19a236a14..1feb135efc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
@@ -31,14 +31,14 @@ import java.util.Map;
 import java.util.Optional;
 
 /** Maintainer of deletionVectors index. */
-public class DeletionVectorsMaintainer {
+public class BucketedDvMaintainer {
 
     private final IndexFileHandler indexFileHandler;
     private final Map<String, DeletionVector> deletionVectors;
     protected final boolean bitmap64;
     private boolean modified;
 
-    private DeletionVectorsMaintainer(
+    private BucketedDvMaintainer(
             IndexFileHandler fileHandler, Map<String, DeletionVector> 
deletionVectors) {
         this.indexFileHandler = fileHandler;
         this.deletionVectors = deletionVectors;
@@ -108,15 +108,16 @@ public class DeletionVectorsMaintainer {
     /**
      * Write new deletion vectors index file if any modifications have been 
made.
      *
-     * @return A list containing the metadata of the deletion vectors index 
file, or an empty list
-     *     if no changes need to be committed.
+     * @return None if no modifications have been made, otherwise the new 
deletion vectors index
+     *     file.
      */
-    public List<IndexFileMeta> writeDeletionVectorsIndex() {
+    public Optional<IndexFileMeta> writeDeletionVectorsIndex() {
         if (modified) {
             modified = false;
-            return indexFileHandler.writeDeletionVectorsIndex(deletionVectors);
+            return Optional.of(
+                    
indexFileHandler.deletionVectorsIndex().writeSingleFile(deletionVectors));
         }
-        return Collections.emptyList();
+        return Optional.empty();
     }
 
     /**
@@ -147,12 +148,12 @@ public class DeletionVectorsMaintainer {
         return new Factory(handler);
     }
 
-    /** Factory to restore {@link DeletionVectorsMaintainer}. */
+    /** Factory to restore {@link BucketedDvMaintainer}. */
     public static class Factory {
 
         private final IndexFileHandler handler;
 
-        public Factory(IndexFileHandler handler) {
+        private Factory(IndexFileHandler handler) {
             this.handler = handler;
         }
 
@@ -160,7 +161,7 @@ public class DeletionVectorsMaintainer {
             return handler;
         }
 
-        public DeletionVectorsMaintainer create(@Nullable List<IndexFileMeta> 
restoredFiles) {
+        public BucketedDvMaintainer create(@Nullable List<IndexFileMeta> 
restoredFiles) {
             if (restoredFiles == null) {
                 restoredFiles = Collections.emptyList();
             }
@@ -169,12 +170,12 @@ public class DeletionVectorsMaintainer {
             return create(deletionVectors);
         }
 
-        public DeletionVectorsMaintainer create() {
+        public BucketedDvMaintainer create() {
             return create(new HashMap<>());
         }
 
-        public DeletionVectorsMaintainer create(Map<String, DeletionVector> 
deletionVectors) {
-            return new DeletionVectorsMaintainer(handler, deletionVectors);
+        public BucketedDvMaintainer create(Map<String, DeletionVector> 
deletionVectors) {
+            return new BucketedDvMaintainer(handler, deletionVectors);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
index ff48c7d218..e06dc767a9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
@@ -149,7 +149,7 @@ public interface DeletionVector extends 
DeletionVectorJudger {
         return fileName -> Optional.empty();
     }
 
-    static Factory factory(@Nullable DeletionVectorsMaintainer dvMaintainer) {
+    static Factory factory(@Nullable BucketedDvMaintainer dvMaintainer) {
         if (dvMaintainer == null) {
             return emptyFactory();
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
index 4647b55fb1..4104305e05 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -45,12 +45,29 @@ public class DeletionVectorIndexFileWriter {
     }
 
     /**
-     * For unaware-bucket mode, this method will write out multiple index 
files, else, it will write
-     * out only one index file.
+     * The deletion file of the bucketed table is updated according to the 
bucket. If a compaction
+     * occurs and there is no longer a deletion file, an empty deletion file 
needs to be generated
+     * to overwrite the old file.
+     *
+     * <p>TODO: We can consider sending a message to delete the deletion file 
in the future.
      */
-    public List<IndexFileMeta> write(Map<String, DeletionVector> input) throws 
IOException {
+    public IndexFileMeta writeSingleFile(Map<String, DeletionVector> input) 
throws IOException {
+
+        DeletionFileWriter writer = new 
DeletionFileWriter(indexPathFactory.newPath(), fileIO);
+        try {
+            for (Map.Entry<String, DeletionVector> entry : input.entrySet()) {
+                writer.write(entry.getKey(), entry.getValue());
+            }
+        } finally {
+            writer.close();
+        }
+        return writer.result();
+    }
+
+    public List<IndexFileMeta> writeWithRolling(Map<String, DeletionVector> 
input)
+            throws IOException {
         if (input.isEmpty()) {
-            return emptyIndexFile();
+            return Collections.emptyList();
         }
         List<IndexFileMeta> result = new ArrayList<>();
         Iterator<Map.Entry<String, DeletionVector>> iterator = 
input.entrySet().iterator();
@@ -76,17 +93,4 @@ public class DeletionVectorIndexFileWriter {
         }
         return writer.result();
     }
-
-    /**
-     * The deletion file of the bucketed table is updated according to the 
bucket. If a compaction
-     * occurs and there is no longer a deletion file, an empty deletion file 
needs to be generated
-     * to overwrite the old file.
-     *
-     * <p>TODO: We can consider sending a message to delete the deletion file 
in the future.
-     */
-    private List<IndexFileMeta> emptyIndexFile() throws IOException {
-        DeletionFileWriter writer = new 
DeletionFileWriter(indexPathFactory.newPath(), fileIO);
-        writer.close();
-        return Collections.singletonList(writer.result());
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index fa9ed56417..cc554b816a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -141,29 +141,27 @@ public class DeletionVectorsIndexFile extends IndexFile {
         }
     }
 
-    /**
-     * Write deletion vectors to a new file, the format of this file can be 
referenced at: <a
-     * href="https://cwiki.apache.org/confluence/x/Tws4EQ";>PIP-16</a>.
-     *
-     * @param input A map where the key represents which file the 
DeletionVector belongs to, and the
-     *     value is the corresponding DeletionVector object.
-     * @return A Pair object specifying the name of the written new file and a 
map where the key
-     *     represents which file the DeletionVector belongs to and the value 
is a Pair object
-     *     specifying the range (start position and size) within the file 
where the deletion vector
-     *     data is located.
-     * @throws UncheckedIOException If an I/O error occurs while writing to 
the file.
-     */
-    public List<IndexFileMeta> write(Map<String, DeletionVector> input) {
+    public IndexFileMeta writeSingleFile(Map<String, DeletionVector> input) {
+        try {
+            return createWriter().writeSingleFile(input);
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to write deletion vectors.", e);
+        }
+    }
+
+    public List<IndexFileMeta> writeWithRolling(Map<String, DeletionVector> 
input) {
         try {
-            DeletionVectorIndexFileWriter writer =
-                    new DeletionVectorIndexFileWriter(
-                            this.fileIO, this.pathFactory, 
this.targetSizePerIndexFile);
-            return writer.write(input);
+            return createWriter().writeWithRolling(input);
         } catch (IOException e) {
             throw new RuntimeException("Failed to write deletion vectors.", e);
         }
     }
 
+    private DeletionVectorIndexFileWriter createWriter() {
+        return new DeletionVectorIndexFileWriter(
+                this.fileIO, this.pathFactory, this.targetSizePerIndexFile);
+    }
+
     private void checkVersion(InputStream in) throws IOException {
         int version = in.read();
         if (version != VERSION_ID_V1) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java
index 38e1176de7..70e00bc4ad 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java
@@ -22,9 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
@@ -36,58 +34,48 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
 
 /** A {@link BaseAppendDeleteFileMaintainer} of unaware bucket append table. */
 public class AppendDeleteFileMaintainer implements 
BaseAppendDeleteFileMaintainer {
 
-    private final IndexFileHandler indexFileHandler;
+    private final DeletionVectorsIndexFile dvIndexFile;
 
     private final BinaryRow partition;
     private final Map<String, DeletionFile> dataFileToDeletionFile;
-    private final Map<String, IndexManifestEntry> indexNameToEntry = new 
HashMap<>();
-
-    private final Map<String, Map<String, DeletionFile>> 
indexFileToDeletionFiles = new HashMap<>();
-    private final Map<String, String> dataFileToIndexFile = new HashMap<>();
-
-    private final Set<String> touchedIndexFiles = new HashSet<>();
-
-    private final DeletionVectorsMaintainer maintainer;
+    private final Map<String, IndexManifestEntry> indexNameToEntry;
+    private final Map<String, Map<String, DeletionFile>> 
indexFileToDeletionFiles;
+    private final Map<String, String> dataFileToIndexFile;
+    private final Set<String> touchedIndexFiles;
+    private final Map<String, DeletionVector> deletionVectors;
 
     AppendDeleteFileMaintainer(
-            IndexFileHandler indexFileHandler,
+            DeletionVectorsIndexFile dvIndexFile,
             BinaryRow partition,
+            List<IndexManifestEntry> manifestEntries,
             Map<String, DeletionFile> deletionFiles) {
-        this.indexFileHandler = indexFileHandler;
+        this.dvIndexFile = dvIndexFile;
         this.partition = partition;
         this.dataFileToDeletionFile = new HashMap<>(deletionFiles);
-        // the deletion of data files is independent
-        // just create an empty maintainer
-        this.maintainer = new 
DeletionVectorsMaintainer.Factory(indexFileHandler).create();
-
-        List<String> touchedIndexFileNames =
-                deletionFiles.values().stream()
-                        .map(deletionFile -> new 
Path(deletionFile.path()).getName())
-                        .distinct()
-                        .collect(Collectors.toList());
-        indexFileHandler.scanEntries().stream()
-                .filter(
-                        indexManifestEntry ->
-                                touchedIndexFileNames.contains(
-                                        
indexManifestEntry.indexFile().fileName()))
-                .forEach(entry -> 
indexNameToEntry.put(entry.indexFile().fileName(), entry));
+        this.deletionVectors = new HashMap<>();
+
+        this.indexNameToEntry = new HashMap<>();
+        for (IndexManifestEntry entry : manifestEntries) {
+            indexNameToEntry.put(entry.indexFile().fileName(), entry);
+        }
 
+        this.indexFileToDeletionFiles = new HashMap<>();
+        this.dataFileToIndexFile = new HashMap<>();
         for (String dataFile : deletionFiles.keySet()) {
             DeletionFile deletionFile = deletionFiles.get(dataFile);
             String indexFileName = new Path(deletionFile.path()).getName();
-            if (!indexFileToDeletionFiles.containsKey(indexFileName)) {
-                indexFileToDeletionFiles.put(indexFileName, new HashMap<>());
-            }
-            indexFileToDeletionFiles.get(indexFileName).put(dataFile, 
deletionFile);
+            indexFileToDeletionFiles
+                    .computeIfAbsent(indexFileName, k -> new HashMap<>())
+                    .put(dataFile, deletionFile);
             dataFileToIndexFile.put(dataFile, indexFileName);
         }
+        this.touchedIndexFiles = new HashSet<>();
     }
 
     @Override
@@ -111,7 +99,7 @@ public class AppendDeleteFileMaintainer implements 
BaseAppendDeleteFileMaintaine
     public DeletionVector getDeletionVector(String dataFile) {
         DeletionFile deletionFile = getDeletionFile(dataFile);
         if (deletionFile != null) {
-            return 
indexFileHandler.deletionVectorsIndex().readDeletionVector(deletionFile);
+            return dvIndexFile.readDeletionVector(deletionFile);
         }
         return null;
     }
@@ -129,28 +117,26 @@ public class AppendDeleteFileMaintainer implements 
BaseAppendDeleteFileMaintaine
 
     @Override
     public void notifyNewDeletionVector(String dataFile, DeletionVector 
deletionVector) {
-        DeletionVectorsIndexFile deletionVectorsIndexFile = 
indexFileHandler.deletionVectorsIndex();
         DeletionFile previous = notifyRemovedDeletionVector(dataFile);
         if (previous != null) {
-            
deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(previous));
+            deletionVector.merge(dvIndexFile.readDeletionVector(previous));
         }
-        maintainer.notifyNewDeletion(dataFile, deletionVector);
+        deletionVectors.put(dataFile, deletionVector);
     }
 
     @Override
     public List<IndexManifestEntry> persist() {
         List<IndexManifestEntry> result = writeUnchangedDeletionVector();
-        List<IndexManifestEntry> newIndexFileEntries =
-                maintainer.writeDeletionVectorsIndex().stream()
-                        .map(
-                                fileMeta ->
-                                        new IndexManifestEntry(
-                                                FileKind.ADD, partition, 
UNAWARE_BUCKET, fileMeta))
-                        .collect(Collectors.toList());
-        result.addAll(newIndexFileEntries);
+        dvIndexFile.writeWithRolling(deletionVectors).stream()
+                .map(this::toAddEntry)
+                .forEach(result::add);
         return result;
     }
 
+    private IndexManifestEntry toAddEntry(IndexFileMeta file) {
+        return new IndexManifestEntry(FileKind.ADD, partition, UNAWARE_BUCKET, 
file);
+    }
+
     public String getIndexFilePath(String dataFile) {
         DeletionFile deletionFile = getDeletionFile(dataFile);
         return deletionFile == null ? null : deletionFile.path();
@@ -158,7 +144,6 @@ public class AppendDeleteFileMaintainer implements 
BaseAppendDeleteFileMaintaine
 
     @VisibleForTesting
     List<IndexManifestEntry> writeUnchangedDeletionVector() {
-        DeletionVectorsIndexFile deletionVectorsIndexFile = 
indexFileHandler.deletionVectorsIndex();
         List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
         for (String indexFile : indexFileToDeletionFiles.keySet()) {
             if (touchedIndexFiles.contains(indexFile)) {
@@ -169,17 +154,10 @@ public class AppendDeleteFileMaintainer implements 
BaseAppendDeleteFileMaintaine
                         indexFileToDeletionFiles.get(indexFile);
                 if (!dataFileToDeletionFiles.isEmpty()) {
                     List<IndexFileMeta> newIndexFiles =
-                            indexFileHandler.writeDeletionVectorsIndex(
-                                    
deletionVectorsIndexFile.readDeletionVector(
-                                            dataFileToDeletionFiles));
+                            dvIndexFile.writeWithRolling(
+                                    
dvIndexFile.readDeletionVector(dataFileToDeletionFiles));
                     newIndexFiles.forEach(
-                            newIndexFile ->
-                                    newIndexEntries.add(
-                                            new IndexManifestEntry(
-                                                    FileKind.ADD,
-                                                    oldEntry.partition(),
-                                                    oldEntry.bucket(),
-                                                    newIndexFile)));
+                            newIndexFile -> 
newIndexEntries.add(toAddEntry(newIndexFile)));
                 }
 
                 // mark the touched index file as removed.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
index 6afffda970..ae3d3e8a97 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
@@ -20,8 +20,9 @@ package org.apache.paimon.deletionvectors.append;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.manifest.IndexManifestEntry;
@@ -29,11 +30,14 @@ import org.apache.paimon.table.source.DeletionFile;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
-import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /**
  * A maintainer to maintain deletion files for append table, the core methods:
@@ -63,15 +67,33 @@ public interface BaseAppendDeleteFileMaintainer {
         // overwrite the entire deletion file of the bucket when writing 
deletes.
         List<IndexFileMeta> indexFiles =
                 indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX, 
partition, bucket);
-        DeletionVectorsMaintainer maintainer =
-                new 
DeletionVectorsMaintainer.Factory(indexFileHandler).create(indexFiles);
+        BucketedDvMaintainer maintainer =
+                
BucketedDvMaintainer.factory(indexFileHandler).create(indexFiles);
         return new BucketedAppendDeleteFileMaintainer(partition, bucket, 
maintainer);
     }
 
     static AppendDeleteFileMaintainer forUnawareAppend(
             IndexFileHandler indexFileHandler, @Nullable Snapshot snapshot, 
BinaryRow partition) {
-        Map<String, DeletionFile> deletionFiles =
-                indexFileHandler.scanDVIndex(snapshot, partition, 
UNAWARE_BUCKET);
-        return new AppendDeleteFileMaintainer(indexFileHandler, partition, 
deletionFiles);
+        List<IndexManifestEntry> manifestEntries =
+                indexFileHandler.scan(snapshot, 
DELETION_VECTORS_INDEX).stream()
+                        .filter(e -> e.partition().equals(partition))
+                        .collect(Collectors.toList());
+        Map<String, DeletionFile> deletionFiles = new HashMap<>();
+        for (IndexManifestEntry file : manifestEntries) {
+            IndexFileMeta meta = file.indexFile();
+            LinkedHashMap<String, DeletionVectorMeta> dvMetas = 
meta.deletionVectorMetas();
+            checkNotNull(dvMetas);
+            for (DeletionVectorMeta dvMeta : dvMetas.values()) {
+                deletionFiles.put(
+                        dvMeta.dataFileName(),
+                        new DeletionFile(
+                                indexFileHandler.filePath(meta).toString(),
+                                dvMeta.offset(),
+                                dvMeta.length(),
+                                dvMeta.cardinality()));
+            }
+        }
+        return new AppendDeleteFileMaintainer(
+                indexFileHandler.deletionVectorsIndex(), partition, 
manifestEntries, deletionFiles);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java
index 847caae499..7245ebfe12 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java
@@ -19,23 +19,23 @@
 package org.apache.paimon.deletionvectors.append;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.stream.Collectors;
 
 /** A {@link BaseAppendDeleteFileMaintainer} of bucketed append table. */
 public class BucketedAppendDeleteFileMaintainer implements 
BaseAppendDeleteFileMaintainer {
 
     private final BinaryRow partition;
     private final int bucket;
-    private final DeletionVectorsMaintainer maintainer;
+    private final BucketedDvMaintainer maintainer;
 
     BucketedAppendDeleteFileMaintainer(
-            BinaryRow partition, int bucket, DeletionVectorsMaintainer 
maintainer) {
+            BinaryRow partition, int bucket, BucketedDvMaintainer maintainer) {
         this.partition = partition;
         this.bucket = bucket;
         this.maintainer = maintainer;
@@ -58,8 +58,11 @@ public class BucketedAppendDeleteFileMaintainer implements 
BaseAppendDeleteFileM
 
     @Override
     public List<IndexManifestEntry> persist() {
-        return maintainer.writeDeletionVectorsIndex().stream()
+        List<IndexManifestEntry> result = new ArrayList<>();
+        maintainer
+                .writeDeletionVectorsIndex()
                 .map(fileMeta -> new IndexManifestEntry(FileKind.ADD, 
partition, bucket, fileMeta))
-                .collect(Collectors.toList());
+                .ifPresent(result::add);
+        return result;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 1d18bde05f..38c4df0ae9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -25,20 +25,16 @@ import 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
-import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.utils.IntIterator;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 import org.apache.paimon.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -47,7 +43,6 @@ import java.util.Set;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Handle index files. */
 public class IndexFileHandler {
@@ -85,37 +80,6 @@ public class IndexFileHandler {
         return result.isEmpty() ? Optional.empty() : 
Optional.of(result.get(0));
     }
 
-    public Map<String, DeletionFile> scanDVIndex(
-            @Nullable Snapshot snapshot, BinaryRow partition, int bucket) {
-        if (snapshot == null) {
-            return Collections.emptyMap();
-        }
-        String indexManifest = snapshot.indexManifest();
-        if (indexManifest == null) {
-            return Collections.emptyMap();
-        }
-        Map<String, DeletionFile> result = new HashMap<>();
-        for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
-            IndexFileMeta meta = file.indexFile();
-            if (meta.indexType().equals(DELETION_VECTORS_INDEX)
-                    && file.partition().equals(partition)
-                    && file.bucket() == bucket) {
-                LinkedHashMap<String, DeletionVectorMeta> dvMetas = 
meta.deletionVectorMetas();
-                checkNotNull(dvMetas);
-                for (DeletionVectorMeta dvMeta : dvMetas.values()) {
-                    result.put(
-                            dvMeta.dataFileName(),
-                            new DeletionFile(
-                                    filePath(meta).toString(),
-                                    dvMeta.offset(),
-                                    dvMeta.length(),
-                                    dvMeta.cardinality()));
-                }
-            }
-        }
-        return result;
-    }
-
     public List<IndexManifestEntry> scan(String indexType) {
         return scan(snapshotManager.latestSnapshot(), indexType);
     }
@@ -189,18 +153,10 @@ public class IndexFileHandler {
 
     public List<IndexManifestEntry> scanEntries(
             Snapshot snapshot, String indexType, Set<BinaryRow> partitions) {
-        if (snapshot == null) {
-            return Collections.emptyList();
-        }
-        String indexManifest = snapshot.indexManifest();
-        if (indexManifest == null) {
-            return Collections.emptyList();
-        }
-
+        List<IndexManifestEntry> manifestEntries = scan(snapshot, indexType);
         List<IndexManifestEntry> result = new ArrayList<>();
-        for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
-            if (file.indexFile().indexType().equals(indexType)
-                    && partitions.contains(file.partition())) {
+        for (IndexManifestEntry file : manifestEntries) {
+            if (partitions.contains(file.partition())) {
                 result.add(file);
             }
         }
@@ -289,9 +245,4 @@ public class IndexFileHandler {
         }
         return deletionVectorsIndex.readAllDeletionVectors(fileMetas);
     }
-
-    public List<IndexFileMeta> writeDeletionVectorsIndex(
-            Map<String, DeletionVector> deletionVectors) {
-        return deletionVectorsIndex.write(deletionVectors);
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
index 0ab0981963..48d0a0ca24 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.mergetree.compact;
 
 import org.apache.paimon.compact.CompactUnit;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.RecordLevelExpire;
 import org.apache.paimon.mergetree.LevelSortedRun;
@@ -54,7 +54,7 @@ public interface CompactStrategy {
             int numLevels,
             List<LevelSortedRun> runs,
             @Nullable RecordLevelExpire recordLevelExpire,
-            @Nullable DeletionVectorsMaintainer dvMaintainer,
+            @Nullable BucketedDvMaintainer dvMaintainer,
             boolean forceRewriteAllFiles) {
         int maxLevel = numLevels - 1;
         if (runs.isEmpty()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index ba427f7e92..6c9376ae33 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -21,7 +21,7 @@ package org.apache.paimon.mergetree.compact;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue;
 import org.apache.paimon.types.RowKind;
@@ -64,7 +64,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
     private final KeyValue reusedAfter = new KeyValue();
     @Nullable private final RecordEqualiser valueEqualiser;
     private final LookupStrategy lookupStrategy;
-    private final @Nullable DeletionVectorsMaintainer 
deletionVectorsMaintainer;
+    private final @Nullable BucketedDvMaintainer deletionVectorsMaintainer;
     private final Comparator<KeyValue> comparator;
 
     public LookupChangelogMergeFunctionWrapper(
@@ -72,7 +72,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
             Function<InternalRow, T> lookup,
             @Nullable RecordEqualiser valueEqualiser,
             LookupStrategy lookupStrategy,
-            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
+            @Nullable BucketedDvMaintainer deletionVectorsMaintainer,
             @Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
         MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create();
         checkArgument(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index feb6d0b10b..896fba0d60 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -23,7 +23,7 @@ import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.codegen.RecordEqualiser;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.FileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
@@ -56,7 +56,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
     private final LookupLevels<T> lookupLevels;
     private final MergeFunctionWrapperFactory<T> wrapperFactory;
     private final boolean noSequenceField;
-    @Nullable private final DeletionVectorsMaintainer dvMaintainer;
+    @Nullable private final BucketedDvMaintainer dvMaintainer;
     private final IntFunction<String> level2FileFormat;
 
     public LookupMergeTreeCompactRewriter(
@@ -71,7 +71,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
             MergeSorter mergeSorter,
             MergeFunctionWrapperFactory<T> wrapperFactory,
             boolean produceChangelog,
-            @Nullable DeletionVectorsMaintainer dvMaintainer,
+            @Nullable BucketedDvMaintainer dvMaintainer,
             CoreOptions options) {
         super(
                 maxLevel,
@@ -156,7 +156,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
                 MergeFunctionFactory<KeyValue> mfFactory,
                 int outputLevel,
                 LookupLevels<T> lookupLevels,
-                @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer);
+                @Nullable BucketedDvMaintainer deletionVectorsMaintainer);
     }
 
     /** A normal {@link MergeFunctionWrapperFactory} to create lookup wrapper. 
*/
@@ -181,7 +181,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
                 MergeFunctionFactory<KeyValue> mfFactory,
                 int outputLevel,
                 LookupLevels<T> lookupLevels,
-                @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) 
{
+                @Nullable BucketedDvMaintainer deletionVectorsMaintainer) {
             return new LookupChangelogMergeFunctionWrapper<>(
                     mfFactory,
                     key -> {
@@ -207,7 +207,7 @@ public class LookupMergeTreeCompactRewriter<T> extends 
ChangelogMergeTreeRewrite
                 MergeFunctionFactory<KeyValue> mfFactory,
                 int outputLevel,
                 LookupLevels<Boolean> lookupLevels,
-                @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) 
{
+                @Nullable BucketedDvMaintainer deletionVectorsMaintainer) {
             return new FirstRowMergeFunctionWrapper(
                     mfFactory,
                     key -> {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 987fa06a30..f0909565ee 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -26,7 +26,7 @@ import org.apache.paimon.compact.CompactResult;
 import org.apache.paimon.compact.CompactTask;
 import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.RecordLevelExpire;
 import org.apache.paimon.mergetree.LevelSortedRun;
@@ -63,7 +63,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
     private final CompactRewriter rewriter;
 
     @Nullable private final CompactionMetrics.Reporter metricsReporter;
-    @Nullable private final DeletionVectorsMaintainer dvMaintainer;
+    @Nullable private final BucketedDvMaintainer dvMaintainer;
     private final boolean lazyGenDeletionFile;
     private final boolean needLookup;
     private final boolean forceRewriteAllFiles;
@@ -79,7 +79,7 @@ public class MergeTreeCompactManager extends 
CompactFutureManager {
             int numSortedRunStopTrigger,
             CompactRewriter rewriter,
             @Nullable CompactionMetrics.Reporter metricsReporter,
-            @Nullable DeletionVectorsMaintainer dvMaintainer,
+            @Nullable BucketedDvMaintainer dvMaintainer,
             boolean lazyGenDeletionFile,
             boolean needLookup,
             @Nullable RecordLevelExpire recordLevelExpire,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 2e61c3d2f1..15af7eb4fd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -24,7 +24,7 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.compact.CompactDeletionFile;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.index.DynamicBucketIndexMaintainer;
 import org.apache.paimon.index.IndexFileHandler;
@@ -73,7 +73,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
 
     private final int writerNumberMax;
     @Nullable private final DynamicBucketIndexMaintainer.Factory 
dbMaintainerFactory;
-    @Nullable private final DeletionVectorsMaintainer.Factory 
dvMaintainerFactory;
+    @Nullable private final BucketedDvMaintainer.Factory dvMaintainerFactory;
     private final int numBuckets;
     private final RowType partitionType;
 
@@ -95,7 +95,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             @Nullable DynamicBucketIndexMaintainer.Factory dbMaintainerFactory,
-            @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+            @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
             String tableName,
             CoreOptions options,
             RowType partitionType) {
@@ -430,7 +430,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                 dbMaintainerFactory == null
                         ? null
                         : 
dbMaintainerFactory.create(restored.dynamicBucketIndex());
-        DeletionVectorsMaintainer dvMaintainer =
+        BucketedDvMaintainer dvMaintainer =
                 dvMaintainerFactory == null
                         ? null
                         : 
dvMaintainerFactory.create(restored.deleteVectorsIndex());
@@ -524,7 +524,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
             long restoredMaxSeqNumber,
             @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor,
-            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer);
+            @Nullable BucketedDvMaintainer deletionVectorsMaintainer);
 
     // force buffer spill to avoid out of memory in batch mode
     protected void forceBufferSpill() throws Exception {}
@@ -538,7 +538,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
         public final RecordWriter<T> writer;
         public final int totalBuckets;
         @Nullable public final DynamicBucketIndexMaintainer 
dynamicBucketMaintainer;
-        @Nullable public final DeletionVectorsMaintainer 
deletionVectorsMaintainer;
+        @Nullable public final BucketedDvMaintainer deletionVectorsMaintainer;
         protected final long baseSnapshotId;
         protected long lastModifiedCommitIdentifier;
 
@@ -546,7 +546,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                 RecordWriter<T> writer,
                 int totalBuckets,
                 @Nullable DynamicBucketIndexMaintainer dynamicBucketMaintainer,
-                @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
+                @Nullable BucketedDvMaintainer deletionVectorsMaintainer,
                 Long baseSnapshotId) {
             this.writer = writer;
             this.totalBuckets = totalBuckets;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java
index 3f1e701255..2df7c36934 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java
@@ -23,7 +23,7 @@ import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compact.NoopCompactManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.types.RowType;
@@ -51,7 +51,6 @@ public class AppendFileStoreWrite extends 
BaseAppendFileStoreWrite {
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options,
-            @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
             String tableName) {
         super(
                 fileIO,
@@ -63,7 +62,7 @@ public class AppendFileStoreWrite extends 
BaseAppendFileStoreWrite {
                 snapshotManager,
                 scan,
                 options,
-                dvMaintainerFactory,
+                null,
                 tableName);
         super.withIgnorePreviousFiles(true);
     }
@@ -74,7 +73,7 @@ public class AppendFileStoreWrite extends 
BaseAppendFileStoreWrite {
             int bucket,
             List<DataFileMeta> restoredFiles,
             ExecutorService compactExecutor,
-            @Nullable DeletionVectorsMaintainer dvMaintainer) {
+            @Nullable BucketedDvMaintainer dvMaintainer) {
         return new NoopCompactManager();
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index dca05a6522..3bd8264831 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -24,8 +24,8 @@ import org.apache.paimon.append.AppendOnlyWriter;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.fileindex.FileIndexOptions;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
@@ -88,7 +88,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options,
-            @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+            @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
             String tableName) {
         super(snapshotManager, scan, options, partitionType, null, 
dvMaintainerFactory, tableName);
         this.fileIO = fileIO;
@@ -111,7 +111,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
             long restoredMaxSeqNumber,
             @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor,
-            @Nullable DeletionVectorsMaintainer dvMaintainer) {
+            @Nullable BucketedDvMaintainer dvMaintainer) {
         return new AppendOnlyWriter(
                 fileIO,
                 ioManager,
@@ -161,7 +161,7 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
             int bucket,
             List<DataFileMeta> restoredFiles,
             ExecutorService compactExecutor,
-            @Nullable DeletionVectorsMaintainer dvMaintainer);
+            @Nullable BucketedDvMaintainer dvMaintainer);
 
     public List<DataFileMeta> compactRewrite(
             BinaryRow partition,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
index 9fffc0ce7b..71f10d4b3d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
@@ -24,8 +24,8 @@ import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compact.NoopCompactManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.types.RowType;
@@ -54,7 +54,7 @@ public class BucketedAppendFileStoreWrite extends 
BaseAppendFileStoreWrite {
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             CoreOptions options,
-            @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+            @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
             String tableName) {
         super(
                 fileIO,
@@ -77,7 +77,7 @@ public class BucketedAppendFileStoreWrite extends 
BaseAppendFileStoreWrite {
             int bucket,
             List<DataFileMeta> restoredFiles,
             ExecutorService compactExecutor,
-            @Nullable DeletionVectorsMaintainer dvMaintainer) {
+            @Nullable BucketedDvMaintainer dvMaintainer) {
         if (options.writeOnly()) {
             return new NoopCompactManager();
         } else {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 4c75fdbb73..b268f7b7a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -20,7 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.FileStore;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.index.DynamicBucketIndexMaintainer;
 import org.apache.paimon.io.DataFileMeta;
@@ -156,7 +156,7 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
         protected final List<DataFileMeta> dataFiles;
         protected final long maxSequenceNumber;
         @Nullable protected final DynamicBucketIndexMaintainer indexMaintainer;
-        @Nullable protected final DeletionVectorsMaintainer 
deletionVectorsMaintainer;
+        @Nullable protected final BucketedDvMaintainer 
deletionVectorsMaintainer;
         protected final CommitIncrement commitIncrement;
 
         protected State(
@@ -168,7 +168,7 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
                 Collection<DataFileMeta> dataFiles,
                 long maxSequenceNumber,
                 @Nullable DynamicBucketIndexMaintainer indexMaintainer,
-                @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
+                @Nullable BucketedDvMaintainer deletionVectorsMaintainer,
                 CommitIncrement commitIncrement) {
             this.partition = partition;
             this.bucket = bucket;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 4e9e558d04..afd3fb906f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -29,8 +29,8 @@ import org.apache.paimon.compact.NoopCompactManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -131,7 +131,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             SnapshotManager snapshotManager,
             FileStoreScan scan,
             @Nullable DynamicBucketIndexMaintainer.Factory dbMaintainerFactory,
-            @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+            @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
             CoreOptions options,
             KeyValueFieldsExtractor extractor,
             String tableName) {
@@ -185,7 +185,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             long restoredMaxSeqNumber,
             @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor,
-            @Nullable DeletionVectorsMaintainer dvMaintainer) {
+            @Nullable BucketedDvMaintainer dvMaintainer) {
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Creating merge tree writer for partition {} bucket {} 
from restored files {}",
@@ -260,7 +260,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             CompactStrategy compactStrategy,
             ExecutorService compactExecutor,
             Levels levels,
-            @Nullable DeletionVectorsMaintainer dvMaintainer) {
+            @Nullable BucketedDvMaintainer dvMaintainer) {
         if (options.writeOnly()) {
             return new NoopCompactManager();
         } else {
@@ -299,7 +299,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             Comparator<InternalRow> keyComparator,
             @Nullable FieldsComparator userDefinedSeqComparator,
             Levels levels,
-            @Nullable DeletionVectorsMaintainer dvMaintainer) {
+            @Nullable BucketedDvMaintainer dvMaintainer) {
         DeletionVector.Factory dvFactory = 
DeletionVector.factory(dvMaintainer);
         FileReaderFactory<KeyValue> readerFactory =
                 readerFactoryBuilder.build(partition, bucket, dvFactory);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index c91a664fad..06263d4ec6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.index.DynamicBucketIndexMaintainer;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
@@ -65,7 +65,7 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
             CoreOptions options,
             RowType partitionType,
             @Nullable DynamicBucketIndexMaintainer.Factory dbMaintainerFactory,
-            @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+            @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
             String tableName) {
         super(
                 snapshotManager,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
index da7287ad96..ac9a851cb2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -21,7 +21,7 @@ package org.apache.paimon.postpone;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.avro.AvroSchemaConverter;
 import org.apache.paimon.fs.FileIO;
@@ -180,7 +180,7 @@ public class PostponeBucketFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue>
             long restoredMaxSeqNumber,
             @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor,
-            @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+            @Nullable BucketedDvMaintainer deletionVectorsMaintainer) {
         Preconditions.checkArgument(bucket == BucketMode.POSTPONE_BUCKET);
         Preconditions.checkArgument(
                 restoreFiles.isEmpty(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index a67b5c0a97..55ffdcbe53 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -19,7 +19,7 @@
 package org.apache.paimon;
 
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer;
 import 
org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainerHelper;
 import org.apache.paimon.fs.FileIO;
@@ -44,6 +44,7 @@ import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.TraceableFileIO;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -126,10 +127,9 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
                 fileHandler, partition, dataFileToDeletionFiles);
     }
 
-    public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow 
partition, int bucket) {
+    public BucketedDvMaintainer createOrRestoreDVMaintainer(BinaryRow 
partition, int bucket) {
         Snapshot latestSnapshot = snapshotManager().latestSnapshot();
-        DeletionVectorsMaintainer.Factory factory =
-                new DeletionVectorsMaintainer.Factory(fileHandler);
+        BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
         List<IndexFileMeta> indexFiles =
                 fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX, 
partition, bucket);
         return factory.create(indexFiles);
@@ -137,19 +137,21 @@ public class TestAppendFileStore extends 
AppendOnlyFileStore {
 
     public CommitMessageImpl writeDVIndexFiles(
             BinaryRow partition, int bucket, Map<String, List<Integer>> 
dataFileToPositions) {
-        DeletionVectorsMaintainer dvMaintainer = 
createOrRestoreDVMaintainer(partition, bucket);
+        BucketedDvMaintainer dvMaintainer = 
createOrRestoreDVMaintainer(partition, bucket);
         for (Map.Entry<String, List<Integer>> entry : 
dataFileToPositions.entrySet()) {
             for (Integer pos : entry.getValue()) {
                 dvMaintainer.notifyNewDeletion(entry.getKey(), pos);
             }
         }
+        List<IndexFileMeta> indexFiles = new ArrayList<>();
+        dvMaintainer.writeDeletionVectorsIndex().ifPresent(indexFiles::add);
         return new CommitMessageImpl(
                 partition,
                 bucket,
                 options().bucket(),
                 DataIncrement.emptyIncrement(),
                 CompactIncrement.emptyIncrement(),
-                new IndexIncrement(dvMaintainer.writeDeletionVectorsIndex()));
+                new IndexIncrement(indexFiles));
     }
 
     public static TestAppendFileStore createAppendStore(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
similarity index 83%
rename from 
paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
index 7e82ad8d65..2128ae8714 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
@@ -51,8 +51,8 @@ import static java.util.Collections.emptyList;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link DeletionVectorsMaintainer}. */
-public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase {
+/** Test for {@link BucketedDvMaintainer}. */
+public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase {
     private IndexFileHandler fileHandler;
 
     @ParameterizedTest
@@ -60,9 +60,8 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
     public void test0(boolean bitmap64) {
         initIndexHandler(bitmap64);
 
-        DeletionVectorsMaintainer.Factory factory =
-                new DeletionVectorsMaintainer.Factory(fileHandler);
-        DeletionVectorsMaintainer dvMaintainer = factory.create(emptyList());
+        BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
+        BucketedDvMaintainer dvMaintainer = factory.create(emptyList());
         assertThat(dvMaintainer.bitmap64).isEqualTo(bitmap64);
 
         dvMaintainer.notifyNewDeletion("f1", 1);
@@ -72,9 +71,10 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
 
         assertThat(dvMaintainer.deletionVectorOf("f1")).isPresent();
         assertThat(dvMaintainer.deletionVectorOf("f3")).isEmpty();
-        List<IndexFileMeta> fileMetas = 
dvMaintainer.writeDeletionVectorsIndex();
+        IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
 
-        Map<String, DeletionVector> deletionVectors = 
fileHandler.readAllDeletionVectors(fileMetas);
+        Map<String, DeletionVector> deletionVectors =
+                
fileHandler.readAllDeletionVectors(Collections.singletonList(file));
         assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue();
         assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse();
         assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse();
@@ -87,10 +87,9 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
     public void test1(boolean bitmap64) {
         initIndexHandler(bitmap64);
 
-        DeletionVectorsMaintainer.Factory factory =
-                new DeletionVectorsMaintainer.Factory(fileHandler);
+        BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
 
-        DeletionVectorsMaintainer dvMaintainer = factory.create();
+        BucketedDvMaintainer dvMaintainer = factory.create();
         DeletionVector deletionVector1 = createDeletionVector(bitmap64);
         deletionVector1.delete(1);
         deletionVector1.delete(3);
@@ -98,8 +97,7 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
         dvMaintainer.notifyNewDeletion("f1", deletionVector1);
         assertThat(dvMaintainer.bitmap64()).isEqualTo(bitmap64);
 
-        List<IndexFileMeta> fileMetas1 = 
dvMaintainer.writeDeletionVectorsIndex();
-        assertThat(fileMetas1.size()).isEqualTo(1);
+        IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
         CommitMessage commitMessage =
                 new CommitMessageImpl(
                         BinaryRow.EMPTY_ROW,
@@ -107,7 +105,7 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
                         1,
                         DataIncrement.emptyIncrement(),
                         CompactIncrement.emptyIncrement(),
-                        new IndexIncrement(fileMetas1));
+                        new IndexIncrement(Collections.singletonList(file)));
         BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
         commit.commit(Collections.singletonList(commitMessage));
 
@@ -122,8 +120,7 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
         deletionVector2.delete(2);
         dvMaintainer.notifyNewDeletion("f1", deletionVector2);
 
-        List<IndexFileMeta> fileMetas2 = 
dvMaintainer.writeDeletionVectorsIndex();
-        assertThat(fileMetas2.size()).isEqualTo(1);
+        file = dvMaintainer.writeDeletionVectorsIndex().get();
         commitMessage =
                 new CommitMessageImpl(
                         BinaryRow.EMPTY_ROW,
@@ -131,7 +128,7 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
                         1,
                         DataIncrement.emptyIncrement(),
                         CompactIncrement.emptyIncrement(),
-                        new IndexIncrement(fileMetas2));
+                        new IndexIncrement(Collections.singletonList(file)));
         commit = table.newBatchWriteBuilder().newCommit();
         commit.commit(Collections.singletonList(commitMessage));
 
@@ -149,9 +146,8 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
     public void testCompactDeletion(boolean bitmap64) throws IOException {
         initIndexHandler(bitmap64);
 
-        DeletionVectorsMaintainer.Factory factory =
-                new DeletionVectorsMaintainer.Factory(fileHandler);
-        DeletionVectorsMaintainer dvMaintainer = factory.create(emptyList());
+        BucketedDvMaintainer.Factory factory = 
BucketedDvMaintainer.factory(fileHandler);
+        BucketedDvMaintainer dvMaintainer = factory.create(emptyList());
 
         File indexDir = new File(tempPath.toFile(), "/default.db/T/index");
 
@@ -191,17 +187,15 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
     public void testReadAndWriteMixedDv(boolean bitmap64) {
         // write first kind dv
         initIndexHandler(bitmap64);
-        DeletionVectorsMaintainer.Factory factory1 =
-                new DeletionVectorsMaintainer.Factory(fileHandler);
-        DeletionVectorsMaintainer dvMaintainer1 = factory1.create();
+        BucketedDvMaintainer.Factory factory1 = 
BucketedDvMaintainer.factory(fileHandler);
+        BucketedDvMaintainer dvMaintainer1 = factory1.create();
         dvMaintainer1.notifyNewDeletion("f1", 1);
         dvMaintainer1.notifyNewDeletion("f1", 3);
         dvMaintainer1.notifyNewDeletion("f2", 1);
         dvMaintainer1.notifyNewDeletion("f2", 3);
         assertThat(dvMaintainer1.bitmap64()).isEqualTo(bitmap64);
 
-        List<IndexFileMeta> fileMetas1 = 
dvMaintainer1.writeDeletionVectorsIndex();
-        assertThat(fileMetas1.size()).isEqualTo(1);
+        IndexFileMeta file = dvMaintainer1.writeDeletionVectorsIndex().get();
         CommitMessage commitMessage1 =
                 new CommitMessageImpl(
                         BinaryRow.EMPTY_ROW,
@@ -209,21 +203,20 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
                         1,
                         DataIncrement.emptyIncrement(),
                         CompactIncrement.emptyIncrement(),
-                        new IndexIncrement(fileMetas1));
+                        new IndexIncrement(Collections.singletonList(file)));
         BatchTableCommit commit1 = table.newBatchWriteBuilder().newCommit();
         commit1.commit(Collections.singletonList(commitMessage1));
 
         // write second kind dv
         initIndexHandler(!bitmap64);
-        DeletionVectorsMaintainer.Factory factory2 =
-                new DeletionVectorsMaintainer.Factory(fileHandler);
+        BucketedDvMaintainer.Factory factory2 = 
BucketedDvMaintainer.factory(fileHandler);
         List<IndexFileMeta> indexFiles =
                 fileHandler.scan(
                         table.latestSnapshot().get(),
                         DELETION_VECTORS_INDEX,
                         BinaryRow.EMPTY_ROW,
                         0);
-        DeletionVectorsMaintainer dvMaintainer2 = factory2.create(indexFiles);
+        BucketedDvMaintainer dvMaintainer2 = factory2.create(indexFiles);
         dvMaintainer2.notifyNewDeletion("f1", 10);
         dvMaintainer2.notifyNewDeletion("f3", 1);
         dvMaintainer2.notifyNewDeletion("f3", 3);
@@ -238,8 +231,7 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
         assertThat(dvs.get("f3"))
                 .isInstanceOf(bitmap64 ? BitmapDeletionVector.class : 
Bitmap64DeletionVector.class);
 
-        List<IndexFileMeta> fileMetas2 = 
dvMaintainer2.writeDeletionVectorsIndex();
-        assertThat(fileMetas2.size()).isEqualTo(1);
+        file = dvMaintainer2.writeDeletionVectorsIndex().get();
         CommitMessage commitMessage2 =
                 new CommitMessageImpl(
                         BinaryRow.EMPTY_ROW,
@@ -247,7 +239,7 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
                         1,
                         DataIncrement.emptyIncrement(),
                         CompactIncrement.emptyIncrement(),
-                        new IndexIncrement(fileMetas2));
+                        new IndexIncrement(Collections.singletonList(file)));
         BatchTableCommit commit2 = table.newBatchWriteBuilder().newCommit();
         commit2.commit(Collections.singletonList(commitMessage2));
 
@@ -278,8 +270,8 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
         fileHandler = table.store().newIndexFileHandler();
     }
 
-    public static DeletionVectorsMaintainer createOrRestore(
-            DeletionVectorsMaintainer.Factory factory,
+    public static BucketedDvMaintainer createOrRestore(
+            BucketedDvMaintainer.Factory factory,
             @Nullable Snapshot snapshot,
             BinaryRow partition) {
         IndexFileHandler handler = factory.indexFileHandler();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
index 2b55cba306..d30462c25d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -72,7 +72,7 @@ public class DeletionVectorsIndexFileTest {
         index3.delete(3);
         deleteMap.put("file33.parquet", index3);
 
-        List<IndexFileMeta> indexFiles = 
deletionVectorsIndexFile.write(deleteMap);
+        List<IndexFileMeta> indexFiles = 
deletionVectorsIndexFile.writeWithRolling(deleteMap);
         assertThat(indexFiles.size()).isEqualTo(1);
 
         // read
@@ -110,7 +110,7 @@ public class DeletionVectorsIndexFileTest {
         }
 
         // read
-        List<IndexFileMeta> indexFiles = 
deletionVectorsIndexFile.write(deleteMap);
+        List<IndexFileMeta> indexFiles = 
deletionVectorsIndexFile.writeWithRolling(deleteMap);
         assertThat(indexFiles.size()).isEqualTo(1);
         Map<String, DeletionVector> dvs =
                 deletionVectorsIndexFile.readAllDeletionVectors(indexFiles);
@@ -142,7 +142,7 @@ public class DeletionVectorsIndexFileTest {
             fileToCardinality.put("f" + i, index.getCardinality());
             fileToDV.put("f" + i, index);
         }
-        List<IndexFileMeta> indexFiles = 
deletionVectorsIndexFile.write(fileToDV);
+        List<IndexFileMeta> indexFiles = 
deletionVectorsIndexFile.writeWithRolling(fileToDV);
 
         // read
         assertThat(indexFiles.size()).isEqualTo(1);
@@ -174,7 +174,7 @@ public class DeletionVectorsIndexFileTest {
             fileToCardinality.put("f" + i, index.getCardinality());
             fileToDV.put("f" + i, index);
         }
-        List<IndexFileMeta> indexFiles = 
deletionVectorsIndexFile.write(fileToDV);
+        List<IndexFileMeta> indexFiles = 
deletionVectorsIndexFile.writeWithRolling(fileToDV);
 
         // assert 1
         assertThat(indexFiles.size()).isEqualTo(3);
@@ -196,7 +196,7 @@ public class DeletionVectorsIndexFileTest {
             fileToCardinality.put("f" + i, index.getCardinality());
             fileToDV.put("f" + i, index);
         }
-        indexFiles = deletionVectorsIndexFile.write(fileToDV);
+        indexFiles = deletionVectorsIndexFile.writeWithRolling(fileToDV);
 
         // assert 2
         assertThat(indexFiles.size()).isGreaterThan(1);
@@ -226,7 +226,7 @@ public class DeletionVectorsIndexFileTest {
             deleteMap1.put(String.format("file%s.parquet", i), index);
             deleteInteger.put(String.format("file%s.parquet", i), num);
         }
-        List<IndexFileMeta> indexFiles1 = 
v1DeletionVectorsIndexFile.write(deleteMap1);
+        List<IndexFileMeta> indexFiles1 = 
v1DeletionVectorsIndexFile.writeWithRolling(deleteMap1);
         assertThat(indexFiles1.size()).isEqualTo(1);
 
         // write v2 dv
@@ -238,7 +238,7 @@ public class DeletionVectorsIndexFileTest {
             deleteMap2.put(String.format("file%s.parquet", i), index);
             deleteInteger.put(String.format("file%s.parquet", i), num);
         }
-        List<IndexFileMeta> indexFiles2 = 
v2DeletionVectorsIndexFile.write(deleteMap2);
+        List<IndexFileMeta> indexFiles2 = 
v2DeletionVectorsIndexFile.writeWithRolling(deleteMap2);
         assertThat(indexFiles2.size()).isEqualTo(1);
 
         List<IndexFileMeta> totalIndexFiles =
@@ -274,7 +274,7 @@ public class DeletionVectorsIndexFileTest {
         index1.delete(100);
         deleteMap.put("file1.parquet", index1);
 
-        List<IndexFileMeta> indexFiles = 
deletionVectorsIndexFile.write(deleteMap);
+        List<IndexFileMeta> indexFiles = 
deletionVectorsIndexFile.writeWithRolling(deleteMap);
         assertThat(indexFiles.size()).isEqualTo(1);
 
         IndexFileMeta indexFileMeta = indexFiles.get(0);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
index c6048b95f4..892d219278 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
@@ -19,10 +19,14 @@
 package org.apache.paimon.deletionvectors.append;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.table.source.DeletionFile;
 
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /** Helper for {@link BaseAppendDeleteFileMaintainer}. */
 public class AppendDeletionFileMaintainerHelper {
@@ -31,6 +35,19 @@ public class AppendDeletionFileMaintainerHelper {
             IndexFileHandler indexFileHandler,
             BinaryRow partition,
             Map<String, DeletionFile> deletionFiles) {
-        return new AppendDeleteFileMaintainer(indexFileHandler, partition, 
deletionFiles);
+        List<String> touchedIndexFileNames =
+                deletionFiles.values().stream()
+                        .map(deletionFile -> new 
Path(deletionFile.path()).getName())
+                        .distinct()
+                        .collect(Collectors.toList());
+        List<IndexManifestEntry> manifests =
+                indexFileHandler.scanEntries().stream()
+                        .filter(
+                                indexManifestEntry ->
+                                        touchedIndexFileNames.contains(
+                                                
indexManifestEntry.indexFile().fileName()))
+                        .collect(Collectors.toList());
+        return new AppendDeleteFileMaintainer(
+                indexFileHandler.deletionVectorsIndex(), partition, manifests, 
deletionFiles);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 23e942d3f4..5701106be4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -25,8 +25,8 @@ import org.apache.paimon.TestAppendFileStore;
 import org.apache.paimon.TestFileStore;
 import org.apache.paimon.TestKeyValueGenerator;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.index.IndexFileHandler;
@@ -890,8 +890,7 @@ public class FileStoreCommitTest {
 
         // assert 1
         assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 
0).size()).isEqualTo(2);
-        DeletionVectorsMaintainer maintainer =
-                store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
+        BucketedDvMaintainer maintainer = 
store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
         Map<String, DeletionVector> dvs = maintainer.deletionVectors();
         assertThat(dvs.size()).isEqualTo(2);
         assertThat(dvs.get("f2").isDeleted(2)).isTrue();
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index f85213da7c..445d1507d9 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark.sql
 
 import org.apache.paimon.data.BinaryRow
-import org.apache.paimon.deletionvectors.{DeletionVector, 
DeletionVectorsMaintainer, DeletionVectorsMaintainerTest}
+import org.apache.paimon.deletionvectors.{BucketedDvMaintainer, 
BucketedDvMaintainerTest, DeletionVector}
 import org.apache.paimon.fs.Path
 import org.apache.paimon.spark.{PaimonSparkTestBase, PaimonSplitScan}
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
@@ -117,7 +117,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelpe
 
           val table = loadTable("target")
           val dvMaintainerFactory =
-            new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+            BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
           runAndCheckSplitScan(s"""
                                   |MERGE INTO target
                                   |USING source
@@ -167,7 +167,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelpe
 
           val table = loadTable("T")
           val dvMaintainerFactory =
-            new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+            BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
 
           spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')")
           val deletionVectors1 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
@@ -239,7 +239,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelpe
 
           val table = loadTable("T")
           val dvMaintainerFactory =
-            new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+            BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
 
           spark.sql(
             "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', 
'2025'), (4, 'd', '2025')")
@@ -326,7 +326,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelpe
 
           val table = loadTable("T")
           val dvMaintainerFactory =
-            new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+            BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
 
           spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')")
           val deletionVectors1 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
@@ -383,7 +383,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelpe
 
           val table = loadTable("T")
           val dvMaintainerFactory =
-            new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+            BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
 
           def getDeletionVectors(ptValues: Seq[String]): Map[String, 
DeletionVector] = {
             getLatestDeletionVectors(
@@ -467,7 +467,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelpe
         Row(1, "a_new1") :: Row(2, "b") :: Row(3, "c_new1") :: Nil)
 
       val dvMaintainerFactory =
-        new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+        BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
 
       val deletionVectors1 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
       // 1, 3 deleted, their row positions are 0, 2
@@ -705,17 +705,17 @@ class DeletionVectorTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelpe
 
   private def getAllLatestDeletionVectors(
       table: FileStoreTable,
-      dvMaintainerFactory: DeletionVectorsMaintainer.Factory): Map[String, 
DeletionVector] = {
+      dvMaintainerFactory: BucketedDvMaintainer.Factory): Map[String, 
DeletionVector] = {
     getLatestDeletionVectors(table, dvMaintainerFactory, 
Seq(BinaryRow.EMPTY_ROW))
   }
 
   private def getLatestDeletionVectors(
       table: FileStoreTable,
-      dvMaintainerFactory: DeletionVectorsMaintainer.Factory,
+      dvMaintainerFactory: BucketedDvMaintainer.Factory,
       partitions: Seq[BinaryRow]): Map[String, DeletionVector] = {
     partitions.flatMap {
       partition =>
-        DeletionVectorsMaintainerTest
+        BucketedDvMaintainerTest
           .createOrRestore(
             dvMaintainerFactory,
             table.snapshotManager().latestSnapshot(),


Reply via email to