This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 47a8871aeb [core] Store `cardinality` in the deletion vector meta 
(#4699)
47a8871aeb is described below

commit 47a8871aeb5cba3267192d4971ac82e340a30956
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Dec 13 22:15:46 2024 +0800

    [core] Store `cardinality` in the deletion vector meta (#4699)
---
 docs/content/concepts/spec/manifest.md             |   8 +-
 .../deletionvectors/BitmapDeletionVector.java      |   5 +
 .../DeletionVectorIndexFileWriter.java             |  15 +--
 .../deletionvectors/DeletionVectorsIndexFile.java  |  19 ++--
 .../apache/paimon/index/DeletionVectorMeta.java    | 103 ++++++++++++++++++++
 .../org/apache/paimon/index/IndexFileHandler.java  |  17 ++--
 .../org/apache/paimon/index/IndexFileMeta.java     |  29 +++---
 .../paimon/index/IndexFileMeta09Serializer.java    | 104 +++++++++++++++++++++
 .../paimon/index/IndexFileMetaSerializer.java      |  39 ++++----
 .../apache/paimon/manifest/IndexManifestEntry.java |   8 +-
 .../manifest/IndexManifestEntrySerializer.java     |  12 ++-
 .../paimon/table/sink/CommitMessageSerializer.java |  78 ++++++++--------
 .../org/apache/paimon/table/source/DataSplit.java  |  29 ++++--
 .../apache/paimon/table/source/DeletionFile.java   |  57 +++++++++--
 .../table/source/snapshot/SnapshotReaderImpl.java  |  18 ++--
 .../append/AppendDeletionFileMaintainerTest.java   |  15 +--
 .../paimon/index/IndexFileMetaSerializerTest.java  |  15 ++-
 ...festCommittableSerializerCompatibilityTest.java |  96 ++++++++++++++++---
 .../table/sink/CommitMessageSerializerTest.java    |   2 +-
 .../org/apache/paimon/table/source/SplitTest.java  |  68 ++++++++++++++
 .../src/test/resources/compatibility/datasplit-v3  | Bin 0 -> 886 bytes
 .../compatibility/manifest-committable-v4          | Bin 0 -> 3145 bytes
 .../paimon/spark/sql/DeletionVectorTest.scala      |  21 +++++
 23 files changed, 595 insertions(+), 163 deletions(-)

diff --git a/docs/content/concepts/spec/manifest.md 
b/docs/content/concepts/spec/manifest.md
index bc7318331a..8460febf78 100644
--- a/docs/content/concepts/spec/manifest.md
+++ b/docs/content/concepts/spec/manifest.md
@@ -111,5 +111,9 @@ The index file meta is:
 2. fileName: file name.
 3. fileSize: file size.
 4. rowCount: total number of rows.
-5. deletionVectorsRanges: Metadata only used by "DELETION_VECTORS", Stores 
offset and length of each data file,
-   The schema is `ARRAY<ROW<f0: STRING, f1: INT, f2: INT>>`.
+5. deletionVectorsRanges: Metadata only used by "DELETION_VECTORS", is an 
array of deletion vector meta, the schema of each deletion vector meta is:
+   1. f0: the data file name corresponding to this deletion vector.
+   2. f1: the starting offset of this deletion vector in the index file.
+   3. f2: the length of this deletion vector in the index file.
+   4. cardinality: the number of deleted rows.
+
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
index a2c5925966..51ae729c21 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
@@ -117,4 +117,9 @@ public class BitmapDeletionVector implements DeletionVector 
{
         BitmapDeletionVector that = (BitmapDeletionVector) o;
         return Objects.equals(this.roaringBitmap, that.roaringBitmap);
     }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(roaringBitmap);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
index f8c8330f19..5246d35d4b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -20,9 +20,9 @@ package org.apache.paimon.deletionvectors;
 
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 import org.apache.paimon.utils.Preconditions;
 
@@ -104,13 +104,13 @@ public class DeletionVectorIndexFileWriter {
 
         private final Path path;
         private final DataOutputStream dataOutputStream;
-        private final LinkedHashMap<String, Pair<Integer, Integer>> dvRanges;
+        private final LinkedHashMap<String, DeletionVectorMeta> dvMetas;
 
         private SingleIndexFileWriter() throws IOException {
             this.path = indexPathFactory.newPath();
             this.dataOutputStream = new 
DataOutputStream(fileIO.newOutputStream(path, true));
             dataOutputStream.writeByte(VERSION_ID_V1);
-            this.dvRanges = new LinkedHashMap<>();
+            this.dvMetas = new LinkedHashMap<>();
         }
 
         private long writtenSizeInBytes() {
@@ -121,7 +121,10 @@ public class DeletionVectorIndexFileWriter {
             Preconditions.checkNotNull(dataOutputStream);
             byte[] data = deletionVector.serializeToBytes();
             int size = data.length;
-            dvRanges.put(key, Pair.of(dataOutputStream.size(), size));
+            dvMetas.put(
+                    key,
+                    new DeletionVectorMeta(
+                            key, dataOutputStream.size(), size, 
deletionVector.getCardinality()));
             dataOutputStream.writeInt(size);
             dataOutputStream.write(data);
             dataOutputStream.writeInt(calculateChecksum(data));
@@ -132,8 +135,8 @@ public class DeletionVectorIndexFileWriter {
                     DELETION_VECTORS_INDEX,
                     path.getName(),
                     writtenSizeInBytes(),
-                    dvRanges.size(),
-                    dvRanges);
+                    dvMetas.size(),
+                    dvMetas);
         }
 
         @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index 798404e001..77abb2d729 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -21,11 +21,11 @@ package org.apache.paimon.deletionvectors;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFile;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.table.source.DeletionFile;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 
 import java.io.DataInputStream;
@@ -63,9 +63,9 @@ public class DeletionVectorsIndexFile extends IndexFile {
      * @throws UncheckedIOException If an I/O error occurs while reading from 
the file.
      */
     public Map<String, DeletionVector> readAllDeletionVectors(IndexFileMeta 
fileMeta) {
-        LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorRanges =
-                fileMeta.deletionVectorsRanges();
-        checkNotNull(deletionVectorRanges);
+        LinkedHashMap<String, DeletionVectorMeta> deletionVectorMetas =
+                fileMeta.deletionVectorMetas();
+        checkNotNull(deletionVectorMetas);
 
         String indexFileName = fileMeta.fileName();
         Map<String, DeletionVector> deletionVectors = new HashMap<>();
@@ -73,18 +73,17 @@ public class DeletionVectorsIndexFile extends IndexFile {
         try (SeekableInputStream inputStream = 
fileIO.newInputStream(filePath)) {
             checkVersion(inputStream);
             DataInputStream dataInputStream = new DataInputStream(inputStream);
-            for (Map.Entry<String, Pair<Integer, Integer>> entry :
-                    deletionVectorRanges.entrySet()) {
+            for (DeletionVectorMeta deletionVectorMeta : 
deletionVectorMetas.values()) {
                 deletionVectors.put(
-                        entry.getKey(),
-                        readDeletionVector(dataInputStream, 
entry.getValue().getRight()));
+                        deletionVectorMeta.dataFileName(),
+                        readDeletionVector(dataInputStream, 
deletionVectorMeta.length()));
             }
         } catch (Exception e) {
             throw new RuntimeException(
                     "Unable to read deletion vectors from file: "
                             + filePath
-                            + ", deletionVectorRanges: "
-                            + deletionVectorRanges,
+                            + ", deletionVectorMetas: "
+                            + deletionVectorMetas,
                     e);
         }
         return deletionVectors;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java
new file mode 100644
index 0000000000..9eb38818f6
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/index/DeletionVectorMeta.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+
+/** Metadata of deletion vector. */
+public class DeletionVectorMeta {
+
+    public static final RowType SCHEMA =
+            RowType.of(
+                    new DataField(0, "f0", newStringType(false)),
+                    new DataField(1, "f1", new IntType(false)),
+                    new DataField(2, "f2", new IntType(false)),
+                    new DataField(3, "_CARDINALITY", new BigIntType(true)));
+
+    private final String dataFileName;
+    private final int offset;
+    private final int length;
+    @Nullable private final Long cardinality;
+
+    public DeletionVectorMeta(
+            String dataFileName, int start, int size, @Nullable Long 
cardinality) {
+        this.dataFileName = dataFileName;
+        this.offset = start;
+        this.length = size;
+        this.cardinality = cardinality;
+    }
+
+    public String dataFileName() {
+        return dataFileName;
+    }
+
+    public int offset() {
+        return offset;
+    }
+
+    public int length() {
+        return length;
+    }
+
+    @Nullable
+    public Long cardinality() {
+        return cardinality;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DeletionVectorMeta that = (DeletionVectorMeta) o;
+        return offset == that.offset
+                && length == that.length
+                && Objects.equals(dataFileName, that.dataFileName)
+                && Objects.equals(cardinality, that.cardinality);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(dataFileName, offset, length, cardinality);
+    }
+
+    @Override
+    public String toString() {
+        return "DeletionVectorMeta{"
+                + "dataFileName='"
+                + dataFileName
+                + '\''
+                + ", offset="
+                + offset
+                + ", length="
+                + length
+                + ", cardinality="
+                + cardinality
+                + '}';
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 7e5efccdd8..8b0e5c5021 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -100,15 +100,16 @@ public class IndexFileHandler {
             if (meta.indexType().equals(DELETION_VECTORS_INDEX)
                     && file.partition().equals(partition)
                     && file.bucket() == bucket) {
-                LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
-                        meta.deletionVectorsRanges();
-                checkNotNull(dvRanges);
-                for (String dataFile : dvRanges.keySet()) {
-                    Pair<Integer, Integer> pair = dvRanges.get(dataFile);
-                    DeletionFile deletionFile =
+                LinkedHashMap<String, DeletionVectorMeta> dvMetas = 
meta.deletionVectorMetas();
+                checkNotNull(dvMetas);
+                for (DeletionVectorMeta dvMeta : dvMetas.values()) {
+                    result.put(
+                            dvMeta.dataFileName(),
                             new DeletionFile(
-                                    filePath(meta).toString(), pair.getLeft(), 
pair.getRight());
-                    result.put(dataFile, deletionFile);
+                                    filePath(meta).toString(),
+                                    dvMeta.offset(),
+                                    dvMeta.length(),
+                                    dvMeta.cardinality()));
                 }
             }
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
index 24ba6992a5..aae4f8c473 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta.java
@@ -23,9 +23,7 @@ import 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.Pair;
 
 import javax.annotation.Nullable;
 
@@ -54,12 +52,7 @@ public class IndexFileMeta {
                             new DataField(
                                     4,
                                     "_DELETIONS_VECTORS_RANGES",
-                                    new ArrayType(
-                                            true,
-                                            RowType.of(
-                                                    newStringType(false),
-                                                    new IntType(false),
-                                                    new IntType(false))))));
+                                    new ArrayType(true, 
DeletionVectorMeta.SCHEMA))));
 
     private final String indexType;
     private final String fileName;
@@ -68,9 +61,9 @@ public class IndexFileMeta {
 
     /**
      * Metadata only used by {@link DeletionVectorsIndexFile}, use 
LinkedHashMap to ensure that the
-     * order of DeletionVectorRanges and the written DeletionVectors is 
consistent.
+     * order of DeletionVectorMetas and the written DeletionVectors is 
consistent.
      */
-    private final @Nullable LinkedHashMap<String, Pair<Integer, Integer>> 
deletionVectorsRanges;
+    private final @Nullable LinkedHashMap<String, DeletionVectorMeta> 
deletionVectorMetas;
 
     public IndexFileMeta(String indexType, String fileName, long fileSize, 
long rowCount) {
         this(indexType, fileName, fileSize, rowCount, null);
@@ -81,12 +74,12 @@ public class IndexFileMeta {
             String fileName,
             long fileSize,
             long rowCount,
-            @Nullable LinkedHashMap<String, Pair<Integer, Integer>> 
deletionVectorsRanges) {
+            @Nullable LinkedHashMap<String, DeletionVectorMeta> 
deletionVectorMetas) {
         this.indexType = indexType;
         this.fileName = fileName;
         this.fileSize = fileSize;
         this.rowCount = rowCount;
-        this.deletionVectorsRanges = deletionVectorsRanges;
+        this.deletionVectorMetas = deletionVectorMetas;
     }
 
     public String indexType() {
@@ -105,8 +98,8 @@ public class IndexFileMeta {
         return rowCount;
     }
 
-    public @Nullable LinkedHashMap<String, Pair<Integer, Integer>> 
deletionVectorsRanges() {
-        return deletionVectorsRanges;
+    public @Nullable LinkedHashMap<String, DeletionVectorMeta> 
deletionVectorMetas() {
+        return deletionVectorMetas;
     }
 
     @Override
@@ -122,12 +115,12 @@ public class IndexFileMeta {
                 && Objects.equals(fileName, that.fileName)
                 && fileSize == that.fileSize
                 && rowCount == that.rowCount
-                && Objects.equals(deletionVectorsRanges, 
that.deletionVectorsRanges);
+                && Objects.equals(deletionVectorMetas, 
that.deletionVectorMetas);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(indexType, fileName, fileSize, rowCount, 
deletionVectorsRanges);
+        return Objects.hash(indexType, fileName, fileSize, rowCount, 
deletionVectorMetas);
     }
 
     @Override
@@ -142,8 +135,8 @@ public class IndexFileMeta {
                 + fileSize
                 + ", rowCount="
                 + rowCount
-                + ", deletionVectorsRanges="
-                + deletionVectorsRanges
+                + ", deletionVectorMetas="
+                + deletionVectorMetas
                 + '}';
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta09Serializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta09Serializer.java
new file mode 100644
index 0000000000..915d904569
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMeta09Serializer.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.index;
+
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.io.DataInputView;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.apache.paimon.utils.SerializationUtils.newStringType;
+
+/** Serializer for {@link IndexFileMeta} with 0.9 version. */
+public class IndexFileMeta09Serializer implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final RowType SCHEMA =
+            new RowType(
+                    false,
+                    Arrays.asList(
+                            new DataField(0, "_INDEX_TYPE", 
newStringType(false)),
+                            new DataField(1, "_FILE_NAME", 
newStringType(false)),
+                            new DataField(2, "_FILE_SIZE", new 
BigIntType(false)),
+                            new DataField(3, "_ROW_COUNT", new 
BigIntType(false)),
+                            new DataField(
+                                    4,
+                                    "_DELETIONS_VECTORS_RANGES",
+                                    new ArrayType(
+                                            true,
+                                            RowType.of(
+                                                    newStringType(false),
+                                                    new IntType(false),
+                                                    new IntType(false))))));
+
+    protected final InternalRowSerializer rowSerializer;
+
+    public IndexFileMeta09Serializer() {
+        this.rowSerializer = InternalSerializers.create(SCHEMA);
+    }
+
+    public IndexFileMeta fromRow(InternalRow row) {
+        return new IndexFileMeta(
+                row.getString(0).toString(),
+                row.getString(1).toString(),
+                row.getLong(2),
+                row.getLong(3),
+                row.isNullAt(4) ? null : 
rowArrayDataToDvMetas(row.getArray(4)));
+    }
+
+    public final List<IndexFileMeta> deserializeList(DataInputView source) 
throws IOException {
+        int size = source.readInt();
+        List<IndexFileMeta> records = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            records.add(deserialize(source));
+        }
+        return records;
+    }
+
+    public IndexFileMeta deserialize(DataInputView in) throws IOException {
+        return fromRow(rowSerializer.deserialize(in));
+    }
+
+    public static LinkedHashMap<String, DeletionVectorMeta> 
rowArrayDataToDvMetas(
+            InternalArray arrayData) {
+        LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>(arrayData.size());
+        for (int i = 0; i < arrayData.size(); i++) {
+            InternalRow row = arrayData.getRow(i, 3);
+            dvMetas.put(
+                    row.getString(0).toString(),
+                    new DeletionVectorMeta(
+                            row.getString(0).toString(), row.getInt(1), 
row.getInt(2), null));
+        }
+        return dvMetas;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
index 4b52932623..db4a44838f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileMetaSerializer.java
@@ -24,9 +24,9 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.utils.ObjectSerializer;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
+import java.util.Collection;
 import java.util.LinkedHashMap;
 
 /** A {@link VersionedObjectSerializer} for {@link IndexFileMeta}. */
@@ -43,9 +43,9 @@ public class IndexFileMetaSerializer extends 
ObjectSerializer<IndexFileMeta> {
                 BinaryString.fromString(record.fileName()),
                 record.fileSize(),
                 record.rowCount(),
-                record.deletionVectorsRanges() == null
+                record.deletionVectorMetas() == null
                         ? null
-                        : 
dvRangesToRowArrayData(record.deletionVectorsRanges()));
+                        : 
dvMetasToRowArrayData(record.deletionVectorMetas().values()));
     }
 
     @Override
@@ -55,30 +55,35 @@ public class IndexFileMetaSerializer extends 
ObjectSerializer<IndexFileMeta> {
                 row.getString(1).toString(),
                 row.getLong(2),
                 row.getLong(3),
-                row.isNullAt(4) ? null : 
rowArrayDataToDvRanges(row.getArray(4)));
+                row.isNullAt(4) ? null : 
rowArrayDataToDvMetas(row.getArray(4)));
     }
 
-    public static InternalArray dvRangesToRowArrayData(
-            LinkedHashMap<String, Pair<Integer, Integer>> dvRanges) {
+    public static InternalArray 
dvMetasToRowArrayData(Collection<DeletionVectorMeta> dvMetas) {
         return new GenericArray(
-                dvRanges.entrySet().stream()
+                dvMetas.stream()
                         .map(
-                                entry ->
+                                dvMeta ->
                                         GenericRow.of(
-                                                
BinaryString.fromString(entry.getKey()),
-                                                entry.getValue().getLeft(),
-                                                entry.getValue().getRight()))
+                                                
BinaryString.fromString(dvMeta.dataFileName()),
+                                                dvMeta.offset(),
+                                                dvMeta.length(),
+                                                dvMeta.cardinality()))
                         .toArray(GenericRow[]::new));
     }
 
-    public static LinkedHashMap<String, Pair<Integer, Integer>> 
rowArrayDataToDvRanges(
+    public static LinkedHashMap<String, DeletionVectorMeta> 
rowArrayDataToDvMetas(
             InternalArray arrayData) {
-        LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
-                new LinkedHashMap<>(arrayData.size());
+        LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>(arrayData.size());
         for (int i = 0; i < arrayData.size(); i++) {
-            InternalRow row = arrayData.getRow(i, 3);
-            dvRanges.put(row.getString(0).toString(), Pair.of(row.getInt(1), 
row.getInt(2)));
+            InternalRow row = arrayData.getRow(i, 
DeletionVectorMeta.SCHEMA.getFieldCount());
+            dvMetas.put(
+                    row.getString(0).toString(),
+                    new DeletionVectorMeta(
+                            row.getString(0).toString(),
+                            row.getInt(1),
+                            row.getInt(2),
+                            row.isNullAt(3) ? null : row.getLong(3)));
         }
-        return dvRanges;
+        return dvMetas;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
index a52d9e8af4..2431a1c264 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntry.java
@@ -20,6 +20,7 @@ package org.apache.paimon.manifest;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
@@ -57,12 +58,7 @@ public class IndexManifestEntry {
                             new DataField(
                                     7,
                                     "_DELETIONS_VECTORS_RANGES",
-                                    new ArrayType(
-                                            true,
-                                            RowType.of(
-                                                    newStringType(false),
-                                                    new IntType(false),
-                                                    new IntType(false))))));
+                                    new ArrayType(true, 
DeletionVectorMeta.SCHEMA))));
 
     private final FileKind kind;
     private final BinaryRow partition;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
index 574e935550..6f2ec17dda 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestEntrySerializer.java
@@ -22,10 +22,9 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexFileMetaSerializer;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
-import static 
org.apache.paimon.index.IndexFileMetaSerializer.dvRangesToRowArrayData;
-import static 
org.apache.paimon.index.IndexFileMetaSerializer.rowArrayDataToDvRanges;
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 
@@ -52,9 +51,10 @@ public class IndexManifestEntrySerializer extends 
VersionedObjectSerializer<Inde
                 BinaryString.fromString(indexFile.fileName()),
                 indexFile.fileSize(),
                 indexFile.rowCount(),
-                record.indexFile().deletionVectorsRanges() == null
+                record.indexFile().deletionVectorMetas() == null
                         ? null
-                        : 
dvRangesToRowArrayData(record.indexFile().deletionVectorsRanges()));
+                        : IndexFileMetaSerializer.dvMetasToRowArrayData(
+                                
record.indexFile().deletionVectorMetas().values()));
     }
 
     @Override
@@ -72,6 +72,8 @@ public class IndexManifestEntrySerializer extends 
VersionedObjectSerializer<Inde
                         row.getString(4).toString(),
                         row.getLong(5),
                         row.getLong(6),
-                        row.isNullAt(7) ? null : 
rowArrayDataToDvRanges(row.getArray(7))));
+                        row.isNullAt(7)
+                                ? null
+                                : 
IndexFileMetaSerializer.rowArrayDataToDvMetas(row.getArray(7))));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
index 7918914b2c..9fc251c366 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.table.sink;
 
 import org.apache.paimon.data.serializer.VersionedSerializer;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.index.IndexFileMeta09Serializer;
 import org.apache.paimon.index.IndexFileMetaSerializer;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
@@ -45,12 +47,14 @@ import static 
org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 /** {@link VersionedSerializer} for {@link CommitMessage}. */
 public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessage> {
 
-    private static final int CURRENT_VERSION = 4;
+    private static final int CURRENT_VERSION = 5;
 
     private final DataFileMetaSerializer dataFileSerializer;
     private final IndexFileMetaSerializer indexEntrySerializer;
 
+    private DataFileMeta09Serializer dataFile09Serializer;
     private DataFileMeta08Serializer dataFile08Serializer;
+    private IndexFileMeta09Serializer indexEntry09Serializer;
 
     public CommitMessageSerializer() {
         this.dataFileSerializer = new DataFileMetaSerializer();
@@ -107,48 +111,48 @@ public class CommitMessageSerializer implements 
VersionedSerializer<CommitMessag
     }
 
     private CommitMessage deserialize(int version, DataInputView view) throws 
IOException {
-        if (version >= 3) {
-            IOExceptionSupplier<List<DataFileMeta>> fileDeserializer =
-                    () -> dataFileSerializer.deserializeList(view);
-            if (version == 3) {
-                DataFileMeta09Serializer serializer = new 
DataFileMeta09Serializer();
-                fileDeserializer = () -> serializer.deserializeList(view);
-            }
-            return new CommitMessageImpl(
-                    deserializeBinaryRow(view),
-                    view.readInt(),
-                    new DataIncrement(
-                            fileDeserializer.get(), fileDeserializer.get(), 
fileDeserializer.get()),
-                    new CompactIncrement(
-                            fileDeserializer.get(), fileDeserializer.get(), 
fileDeserializer.get()),
-                    new IndexIncrement(
-                            indexEntrySerializer.deserializeList(view),
-                            indexEntrySerializer.deserializeList(view)));
-        } else {
-            return deserialize08(version, view);
-        }
-    }
-
-    private CommitMessage deserialize08(int version, DataInputView view) 
throws IOException {
-        if (dataFile08Serializer == null) {
-            dataFile08Serializer = new DataFileMeta08Serializer();
-        }
+        IOExceptionSupplier<List<DataFileMeta>> fileDeserializer = 
fileDeserializer(version, view);
+        IOExceptionSupplier<List<IndexFileMeta>> indexEntryDeserializer =
+                indexEntryDeserializer(version, view);
 
         return new CommitMessageImpl(
                 deserializeBinaryRow(view),
                 view.readInt(),
                 new DataIncrement(
-                        dataFile08Serializer.deserializeList(view),
-                        dataFile08Serializer.deserializeList(view),
-                        dataFile08Serializer.deserializeList(view)),
+                        fileDeserializer.get(), fileDeserializer.get(), 
fileDeserializer.get()),
                 new CompactIncrement(
-                        dataFile08Serializer.deserializeList(view),
-                        dataFile08Serializer.deserializeList(view),
-                        dataFile08Serializer.deserializeList(view)),
+                        fileDeserializer.get(), fileDeserializer.get(), 
fileDeserializer.get()),
                 new IndexIncrement(
-                        indexEntrySerializer.deserializeList(view),
-                        version <= 2
-                                ? Collections.emptyList()
-                                : indexEntrySerializer.deserializeList(view)));
+                        indexEntryDeserializer.get(),
+                        version <= 2 ? Collections.emptyList() : 
indexEntryDeserializer.get()));
+    }
+
+    private IOExceptionSupplier<List<DataFileMeta>> fileDeserializer(
+            int version, DataInputView view) {
+        if (version >= 4) {
+            return () -> dataFileSerializer.deserializeList(view);
+        } else if (version == 3) {
+            if (dataFile09Serializer == null) {
+                dataFile09Serializer = new DataFileMeta09Serializer();
+            }
+            return () -> dataFile09Serializer.deserializeList(view);
+        } else {
+            if (dataFile08Serializer == null) {
+                dataFile08Serializer = new DataFileMeta08Serializer();
+            }
+            return () -> dataFile08Serializer.deserializeList(view);
+        }
+    }
+
+    private IOExceptionSupplier<List<IndexFileMeta>> indexEntryDeserializer(
+            int version, DataInputView view) {
+        if (version >= 5) {
+            return () -> indexEntrySerializer.deserializeList(view);
+        } else {
+            if (indexEntry09Serializer == null) {
+                indexEntry09Serializer = new IndexFileMeta09Serializer();
+            }
+            return () -> indexEntry09Serializer.deserializeList(view);
+        }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 1dac6584d6..29405466b9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -50,7 +50,7 @@ public class DataSplit implements Split {
 
     private static final long serialVersionUID = 7L;
     private static final long MAGIC = -2394839472490812314L;
-    private static final int VERSION = 3;
+    private static final int VERSION = 4;
 
     private long snapshotId = 0;
     private BinaryRow partition;
@@ -272,13 +272,16 @@ public class DataSplit implements Split {
 
         FunctionWithIOException<DataInputView, DataFileMeta> dataFileSer =
                 getFileMetaSerde(version);
+        FunctionWithIOException<DataInputView, DeletionFile> deletionFileSerde 
=
+                getDeletionFileSerde(version);
         int beforeNumber = in.readInt();
         List<DataFileMeta> beforeFiles = new ArrayList<>(beforeNumber);
         for (int i = 0; i < beforeNumber; i++) {
             beforeFiles.add(dataFileSer.apply(in));
         }
 
-        List<DeletionFile> beforeDeletionFiles = 
DeletionFile.deserializeList(in);
+        List<DeletionFile> beforeDeletionFiles =
+                DeletionFile.deserializeList(in, deletionFileSerde);
 
         int fileNumber = in.readInt();
         List<DataFileMeta> dataFiles = new ArrayList<>(fileNumber);
@@ -286,7 +289,7 @@ public class DataSplit implements Split {
             dataFiles.add(dataFileSer.apply(in));
         }
 
-        List<DeletionFile> dataDeletionFiles = 
DeletionFile.deserializeList(in);
+        List<DeletionFile> dataDeletionFiles = 
DeletionFile.deserializeList(in, deletionFileSerde);
 
         boolean isStreaming = in.readBoolean();
         boolean rawConvertible = in.readBoolean();
@@ -319,16 +322,22 @@ public class DataSplit implements Split {
         } else if (version == 2) {
             DataFileMeta09Serializer serializer = new 
DataFileMeta09Serializer();
             return serializer::deserialize;
-        } else if (version == 3) {
+        } else if (version >= 3) {
             DataFileMetaSerializer serializer = new DataFileMetaSerializer();
             return serializer::deserialize;
         } else {
-            throw new UnsupportedOperationException(
-                    "Expecting DataSplit version to be smaller or equal than "
-                            + VERSION
-                            + ", but found "
-                            + version
-                            + ".");
+            throw new UnsupportedOperationException("Unsupported version: " + 
version);
+        }
+    }
+
+    private static FunctionWithIOException<DataInputView, DeletionFile> 
getDeletionFileSerde(
+            int version) {
+        if (version >= 1 && version <= 3) {
+            return DeletionFile::deserializeV3;
+        } else if (version >= 4) {
+            return DeletionFile::deserialize;
+        } else {
+            throw new UnsupportedOperationException("Unsupported version: " + 
version);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
index 94dfc61572..5bcf6898ed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.Public;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataOutputView;
+import org.apache.paimon.utils.FunctionWithIOException;
 
 import javax.annotation.Nullable;
 
@@ -52,11 +53,13 @@ public class DeletionFile implements Serializable {
     private final String path;
     private final long offset;
     private final long length;
+    @Nullable private final Long cardinality;
 
-    public DeletionFile(String path, long offset, long length) {
+    public DeletionFile(String path, long offset, long length, @Nullable Long 
cardinality) {
         this.path = path;
         this.offset = offset;
         this.length = length;
+        this.cardinality = cardinality;
     }
 
     /** Path of the file. */
@@ -74,6 +77,12 @@ public class DeletionFile implements Serializable {
         return length;
     }
 
+    /** the number of deleted rows. */
+    @Nullable
+    public Long cardinality() {
+        return cardinality;
+    }
+
     public static void serialize(DataOutputView out, @Nullable DeletionFile 
file)
             throws IOException {
         if (file == null) {
@@ -83,6 +92,7 @@ public class DeletionFile implements Serializable {
             out.writeUTF(file.path);
             out.writeLong(file.offset);
             out.writeLong(file.length);
+            out.writeLong(file.cardinality == null ? -1 : file.cardinality);
         }
     }
 
@@ -108,17 +118,32 @@ public class DeletionFile implements Serializable {
         String path = in.readUTF();
         long offset = in.readLong();
         long length = in.readLong();
-        return new DeletionFile(path, offset, length);
+        long cardinality = in.readLong();
+        return new DeletionFile(path, offset, length, cardinality == -1 ? null 
: cardinality);
     }
 
     @Nullable
-    public static List<DeletionFile> deserializeList(DataInputView in) throws 
IOException {
+    public static DeletionFile deserializeV3(DataInputView in) throws 
IOException {
+        if (in.readByte() == 0) {
+            return null;
+        }
+
+        String path = in.readUTF();
+        long offset = in.readLong();
+        long length = in.readLong();
+        return new DeletionFile(path, offset, length, null);
+    }
+
+    @Nullable
+    public static List<DeletionFile> deserializeList(
+            DataInputView in, FunctionWithIOException<DataInputView, 
DeletionFile> deserialize)
+            throws IOException {
         List<DeletionFile> files = null;
         if (in.readByte() == 1) {
             int size = in.readInt();
             files = new ArrayList<>(size);
             for (int i = 0; i < size; i++) {
-                files.add(DeletionFile.deserialize(in));
+                files.add(deserialize.apply(in));
             }
         }
         return files;
@@ -126,22 +151,34 @@ public class DeletionFile implements Serializable {
 
     @Override
     public boolean equals(Object o) {
-        if (!(o instanceof DeletionFile)) {
+        if (o == null || getClass() != o.getClass()) {
             return false;
         }
-
-        DeletionFile other = (DeletionFile) o;
-        return Objects.equals(path, other.path) && offset == other.offset && 
length == other.length;
+        DeletionFile that = (DeletionFile) o;
+        return offset == that.offset
+                && length == that.length
+                && Objects.equals(path, that.path)
+                && Objects.equals(cardinality, that.cardinality);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(path, offset, length);
+        return Objects.hash(path, offset, length, cardinality);
     }
 
     @Override
     public String toString() {
-        return String.format("{path = %s, offset = %d, length = %d}", path, 
offset, length);
+        return "DeletionFile{"
+                + "path='"
+                + path
+                + '\''
+                + ", offset="
+                + offset
+                + ", length="
+                + length
+                + ", cardinality="
+                + cardinality
+                + '}';
     }
 
     static Factory emptyFactory() {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index ce01bdba94..bf19ba10c6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -24,6 +24,7 @@ import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.RecordComparator;
 import org.apache.paimon.consumer.ConsumerManager;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
@@ -492,23 +493,24 @@ public class SnapshotReaderImpl implements SnapshotReader 
{
         List<DeletionFile> deletionFiles = new ArrayList<>(dataFiles.size());
         Map<String, IndexFileMeta> dataFileToIndexFileMeta = new HashMap<>();
         for (IndexFileMeta indexFileMeta : indexFileMetas) {
-            if (indexFileMeta.deletionVectorsRanges() != null) {
-                for (String dataFileName : 
indexFileMeta.deletionVectorsRanges().keySet()) {
-                    dataFileToIndexFileMeta.put(dataFileName, indexFileMeta);
+            if (indexFileMeta.deletionVectorMetas() != null) {
+                for (DeletionVectorMeta dvMeta : 
indexFileMeta.deletionVectorMetas().values()) {
+                    dataFileToIndexFileMeta.put(dvMeta.dataFileName(), 
indexFileMeta);
                 }
             }
         }
         for (DataFileMeta file : dataFiles) {
             IndexFileMeta indexFileMeta = 
dataFileToIndexFileMeta.get(file.fileName());
             if (indexFileMeta != null) {
-                Map<String, Pair<Integer, Integer>> ranges = 
indexFileMeta.deletionVectorsRanges();
-                if (ranges != null && ranges.containsKey(file.fileName())) {
-                    Pair<Integer, Integer> range = ranges.get(file.fileName());
+                LinkedHashMap<String, DeletionVectorMeta> dvMetas =
+                        indexFileMeta.deletionVectorMetas();
+                if (dvMetas != null && dvMetas.containsKey(file.fileName())) {
                     deletionFiles.add(
                             new DeletionFile(
                                     
indexFileHandler.filePath(indexFileMeta).toString(),
-                                    range.getKey(),
-                                    range.getValue()));
+                                    dvMetas.get(file.fileName()).offset(),
+                                    dvMetas.get(file.fileName()).length(),
+                                    
dvMetas.get(file.fileName()).cardinality()));
                     continue;
                 }
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index 6c674352b8..a52819c805 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -23,12 +23,12 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DeletionFile;
-import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.PathFactory;
 
 import org.junit.jupiter.api.Test;
@@ -94,7 +94,7 @@ class AppendDeletionFileMaintainerTest {
         assertThat(res.size()).isEqualTo(3);
         IndexManifestEntry entry =
                 res.stream().filter(file -> file.kind() == 
FileKind.ADD).findAny().get();
-        
assertThat(entry.indexFile().deletionVectorsRanges().containsKey("f2")).isTrue();
+        
assertThat(entry.indexFile().deletionVectorMetas().containsKey("f2")).isTrue();
         entry =
                 res.stream()
                         .filter(file -> file.kind() == FileKind.DELETE)
@@ -117,14 +117,15 @@ class AppendDeletionFileMaintainerTest {
             PathFactory indexPathFactory, List<IndexFileMeta> fileMetas) {
         Map<String, DeletionFile> dataFileToDeletionFiles = new HashMap<>();
         for (IndexFileMeta indexFileMeta : fileMetas) {
-            for (Map.Entry<String, Pair<Integer, Integer>> range :
-                    indexFileMeta.deletionVectorsRanges().entrySet()) {
+            for (Map.Entry<String, DeletionVectorMeta> dvMeta :
+                    indexFileMeta.deletionVectorMetas().entrySet()) {
                 dataFileToDeletionFiles.put(
-                        range.getKey(),
+                        dvMeta.getKey(),
                         new DeletionFile(
                                 
indexPathFactory.toPath(indexFileMeta.fileName()).toString(),
-                                range.getValue().getLeft(),
-                                range.getValue().getRight()));
+                                dvMeta.getValue().offset(),
+                                dvMeta.getValue().length(),
+                                dvMeta.getValue().cardinality()));
             }
         }
         return dataFileToDeletionFiles;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
index 724d5b4163..a7e692d2e5 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/index/IndexFileMetaSerializerTest.java
@@ -21,7 +21,6 @@ package org.apache.paimon.index;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.utils.ObjectSerializer;
 import org.apache.paimon.utils.ObjectSerializerTestBase;
-import org.apache.paimon.utils.Pair;
 
 import java.util.LinkedHashMap;
 import java.util.Random;
@@ -59,14 +58,20 @@ public class IndexFileMetaSerializerTest extends 
ObjectSerializerTestBase<IndexF
 
     public static IndexFileMeta randomDeletionVectorIndexFile() {
         Random rnd = new Random();
-        LinkedHashMap<String, Pair<Integer, Integer>> deletionVectorsRanges = 
new LinkedHashMap<>();
-        deletionVectorsRanges.put("my_file_name1", Pair.of(rnd.nextInt(), 
rnd.nextInt()));
-        deletionVectorsRanges.put("my_file_name2", Pair.of(rnd.nextInt(), 
rnd.nextInt()));
+        LinkedHashMap<String, DeletionVectorMeta> deletionVectorMetas = new 
LinkedHashMap<>();
+        deletionVectorMetas.put(
+                "my_file_name1",
+                new DeletionVectorMeta(
+                        "my_file_name1", rnd.nextInt(), rnd.nextInt(), 
rnd.nextLong()));
+        deletionVectorMetas.put(
+                "my_file_name2",
+                new DeletionVectorMeta(
+                        "my_file_name2", rnd.nextInt(), rnd.nextInt(), 
rnd.nextLong()));
         return new IndexFileMeta(
                 DeletionVectorsIndexFile.DELETION_VECTORS_INDEX,
                 "deletion_vectors_index_file_name" + rnd.nextLong(),
                 rnd.nextInt(),
                 rnd.nextInt(),
-                deletionVectorsRanges);
+                deletionVectorMetas);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
index bd272b745d..fbc02b2d73 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.manifest;
 
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
@@ -27,7 +28,6 @@ import org.apache.paimon.io.IndexIncrement;
 import org.apache.paimon.stats.SimpleStats;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.IOUtils;
-import org.apache.paimon.utils.Pair;
 
 import org.junit.jupiter.api.Test;
 
@@ -78,11 +78,11 @@ public class ManifestCommittableSerializerCompatibilityTest 
{
                         Arrays.asList("field1", "field2", "field3"));
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
-        LinkedHashMap<String, Pair<Integer, Integer>> dvRanges = new 
LinkedHashMap<>();
-        dvRanges.put("dv_key1", Pair.of(1, 2));
-        dvRanges.put("dv_key2", Pair.of(3, 4));
+        LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
+        dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, 3L));
+        dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, 5L));
         IndexFileMeta indexFile =
-                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvRanges);
+                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvMetas);
         List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
 
         CommitMessageImpl commitMessage =
@@ -106,6 +106,76 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
         assertThat(deserialized).isEqualTo(manifestCommittable);
     }
 
+    @Test
+    public void testCompatibilityToVersion4() throws IOException {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT,
+                        Arrays.asList("field1", "field2", "field3"));
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
+        dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null));
+        dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null));
+        IndexFileMeta indexFile =
+                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvMetas);
+        List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
+
+        CommitMessageImpl commitMessage =
+                new CommitMessageImpl(
+                        singleColumn("my_partition"),
+                        11,
+                        new DataIncrement(dataFiles, dataFiles, dataFiles),
+                        new CompactIncrement(dataFiles, dataFiles, dataFiles),
+                        new IndexIncrement(indexFiles));
+
+        ManifestCommittable manifestCommittable =
+                new ManifestCommittable(
+                        5,
+                        202020L,
+                        Collections.singletonMap(5, 555L),
+                        Collections.singletonList(commitMessage));
+
+        ManifestCommittableSerializer serializer = new 
ManifestCommittableSerializer();
+        byte[] bytes = serializer.serialize(manifestCommittable);
+        ManifestCommittable deserialized = serializer.deserialize(3, bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+
+        byte[] v2Bytes =
+                IOUtils.readFully(
+                        ManifestCommittableSerializerCompatibilityTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/manifest-committable-v4"),
+                        true);
+        deserialized = serializer.deserialize(2, v2Bytes);
+        assertThat(deserialized).isEqualTo(manifestCommittable);
+    }
+
     @Test
     public void testCompatibilityToVersion3() throws IOException {
         SimpleStats keyStats =
@@ -139,11 +209,11 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
-        LinkedHashMap<String, Pair<Integer, Integer>> dvRanges = new 
LinkedHashMap<>();
-        dvRanges.put("dv_key1", Pair.of(1, 2));
-        dvRanges.put("dv_key2", Pair.of(3, 4));
+        LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
+        dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null));
+        dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null));
         IndexFileMeta indexFile =
-                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvRanges);
+                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvMetas);
         List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
 
         CommitMessageImpl commitMessage =
@@ -209,11 +279,11 @@ public class 
ManifestCommittableSerializerCompatibilityTest {
                         null);
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
-        LinkedHashMap<String, Pair<Integer, Integer>> dvRanges = new 
LinkedHashMap<>();
-        dvRanges.put("dv_key1", Pair.of(1, 2));
-        dvRanges.put("dv_key2", Pair.of(3, 4));
+        LinkedHashMap<String, DeletionVectorMeta> dvMetas = new 
LinkedHashMap<>();
+        dvMetas.put("dv_key1", new DeletionVectorMeta("dv_key1", 1, 2, null));
+        dvMetas.put("dv_key2", new DeletionVectorMeta("dv_key2", 3, 4, null));
         IndexFileMeta indexFile =
-                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvRanges);
+                new IndexFileMeta("my_index_type", "my_index_file", 1024 * 
100, 1002, dvMetas);
         List<IndexFileMeta> indexFiles = Collections.singletonList(indexFile);
 
         CommitMessageImpl commitMessage =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
index eb9105189b..1f87838aea 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java
@@ -48,7 +48,7 @@ public class CommitMessageSerializerTest {
         CommitMessageImpl committable =
                 new CommitMessageImpl(row(0), 1, dataIncrement, 
compactIncrement, indexIncrement);
         CommitMessageImpl newCommittable =
-                (CommitMessageImpl) serializer.deserialize(3, 
serializer.serialize(committable));
+                (CommitMessageImpl) serializer.deserialize(5, 
serializer.serialize(committable));
         
assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement());
         
assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement());
         
assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
index c64a12ffae..359d38c973 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java
@@ -107,6 +107,9 @@ public class SplitTest {
                         Arrays.asList("field1", "field2", "field3"));
         List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
 
+        DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 
33L);
+        List<DeletionFile> deletionFiles = 
Collections.singletonList(deletionFile);
+
         BinaryRow partition = new BinaryRow(1);
         BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
         binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
@@ -118,6 +121,7 @@ public class SplitTest {
                         .withPartition(partition)
                         .withBucket(20)
                         .withDataFiles(dataFiles)
+                        .withDataDeletionFiles(deletionFiles)
                         .withBucketPath("my path")
                         .build();
 
@@ -243,4 +247,68 @@ public class SplitTest {
                 InstantiationUtil.deserializeObject(v2Bytes, 
DataSplit.class.getClassLoader());
         assertThat(actual).isEqualTo(split);
     }
+
+    @Test
+    public void testSerializerCompatibleV3() throws Exception {
+        SimpleStats keyStats =
+                new SimpleStats(
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        fromLongArray(new Long[] {0L}));
+        SimpleStats valueStats =
+                new SimpleStats(
+                        singleColumn("min_value"),
+                        singleColumn("max_value"),
+                        fromLongArray(new Long[] {0L}));
+
+        DataFileMeta dataFile =
+                new DataFileMeta(
+                        "my_file",
+                        1024 * 1024,
+                        1024,
+                        singleColumn("min_key"),
+                        singleColumn("max_key"),
+                        keyStats,
+                        valueStats,
+                        15,
+                        200,
+                        5,
+                        3,
+                        Arrays.asList("extra1", "extra2"),
+                        
Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")),
+                        11L,
+                        new byte[] {1, 2, 4},
+                        FileSource.COMPACT,
+                        Arrays.asList("field1", "field2", "field3"));
+        List<DataFileMeta> dataFiles = Collections.singletonList(dataFile);
+
+        DeletionFile deletionFile = new DeletionFile("deletion_file", 100, 22, 
null);
+        List<DeletionFile> deletionFiles = 
Collections.singletonList(deletionFile);
+
+        BinaryRow partition = new BinaryRow(1);
+        BinaryRowWriter binaryRowWriter = new BinaryRowWriter(partition);
+        binaryRowWriter.writeString(0, BinaryString.fromString("aaaaa"));
+        binaryRowWriter.complete();
+
+        DataSplit split =
+                DataSplit.builder()
+                        .withSnapshot(18)
+                        .withPartition(partition)
+                        .withBucket(20)
+                        .withDataFiles(dataFiles)
+                        .withDataDeletionFiles(deletionFiles)
+                        .withBucketPath("my path")
+                        .build();
+
+        byte[] v2Bytes =
+                IOUtils.readFully(
+                        SplitTest.class
+                                .getClassLoader()
+                                
.getResourceAsStream("compatibility/datasplit-v3"),
+                        true);
+
+        DataSplit actual =
+                InstantiationUtil.deserializeObject(v2Bytes, 
DataSplit.class.getClassLoader());
+        assertThat(actual).isEqualTo(split);
+    }
 }
diff --git a/paimon-core/src/test/resources/compatibility/datasplit-v3 
b/paimon-core/src/test/resources/compatibility/datasplit-v3
new file mode 100644
index 0000000000..6b19fe2d95
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/datasplit-v3 differ
diff --git 
a/paimon-core/src/test/resources/compatibility/manifest-committable-v4 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v4
new file mode 100644
index 0000000000..9c095669a3
Binary files /dev/null and 
b/paimon-core/src/test/resources/compatibility/manifest-committable-v4 differ
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index ea8309e14f..46a423b9d6 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -631,6 +631,27 @@ class DeletionVectorTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelpe
     )
   }
 
+  test("Paimon deletionVector: get cardinality") {
+    sql(s"""
+           |CREATE TABLE T (id INT)
+           |TBLPROPERTIES (
+           | 'deletion-vectors.enabled' = 'true',
+           | 'bucket-key' = 'id',
+           | 'bucket' = '1'
+           |)
+           |""".stripMargin)
+
+    sql("INSERT INTO T SELECT /*+ REPARTITION(1) */ id FROM range (1, 50000)")
+    sql("DELETE FROM T WHERE id >= 111 and id <= 444")
+
+    val fileStore = loadTable("T").store()
+    val indexManifest = 
fileStore.snapshotManager().latestSnapshot().indexManifest()
+    val entry = 
fileStore.newIndexFileHandler().readManifest(indexManifest).get(0)
+    val dvMeta = 
entry.indexFile().deletionVectorMetas().values().iterator().next()
+
+    assert(dvMeta.cardinality() == 334)
+  }
+
   private def getPathName(path: String): String = {
     new Path(path).getName
   }

Reply via email to