This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f9d04c147 [core] Introduce deletion vector (#2923)
f9d04c147 is described below

commit f9d04c14737a17492409d47d683c8c970d52db4a
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Mar 1 11:32:11 2024 +0800

    [core] Introduce deletion vector (#2923)
---
 paimon-core/pom.xml                                |   7 +
 .../java/org/apache/paimon/AbstractFileStore.java  |   4 +-
 .../deletionvectors/BitmapDeletionVector.java      |  94 +++++++++++
 .../paimon/deletionvectors/DeletionVector.java     |  94 +++++++++++
 .../deletionvectors/DeletionVectorsIndexFile.java  | 188 +++++++++++++++++++++
 .../deletionvectors/DeletionVectorsMaintainer.java | 132 +++++++++++++++
 .../org/apache/paimon/index/HashIndexFile.java     |  29 +---
 .../index/{HashIndexFile.java => IndexFile.java}   |  26 +--
 .../org/apache/paimon/index/IndexFileHandler.java  |  47 +++++-
 .../org/apache/paimon/index/IndexFileMeta.java     |  41 ++++-
 .../paimon/index/IndexFileMetaSerializer.java      |  37 +++-
 .../apache/paimon/manifest/IndexManifestEntry.java |  11 ++
 .../manifest/IndexManifestEntrySerializer.java     |  10 +-
 .../paimon/deletionvectors/DeletionVectorTest.java |  65 +++++++
 .../DeletionVectorsIndexFileTest.java              |  97 +++++++++++
 .../DeletionVectorsMaintainerTest.java             |  67 ++++++++
 .../paimon/index/IndexFileMetaSerializerTest.java  |  26 ++-
 .../manifest/IndexManifestEntrySerializerTest.java |   9 +-
 18 files changed, 916 insertions(+), 68 deletions(-)

diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index 0a0effb15..0829c7da2 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -33,6 +33,7 @@ under the License.
 
     <properties>
         <frocksdbjni.version>6.20.3-ververica-2.0</frocksdbjni.version>
+        <RoaringBitmap.version>1.0.1</RoaringBitmap.version>
     </properties>
 
     <dependencies>
@@ -63,6 +64,12 @@ under the License.
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.roaringbitmap</groupId>
+            <artifactId>RoaringBitmap</artifactId>
+            <version>${RoaringBitmap.version}</version>
+        </dependency>
+
         <!-- test dependencies -->
 
         <dependency>
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 896b53794..f0dc46301 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -19,6 +19,7 @@
 package org.apache.paimon;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.index.HashIndexFile;
 import org.apache.paimon.index.IndexFileHandler;
@@ -143,7 +144,8 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
         return new IndexFileHandler(
                 snapshotManager(),
                 indexManifestFileFactory().create(),
-                new HashIndexFile(fileIO, pathFactory().indexFileFactory()));
+                new HashIndexFile(fileIO, pathFactory().indexFileFactory()),
+                new DeletionVectorsIndexFile(fileIO, 
pathFactory().indexFileFactory()));
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
new file mode 100644
index 000000000..9ef2182e1
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.roaringbitmap.RoaringBitmap;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * A {@link DeletionVector} based on {@link RoaringBitmap}, it only supports 
files with row count
+ * not exceeding {@link Integer#MAX_VALUE}.
+ */
+public class BitmapDeletionVector implements DeletionVector {
+
+    public static final int MAGIC_NUMBER = 1581511376;
+
+    private final RoaringBitmap roaringBitmap;
+
+    BitmapDeletionVector() {
+        roaringBitmap = new RoaringBitmap();
+    }
+
+    private BitmapDeletionVector(RoaringBitmap roaringBitmap) {
+        this.roaringBitmap = roaringBitmap;
+    }
+
+    @Override
+    public void delete(long position) {
+        checkPosition(position);
+        roaringBitmap.add((int) position);
+    }
+
+    @Override
+    public boolean checkedDelete(long position) {
+        checkPosition(position);
+        return roaringBitmap.checkedAdd((int) position);
+    }
+
+    @Override
+    public boolean isDeleted(long position) {
+        checkPosition(position);
+        return roaringBitmap.contains((int) position);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return roaringBitmap.isEmpty();
+    }
+
+    @Override
+    public byte[] serializeToBytes() {
+        try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                DataOutputStream dos = new DataOutputStream(bos)) {
+            dos.writeInt(MAGIC_NUMBER);
+            roaringBitmap.runOptimize();
+            roaringBitmap.serialize(dos);
+            return bos.toByteArray();
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to serialize deletion vector", 
e);
+        }
+    }
+
+    public static DeletionVector deserializeFromDataInput(DataInput bis) 
throws IOException {
+        RoaringBitmap roaringBitmap = new RoaringBitmap();
+        roaringBitmap.deserialize(bis);
+        return new BitmapDeletionVector(roaringBitmap);
+    }
+
+    private void checkPosition(long position) {
+        if (position > Integer.MAX_VALUE) {
+            throw new IllegalArgumentException(
+                    "The file has too many rows, RoaringBitmap32 only supports 
files with row count not exceeding 2147483647.");
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
new file mode 100644
index 000000000..c30df42ab
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
+/**
+ * The DeletionVector can efficiently record the positions of rows that are 
deleted in a file, which
+ * can then be used to filter out deleted rows when processing the file.
+ */
+public interface DeletionVector {
+
+    /**
+     * Marks the row at the specified position as deleted.
+     *
+     * @param position The position of the row to be marked as deleted.
+     */
+    void delete(long position);
+
+    /**
+     * Marks the row at the specified position as deleted.
+     *
+     * @param position The position of the row to be marked as deleted.
+     * @return true if the added position wasn't already deleted. False 
otherwise.
+     */
+    default boolean checkedDelete(long position) {
+        if (isDeleted(position)) {
+            return false;
+        } else {
+            delete(position);
+            return true;
+        }
+    }
+
+    /**
+     * Checks if the row at the specified position is marked as deleted.
+     *
+     * @param position The position of the row to check.
+     * @return true if the row is marked as deleted, false otherwise.
+     */
+    boolean isDeleted(long position);
+
+    /**
+     * Determines if the deletion vector is empty, indicating no deletions.
+     *
+     * @return true if the deletion vector is empty, false if it contains 
deletions.
+     */
+    boolean isEmpty();
+
+    /**
+     * Serializes the deletion vector to a byte array for storage or 
transmission.
+     *
+     * @return A byte array representing the serialized deletion vector.
+     */
+    byte[] serializeToBytes();
+
+    /**
+     * Deserializes a deletion vector from a byte array.
+     *
+     * @param bytes The byte array containing the serialized deletion vector.
+     * @return A DeletionVector instance that represents the deserialized data.
+     */
+    static DeletionVector deserializeFromBytes(byte[] bytes) {
+        try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+                DataInputStream dis = new DataInputStream(bis)) {
+            int magicNum = dis.readInt();
+            if (magicNum == BitmapDeletionVector.MAGIC_NUMBER) {
+                return BitmapDeletionVector.deserializeFromDataInput(dis);
+            } else {
+                throw new RuntimeException("Invalid magic number: " + 
magicNum);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to deserialize deletion 
vector", e);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
new file mode 100644
index 000000000..a82cc9be8
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.index.IndexFile;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PathFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.zip.CRC32;
+
+/** DeletionVectors index file. */
+public class DeletionVectorsIndexFile extends IndexFile {
+
+    public static final String DELETION_VECTORS_INDEX = "DELETION_VECTORS";
+    public static final byte VERSION_ID_V1 = 1;
+
+    public DeletionVectorsIndexFile(FileIO fileIO, PathFactory pathFactory) {
+        super(fileIO, pathFactory);
+    }
+
+    /**
+     * Reads all deletion vectors from a specified file.
+     *
+     * @param fileName The name of the file from which to read the deletion 
vectors.
+     * @param deletionVectorRanges A map where the key represents which file 
the DeletionVector
+     *     belongs to and the value is a Pair object specifying the range 
(start position and size)
+     *     within the file where the deletion vector data is located.
+     * @return A map where the key represents which file the DeletionVector 
belongs to, and the
+     *     value is the corresponding DeletionVector object.
+     * @throws UncheckedIOException If an I/O error occurs while reading from 
the file.
+     */
+    public Map<String, DeletionVector> readAllDeletionVectors(
+            String fileName, Map<String, Pair<Integer, Integer>> 
deletionVectorRanges) {
+        Map<String, DeletionVector> deletionVectors = new HashMap<>();
+        try (SeekableInputStream inputStream =
+                fileIO.newInputStream(pathFactory.toPath(fileName))) {
+            checkVersion(inputStream);
+            DataInputStream dataInputStream = new DataInputStream(inputStream);
+            for (Map.Entry<String, Pair<Integer, Integer>> entry :
+                    deletionVectorRanges.entrySet()) {
+                deletionVectors.put(
+                        entry.getKey(),
+                        readDeletionVector(dataInputStream, 
entry.getValue().getRight()));
+            }
+        } catch (IOException e) {
+            throw new UncheckedIOException(
+                    "Unable to read deletion vectors from file: " + fileName, 
e);
+        }
+        return deletionVectors;
+    }
+
+    /**
+     * Reads a single deletion vector from the specified file.
+     *
+     * @param fileName The name of the file from which to read the deletion 
vector.
+     * @param deletionVectorRange A Pair specifying the range (start position 
and size) within the
+     *     file where the deletion vector data is located.
+     * @return The DeletionVector object read from the specified range in the 
file.
+     * @throws UncheckedIOException If an I/O error occurs while reading from 
the file.
+     */
+    public DeletionVector readDeletionVector(
+            String fileName, Pair<Integer, Integer> deletionVectorRange) {
+        try (SeekableInputStream inputStream =
+                fileIO.newInputStream(pathFactory.toPath(fileName))) {
+            checkVersion(inputStream);
+            inputStream.seek(deletionVectorRange.getLeft());
+            DataInputStream dataInputStream = new DataInputStream(inputStream);
+            return readDeletionVector(dataInputStream, 
deletionVectorRange.getRight());
+        } catch (IOException e) {
+            throw new UncheckedIOException(
+                    "Unable to read deletion vector from file: " + fileName, 
e);
+        }
+    }
+
+    /**
+     * Write deletion vectors to a new file, the format of this file can be 
referenced at: <a
+     * href="https://cwiki.apache.org/confluence/x/Tws4EQ";>PIP-16</a>.
+     *
+     * @param input A map where the key represents which file the 
DeletionVector belongs to, and the
+     *     value is the corresponding DeletionVector object.
+     * @return A Pair object specifying the name of the written new file and a 
map where the key
+     *     represents which file the DeletionVector belongs to and the value 
is a Pair object
+     *     specifying the range (start position and size) within the file 
where the deletion vector
+     *     data is located.
+     * @throws UncheckedIOException If an I/O error occurs while writing to 
the file.
+     */
+    public Pair<String, Map<String, Pair<Integer, Integer>>> write(
+            Map<String, DeletionVector> input) {
+        int size = input.size();
+        // use LinkedHashMap to ensure that the order of DeletionVectorRanges 
and the written
+        // DeletionVectors is consistent.
+        Map<String, Pair<Integer, Integer>> deletionVectorRanges = new 
LinkedHashMap<>(size);
+        Path path = pathFactory.newPath();
+        try (DataOutputStream dataOutputStream =
+                new DataOutputStream(fileIO.newOutputStream(path, true))) {
+            dataOutputStream.writeByte(VERSION_ID_V1);
+            for (Map.Entry<String, DeletionVector> entry : input.entrySet()) {
+                String key = entry.getKey();
+                byte[] valueBytes = entry.getValue().serializeToBytes();
+                deletionVectorRanges.put(key, Pair.of(dataOutputStream.size(), 
valueBytes.length));
+                dataOutputStream.writeInt(valueBytes.length);
+                dataOutputStream.write(valueBytes);
+                dataOutputStream.writeInt(calculateChecksum(valueBytes));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(
+                    "Unable to write deletion vectors to file: " + 
path.getName(), e);
+        }
+        return Pair.of(path.getName(), deletionVectorRanges);
+    }
+
+    private void checkVersion(InputStream in) throws IOException {
+        int version = in.read();
+        if (version != VERSION_ID_V1) {
+            throw new RuntimeException(
+                    "Version not match, actual size: "
+                            + version
+                            + ", expert size: "
+                            + VERSION_ID_V1);
+        }
+    }
+
+    private DeletionVector readDeletionVector(DataInputStream inputStream, int 
size) {
+        try {
+            // check size
+            int actualSize = inputStream.readInt();
+            if (actualSize != size) {
+                throw new RuntimeException(
+                        "Size not match, actual size: " + actualSize + ", 
expert size: " + size);
+            }
+
+            // read DeletionVector bytes
+            byte[] bytes = new byte[size];
+            int readSize = inputStream.read(bytes);
+            if (readSize != size) {
+                throw new RuntimeException(
+                        "Size not match, actual size: " + readSize + ", expert 
size: " + size);
+            }
+
+            // check checksum
+            int checkSum = calculateChecksum(bytes);
+            int actualCheckSum = inputStream.readInt();
+            if (actualCheckSum != checkSum) {
+                throw new RuntimeException(
+                        "Checksum not match, actual checksum: "
+                                + actualCheckSum
+                                + ", expected checksum: "
+                                + checkSum);
+            }
+            return DeletionVector.deserializeFromBytes(bytes);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Unable to read deletion vector", 
e);
+        }
+    }
+
+    private int calculateChecksum(byte[] bytes) {
+        CRC32 crc = new CRC32();
+        crc.update(bytes);
+        return (int) crc.getValue();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
new file mode 100644
index 000000000..7c88edc6c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/** Maintainer of deletionVectors index. */
+public class DeletionVectorsMaintainer {
+
+    private final IndexFileHandler indexFileHandler;
+    private final Map<String, DeletionVector> deletionVectors;
+    private boolean modified;
+
+    private DeletionVectorsMaintainer(
+            IndexFileHandler fileHandler,
+            @Nullable Long snapshotId,
+            BinaryRow partition,
+            int bucket) {
+        this.indexFileHandler = fileHandler;
+        IndexFileMeta indexFile =
+                snapshotId == null
+                        ? null
+                        : fileHandler
+                                .scan(
+                                        snapshotId,
+                                        
DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
+                                        partition,
+                                        bucket)
+                                .orElse(null);
+        this.deletionVectors =
+                indexFile == null
+                        ? new HashMap<>()
+                        : indexFileHandler.readAllDeletionVectors(indexFile);
+        this.modified = false;
+    }
+
+    /**
+     * Notifies a new deletion which marks the specified row position as 
deleted with the given file
+     * name.
+     *
+     * @param fileName The name of the file where the deletion occurred.
+     * @param position The row position within the file that has been deleted.
+     */
+    public void notifyNewDeletion(String fileName, long position) {
+        DeletionVector deletionVector =
+                deletionVectors.computeIfAbsent(fileName, k -> new 
BitmapDeletionVector());
+        if (deletionVector.checkedDelete(position)) {
+            modified = true;
+        }
+    }
+
+    /**
+     * Removes the specified file's deletion vector, this method is typically 
used for remove before
+     * files' deletion vector in compaction.
+     *
+     * @param fileName The name of the file whose deletion vector should be 
removed.
+     */
+    public void removeDeletionVectorOf(String fileName) {
+        if (deletionVectors.containsKey(fileName)) {
+            deletionVectors.remove(fileName);
+            modified = true;
+        }
+    }
+
+    /**
+     * Prepares to commit: write new deletion vectors index file if any 
modifications have been
+     * made.
+     *
+     * @return A list containing the metadata of the deletion vectors index 
file, or an empty list
+     *     if no changes need to be committed.
+     */
+    public List<IndexFileMeta> prepareCommit() {
+        if (modified) {
+            IndexFileMeta entry = 
indexFileHandler.writeDeletionVectorsIndex(deletionVectors);
+            modified = false;
+            return Collections.singletonList(entry);
+        }
+        return Collections.emptyList();
+    }
+
+    /**
+     * Retrieves the deletion vector associated with the specified file name.
+     *
+     * @param fileName The name of the file for which the deletion vector is 
requested.
+     * @return An {@code Optional} containing the deletion vector if it 
exists, or an empty {@code
+     *     Optional} if not.
+     */
+    public Optional<DeletionVector> deletionVectorOf(String fileName) {
+        return Optional.ofNullable(deletionVectors.get(fileName));
+    }
+
+    /** Factory to restore {@link DeletionVectorsMaintainer}. */
+    public static class DeletionVectorsMaintainerFactory {
+
+        private final IndexFileHandler handler;
+
+        public DeletionVectorsMaintainerFactory(IndexFileHandler handler) {
+            this.handler = handler;
+        }
+
+        public DeletionVectorsMaintainer createOrRestore(
+                @Nullable Long snapshotId, BinaryRow partition, int bucket) {
+            return new DeletionVectorsMaintainer(handler, snapshotId, 
partition, bucket);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java 
b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
index f8ecff62d..4919f08f3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
@@ -24,30 +24,17 @@ import org.apache.paimon.utils.IntIterator;
 import org.apache.paimon.utils.PathFactory;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
 
 import static org.apache.paimon.utils.IntFileUtils.readInts;
 import static org.apache.paimon.utils.IntFileUtils.writeInts;
 
 /** Hash index file contains ints. */
-public class HashIndexFile {
+public class HashIndexFile extends IndexFile {
 
     public static final String HASH_INDEX = "HASH";
 
-    private final FileIO fileIO;
-    private final PathFactory pathFactory;
-
     public HashIndexFile(FileIO fileIO, PathFactory pathFactory) {
-        this.fileIO = fileIO;
-        this.pathFactory = pathFactory;
-    }
-
-    public long fileSize(String fileName) {
-        try {
-            return fileIO.getFileSize(pathFactory.toPath(fileName));
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
+        super(fileIO, pathFactory);
     }
 
     public IntIterator read(String fileName) throws IOException {
@@ -59,16 +46,4 @@ public class HashIndexFile {
         writeInts(fileIO, path, input);
         return path.getName();
     }
-
-    public void delete(String fileName) {
-        fileIO.deleteQuietly(pathFactory.toPath(fileName));
-    }
-
-    public boolean exists(String fileName) {
-        try {
-            return fileIO.exists(pathFactory.toPath(fileName));
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
similarity index 67%
copy from paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
copy to paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
index f8ecff62d..2106113eb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/HashIndexFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFile.java
@@ -19,25 +19,19 @@
 package org.apache.paimon.index;
 
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.utils.IntIterator;
 import org.apache.paimon.utils.PathFactory;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
 
-import static org.apache.paimon.utils.IntFileUtils.readInts;
-import static org.apache.paimon.utils.IntFileUtils.writeInts;
+/** Base index file. */
+public abstract class IndexFile {
 
-/** Hash index file contains ints. */
-public class HashIndexFile {
+    protected final FileIO fileIO;
 
-    public static final String HASH_INDEX = "HASH";
+    protected final PathFactory pathFactory;
 
-    private final FileIO fileIO;
-    private final PathFactory pathFactory;
-
-    public HashIndexFile(FileIO fileIO, PathFactory pathFactory) {
+    public IndexFile(FileIO fileIO, PathFactory pathFactory) {
         this.fileIO = fileIO;
         this.pathFactory = pathFactory;
     }
@@ -50,16 +44,6 @@ public class HashIndexFile {
         }
     }
 
-    public IntIterator read(String fileName) throws IOException {
-        return readInts(fileIO, pathFactory.toPath(fileName));
-    }
-
-    public String write(IntIterator input) throws IOException {
-        Path path = pathFactory.newPath();
-        writeInts(fileIO, path, input);
-        return path.getName();
-    }
-
     public void delete(String fileName) {
         fileIO.deleteQuietly(pathFactory.toPath(fileName));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 73b0d3416..d460097f0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -20,9 +20,12 @@ package org.apache.paimon.index;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.IndexManifestFile;
 import org.apache.paimon.utils.IntIterator;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
 
 import java.io.IOException;
@@ -30,8 +33,10 @@ import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 
 /** Handle index files. */
@@ -40,14 +45,17 @@ public class IndexFileHandler {
     private final SnapshotManager snapshotManager;
     private final IndexManifestFile indexManifestFile;
     private final HashIndexFile hashIndex;
+    private final DeletionVectorsIndexFile deletionVectorsIndex;
 
     public IndexFileHandler(
             SnapshotManager snapshotManager,
             IndexManifestFile indexManifestFile,
-            HashIndexFile hashIndex) {
+            HashIndexFile hashIndex,
+            DeletionVectorsIndexFile deletionVectorsIndex) {
         this.snapshotManager = snapshotManager;
         this.indexManifestFile = indexManifestFile;
         this.hashIndex = hashIndex;
+        this.deletionVectorsIndex = deletionVectorsIndex;
     }
 
     public Optional<IndexFileMeta> scan(
@@ -148,4 +156,41 @@ public class IndexFileHandler {
     public void deleteManifest(String indexManifest) {
         indexManifestFile.delete(indexManifest);
     }
+
+    public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta 
fileMeta) {
+        if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) {
+            throw new IllegalArgumentException(
+                    "Input file is not deletion vectors index " + 
fileMeta.indexType());
+        }
+        Map<String, Pair<Integer, Integer>> deleteIndexRange = 
fileMeta.deletionVectorsRanges();
+        if (deleteIndexRange == null || deleteIndexRange.isEmpty()) {
+            return Collections.emptyMap();
+        }
+        return 
deletionVectorsIndex.readAllDeletionVectors(fileMeta.fileName(), 
deleteIndexRange);
+    }
+
+    public Optional<DeletionVector> readDeletionVector(IndexFileMeta fileMeta, 
String fileName) {
+        if (!fileMeta.indexType().equals(DELETION_VECTORS_INDEX)) {
+            throw new IllegalArgumentException(
+                    "Input file is not deletion vectors index " + 
fileMeta.indexType());
+        }
+        Map<String, Pair<Integer, Integer>> deleteIndexRange = 
fileMeta.deletionVectorsRanges();
+        if (deleteIndexRange == null || 
!deleteIndexRange.containsKey(fileName)) {
+            return Optional.empty();
+        }
+        return Optional.of(
+                deletionVectorsIndex.readDeletionVector(
+                        fileMeta.fileName(), deleteIndexRange.get(fileName)));
+    }
+
+    public IndexFileMeta writeDeletionVectorsIndex(Map<String, DeletionVector> 
deletionVectors) {
+        Pair<String, Map<String, Pair<Integer, Integer>>> pair =
+                deletionVectorsIndex.write(deletionVectors);
+        return new IndexFileMeta(
+                DELETION_VECTORS_INDEX,
+                pair.getLeft(),
+                deletionVectorsIndex.fileSize(pair.getLeft()),
+                deletionVectors.size(),
+                pair.getRight());
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
index 6fb3e176b..1b3f405fe 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
@@ -18,12 +18,19 @@
 
 package org.apache.paimon.index;
 
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
+import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import static org.apache.paimon.utils.SerializationUtils.newStringType;
@@ -36,11 +43,24 @@ public class IndexFileMeta {
     private final long fileSize;
     private final long rowCount;
 
+    /** Metadata only used by {@link DeletionVectorsIndexFile}. */
+    private final @Nullable Map<String, Pair<Integer, Integer>> 
deletionVectorsRanges;
+
     public IndexFileMeta(String indexType, String fileName, long fileSize, 
long rowCount) {
+        this(indexType, fileName, fileSize, rowCount, null);
+    }
+
+    public IndexFileMeta(
+            String indexType,
+            String fileName,
+            long fileSize,
+            long rowCount,
+            @Nullable Map<String, Pair<Integer, Integer>> 
deletionVectorsRanges) {
         this.indexType = indexType;
         this.fileName = fileName;
         this.fileSize = fileSize;
         this.rowCount = rowCount;
+        this.deletionVectorsRanges = deletionVectorsRanges;
     }
 
     public String indexType() {
@@ -59,6 +79,10 @@ public class IndexFileMeta {
         return rowCount;
     }
 
+    public @Nullable Map<String, Pair<Integer, Integer>> 
deletionVectorsRanges() {
+        return deletionVectorsRanges;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -71,12 +95,13 @@ public class IndexFileMeta {
         return Objects.equals(indexType, that.indexType)
                 && Objects.equals(fileName, that.fileName)
                 && fileSize == that.fileSize
-                && rowCount == that.rowCount;
+                && rowCount == that.rowCount
+                && Objects.equals(deletionVectorsRanges, 
that.deletionVectorsRanges);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(indexType, fileName, fileSize, rowCount);
+        return Objects.hash(indexType, fileName, fileSize, rowCount, 
deletionVectorsRanges);
     }
 
     @Override
@@ -91,6 +116,8 @@ public class IndexFileMeta {
                 + fileSize
                 + ", rowCount="
                 + rowCount
+                + ", deletionVectorsRanges="
+                + deletionVectorsRanges
                 + '}';
     }
 
@@ -100,6 +127,16 @@ public class IndexFileMeta {
         fields.add(new DataField(1, "_FILE_NAME", newStringType(false)));
         fields.add(new DataField(2, "_FILE_SIZE", new BigIntType(false)));
         fields.add(new DataField(3, "_ROW_COUNT", new BigIntType(false)));
+        fields.add(
+                new DataField(
+                        4,
+                        "_DELETION_VECTORS_RANGES",
+                        new ArrayType(
+                                true,
+                                RowType.of(
+                                        newStringType(false),
+                                        new IntType(false),
+                                        new IntType(false)))));
         return new RowType(fields);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
index 2813dd48b..005868540 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
@@ -19,11 +19,17 @@
 package org.apache.paimon.index;
 
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.utils.ObjectSerializer;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
+
 /** A {@link VersionedObjectSerializer} for {@link IndexFileMeta}. */
 public class IndexFileMetaSerializer extends ObjectSerializer<IndexFileMeta> {
 
@@ -37,7 +43,10 @@ public class IndexFileMetaSerializer extends 
ObjectSerializer<IndexFileMeta> {
                 BinaryString.fromString(record.indexType()),
                 BinaryString.fromString(record.fileName()),
                 record.fileSize(),
-                record.rowCount());
+                record.rowCount(),
+                record.deletionVectorsRanges() == null
+                        ? null
+                        : 
dvRangesToRowArrayData(record.deletionVectorsRanges()));
     }
 
     @Override
@@ -46,6 +55,30 @@ public class IndexFileMetaSerializer extends 
ObjectSerializer<IndexFileMeta> {
                 row.getString(0).toString(),
                 row.getString(1).toString(),
                 row.getLong(2),
-                row.getLong(3));
+                row.getLong(3),
+                row.isNullAt(4) ? null : 
rowArrayDataToDvRanges(row.getArray(4)));
+    }
+
+    public static InternalArray dvRangesToRowArrayData(
+            Map<String, Pair<Integer, Integer>> dvRanges) {
+        return new GenericArray(
+                dvRanges.entrySet().stream()
+                        .map(
+                                entry ->
+                                        GenericRow.of(
+                                                
BinaryString.fromString(entry.getKey()),
+                                                entry.getValue().getLeft(),
+                                                entry.getValue().getRight()))
+                        .toArray(GenericRow[]::new));
+    }
+
+    public static Map<String, Pair<Integer, Integer>> rowArrayDataToDvRanges(
+            InternalArray arrayData) {
+        Map<String, Pair<Integer, Integer>> dvRanges = new 
LinkedHashMap<>(arrayData.size());
+        for (int i = 0; i < arrayData.size(); i++) {
+            InternalRow row = arrayData.getRow(i, 3);
+            dvRanges.put(row.getString(0).toString(), Pair.of(row.getInt(1), 
row.getInt(2)));
+        }
+        return dvRanges;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
index e2b136855..53f3fb82d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
@@ -20,6 +20,7 @@ package org.apache.paimon.manifest;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.IntType;
@@ -80,6 +81,16 @@ public class IndexManifestEntry {
         fields.add(new DataField(4, "_FILE_NAME", newStringType(false)));
         fields.add(new DataField(5, "_FILE_SIZE", new BigIntType(false)));
         fields.add(new DataField(6, "_ROW_COUNT", new BigIntType(false)));
+        fields.add(
+                new DataField(
+                        7,
+                        "_DELETIONS_VECTORS_RANGES",
+                        new ArrayType(
+                                true,
+                                RowType.of(
+                                        newStringType(false),
+                                        new IntType(false),
+                                        new IntType(false)))));
         return new RowType(fields);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
index 40c4cb5a8..bdd711442 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
@@ -24,6 +24,8 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
+import static 
org.apache.paimon.index.IndexFileMetaSerializer.dvRangesToRowArrayData;
+import static 
org.apache.paimon.index.IndexFileMetaSerializer.rowArrayDataToDvRanges;
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 
@@ -49,7 +51,10 @@ public class IndexManifestEntrySerializer extends 
VersionedObjectSerializer<Inde
                 BinaryString.fromString(indexFile.indexType()),
                 BinaryString.fromString(indexFile.fileName()),
                 indexFile.fileSize(),
-                indexFile.rowCount());
+                indexFile.rowCount(),
+                record.indexFile().deletionVectorsRanges() == null
+                        ? null
+                        : 
dvRangesToRowArrayData(record.indexFile().deletionVectorsRanges()));
     }
 
     @Override
@@ -66,6 +71,7 @@ public class IndexManifestEntrySerializer extends 
VersionedObjectSerializer<Inde
                         row.getString(3).toString(),
                         row.getString(4).toString(),
                         row.getLong(5),
-                        row.getLong(6)));
+                        row.getLong(6),
+                        row.isNullAt(7) ? null : 
rowArrayDataToDvRanges(row.getArray(7))));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
new file mode 100644
index 000000000..d2b42b798
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DeletionVector}. */
+public class DeletionVectorTest {
+    @Test
+    public void testBitmapDeletionVector() {
+        HashSet<Integer> toDelete = new HashSet<>();
+        Random random = new Random();
+        for (int i = 0; i < 10000; i++) {
+            toDelete.add(random.nextInt());
+        }
+        HashSet<Integer> notDelete = new HashSet<>();
+        for (int i = 0; i < 10000; i++) {
+            if (!toDelete.contains(i)) {
+                notDelete.add(i);
+            }
+        }
+
+        DeletionVector deletionVector = new BitmapDeletionVector();
+        assertThat(deletionVector.isEmpty()).isTrue();
+
+        for (Integer i : toDelete) {
+            assertThat(deletionVector.checkedDelete(i)).isTrue();
+            assertThat(deletionVector.checkedDelete(i)).isFalse();
+        }
+        DeletionVector deserializedDeletionVector =
+                
DeletionVector.deserializeFromBytes(deletionVector.serializeToBytes());
+
+        assertThat(deletionVector.isEmpty()).isFalse();
+        assertThat(deserializedDeletionVector.isEmpty()).isFalse();
+        for (Integer i : toDelete) {
+            assertThat(deletionVector.isDeleted(i)).isTrue();
+            assertThat(deserializedDeletionVector.isDeleted(i)).isTrue();
+        }
+        for (Integer i : notDelete) {
+            assertThat(deletionVector.isDeleted(i)).isFalse();
+            assertThat(deserializedDeletionVector.isDeleted(i)).isFalse();
+        }
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
new file mode 100644
index 000000000..228508f33
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.PathFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DeletionVectorsIndexFile}. */
+public class DeletionVectorsIndexFileTest {
+    @TempDir java.nio.file.Path tempPath;
+
+    @Test
+    public void test0() {
+        Path dir = new Path(tempPath.toUri());
+        PathFactory pathFactory =
+                new PathFactory() {
+                    @Override
+                    public Path newPath() {
+                        return new Path(dir, UUID.randomUUID().toString());
+                    }
+
+                    @Override
+                    public Path toPath(String fileName) {
+                        return new Path(dir, fileName);
+                    }
+                };
+
+        DeletionVectorsIndexFile deletionVectorsIndexFile =
+                new DeletionVectorsIndexFile(LocalFileIO.create(), 
pathFactory);
+
+        // write
+        HashMap<String, DeletionVector> deleteMap = new HashMap<>();
+        BitmapDeletionVector index1 = new BitmapDeletionVector();
+        index1.delete(1);
+        deleteMap.put("file1.parquet", index1);
+
+        BitmapDeletionVector index2 = new BitmapDeletionVector();
+        index2.delete(2);
+        index2.delete(3);
+        deleteMap.put("file2.parquet", index2);
+
+        BitmapDeletionVector index3 = new BitmapDeletionVector();
+        index3.delete(3);
+        deleteMap.put("file33.parquet", index3);
+
+        Pair<String, Map<String, Pair<Integer, Integer>>> pair =
+                deletionVectorsIndexFile.write(deleteMap);
+        String fileName = pair.getLeft();
+        Map<String, Pair<Integer, Integer>> deletionVectorRanges = 
pair.getRight();
+
+        // read
+        Map<String, DeletionVector> actualDeleteMap =
+                deletionVectorsIndexFile.readAllDeletionVectors(fileName, 
deletionVectorRanges);
+        assertThat(actualDeleteMap.get("file1.parquet").isDeleted(1)).isTrue();
+        
assertThat(actualDeleteMap.get("file1.parquet").isDeleted(2)).isFalse();
+        assertThat(actualDeleteMap.get("file2.parquet").isDeleted(2)).isTrue();
+        assertThat(actualDeleteMap.get("file2.parquet").isDeleted(3)).isTrue();
+        
assertThat(actualDeleteMap.get("file33.parquet").isDeleted(3)).isTrue();
+
+        DeletionVector file1DeletionVector =
+                deletionVectorsIndexFile.readDeletionVector(
+                        fileName, deletionVectorRanges.get("file1.parquet"));
+        assertThat(file1DeletionVector.isDeleted(1)).isTrue();
+        assertThat(file1DeletionVector.isDeleted(2)).isFalse();
+
+        // delete
+        deletionVectorsIndexFile.delete(fileName);
+        assertThat(deletionVectorsIndexFile.exists(fileName)).isFalse();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
new file mode 100644
index 000000000..6749c6a93
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link DeletionVectorsMaintainer}. */
+public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase {
+    private IndexFileHandler fileHandler;
+
+    @BeforeEach
+    public void beforeEach() throws Exception {
+        fileHandler = table.store().newIndexFileHandler();
+    }
+
+    @Test
+    public void test0() {
+        DeletionVectorsMaintainer.DeletionVectorsMaintainerFactory factory =
+                new 
DeletionVectorsMaintainer.DeletionVectorsMaintainerFactory(fileHandler);
+        DeletionVectorsMaintainer dvMaintainer =
+                factory.createOrRestore(null, BinaryRow.EMPTY_ROW, 0);
+
+        dvMaintainer.notifyNewDeletion("f1", 1);
+        dvMaintainer.notifyNewDeletion("f2", 2);
+        dvMaintainer.notifyNewDeletion("f3", 3);
+        dvMaintainer.removeDeletionVectorOf("f3");
+
+        assertThat(dvMaintainer.deletionVectorOf("f1")).isPresent();
+        assertThat(dvMaintainer.deletionVectorOf("f3")).isEmpty();
+        List<IndexFileMeta> fileMetas = dvMaintainer.prepareCommit();
+
+        Map<String, DeletionVector> deletionVectors =
+                fileHandler.readAllDeletionVectors(fileMetas.get(0));
+        assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue();
+        assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse();
+        assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse();
+        assertThat(deletionVectors.get("f2").isDeleted(2)).isTrue();
+        assertThat(deletionVectors.containsKey("f3")).isFalse();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
index 8f1ef392a..0e8cfc430 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
@@ -18,9 +18,13 @@
 
 package org.apache.paimon.index;
 
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.utils.ObjectSerializer;
 import org.apache.paimon.utils.ObjectSerializerTestBase;
+import org.apache.paimon.utils.Pair;
 
+import java.util.LinkedHashMap;
+import java.util.Map;
 import java.util.Random;
 
 /** Test for {@link org.apache.paimon.index.IndexFileMetaSerializer}. */
@@ -38,10 +42,22 @@ public class IndexFileMetaSerializerTest extends 
ObjectSerializerTestBase<IndexF
 
     public static IndexFileMeta randomIndexFile() {
         Random rnd = new Random();
-        return new IndexFileMeta(
-                HashIndexFile.HASH_INDEX,
-                "my_file_name" + rnd.nextLong(),
-                rnd.nextInt(),
-                rnd.nextInt());
+        if (rnd.nextBoolean()) {
+            return new IndexFileMeta(
+                    HashIndexFile.HASH_INDEX,
+                    "my_file_name" + rnd.nextLong(),
+                    rnd.nextInt(),
+                    rnd.nextInt());
+        } else {
+            Map<String, Pair<Integer, Integer>> deletionVectorsRanges = new 
LinkedHashMap<>();
+            deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(), 
rnd.nextInt()));
+            deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(), 
rnd.nextInt()));
+            return new IndexFileMeta(
+                    DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
+                    "deletion_vectors_index_file_name" + rnd.nextLong(),
+                    rnd.nextInt(),
+                    rnd.nextInt(),
+                    deletionVectorsRanges);
+        }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
index 1ee3d6b5d..0429b8dae 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/IndexManifestEntrySerializerTest.java
@@ -18,13 +18,12 @@
 
 package org.apache.paimon.manifest;
 
-import org.apache.paimon.index.HashIndexFile;
-import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.utils.ObjectSerializer;
 import org.apache.paimon.utils.ObjectSerializerTestBase;
 
 import java.util.Random;
 
+import static 
org.apache.paimon.index.IndexFileMetaSerializerTest.randomIndexFile;
 import static org.apache.paimon.io.DataFileTestUtils.row;
 
 /** Test for {@link IndexManifestEntrySerializer}. */
@@ -46,10 +45,6 @@ public class IndexManifestEntrySerializerTest extends 
ObjectSerializerTestBase<I
                 rnd.nextBoolean() ? FileKind.ADD : FileKind.DELETE,
                 row(rnd.nextInt()),
                 rnd.nextInt(),
-                new IndexFileMeta(
-                        HashIndexFile.HASH_INDEX,
-                        "my_file_name" + rnd.nextLong(),
-                        rnd.nextInt(),
-                        rnd.nextInt()));
+                randomIndexFile());
     }
 }

Reply via email to