This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e8acd22b9 [core] Introduce deletedIndexFiles in IndexIncrement and 
allow multi index files in same bucket (#3368)
e8acd22b9 is described below

commit e8acd22b995d21ac37cc492fa8c1c66abcb5d44d
Author: Yann Byron <[email protected]>
AuthorDate: Wed May 22 21:59:56 2024 +0800

    [core] Introduce deletedIndexFiles in IndexIncrement and allow multi index 
files in same bucket (#3368)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |   3 +-
 .../deletionvectors/DeletionVectorsIndexFile.java  |  13 +-
 .../deletionvectors/DeletionVectorsMaintainer.java |  11 +-
 .../apache/paimon/index/HashIndexMaintainer.java   |   2 +-
 .../org/apache/paimon/index/IndexFileHandler.java  |  55 +++---
 .../org/apache/paimon/io/CompactIncrement.java     |   6 +
 .../java/org/apache/paimon/io/IndexIncrement.java  |  31 ++-
 .../apache/paimon/manifest/IndexManifestEntry.java |  55 ------
 .../apache/paimon/manifest/IndexManifestFile.java  |  33 +---
 .../paimon/manifest/IndexManifestFileHandler.java  | 211 +++++++++++++++++++++
 .../paimon/operation/FileStoreCommitImpl.java      |  29 ++-
 .../paimon/table/sink/CommitMessageSerializer.java |   5 +-
 .../apache/paimon/table/sink/TableCommitImpl.java  |   4 +
 .../table/source/snapshot/SnapshotReaderImpl.java  |  52 ++---
 .../org/apache/paimon/TestAppendFileStore.java     | 170 +++++++++++++++++
 .../DeletionVectorsIndexFileTest.java              |  14 +-
 .../DeletionVectorsMaintainerTest.java             |  62 +++++-
 .../paimon/index/IndexFileMetaSerializerTest.java  |  39 ++--
 .../manifest/IndexManifestFileHandlerTest.java     | 112 +++++++++++
 .../paimon/operation/FileStoreCommitTest.java      |  60 +++++-
 .../table/sink/CommitMessageSerializerTest.java    |   4 +-
 21 files changed, 789 insertions(+), 182 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 8cbf433f6..7528b393d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -191,7 +191,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 partitionType.getFieldCount() > 0 && 
options.dynamicPartitionOverwrite(),
                 newKeyComparator(),
                 options.branch(),
-                newStatsFileHandler());
+                newStatsFileHandler(),
+                bucketMode());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index 733fb7825..c0cc09039 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -22,6 +22,7 @@ import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.index.IndexFile;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 
@@ -48,18 +49,16 @@ public class DeletionVectorsIndexFile extends IndexFile {
     /**
      * Reads all deletion vectors from a specified file.
      *
-     * @param fileName The name of the file from which to read the deletion 
vectors.
-     * @param deletionVectorRanges A map where the key represents which file 
the DeletionVector
-     *     belongs to and the value is a Pair object specifying the range 
(start position and size)
-     *     within the file where the deletion vector data is located.
      * @return A map where the key represents which file the DeletionVector 
belongs to, and the
      *     value is the corresponding DeletionVector object.
      * @throws UncheckedIOException If an I/O error occurs while reading from 
the file.
      */
-    public Map<String, DeletionVector> readAllDeletionVectors(
-            String fileName, LinkedHashMap<String, Pair<Integer, Integer>> 
deletionVectorRanges) {
+    public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta 
fileMeta) {
+        LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
+                fileMeta.deletionVectorsRanges();
+        String indexFileName = fileMeta.fileName();
         Map<String, DeletionVector> deletionVectors = new HashMap<>();
-        Path filePath = pathFactory.toPath(fileName);
+        Path filePath = pathFactory.toPath(indexFileName);
         try (SeekableInputStream inputStream = 
fileIO.newInputStream(filePath)) {
             checkVersion(inputStream);
             DataInputStream dataInputStream = new DataInputStream(inputStream);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index 3beb99623..ce9ad40a1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -129,15 +129,12 @@ public class DeletionVectorsMaintainer {
 
         public DeletionVectorsMaintainer createOrRestore(
                 @Nullable Long snapshotId, BinaryRow partition, int bucket) {
-            IndexFileMeta indexFile =
+            List<IndexFileMeta> indexFiles =
                     snapshotId == null
-                            ? null
-                            : handler.scan(snapshotId, DELETION_VECTORS_INDEX, 
partition, bucket)
-                                    .orElse(null);
+                            ? Collections.emptyList()
+                            : handler.scan(snapshotId, DELETION_VECTORS_INDEX, 
partition, bucket);
             Map<String, DeletionVector> deletionVectors =
-                    indexFile == null
-                            ? new HashMap<>()
-                            : new 
HashMap<>(handler.readAllDeletionVectors(indexFile));
+                    new HashMap<>(handler.readAllDeletionVectors(indexFiles));
             return createOrRestore(deletionVectors);
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java 
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
index 66a8d6409..41e486525 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexMaintainer.java
@@ -51,7 +51,7 @@ public class HashIndexMaintainer implements 
IndexMaintainer<KeyValue> {
         IntHashSet hashcode = new IntHashSet();
         if (snapshotId != null) {
             Optional<IndexFileMeta> indexFile =
-                    fileHandler.scan(snapshotId, HashIndexFile.HASH_INDEX, 
partition, bucket);
+                    fileHandler.scanHashIndex(snapshotId, partition, bucket);
             if (indexFile.isPresent()) {
                 IndexFileMeta file = indexFile.get();
                 hashcode = new IntHashSet((int) file.rowCount());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index fdc2fe754..f3138cb02 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,20 +65,25 @@ public class IndexFileHandler {
         this.deletionVectorsIndex = deletionVectorsIndex;
     }
 
-    public Optional<IndexFileMeta> scan(
+    public Optional<IndexFileMeta> scanHashIndex(long snapshotId, BinaryRow 
partition, int bucket) {
+        List<IndexFileMeta> result = scan(snapshotId, HASH_INDEX, partition, 
bucket);
+        if (result.size() > 1) {
+            throw new IllegalArgumentException(
+                    "Find multiple hash index files for one bucket: " + 
result);
+        }
+        return result.isEmpty() ? Optional.empty() : 
Optional.of(result.get(0));
+    }
+
+    public List<IndexFileMeta> scan(
             long snapshotId, String indexType, BinaryRow partition, int 
bucket) {
         List<IndexManifestEntry> entries = scan(snapshotId, indexType, 
partition);
-        List<IndexManifestEntry> result = new ArrayList<>();
+        List<IndexFileMeta> result = new ArrayList<>();
         for (IndexManifestEntry file : entries) {
             if (file.bucket() == bucket) {
-                result.add(file);
+                result.add(file.indexFile());
             }
         }
-        if (result.size() > 1) {
-            throw new IllegalArgumentException(
-                    "Find multiple index files for one bucket: " + result);
-        }
-        return result.isEmpty() ? Optional.empty() : 
Optional.of(result.get(0).indexFile());
+        return result;
     }
 
     public List<IndexManifestEntry> scan(String indexType, BinaryRow 
partition) {
@@ -167,31 +173,16 @@ public class IndexFileHandler {
         indexManifestFile.delete(indexManifest);
     }
 
-    public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta 
fileMeta) {
-        if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) {
-            throw new IllegalArgumentException(
-                    "Input file is not deletion vectors index " + 
fileMeta.indexType());
-        }
-        LinkedHashMap<String, Pair<Integer, Integer>> deleteIndexRange =
-                fileMeta.deletionVectorsRanges();
-        if (deleteIndexRange == null || deleteIndexRange.isEmpty()) {
-            return Collections.emptyMap();
-        }
-        return 
deletionVectorsIndex.readAllDeletionVectors(fileMeta.fileName(), 
deleteIndexRange);
-    }
-
-    public Optional<DeletionVector> readDeletionVector(IndexFileMeta fileMeta, 
String fileName) {
-        if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) {
-            throw new IllegalArgumentException(
-                    "Input file is not deletion vectors index " + 
fileMeta.indexType());
-        }
-        Map<String, Pair<Integer, Integer>> deleteIndexRange = 
fileMeta.deletionVectorsRanges();
-        if (deleteIndexRange == null || 
!deleteIndexRange.containsKey(fileName)) {
-            return Optional.empty();
+    public Map<String, DeletionVector> 
readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
+        Map<String, DeletionVector> deletionVectors = new HashMap<>();
+        for (IndexFileMeta indexFile : fileMetas) {
+            if (!indexFile.indexType().equals(DELETION_VECTORS_INDEX)) {
+                throw new IllegalArgumentException(
+                        "Input file is not deletion vectors index " + 
indexFile.indexType());
+            }
+            
deletionVectors.putAll(deletionVectorsIndex.readAllDeletionVectors(indexFile));
         }
-        return Optional.of(
-                deletionVectorsIndex.readDeletionVector(
-                        fileMeta.fileName(), deleteIndexRange.get(fileName)));
+        return deletionVectors;
     }
 
     public IndexFileMeta writeDeletionVectorsIndex(Map<String, DeletionVector> 
deletionVectors) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java 
b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
index cc3e79f01..23f64b5cd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/CompactIncrement.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.io;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
@@ -88,4 +89,9 @@ public class CompactIncrement {
                         .map(DataFileMeta::fileName)
                         .collect(Collectors.joining(",\n")));
     }
+
+    public static CompactIncrement emptyIncrement() {
+        return new CompactIncrement(
+                Collections.emptyList(), Collections.emptyList(), 
Collections.emptyList());
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java 
b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
index a62d24109..8e53adfbf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/IndexIncrement.java
@@ -20,6 +20,8 @@ package org.apache.paimon.io;
 
 import org.apache.paimon.index.IndexFileMeta;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
@@ -28,16 +30,29 @@ public class IndexIncrement {
 
     private final List<IndexFileMeta> newIndexFiles;
 
+    private final List<IndexFileMeta> deletedIndexFiles;
+
     public IndexIncrement(List<IndexFileMeta> newIndexFiles) {
         this.newIndexFiles = newIndexFiles;
+        this.deletedIndexFiles = Collections.emptyList();
+    }
+
+    public IndexIncrement(
+            List<IndexFileMeta> newIndexFiles, List<IndexFileMeta> 
deletedIndexFiles) {
+        this.newIndexFiles = newIndexFiles;
+        this.deletedIndexFiles = deletedIndexFiles;
     }
 
     public List<IndexFileMeta> newIndexFiles() {
         return newIndexFiles;
     }
 
+    public List<IndexFileMeta> deletedIndexFiles() {
+        return deletedIndexFiles;
+    }
+
     public boolean isEmpty() {
-        return newIndexFiles.isEmpty();
+        return newIndexFiles.isEmpty() && deletedIndexFiles.isEmpty();
     }
 
     @Override
@@ -49,16 +64,24 @@ public class IndexIncrement {
             return false;
         }
         IndexIncrement that = (IndexIncrement) o;
-        return Objects.equals(newIndexFiles, that.newIndexFiles);
+        return Objects.equals(newIndexFiles, that.newIndexFiles)
+                && Objects.equals(deletedIndexFiles, that.deletedIndexFiles);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(newIndexFiles);
+        List<IndexFileMeta> all = new ArrayList<>(newIndexFiles);
+        all.addAll(deletedIndexFiles);
+        return Objects.hash(all);
     }
 
     @Override
     public String toString() {
-        return "IndexIncrement{" + "newIndexFiles=" + newIndexFiles + '}';
+        return "IndexIncrement{"
+                + "newIndexFiles="
+                + newIndexFiles
+                + ",deletedIndexFiles="
+                + deletedIndexFiles
+                + "}";
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
index 53f3fb82d..fc4fd03e4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
@@ -94,10 +94,6 @@ public class IndexManifestEntry {
         return new RowType(fields);
     }
 
-    public Identifier identifier() {
-        return new Identifier(partition, bucket, indexFile.indexType());
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -131,55 +127,4 @@ public class IndexManifestEntry {
                 + indexFile
                 + '}';
     }
-
-    /** The {@link Identifier} of a {@link IndexFileMeta}. */
-    public static class Identifier {
-
-        public final BinaryRow partition;
-        public final int bucket;
-        public final String indexType;
-
-        private Integer hash;
-
-        private Identifier(BinaryRow partition, int bucket, String indexType) {
-            this.partition = partition;
-            this.bucket = bucket;
-            this.indexType = indexType;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            Identifier that = (Identifier) o;
-            return bucket == that.bucket
-                    && Objects.equals(partition, that.partition)
-                    && Objects.equals(indexType, that.indexType);
-        }
-
-        @Override
-        public int hashCode() {
-            if (hash == null) {
-                hash = Objects.hash(partition, bucket, indexType);
-            }
-            return hash;
-        }
-
-        @Override
-        public String toString() {
-            return "Identifier{"
-                    + "partition="
-                    + partition
-                    + ", bucket="
-                    + bucket
-                    + ", indexType='"
-                    + indexType
-                    + '\''
-                    + '}';
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
index 235f8c509..8a91f3483 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFile.java
@@ -22,7 +22,7 @@ import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriterFactory;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.manifest.IndexManifestEntry.Identifier;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.ObjectsFile;
@@ -31,10 +31,7 @@ import org.apache.paimon.utils.VersionedObjectSerializer;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.Map;
 
 /** Index manifest file. */
 public class IndexManifestFile extends ObjectsFile<IndexManifestEntry> {
@@ -53,27 +50,17 @@ public class IndexManifestFile extends 
ObjectsFile<IndexManifestEntry> {
                 null);
     }
 
-    /** Merge new index files to index manifest. */
+    /** Write new index files to index manifest. */
     @Nullable
-    public String merge(
-            @Nullable String previousIndexManifest, List<IndexManifestEntry> 
newIndexFiles) {
-        String indexManifest = previousIndexManifest;
-        if (newIndexFiles.size() > 0) {
-            Map<Identifier, IndexManifestEntry> indexEntries = new 
LinkedHashMap<>();
-            List<IndexManifestEntry> entries =
-                    indexManifest == null ? new ArrayList<>() : 
read(indexManifest);
-            entries.addAll(newIndexFiles);
-            for (IndexManifestEntry file : entries) {
-                if (file.kind() == FileKind.ADD) {
-                    indexEntries.put(file.identifier(), file);
-                } else {
-                    indexEntries.remove(file.identifier());
-                }
-            }
-            indexManifest = writeWithoutRolling(indexEntries.values());
+    public String writeIndexFiles(
+            @Nullable String previousIndexManifest,
+            List<IndexManifestEntry> newIndexFiles,
+            BucketMode bucketMode) {
+        if (newIndexFiles.isEmpty()) {
+            return previousIndexManifest;
         }
-
-        return indexManifest;
+        IndexManifestFileHandler handler = new IndexManifestFileHandler(this, 
bucketMode);
+        return handler.write(previousIndexManifest, newIndexFiles);
     }
 
     /** Creator of {@link IndexManifestFile}. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
new file mode 100644
index 000000000..2c62e3ef4
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+
+/** IndexManifestFile Handler. */
+public class IndexManifestFileHandler {
+
+    private final IndexManifestFile indexManifestFile;
+
+    private final BucketMode bucketMode;
+
+    IndexManifestFileHandler(IndexManifestFile indexManifestFile, BucketMode 
bucketMode) {
+        this.indexManifestFile = indexManifestFile;
+        this.bucketMode = bucketMode;
+    }
+
+    String write(@Nullable String previousIndexManifest, 
List<IndexManifestEntry> newIndexFiles) {
+        List<IndexManifestEntry> entries =
+                previousIndexManifest == null
+                        ? new ArrayList<>()
+                        : indexManifestFile.read(previousIndexManifest);
+        for (IndexManifestEntry entry : entries) {
+            Preconditions.checkArgument(entry.kind() == FileKind.ADD);
+        }
+
+        Pair<List<IndexManifestEntry>, List<IndexManifestEntry>> previous =
+                separateIndexEntries(entries);
+        Pair<List<IndexManifestEntry>, List<IndexManifestEntry>> current =
+                separateIndexEntries(newIndexFiles);
+
+        // Step1: get the hash index files;
+        List<IndexManifestEntry> indexEntries =
+                getIndexManifestFileCombine(HASH_INDEX)
+                        .combine(previous.getLeft(), current.getLeft());
+
+        // Step2: get the dv index files;
+        indexEntries.addAll(
+                getIndexManifestFileCombine(DELETION_VECTORS_INDEX)
+                        .combine(previous.getRight(), current.getRight()));
+
+        return indexManifestFile.writeWithoutRolling(indexEntries);
+    }
+
+    private Pair<List<IndexManifestEntry>, List<IndexManifestEntry>> 
separateIndexEntries(
+            List<IndexManifestEntry> indexFiles) {
+        List<IndexManifestEntry> hashEntries = new ArrayList<>();
+        List<IndexManifestEntry> dvEntries = new ArrayList<>();
+        for (IndexManifestEntry entry : indexFiles) {
+            String indexType = entry.indexFile().indexType();
+            if (indexType.equals(DELETION_VECTORS_INDEX)) {
+                dvEntries.add(entry);
+            } else if (indexType.equals(HASH_INDEX)) {
+                hashEntries.add(entry);
+            } else {
+                throw new IllegalArgumentException("Can't recognize this index 
type: " + indexType);
+            }
+        }
+        return Pair.of(hashEntries, dvEntries);
+    }
+
+    private IndexManifestFileCombiner getIndexManifestFileCombine(String 
indexType) {
+        if (DELETION_VECTORS_INDEX.equals(indexType) && 
BucketMode.BUCKET_UNAWARE == bucketMode) {
+            return new UnawareBucketCombiner();
+        } else {
+            return new CommonBucketCombiner();
+        }
+    }
+
+    interface IndexManifestFileCombiner {
+        List<IndexManifestEntry> combine(
+                List<IndexManifestEntry> prevIndexFiles, 
List<IndexManifestEntry> newIndexFiles);
+    }
+
+    /**
+     * We combine the previous and new index files by the file name. This is 
only used for tables
+     * with UnawareBucket.
+     */
+    static class UnawareBucketCombiner implements IndexManifestFileCombiner {
+
+        @Override
+        public List<IndexManifestEntry> combine(
+                List<IndexManifestEntry> prevIndexFiles, 
List<IndexManifestEntry> newIndexFiles) {
+            Map<String, IndexManifestEntry> indexEntries = new HashMap<>();
+            for (IndexManifestEntry entry : prevIndexFiles) {
+                indexEntries.put(entry.indexFile().fileName(), entry);
+            }
+
+            for (IndexManifestEntry entry : newIndexFiles) {
+                if (entry.kind() == FileKind.ADD) {
+                    indexEntries.put(entry.indexFile().fileName(), entry);
+                } else {
+                    indexEntries.remove(entry.indexFile().fileName());
+                }
+            }
+            return new ArrayList<>(indexEntries.values());
+        }
+    }
+
+    /** We combine the previous and new index files by {@link Identifier}. */
+    static class CommonBucketCombiner implements IndexManifestFileCombiner {
+
+        @Override
+        public List<IndexManifestEntry> combine(
+                List<IndexManifestEntry> prevIndexFiles, 
List<IndexManifestEntry> newIndexFiles) {
+            Map<Identifier, IndexManifestEntry> indexEntries = new HashMap<>();
+            for (IndexManifestEntry entry : prevIndexFiles) {
+                indexEntries.put(identifier(entry), entry);
+            }
+
+            for (IndexManifestEntry entry : newIndexFiles) {
+                if (entry.kind() == FileKind.ADD) {
+                    indexEntries.put(identifier(entry), entry);
+                } else {
+                    indexEntries.remove(identifier(entry));
+                }
+            }
+            return new ArrayList<>(indexEntries.values());
+        }
+    }
+
+    private static Identifier identifier(IndexManifestEntry 
indexManifestEntry) {
+        return new Identifier(
+                indexManifestEntry.partition(),
+                indexManifestEntry.bucket(),
+                indexManifestEntry.indexFile().indexType());
+    }
+
+    /** The {@link Identifier} of a {@link IndexFileMeta}. */
+    public static class Identifier {
+
+        public final BinaryRow partition;
+        public final int bucket;
+        public final String indexType;
+
+        private Integer hash;
+
+        private Identifier(BinaryRow partition, int bucket, String indexType) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.indexType = indexType;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            Identifier that = (Identifier) o;
+            return bucket == that.bucket
+                    && Objects.equals(partition, that.partition)
+                    && Objects.equals(indexType, that.indexType);
+        }
+
+        @Override
+        public int hashCode() {
+            if (hash == null) {
+                hash = Objects.hash(partition, bucket, indexType);
+            }
+            return hash;
+        }
+
+        @Override
+        public String toString() {
+            return "Identifier{"
+                    + "partition="
+                    + partition
+                    + ", bucket="
+                    + bucket
+                    + ", indexType='"
+                    + indexType
+                    + '\''
+                    + '}';
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 7ddf8c210..0001d5b19 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -44,6 +44,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.types.RowType;
@@ -125,6 +126,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
     private final StatsFileHandler statsFileHandler;
 
+    private final BucketMode bucketMode;
+
     public FileStoreCommitImpl(
             FileIO fileIO,
             SchemaManager schemaManager,
@@ -143,7 +146,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             boolean dynamicPartitionOverwrite,
             @Nullable Comparator<InternalRow> keyComparator,
             String branchName,
-            StatsFileHandler statsFileHandler) {
+            StatsFileHandler statsFileHandler,
+            BucketMode bucketMode) {
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
         this.commitUser = commitUser;
@@ -166,6 +170,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.ignoreEmptyCommit = true;
         this.commitMetrics = null;
         this.statsFileHandler = statsFileHandler;
+        this.bucketMode = bucketMode;
     }
 
     @Override
@@ -633,6 +638,24 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                                                 "Unknown index type: " + 
f.indexType());
                                 }
                             });
+            commitMessage
+                    .indexIncrement()
+                    .deletedIndexFiles()
+                    .forEach(
+                            f -> {
+                                if 
(f.indexType().equals(DELETION_VECTORS_INDEX)) {
+                                    compactDvIndexFiles.add(
+                                            new IndexManifestEntry(
+                                                    FileKind.DELETE,
+                                                    commitMessage.partition(),
+                                                    commitMessage.bucket(),
+                                                    f));
+                                } else {
+                                    throw new RuntimeException(
+                                            "This index type is not supported 
to delete: "
+                                                    + f.indexType());
+                                }
+                            });
         }
     }
 
@@ -834,7 +857,9 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             }
 
             // write new index manifest
-            String indexManifest = 
indexManifestFile.merge(previousIndexManifest, indexFiles);
+            String indexManifest =
+                    indexManifestFile.writeIndexFiles(
+                            previousIndexManifest, indexFiles, bucketMode);
             if (!Objects.equals(indexManifest, previousIndexManifest)) {
                 newIndexManifest = indexManifest;
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 24c9fc892..a7b566b32 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -81,6 +81,7 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
         
dataFileSerializer.serializeList(message.compactIncrement().compactAfter(), 
view);
         
dataFileSerializer.serializeList(message.compactIncrement().changelogFiles(), 
view);
         
indexEntrySerializer.serializeList(message.indexIncrement().newIndexFiles(), 
view);
+        
indexEntrySerializer.serializeList(message.indexIncrement().deletedIndexFiles(),
 view);
     }
 
     @Override
@@ -124,6 +125,8 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
                         dataFileSerializer.deserializeList(view),
                         dataFileSerializer.deserializeList(view),
                         dataFileSerializer.deserializeList(view)),
-                new 
IndexIncrement(indexEntrySerializer.deserializeList(view)));
+                new IndexIncrement(
+                        indexEntrySerializer.deserializeList(view),
+                        indexEntrySerializer.deserializeList(view)));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index eab7a0c6d..d244c2a94 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -281,6 +281,10 @@ public class TableCommitImpl implements InnerTableCommit {
                         .map(IndexFileMeta::fileName)
                         .map(indexFileFactory::toPath)
                         .forEach(files::add);
+                msg.indexIncrement().deletedIndexFiles().stream()
+                        .map(IndexFileMeta::fileName)
+                        .map(indexFileFactory::toPath)
+                        .forEach(files::add);
             }
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 2ed258e85..ffb52ce6f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -48,8 +48,6 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TypeUtils;
 
-import javax.annotation.Nullable;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -284,12 +282,11 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                                 ? splitGenerator.splitForStreaming(bucketFiles)
                                 : splitGenerator.splitForBatch(bucketFiles);
 
-                IndexFileMeta deletionIndexFile =
+                List<IndexFileMeta> deletionIndexFiles =
                         deletionVectors
-                                ? indexFileHandler
-                                        .scan(snapshotId, 
DELETION_VECTORS_INDEX, partition, bucket)
-                                        .orElse(null)
-                                : null;
+                                ? indexFileHandler.scan(
+                                        snapshotId, DELETION_VECTORS_INDEX, 
partition, bucket)
+                                : Collections.emptyList();
                 for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
                     List<DataFileMeta> dataFiles = splitGroup.files;
                     String bucketPath = pathFactory.bucketPath(partition, 
bucket).toString();
@@ -298,7 +295,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
                             .withBucketPath(bucketPath);
                     if (deletionVectors) {
                         builder.withDataDeletionFiles(
-                                getDeletionFiles(dataFiles, 
deletionIndexFile));
+                                getDeletionFiles(dataFiles, 
deletionIndexFiles));
                     }
 
                     splits.add(builder.build());
@@ -373,16 +370,15 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
                                 .isStreaming(isStreaming)
                                 .withBucketPath(pathFactory.bucketPath(part, 
bucket).toString());
                 if (deletionVectors) {
-                    IndexFileMeta beforeDeletionIndex =
-                            indexFileHandler
-                                    .scan(beforeSnapshotId, 
DELETION_VECTORS_INDEX, part, bucket)
-                                    .orElse(null);
-                    IndexFileMeta deletionIndex =
-                            indexFileHandler
-                                    .scan(plan.snapshotId(), 
DELETION_VECTORS_INDEX, part, bucket)
-                                    .orElse(null);
-                    builder.withBeforeDeletionFiles(getDeletionFiles(before, 
beforeDeletionIndex));
-                    builder.withDataDeletionFiles(getDeletionFiles(data, 
deletionIndex));
+                    List<IndexFileMeta> beforeDeletionIndexes =
+                            indexFileHandler.scan(
+                                    beforeSnapshotId, DELETION_VECTORS_INDEX, 
part, bucket);
+                    List<IndexFileMeta> deletionIndexes =
+                            indexFileHandler.scan(
+                                    plan.snapshotId(), DELETION_VECTORS_INDEX, 
part, bucket);
+                    builder.withBeforeDeletionFiles(
+                            getDeletionFiles(before, beforeDeletionIndexes));
+                    builder.withDataDeletionFiles(getDeletionFiles(data, 
deletionIndexes));
                 }
                 splits.add(builder.build());
             }
@@ -412,14 +408,22 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
     }
 
     private List<DeletionFile> getDeletionFiles(
-            List<DataFileMeta> dataFiles, @Nullable IndexFileMeta 
indexFileMeta) {
+            List<DataFileMeta> dataFiles, List<IndexFileMeta> indexFileMetas) {
         List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
-        Map<String, Pair<Integer, Integer>> deletionRanges =
-                indexFileMeta == null ? null : 
indexFileMeta.deletionVectorsRanges();
+        Map<String, IndexFileMeta> dataFileToIndexFileMeta = new HashMap<>();
+        for (IndexFileMeta indexFileMeta : indexFileMetas) {
+            if (indexFileMeta.deletionVectorsRanges() != null) {
+                for (String dataFileName : 
indexFileMeta.deletionVectorsRanges().keySet()) {
+                    dataFileToIndexFileMeta.put(dataFileName, indexFileMeta);
+                }
+            }
+        }
         for (DataFileMeta file : dataFiles) {
-            if (deletionRanges != null) {
-                Pair<Integer, Integer> range = 
deletionRanges.get(file.fileName());
-                if (range != null) {
+            IndexFileMeta indexFileMeta = 
dataFileToIndexFileMeta.get(file.fileName());
+            if (indexFileMeta != null) {
+                Map<String, Pair<Integer, Integer>> ranges = 
indexFileMeta.deletionVectorsRanges();
+                if (ranges != null && ranges.containsKey(file.fileName())) {
+                    Pair<Integer, Integer> range = ranges.get(file.fileName());
                     deletionFiles.add(
                             new DeletionFile(
                                     
indexFileHandler.filePath(indexFileMeta).toString(),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
new file mode 100644
index 000000000..11612ac1b
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOFinder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.operation.FileStoreCommitImpl;
+import org.apache.paimon.operation.Lock;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+
+/** Wrapper of AppendOnlyFileStore. */
+public class TestAppendFileStore extends AppendOnlyFileStore {
+
+    private final String commitUser;
+
+    private final IndexFileHandler fileHandler;
+
+    private long commitIdentifier;
+
+    private FileIO fileIO;
+
+    public TestAppendFileStore(
+            FileIO fileIO,
+            SchemaManager schemaManage,
+            CoreOptions options,
+            TableSchema tableSchema,
+            RowType partitionType,
+            RowType bucketType,
+            RowType rowType,
+            String tableName) {
+        super(
+                fileIO,
+                schemaManage,
+                tableSchema,
+                options,
+                partitionType,
+                bucketType,
+                rowType,
+                tableName,
+                new CatalogEnvironment(Lock.emptyFactory(), null, null));
+
+        this.fileIO = fileIO;
+        this.commitUser = UUID.randomUUID().toString();
+        this.fileHandler = this.newIndexFileHandler();
+        this.commitIdentifier = 0L;
+    }
+
+    public FileIO fileIO() {
+        return this.fileIO;
+    }
+
+    public FileStoreCommitImpl newCommit() {
+        return super.newCommit(commitUser);
+    }
+
+    public void commit(CommitMessage... commitMessages) {
+        ManifestCommittable committable = new 
ManifestCommittable(commitIdentifier++);
+        for (CommitMessage commitMessage : commitMessages) {
+            committable.addFileCommittable(commitMessage);
+        }
+        newCommit().commit(committable, Collections.emptyMap());
+    }
+
+    public CommitMessage removeIndexFiles(
+            BinaryRow partition, int bucket, List<IndexFileMeta> 
indexFileMetas) {
+        return new CommitMessageImpl(
+                partition,
+                bucket,
+                DataIncrement.emptyIncrement(),
+                CompactIncrement.emptyIncrement(),
+                new IndexIncrement(Collections.emptyList(), indexFileMetas));
+    }
+
+    public List<IndexFileMeta> scanDVIndexFiles(BinaryRow partition, int 
bucket) {
+        Long lastSnapshotId = snapshotManager().latestSnapshotId();
+        return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX, 
partition, bucket);
+    }
+
+    public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow 
partition, int bucket) {
+        Long lastSnapshotId = snapshotManager().latestSnapshotId();
+        DeletionVectorsMaintainer.Factory factory =
+                new DeletionVectorsMaintainer.Factory(fileHandler);
+        return factory.createOrRestore(lastSnapshotId, partition, bucket);
+    }
+
+    public CommitMessage writeDVIndexFiles(
+            BinaryRow partition, int bucket, Map<String, List<Integer>> 
dataFileToPositions) {
+        DeletionVectorsMaintainer dvMaintainer = 
createOrRestoreDVMaintainer(partition, bucket);
+        for (Map.Entry<String, List<Integer>> entry : 
dataFileToPositions.entrySet()) {
+            for (Integer pos : entry.getValue()) {
+                dvMaintainer.notifyNewDeletion(entry.getKey(), pos);
+            }
+        }
+        return new CommitMessageImpl(
+                partition,
+                bucket,
+                DataIncrement.emptyIncrement(),
+                CompactIncrement.emptyIncrement(),
+                new IndexIncrement(dvMaintainer.prepareCommit()));
+    }
+
+    public static TestAppendFileStore createAppendStore(
+            java.nio.file.Path tempDir, Map<String, String> options) throws 
Exception {
+        String root = TraceableFileIO.SCHEME + "://" + tempDir.toString();
+        Path path = new Path(tempDir.toUri());
+        FileIO fileIO = FileIOFinder.find(new Path(root));
+        SchemaManager schemaManage = new SchemaManager(new LocalFileIO(), 
path);
+
+        options.put(CoreOptions.PATH.key(), root);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        schemaManage,
+                        new Schema(
+                                
TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(),
+                                
TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(),
+                                Collections.emptyList(),
+                                options,
+                                null));
+        return new TestAppendFileStore(
+                fileIO,
+                schemaManage,
+                new CoreOptions(options),
+                tableSchema,
+                TestKeyValueGenerator.DEFAULT_PART_TYPE,
+                RowType.of(),
+                TestKeyValueGenerator.DEFAULT_ROW_TYPE,
+                (new Path(root)).getName());
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
index fbaa407d2..c43950a44 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.deletionvectors;
 
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 
@@ -32,6 +33,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
+import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link DeletionVectorsIndexFile}. */
@@ -66,8 +68,10 @@ public class DeletionVectorsIndexFileTest {
         LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges = 
pair.getRight();
 
         // read
+        IndexFileMeta indexFileMeta =
+                new IndexFileMeta(DELETION_VECTORS_INDEX, fileName, 0L, 0L, 
deletionVectorRanges);
         Map<String, DeletionVector> actualDeleteMap =
-                deletionVectorsIndexFile.readAllDeletionVectors(fileName, 
deletionVectorRanges);
+                deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta);
         assertThat(actualDeleteMap.get("file1.parquet").isDeleted(1)).isTrue();
         
assertThat(actualDeleteMap.get("file1.parquet").isDeleted(2)).isFalse();
         assertThat(actualDeleteMap.get("file2.parquet").isDeleted(2)).isTrue();
@@ -104,8 +108,10 @@ public class DeletionVectorsIndexFileTest {
                 deletionVectorsIndexFile.write(deleteMap);
 
         // read
+        IndexFileMeta indexFileMeta =
+                new IndexFileMeta(DELETION_VECTORS_INDEX, pair.getLeft(), 0L, 
0L, pair.getRight());
         Map<String, DeletionVector> dvs =
-                
deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(), 
pair.getRight());
+                deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta);
         assertThat(dvs.size()).isEqualTo(100000);
     }
 
@@ -128,8 +134,10 @@ public class DeletionVectorsIndexFileTest {
                 deletionVectorsIndexFile.write(deleteMap);
 
         // read
+        IndexFileMeta indexFileMeta =
+                new IndexFileMeta(DELETION_VECTORS_INDEX, pair.getLeft(), 0L, 
0L, pair.getRight());
         Map<String, DeletionVector> dvs =
-                
deletionVectorsIndexFile.readAllDeletionVectors(pair.getLeft(), 
pair.getRight());
+                deletionVectorsIndexFile.readAllDeletionVectors(indexFileMeta);
         assertThat(dvs.size()).isEqualTo(1);
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
index 9bfb4f81a..dd6d3102c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
@@ -22,10 +22,17 @@ import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -56,12 +63,63 @@ public class DeletionVectorsMaintainerTest extends 
PrimaryKeyTableTestBase {
         assertThat(dvMaintainer.deletionVectorOf("f3")).isEmpty();
         List<IndexFileMeta> fileMetas = dvMaintainer.prepareCommit();
 
-        Map<String, DeletionVector> deletionVectors =
-                fileHandler.readAllDeletionVectors(fileMetas.get(0));
+        Map<String, DeletionVector> deletionVectors = 
fileHandler.readAllDeletionVectors(fileMetas);
         assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue();
         assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse();
         assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse();
         assertThat(deletionVectors.get("f2").isDeleted(2)).isTrue();
         assertThat(deletionVectors.containsKey("f3")).isFalse();
     }
+
+    @Test
+    public void test1() {
+        DeletionVectorsMaintainer.Factory factory =
+                new DeletionVectorsMaintainer.Factory(fileHandler);
+
+        DeletionVectorsMaintainer dvMaintainer = factory.create();
+        BitmapDeletionVector deletionVector1 = new BitmapDeletionVector();
+        deletionVector1.delete(1);
+        deletionVector1.delete(3);
+        deletionVector1.delete(5);
+        dvMaintainer.notifyNewDeletion("f1", deletionVector1);
+
+        List<IndexFileMeta> fileMetas1 = dvMaintainer.prepareCommit();
+        assertThat(fileMetas1.size()).isEqualTo(1);
+        CommitMessage commitMessage =
+                new CommitMessageImpl(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        DataIncrement.emptyIncrement(),
+                        CompactIncrement.emptyIncrement(),
+                        new IndexIncrement(fileMetas1));
+        BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+        commit.commit(Collections.singletonList(commitMessage));
+
+        Long lastSnapshotId = table.snapshotManager().latestSnapshotId();
+        dvMaintainer = factory.createOrRestore(lastSnapshotId, 
BinaryRow.EMPTY_ROW, 0);
+        DeletionVector deletionVector2 = 
dvMaintainer.deletionVectorOf("f1").get();
+        assertThat(deletionVector2.isDeleted(1)).isTrue();
+        assertThat(deletionVector2.isDeleted(2)).isFalse();
+
+        deletionVector2.delete(2);
+        dvMaintainer.notifyNewDeletion("f1", deletionVector2);
+
+        List<IndexFileMeta> fileMetas2 = dvMaintainer.prepareCommit();
+        assertThat(fileMetas2.size()).isEqualTo(1);
+        commitMessage =
+                new CommitMessageImpl(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        DataIncrement.emptyIncrement(),
+                        CompactIncrement.emptyIncrement(),
+                        new IndexIncrement(fileMetas2));
+        commit = table.newBatchWriteBuilder().newCommit();
+        commit.commit(Collections.singletonList(commitMessage));
+
+        lastSnapshotId = table.snapshotManager().latestSnapshotId();
+        dvMaintainer = factory.createOrRestore(lastSnapshotId, 
BinaryRow.EMPTY_ROW, 0);
+        DeletionVector deletionVector3 = 
dvMaintainer.deletionVectorOf("f1").get();
+        assertThat(deletionVector3.isDeleted(1)).isTrue();
+        assertThat(deletionVector3.isDeleted(2)).isTrue();
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
index 53058a05f..724d5b416 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
@@ -42,22 +42,31 @@ public class IndexFileMetaSerializerTest extends 
ObjectSerializerTestBase<IndexF
     public static IndexFileMeta randomIndexFile() {
         Random rnd = new Random();
         if (rnd.nextBoolean()) {
-            return new IndexFileMeta(
-                    HashIndexFile.HASH_INDEX,
-                    "my_file_name" + rnd.nextLong(),
-                    rnd.nextInt(),
-                    rnd.nextInt());
+            return randomHashIndexFile();
         } else {
-            LinkedHashMap<String, Pair<Integer, Integer>> 
deletionVectorsRanges =
-                    new LinkedHashMap<>();
-            deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(), 
rnd.nextInt()));
-            deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(), 
rnd.nextInt()));
-            return new IndexFileMeta(
-                    DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
-                    "deletion_vectors_index_file_name" + rnd.nextLong(),
-                    rnd.nextInt(),
-                    rnd.nextInt(),
-                    deletionVectorsRanges);
+            return randomDeletionVectorIndexFile();
         }
     }
+
+    public static IndexFileMeta randomHashIndexFile() {
+        Random rnd = new Random();
+        return new IndexFileMeta(
+                HashIndexFile.HASH_INDEX,
+                "my_file_name" + rnd.nextLong(),
+                rnd.nextInt(),
+                rnd.nextInt());
+    }
+
+    public static IndexFileMeta randomDeletionVectorIndexFile() {
+        Random rnd = new Random();
+        LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorsRanges = 
new LinkedHashMap<>();
+        deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(), 
rnd.nextInt()));
+        deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(), 
rnd.nextInt()));
+        return new IndexFileMeta(
+                DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
+                "deletion_vectors_index_file_name" + rnd.nextLong(),
+                rnd.nextInt(),
+                rnd.nextInt(),
+                deletionVectorsRanges);
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
new file mode 100644
index 000000000..4ca59a75c
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestFileHandlerTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import org.apache.paimon.TestAppendFileStore;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.BucketMode;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import static 
org.apache.paimon.index.IndexFileMetaSerializerTest.randomDeletionVectorIndexFile;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for IndexManifestFileHandler. */
+public class IndexManifestFileHandlerTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testUnawareMode() throws Exception {
+        TestAppendFileStore fileStore =
+                TestAppendFileStore.createAppendStore(tempDir, new 
HashMap<>());
+
+        IndexManifestFile indexManifestFile =
+                new IndexManifestFile.Factory(
+                                fileStore.fileIO(),
+                                fileStore.options().manifestFormat(),
+                                fileStore.pathFactory())
+                        .create();
+        IndexManifestFileHandler indexManifestFileHandler =
+                new IndexManifestFileHandler(indexManifestFile, 
BucketMode.BUCKET_UNAWARE);
+
+        IndexManifestEntry entry1 =
+                new IndexManifestEntry(
+                        FileKind.ADD, BinaryRow.EMPTY_ROW, 0, 
randomDeletionVectorIndexFile());
+        String indexManifestFile1 = indexManifestFileHandler.write(null, 
Arrays.asList(entry1));
+
+        IndexManifestEntry entry2 = entry1.toDeleteEntry();
+        IndexManifestEntry entry3 =
+                new IndexManifestEntry(
+                        FileKind.ADD, BinaryRow.EMPTY_ROW, 0, 
randomDeletionVectorIndexFile());
+        String indexManifestFile2 =
+                indexManifestFileHandler.write(indexManifestFile1, 
Arrays.asList(entry2, entry3));
+
+        List<IndexManifestEntry> entries = 
indexManifestFile.read(indexManifestFile2);
+        assertThat(entries.size()).isEqualTo(1);
+        assertThat(entries.contains(entry1)).isFalse();
+        assertThat(entries.contains(entry2)).isFalse();
+        assertThat(entries.contains(entry3)).isTrue();
+    }
+
+    @Test
+    public void testHashFixedBucket() throws Exception {
+        TestAppendFileStore fileStore =
+                TestAppendFileStore.createAppendStore(tempDir, new 
HashMap<>());
+
+        IndexManifestFile indexManifestFile =
+                new IndexManifestFile.Factory(
+                                fileStore.fileIO(),
+                                fileStore.options().manifestFormat(),
+                                fileStore.pathFactory())
+                        .create();
+        IndexManifestFileHandler indexManifestFileHandler =
+                new IndexManifestFileHandler(indexManifestFile, 
BucketMode.HASH_FIXED);
+
+        IndexManifestEntry entry1 =
+                new IndexManifestEntry(
+                        FileKind.ADD, BinaryRow.EMPTY_ROW, 0, 
randomDeletionVectorIndexFile());
+        IndexManifestEntry entry2 =
+                new IndexManifestEntry(
+                        FileKind.ADD, BinaryRow.EMPTY_ROW, 1, 
randomDeletionVectorIndexFile());
+        String indexManifestFile1 =
+                indexManifestFileHandler.write(null, Arrays.asList(entry1, 
entry2));
+
+        IndexManifestEntry entry3 =
+                new IndexManifestEntry(
+                        FileKind.ADD, BinaryRow.EMPTY_ROW, 1, 
randomDeletionVectorIndexFile());
+        IndexManifestEntry entry4 =
+                new IndexManifestEntry(
+                        FileKind.ADD, BinaryRow.EMPTY_ROW, 2, 
randomDeletionVectorIndexFile());
+        String indexManifestFile2 =
+                indexManifestFileHandler.write(indexManifestFile1, 
Arrays.asList(entry3, entry4));
+
+        List<IndexManifestEntry> entries = 
indexManifestFile.read(indexManifestFile2);
+        assertThat(entries.size()).isEqualTo(3);
+        assertThat(entries.contains(entry1)).isTrue();
+        assertThat(entries.contains(entry2)).isFalse();
+        assertThat(entries.contains(entry3)).isTrue();
+        assertThat(entries.contains(entry4)).isTrue();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index d603eda41..03630e71f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -21,9 +21,12 @@ package org.apache.paimon.operation;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.KeyValue;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.TestAppendFileStore;
 import org.apache.paimon.TestFileStore;
 import org.apache.paimon.TestKeyValueGenerator;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.index.IndexFileHandler;
@@ -39,6 +42,8 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.ColStats;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.stats.StatsFileHandler;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowKind;
@@ -57,6 +62,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -757,7 +763,7 @@ public class FileStoreCommitTest {
                 .containsExactlyInAnyOrder(6, 8);
 
         // assert scan one bucket
-        Optional<IndexFileMeta> file = indexFileHandler.scan(snapshot.id(), 
HASH_INDEX, part1, 0);
+        Optional<IndexFileMeta> file = 
indexFileHandler.scanHashIndex(snapshot.id(), part1, 0);
         assertThat(file).isPresent();
         
assertThat(indexFileHandler.readHashIndexList(file.get())).containsExactlyInAnyOrder(1,
 4);
 
@@ -766,9 +772,9 @@ public class FileStoreCommitTest {
         store.overwriteData(
                 Collections.singletonList(record1), gen::getPartition, kv -> 
0, new HashMap<>());
         snapshot = store.snapshotManager().latestSnapshot();
-        file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part1, 0);
+        file = indexFileHandler.scanHashIndex(snapshot.id(), part1, 0);
         assertThat(file).isEmpty();
-        file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2, 2);
+        file = indexFileHandler.scanHashIndex(snapshot.id(), part2, 2);
         assertThat(file).isPresent();
 
         // overwrite all partitions
@@ -776,7 +782,7 @@ public class FileStoreCommitTest {
         store.overwriteData(
                 Collections.singletonList(record1), gen::getPartition, kv -> 
0, new HashMap<>());
         snapshot = store.snapshotManager().latestSnapshot();
-        file = indexFileHandler.scan(snapshot.id(), HASH_INDEX, part2, 2);
+        file = indexFileHandler.scanHashIndex(snapshot.id(), part2, 2);
         assertThat(file).isEmpty();
     }
 
@@ -843,6 +849,52 @@ public class FileStoreCommitTest {
         assertThat(readStats.get()).isEqualTo(fakeStats);
     }
 
+    @Test
+    public void testDVIndexFiles() throws Exception {
+        TestAppendFileStore store = 
TestAppendFileStore.createAppendStore(tempDir, new HashMap<>());
+
+        // commit 1
+        CommitMessage commitMessage1 =
+                store.writeDVIndexFiles(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        Collections.singletonMap("f1", Arrays.asList(1, 3)));
+        CommitMessage commitMessage2 =
+                store.writeDVIndexFiles(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        Collections.singletonMap("f2", Arrays.asList(2, 4)));
+        store.commit(commitMessage1, commitMessage2);
+
+        // assert 1
+        assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 
0).size()).isEqualTo(2);
+        DeletionVectorsMaintainer maintainer =
+                store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
+        Map<String, DeletionVector> dvs = maintainer.deletionVectors();
+        assertThat(dvs.size()).isEqualTo(2);
+        assertThat(dvs.get("f2").isDeleted(2)).isTrue();
+        assertThat(dvs.get("f2").isDeleted(3)).isFalse();
+        assertThat(dvs.get("f2").isDeleted(4)).isTrue();
+
+        // commit 2
+        CommitMessage commitMessage3 =
+                store.writeDVIndexFiles(
+                        BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2", 
Arrays.asList(3)));
+        CommitMessage commitMessage4 =
+                store.removeIndexFiles(
+                        BinaryRow.EMPTY_ROW,
+                        0,
+                        ((CommitMessageImpl) 
commitMessage1).indexIncrement().newIndexFiles());
+        store.commit(commitMessage3, commitMessage4);
+
+        // assert 2
+        assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW, 
0).size()).isEqualTo(2);
+        maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
+        dvs = maintainer.deletionVectors();
+        assertThat(dvs.size()).isEqualTo(2);
+        assertThat(dvs.get("f2").isDeleted(3)).isTrue();
+    }
+
     private TestFileStore createStore(boolean failing) throws Exception {
         return createStore(failing, 1);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index 190027d36..47a210742 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -42,7 +42,9 @@ public class CommitMessageSerializerTest {
         DataIncrement dataIncrement = randomNewFilesIncrement();
         CompactIncrement compactIncrement = randomCompactIncrement();
         IndexIncrement indexIncrement =
-                new IndexIncrement(Arrays.asList(randomIndexFile(), 
randomIndexFile()));
+                new IndexIncrement(
+                        Arrays.asList(randomIndexFile(), randomIndexFile()),
+                        Arrays.asList(randomIndexFile(), randomIndexFile()));
         CommitMessageImpl committable =
                 new CommitMessageImpl(row(0), 1, dataIncrement, 
compactIncrement, indexIncrement);
         CommitMessageImpl newCommittable =

Reply via email to