This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 958c1ecbb8 [common] Introduce BitmapFileIndexMetaV2. (#5028)
958c1ecbb8 is described below

commit 958c1ecbb8ffb0f1e8351faa5781c73797efea5e
Author: Zhonghang Liu <[email protected]>
AuthorDate: Mon Feb 24 20:30:27 2025 +0800

    [common] Introduce BitmapFileIndexMetaV2. (#5028)
---
 docs/content/append-table/query-performance.md     |   3 +-
 docs/content/concepts/spec/fileindex.md            | 159 +++++++-
 .../content/primary-key-table/query-performance.md |   3 +-
 .../benchmark/bitmap/BitmapIndexBenchmark.java     | 188 ++++++++++
 .../benchmark/bitmap/RoaringBitmapBenchmark.java   |  16 +
 .../paimon/fileindex/bitmap/BitmapFileIndex.java   | 109 ++++--
 .../fileindex/bitmap/BitmapFileIndexMeta.java      | 210 ++++++++---
 .../fileindex/bitmap/BitmapFileIndexMetaV2.java    | 415 +++++++++++++++++++++
 .../fileindex/bitmapindex/TestBitmapFileIndex.java | 289 ++++++++------
 .../apache/paimon/spark/SparkFileIndexITCase.java  |  27 +-
 10 files changed, 1231 insertions(+), 188 deletions(-)

diff --git a/docs/content/append-table/query-performance.md 
b/docs/content/append-table/query-performance.md
index d150c95bbc..e8911b5e58 100644
--- a/docs/content/append-table/query-performance.md
+++ b/docs/content/append-table/query-performance.md
@@ -62,8 +62,7 @@ scenario. Using a bitmap may consume more space but can 
result in greater accura
 * `file-index.bloom-filter.<column_name>.fpp` to config false positive 
probability.
 * `file-index.bloom-filter.<column_name>.items` to config the expected 
distinct items in one data file.
 
-`Bitmap`:
-* `file-index.bitmap.columns`: specify the columns that need bitmap index.
+We have the bitmap index, see more information in [bitmap]({{< ref 
"concepts/spec/fileindex#index-bitmap" >}})
 
 `Bit-Slice Index Bitmap`
 * `file-index.bsi.columns`: specify the columns that need bsi index.
diff --git a/docs/content/concepts/spec/fileindex.md 
b/docs/content/concepts/spec/fileindex.md
index f8cdd97505..f8bb4a4f09 100644
--- a/docs/content/concepts/spec/fileindex.md
+++ b/docs/content/concepts/spec/fileindex.md
@@ -96,11 +96,76 @@ This class use (64-bits) long hash. Store the num hash 
function (one integer) an
 
 ## Index: Bitmap
 
-Define `'file-index.bitmap.columns'`.
+* `file-index.bitmap.columns`: specify the columns that need bitmap index.
+* `file-index.bitmap.version`: specify the bitmap index format version, 
default version is 1, latest version is 2.
+* `file-index.bitmap.<column_name>.index-block-size`: to config secondary 
index block size, default value is 16kb.
+
+
+Bitmap file index format (V2):
+
+<pre>
+
+Bitmap file index format (V2)
++-------------------------------------------------+-----------------
+| version (1 byte) = 2                           |
++-------------------------------------------------+
+| row count (4 bytes int)                        |
++-------------------------------------------------+
+| non-null value bitmap number (4 bytes int)     |
++-------------------------------------------------+
+| has null value (1 byte)                        |
++-------------------------------------------------+
+| null value offset (4 bytes if has null value)  |       HEAD
++-------------------------------------------------+
+| null bitmap length (4 bytes if has null value) |
++-------------------------------------------------+
+| bitmap index block number (4 bytes int)        |
++-------------------------------------------------+
+| value 1 | offset 1                             |
++-------------------------------------------------+
+| value 2 | offset 2                             |
++-------------------------------------------------+
+| ...                                            |
++-------------------------------------------------+
+| bitmap body offset (4 bytes int)               |
++-------------------------------------------------+-----------------
+| bitmap index block 1                           |
++-------------------------------------------------+
+| bitmap index block 2                           |  INDEX BLOCKS
++-------------------------------------------------+
+| ...                                            |
++-------------------------------------------------+-----------------
+| serialized bitmap 1                            |
++-------------------------------------------------+
+| serialized bitmap 2                            |
++-------------------------------------------------+  BITMAP BLOCKS
+| serialized bitmap 3                            |
++-------------------------------------------------+
+| ...                                            |
++-------------------------------------------------+-----------------
+
+index block format:
++-------------------------------------------------+
+| entry number (4 bytes int)                     |
++-------------------------------------------------+
+| value 1 | offset 1 | length 1                  |
++-------------------------------------------------+
+| value 2 | offset 2 | length 2                  |
++-------------------------------------------------+
+| ...                                            |
++-------------------------------------------------+
+
+value x:                       var bytes for any data type (as bitmap 
identifier)
+offset:                        4 bytes int (when it is negative, it represents 
that there is only one value
+                                 and its position is the inverse of the 
negative value)
+length:                        4 bytes int
+  
+</pre>
 
 Bitmap file index format (V1):
 
 <pre>
+
 Bitmap file index format (V1)
 +-------------------------------------------------+-----------------
 | version (1 byte)                               |
@@ -135,7 +200,97 @@ offset:                        4 bytes int (when it is 
negative, it represents t
                                  and its position is the inverse of the 
negative value)
 </pre>
 
-Integer are all BIG_ENDIAN.
+Integer are all BIG_ENDIAN. In the paimon version that supports v2, the bitmap 
index version defaults to v2.
+
+Bitmap only support the following data type:
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 10%">Paimon Data Type</th>
+      <th class="text-left" style="width: 5%">Supported</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code>TinyIntType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>SmallIntType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>IntType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>BigIntType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>DateType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>TimeType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>LocalZonedTimestampType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>TimestampType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>CharType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>VarCharType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>StringType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>BooleanType</code></td>
+      <td>true</td>
+    </tr>
+    <tr>
+      <td><code>DecimalType(precision, scale)</code></td>
+      <td>false</td>
+    </tr>
+    <tr>
+      <td><code>FloatType</code></td>
+      <td>Not recommended</td>
+    </tr>
+    <tr>
+      <td><code>DoubleType</code></td>
+      <td>Not recommended</td>
+    </tr>
+    <tr>
+      <td><code>VarBinaryType</code>, <code>BinaryType</code></td>
+      <td>false</td>
+    </tr>
+    <tr>
+      <td><code>RowType</code></td>
+      <td>false</td>
+    </tr>
+    <tr>
+      <td><code>MapType</code></td>
+      <td>false</td>
+    </tr>
+    <tr>
+      <td><code>ArrayType</code></td>
+      <td>false</td>
+    </tr>
+    </tbody>
+</table>
+
 
 ## Index: Bit-Slice Index Bitmap
 
diff --git a/docs/content/primary-key-table/query-performance.md 
b/docs/content/primary-key-table/query-performance.md
index fe30285299..60e80b70e5 100644
--- a/docs/content/primary-key-table/query-performance.md
+++ b/docs/content/primary-key-table/query-performance.md
@@ -60,8 +60,7 @@ Supported filter types:
 * `file-index.bloom-filter.<column_name>.fpp` to config false positive 
probability.
 * `file-index.bloom-filter.<column_name>.items` to config the expected 
distinct items in one data file.
 
-`Bitmap`:
-* `file-index.bitmap.columns`: specify the columns that need bitmap index.
+We have the bitmap index, see more information in [bitmap]({{< ref 
"concepts/spec/fileindex#index-bitmap" >}})
 
 `Bit-Slice Index Bitmap`
 * `file-index.bsi.columns`: specify the columns that need bsi index.
diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/BitmapIndexBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/BitmapIndexBenchmark.java
new file mode 100644
index 0000000000..34a8d1469f
--- /dev/null
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/BitmapIndexBenchmark.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.benchmark.bitmap;
+
+import org.apache.paimon.benchmark.Benchmark;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexResult;
+import org.apache.paimon.fileindex.FileIndexWriter;
+import org.apache.paimon.fileindex.bitmap.BitmapFileIndex;
+import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
+import org.junit.jupiter.api.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+/** Benchmark for {@link BitmapFileIndex}. */
+public class BitmapIndexBenchmark {
+
+    public static final int ROW_COUNT = 1000000;
+
+    private static final String prefix = "asdfghjkl";
+
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+    @Test
+    public void testQuery10() throws Exception {
+        testQuery(10);
+    }
+
+    @Test
+    public void testQuery100() throws Exception {
+        testQuery(100);
+    }
+
+    @Test
+    public void testQuery1000() throws Exception {
+        testQuery(1000);
+    }
+
+    @Test
+    public void testQuery10000() throws Exception {
+        testQuery(10000);
+    }
+
+    @Test
+    public void testQuery30000() throws Exception {
+        testQuery(30000);
+    }
+
+    @Test
+    public void testQuery50000() throws Exception {
+        testQuery(50000);
+    }
+
+    @Test
+    public void testQuery80000() throws Exception {
+        testQuery(80000);
+    }
+
+    @Test
+    public void testQuery100000() throws Exception {
+        testQuery(100000);
+    }
+
+    private void testQuery(int approxCardinality) throws Exception {
+
+        RoaringBitmap32 middleBm = new RoaringBitmap32();
+
+        Options writeOptions1 = new Options();
+        writeOptions1.setInteger(BitmapFileIndex.VERSION, 
BitmapFileIndex.VERSION_1);
+        FileIndexWriter writer1 =
+                new BitmapFileIndex(DataTypes.STRING(), 
writeOptions1).createWriter();
+
+        Options writeOptions2 = new Options();
+        writeOptions2.setInteger(BitmapFileIndex.VERSION, 
BitmapFileIndex.VERSION_2);
+        FileIndexWriter writer2 =
+                new BitmapFileIndex(DataTypes.STRING(), 
writeOptions2).createWriter();
+
+        for (int i = 0; i < ROW_COUNT; i++) {
+            int sid = (int) (Math.random() * approxCardinality);
+            if (sid == approxCardinality / 2) {
+                middleBm.add(i);
+            }
+            writer1.write(BinaryString.fromString(prefix + sid));
+            writer2.write(BinaryString.fromString(prefix + sid));
+        }
+
+        folder.create();
+
+        File file1 = folder.newFile("bitmap-index-v1");
+        File file2 = folder.newFile("bitmap-index-v2");
+        FileUtils.writeByteArrayToFile(file1, writer1.serializedBytes());
+        FileUtils.writeByteArrayToFile(file2, writer2.serializedBytes());
+
+        Benchmark benchmark =
+                new Benchmark(
+                                
String.format("bitmap-index-query-benchmark-%d", approxCardinality),
+                                100)
+                        .setNumWarmupIters(1)
+                        .setOutputPerIteration(true);
+
+        benchmark.addCase("formatV1", 10, () -> query(approxCardinality, 
file1, "false", "false"));
+
+        benchmark.addCase(
+                "formatV1-bitmapByteBuffer",
+                10,
+                () -> query(approxCardinality, file1, "false", "true"));
+
+        benchmark.addCase(
+                "formatV1-bufferedInput",
+                10,
+                () -> query(approxCardinality, file1, "true", "false"));
+
+        benchmark.addCase(
+                "formatV1-bufferedInput-bitmapByteBuffer",
+                10,
+                () -> query(approxCardinality, file1, "true", "true"));
+
+        benchmark.addCase("format-v2", 10, () -> query(approxCardinality, 
file2, "false", "false"));
+
+        benchmark.addCase(
+                "format-v2-bitmapByteBuffer",
+                10,
+                () -> query(approxCardinality, file2, "false", "true"));
+
+        benchmark.addCase(
+                "format-v2-bufferedInput",
+                10,
+                () -> query(approxCardinality, file2, "true", "false"));
+
+        benchmark.addCase(
+                "format-v2-bufferedInput-bitmapByteBuffer",
+                10,
+                () -> query(approxCardinality, file2, "true", "true"));
+
+        benchmark.run();
+    }
+
+    private static void query(
+            int approxCardinality,
+            File file1,
+            String enableBufferedInput,
+            String enableNextOffsetToSize) {
+        try {
+            FieldRef fieldRef = new FieldRef(0, "", DataTypes.STRING());
+            Options options = new Options();
+            options.set(BitmapFileIndex.ENABLE_BUFFERED_INPUT, 
enableBufferedInput);
+            options.set(BitmapFileIndex.ENABLE_NEXT_OFFSET_TO_SIZE, 
enableNextOffsetToSize);
+            LocalFileIO.LocalSeekableInputStream localSeekableInputStream =
+                    new LocalFileIO.LocalSeekableInputStream(file1);
+            FileIndexReader reader =
+                    new BitmapFileIndex(DataTypes.STRING(), options)
+                            .createReader(localSeekableInputStream, 0, 0);
+            FileIndexResult result =
+                    reader.visitEqual(
+                            fieldRef, BinaryString.fromString(prefix + 
(approxCardinality / 2)));
+            ((BitmapIndexResult) result).get();
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RoaringBitmapBenchmark.java
 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RoaringBitmapBenchmark.java
index 4b989e96e5..a6e4b59e34 100644
--- 
a/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RoaringBitmapBenchmark.java
+++ 
b/paimon-benchmark/paimon-micro-benchmarks/src/test/java/org/apache/paimon/benchmark/bitmap/RoaringBitmapBenchmark.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.roaringbitmap.RoaringBitmap;
 
+import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -78,6 +79,21 @@ public class RoaringBitmapBenchmark {
                     }
                 });
 
+        benchmark.addCase(
+                "deserialize(DataInputStream(BufferedInputStream))",
+                10,
+                () -> {
+                    try {
+                        LocalFileIO.LocalSeekableInputStream seekableStream =
+                                new LocalFileIO.LocalSeekableInputStream(file);
+                        DataInputStream input =
+                                new DataInputStream(new 
BufferedInputStream(seekableStream));
+                        new RoaringBitmap().deserialize(input);
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
         benchmark.addCase(
                 "deserialize(DataInput, byte[])",
                 10,
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java
index 4020302c56..5f0e3adbf0 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndex.java
@@ -33,9 +33,9 @@ import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.utils.RoaringBitmap32;
 
 import java.io.ByteArrayOutputStream;
-import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -49,23 +49,31 @@ import java.util.stream.Collectors;
 public class BitmapFileIndex implements FileIndexer {
 
     public static final int VERSION_1 = 1;
+    public static final int VERSION_2 = 2;
+
+    public static final String VERSION = "version";
+    public static final String INDEX_BLOCK_SIZE = "index-block-size";
+    public static final String ENABLE_BUFFERED_INPUT = "enable-buffered-input";
+    public static final String ENABLE_NEXT_OFFSET_TO_SIZE = 
"enable-next-offset-to-size";
 
     private final DataType dataType;
+    private final Options options;
 
     public BitmapFileIndex(DataType dataType, Options options) {
         this.dataType = dataType;
+        this.options = options;
     }
 
     @Override
     public FileIndexWriter createWriter() {
-        return new Writer(dataType);
+        return new Writer(dataType, options);
     }
 
     @Override
     public FileIndexReader createReader(
             SeekableInputStream seekableInputStream, int start, int length) {
         try {
-            return new Reader(seekableInputStream, start, length);
+            return new Reader(seekableInputStream, start, options);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -73,15 +81,19 @@ public class BitmapFileIndex implements FileIndexer {
 
     private static class Writer extends FileIndexWriter {
 
+        private final int version;
         private final DataType dataType;
         private final Function<Object, Object> valueMapper;
         private final Map<Object, RoaringBitmap32> id2bitmap = new HashMap<>();
         private final RoaringBitmap32 nullBitmap = new RoaringBitmap32();
         private int rowNumber;
+        private final Options options;
 
-        public Writer(DataType dataType) {
+        public Writer(DataType dataType, Options options) {
+            this.version = options.getInteger(VERSION, VERSION_1);
             this.dataType = dataType;
             this.valueMapper = getValueMapper(dataType);
+            this.options = options;
         }
 
         @Override
@@ -103,7 +115,7 @@ public class BitmapFileIndex implements FileIndexer {
                 ByteArrayOutputStream output = new ByteArrayOutputStream();
                 DataOutputStream dos = new DataOutputStream(output);
 
-                dos.writeByte(VERSION_1);
+                dos.writeByte(version);
 
                 // 1.serialize bitmaps to bytes
                 byte[] nullBitmapBytes = nullBitmap.serialize();
@@ -111,7 +123,7 @@ public class BitmapFileIndex implements FileIndexer {
                         id2bitmap.entrySet().stream()
                                 .collect(
                                         Collectors.toMap(
-                                                e -> e.getKey(), e -> 
e.getValue().serialize()));
+                                                Map.Entry::getKey, e -> 
e.getValue().serialize()));
 
                 // 2.build bitmap file index meta
                 LinkedHashMap<Object, Integer> bitmapOffsets = new 
LinkedHashMap<>();
@@ -132,16 +144,36 @@ public class BitmapFileIndex implements FileIndexer {
                                 offsetRef[0] += bytes.length;
                             }
                         });
-                BitmapFileIndexMeta bitmapFileIndexMeta =
-                        new BitmapFileIndexMeta(
-                                dataType,
-                                rowNumber,
-                                id2bitmap.size(),
-                                !nullBitmap.isEmpty(),
-                                nullBitmap.getCardinality() == 1
-                                        ? -1 - nullBitmap.iterator().next()
-                                        : 0,
-                                bitmapOffsets);
+                BitmapFileIndexMeta bitmapFileIndexMeta;
+                if (version == VERSION_1) {
+                    bitmapFileIndexMeta =
+                            new BitmapFileIndexMeta(
+                                    dataType,
+                                    options,
+                                    rowNumber,
+                                    id2bitmap.size(),
+                                    !nullBitmap.isEmpty(),
+                                    nullBitmap.getCardinality() == 1
+                                            ? -1 - nullBitmap.iterator().next()
+                                            : 0,
+                                    bitmapOffsets);
+                } else if (version == VERSION_2) {
+                    bitmapFileIndexMeta =
+                            new BitmapFileIndexMetaV2(
+                                    dataType,
+                                    options,
+                                    rowNumber,
+                                    id2bitmap.size(),
+                                    !nullBitmap.isEmpty(),
+                                    nullBitmap.getCardinality() == 1
+                                            ? -1 - nullBitmap.iterator().next()
+                                            : 0,
+                                    nullBitmapBytes.length,
+                                    bitmapOffsets,
+                                    offsetRef[0]);
+                } else {
+                    throw new RuntimeException("invalid version: " + version);
+                }
 
                 // 3.serialize meta
                 bitmapFileIndexMeta.serialize(dos);
@@ -164,16 +196,20 @@ public class BitmapFileIndex implements FileIndexer {
 
         private final SeekableInputStream seekableInputStream;
         private final int headStart;
-        private int bodyStart;
         private final Map<Object, RoaringBitmap32> bitmaps = new 
LinkedHashMap<>();
+        private final boolean enableNextOffsetToSize;
 
-        private int version;
         private BitmapFileIndexMeta bitmapFileIndexMeta;
         private Function<Object, Object> valueMapper;
 
-        public Reader(SeekableInputStream seekableInputStream, int start, int 
length) {
+        private final Options options;
+
+        public Reader(SeekableInputStream seekableInputStream, int start, 
Options options) {
             this.seekableInputStream = seekableInputStream;
             this.headStart = start;
+            this.options = options;
+            enableNextOffsetToSize =
+                    
options.getBoolean(BitmapFileIndex.ENABLE_NEXT_OFFSET_TO_SIZE, true);
         }
 
         @Override
@@ -222,21 +258,32 @@ public class BitmapFileIndex implements FileIndexer {
                             .map(
                                     it ->
                                             bitmaps.computeIfAbsent(
-                                                    valueMapper.apply(it), k 
-> readBitmap(k)))
+                                                    valueMapper.apply(it), 
this::readBitmap))
                             .iterator());
         }
 
         private RoaringBitmap32 readBitmap(Object bitmapId) {
             try {
-                if (!bitmapFileIndexMeta.contains(bitmapId)) {
+                BitmapFileIndexMeta.Entry entry = 
bitmapFileIndexMeta.findEntry(bitmapId);
+                if (entry == null) {
                     return new RoaringBitmap32();
                 } else {
-                    int offset = bitmapFileIndexMeta.getOffset(bitmapId);
+                    int offset = entry.offset;
                     if (offset < 0) {
                         return RoaringBitmap32.bitmapOf(-1 - offset);
                     } else {
-                        seekableInputStream.seek(bodyStart + offset);
+                        
seekableInputStream.seek(bitmapFileIndexMeta.getBodyStart() + offset);
                         RoaringBitmap32 bitmap = new RoaringBitmap32();
+                        if (enableNextOffsetToSize) {
+                            int length = entry.length;
+                            if (length != -1) {
+                                DataInputStream input = new 
DataInputStream(seekableInputStream);
+                                byte[] bytes = new byte[length];
+                                input.readFully(bytes);
+                                bitmap.deserialize(ByteBuffer.wrap(bytes));
+                                return bitmap;
+                            }
+                        }
                         bitmap.deserialize(new 
DataInputStream(seekableInputStream));
                         return bitmap;
                     }
@@ -251,18 +298,20 @@ public class BitmapFileIndex implements FileIndexer {
                 this.valueMapper = getValueMapper(dataType);
                 try {
                     seekableInputStream.seek(headStart);
-                    this.version = seekableInputStream.read();
-                    if (this.version > VERSION_1) {
+                    int version = seekableInputStream.read();
+                    if (version == VERSION_1) {
+                        this.bitmapFileIndexMeta = new 
BitmapFileIndexMeta(dataType, options);
+                        
this.bitmapFileIndexMeta.deserialize(seekableInputStream);
+                    } else if (version == VERSION_2) {
+                        this.bitmapFileIndexMeta = new 
BitmapFileIndexMetaV2(dataType, options);
+                        
this.bitmapFileIndexMeta.deserialize(seekableInputStream);
+                    } else if (version > VERSION_2) {
                         throw new RuntimeException(
                                 String.format(
                                         "read index file fail, "
                                                 + "your plugin version is 
lower than %d",
-                                        this.version));
+                                        version));
                     }
-                    DataInput input = new DataInputStream(seekableInputStream);
-                    this.bitmapFileIndexMeta = new 
BitmapFileIndexMeta(dataType);
-                    this.bitmapFileIndexMeta.deserialize(input);
-                    bodyStart = (int) seekableInputStream.getPos();
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMeta.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMeta.java
index 91afece8ca..af09aec9cc 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMeta.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMeta.java
@@ -19,6 +19,8 @@
 package org.apache.paimon.fileindex.bitmap;
 
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BinaryType;
@@ -43,10 +45,15 @@ import org.apache.paimon.types.VarBinaryType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.types.VariantType;
 
+import java.io.BufferedInputStream;
 import java.io.DataInput;
+import java.io.DataInputStream;
 import java.io.DataOutput;
+import java.io.InputStream;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.function.Function;
 
 /**
  *
@@ -88,26 +95,33 @@ import java.util.Map;
  */
 public class BitmapFileIndexMeta {
 
-    private final DataType dataType;
-
-    private int rowCount;
-    private int nonNullBitmapNumber;
-    private boolean hasNullValue;
-    private int nullValueOffset;
-    private LinkedHashMap<Object, Integer> bitmapOffsets;
-
-    public BitmapFileIndexMeta(DataType dataType) {
+    protected final DataType dataType;
+    protected final Options options;
+    protected int rowCount;
+    protected int nonNullBitmapNumber;
+    protected boolean hasNullValue;
+    protected int nullValueOffset;
+    protected LinkedHashMap<Object, Integer> bitmapOffsets;
+    protected Map<Object, Integer> bitmapLengths;
+    protected long bodyStart;
+    protected boolean enableNextOffsetToSize;
+
+    public BitmapFileIndexMeta(DataType dataType, Options options) {
         this.dataType = dataType;
+        this.options = options;
+        enableNextOffsetToSize =
+                options.getBoolean(BitmapFileIndex.ENABLE_NEXT_OFFSET_TO_SIZE, 
true);
     }
 
     public BitmapFileIndexMeta(
             DataType dataType,
+            Options options,
             int rowCount,
             int nonNullBitmapNumber,
             boolean hasNullValue,
             int nullValueOffset,
             LinkedHashMap<Object, Integer> bitmapOffsets) {
-        this(dataType);
+        this(dataType, options);
         this.rowCount = rowCount;
         this.nonNullBitmapNumber = nonNullBitmapNumber;
         this.hasNullValue = hasNullValue;
@@ -119,22 +133,141 @@ public class BitmapFileIndexMeta {
         return rowCount;
     }
 
-    public boolean contains(Object bitmapId) {
-        if (bitmapId == null) {
-            return hasNullValue;
-        }
-        return bitmapOffsets.containsKey(bitmapId);
+    public long getBodyStart() {
+        return bodyStart;
     }
 
-    public int getOffset(Object bitmapId) {
+    /**
+     * Find entry for bitmap.
+     *
+     * @param bitmapId the bitmap identifier to be searched.
+     * @return an {@link Entry} which contains offset and length of bitmap if 
it is contained in the
+     *     index meta; otherwise `null`.
+     */
+    public Entry findEntry(Object bitmapId) {
+        int length = bitmapLengths == null ? -1 : 
bitmapLengths.getOrDefault(bitmapId, -1);
         if (bitmapId == null) {
-            return nullValueOffset;
+            if (hasNullValue) {
+                return new Entry(null, nullValueOffset, length);
+            }
+        } else {
+            if (bitmapOffsets.containsKey(bitmapId)) {
+                return new Entry(bitmapId, bitmapOffsets.get(bitmapId), 
length);
+            }
         }
-        return bitmapOffsets.get(bitmapId);
+        return null;
     }
 
     public void serialize(DataOutput out) throws Exception {
 
+        ThrowableConsumer valueWriter = getValueWriter(out);
+
+        out.writeInt(rowCount);
+        out.writeInt(nonNullBitmapNumber);
+        out.writeBoolean(hasNullValue);
+        if (hasNullValue) {
+            out.writeInt(nullValueOffset);
+        }
+        for (Map.Entry<Object, Integer> entry : bitmapOffsets.entrySet()) {
+            valueWriter.accept(entry.getKey());
+            out.writeInt(entry.getValue());
+        }
+    }
+
+    public void deserialize(SeekableInputStream seekableInputStream) throws 
Exception {
+        bodyStart = seekableInputStream.getPos();
+        InputStream inputStream = seekableInputStream;
+        if (options.getBoolean(BitmapFileIndex.ENABLE_BUFFERED_INPUT, true)) {
+            inputStream = new BufferedInputStream(inputStream);
+        }
+        if (enableNextOffsetToSize) {
+            this.bitmapLengths = new HashMap<>();
+        }
+        DataInput in = new DataInputStream(inputStream);
+        ThrowableSupplier valueReader = getValueReader(in);
+        Function<Object, Integer> measure = getSerializeSizeMeasure();
+
+        rowCount = in.readInt();
+        bodyStart += Integer.BYTES;
+
+        nonNullBitmapNumber = in.readInt();
+        bodyStart += Integer.BYTES;
+
+        hasNullValue = in.readBoolean();
+        bodyStart++;
+
+        if (hasNullValue) {
+            nullValueOffset = in.readInt();
+            bodyStart += Integer.BYTES;
+        }
+
+        bitmapOffsets = new LinkedHashMap<>();
+        Object lastValue = null;
+        int lastOffset = nullValueOffset;
+        for (int i = 0; i < nonNullBitmapNumber; i++) {
+            Object value = valueReader.get();
+            int offset = in.readInt();
+            bitmapOffsets.put(value, offset);
+            bodyStart += measure.apply(value) + Integer.BYTES;
+            if (enableNextOffsetToSize) {
+                if (offset >= 0) {
+                    if (lastOffset >= 0) {
+                        int length = offset - lastOffset;
+                        bitmapLengths.put(lastValue, length);
+                    }
+                    lastValue = value;
+                    lastOffset = offset;
+                }
+            }
+        }
+    }
+
+    protected Function<Object, Integer> getSerializeSizeMeasure() {
+        return dataType.accept(
+                new DataTypeVisitorAdapter<Function<Object, Integer>>() {
+                    @Override
+                    public Function<Object, Integer> visitBinaryString() {
+                        return o -> Integer.BYTES + ((BinaryString) 
o).getSizeInBytes();
+                    }
+
+                    @Override
+                    public Function<Object, Integer> visitByte() {
+                        return o -> Byte.BYTES;
+                    }
+
+                    @Override
+                    public Function<Object, Integer> visitShort() {
+                        return o -> Short.BYTES;
+                    }
+
+                    @Override
+                    public Function<Object, Integer> visitInt() {
+                        return o -> Integer.BYTES;
+                    }
+
+                    @Override
+                    public Function<Object, Integer> visitLong() {
+                        return o -> Long.BYTES;
+                    }
+
+                    @Override
+                    public Function<Object, Integer> visitFloat() {
+                        return o -> Float.BYTES;
+                    }
+
+                    @Override
+                    public Function<Object, Integer> visitDouble() {
+                        return o -> Double.BYTES;
+                    }
+
+                    @Override
+                    public Function<Object, Integer> visitBoolean() {
+                        return o -> 1;
+                    }
+                });
+    }
+
+    protected ThrowableConsumer getValueWriter(DataOutput out) {
         ThrowableConsumer valueWriter =
                 dataType.accept(
                         new DataTypeVisitorAdapter<ThrowableConsumer>() {
@@ -182,21 +315,10 @@ public class BitmapFileIndexMeta {
                                 return o -> out.writeBoolean((Boolean) o);
                             }
                         });
-
-        out.writeInt(rowCount);
-        out.writeInt(nonNullBitmapNumber);
-        out.writeBoolean(hasNullValue);
-        if (hasNullValue) {
-            out.writeInt(nullValueOffset);
-        }
-        for (Map.Entry<Object, Integer> entry : bitmapOffsets.entrySet()) {
-            valueWriter.accept(entry.getKey());
-            out.writeInt(entry.getValue());
-        }
+        return valueWriter;
     }
 
-    public void deserialize(DataInput in) throws Exception {
-
+    protected ThrowableSupplier getValueReader(DataInput in) {
         ThrowableSupplier valueReader =
                 dataType.accept(
                         new DataTypeVisitorAdapter<ThrowableSupplier>() {
@@ -245,17 +367,7 @@ public class BitmapFileIndexMeta {
                                 return in::readBoolean;
                             }
                         });
-
-        rowCount = in.readInt();
-        nonNullBitmapNumber = in.readInt();
-        hasNullValue = in.readBoolean();
-        if (hasNullValue) {
-            nullValueOffset = in.readInt();
-        }
-        bitmapOffsets = new LinkedHashMap<>();
-        for (int i = 0; i < nonNullBitmapNumber; i++) {
-            bitmapOffsets.put(valueReader.get(), in.readInt());
-        }
+        return valueReader;
     }
 
     /** functional interface. */
@@ -392,4 +504,18 @@ public class BitmapFileIndexMeta {
             throw new UnsupportedOperationException("Does not support type 
variant");
         }
     }
+
+    /** Bitmap entry. */
+    public static class Entry {
+
+        Object key;
+        int offset;
+        int length;
+
+        public Entry(Object key, int offset, int length) {
+            this.key = key;
+            this.offset = offset;
+            this.length = length;
+        }
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMetaV2.java
 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMetaV2.java
new file mode 100644
index 0000000000..a6974ecd30
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/fileindex/bitmap/BitmapFileIndexMetaV2.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fileindex.bitmap;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.types.DataType;
+
+import java.io.BufferedInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * When the bitmap-indexed column cardinality is high, using the first version 
of the bitmap index
+ * format will take a lot of time to read the entire dictionary. But in fact 
we don't need a full
+ * dictionary when dealing with a small number of predicates, the performance 
of predicate hits on
+ * the bitmap can be improved by creating a secondary index on the dictionary.
+ *
+ * <pre>
+ * Bitmap file index format (V2)
+ * +-------------------------------------------------+-----------------
+ * | version (1 byte) = 2                           |
+ * +-------------------------------------------------+
+ * | row count (4 bytes int)                        |
+ * +-------------------------------------------------+
+ * | non-null value bitmap number (4 bytes int)     |
+ * +-------------------------------------------------+
+ * | has null value (1 byte)                        |
+ * +-------------------------------------------------+
+ * | null value offset (4 bytes if has null value)  |       HEAD
+ * +-------------------------------------------------+
+ * | null bitmap length (4 bytes if has null value) |
+ * +-------------------------------------------------+
+ * | bitmap index block number (4 bytes int)        |
+ * +-------------------------------------------------+
+ * | value 1 | offset 1                             |
+ * +-------------------------------------------------+
+ * | value 2 | offset 2                             |
+ * +-------------------------------------------------+
+ * | ...                                            |
+ * +-------------------------------------------------+
+ * | bitmap blocks offset (4 bytes int)             |
+ * +-------------------------------------------------+-----------------
+ * | bitmap index block 1                           |
+ * +-------------------------------------------------+
+ * | bitmap index block 2                           |  INDEX BLOCKS
+ * +-------------------------------------------------+
+ * | ...                                            |
+ * +-------------------------------------------------+-----------------
+ * | serialized bitmap 1                            |
+ * +-------------------------------------------------+
+ * | serialized bitmap 2                            |
+ * +-------------------------------------------------+  BITMAP BLOCKS
+ * | serialized bitmap 3                            |
+ * +-------------------------------------------------+
+ * | ...                                            |
+ * +-------------------------------------------------+-----------------
+ *
+ * index block format:
+ * +-------------------------------------------------+
+ * | entry number (4 bytes int)                     |
+ * +-------------------------------------------------+
+ * | value 1 | offset 1 | length 1                  |
+ * +-------------------------------------------------+
+ * | value 2 | offset 2 | length 2                  |
+ * +-------------------------------------------------+
+ * | ...                                            |
+ * +-------------------------------------------------+
+ * </pre>
+ */
+public class BitmapFileIndexMetaV2 extends BitmapFileIndexMeta {
+
+    private long blockSizeLimit;
+
+    private List<BitmapIndexBlock> indexBlocks;
+    private long indexBlockStart;
+    private int nullBitmapLength;
+
+    public BitmapFileIndexMetaV2(DataType dataType, Options options) {
+        super(dataType, options);
+        this.nullBitmapLength = -1;
+    }
+
+    public BitmapFileIndexMetaV2(
+            DataType dataType,
+            Options options,
+            int rowCount,
+            int nonNullBitmapNumber,
+            boolean hasNullValue,
+            int nullValueOffset,
+            int nullBitmapLength,
+            LinkedHashMap<Object, Integer> bitmapOffsets,
+            int finalOffset) {
+        super(
+                dataType,
+                options,
+                rowCount,
+                nonNullBitmapNumber,
+                hasNullValue,
+                nullValueOffset,
+                bitmapOffsets);
+        this.nullBitmapLength = nullBitmapLength;
+        blockSizeLimit =
+                
MemorySize.parse(options.getString(BitmapFileIndex.INDEX_BLOCK_SIZE, "16kb"))
+                        .getBytes();
+        if (enableNextOffsetToSize) {
+            bitmapLengths = new HashMap<>();
+            Object lastValue = null;
+            int lastOffset = nullValueOffset;
+            for (Map.Entry<Object, Integer> entry : bitmapOffsets.entrySet()) {
+                Object value = entry.getKey();
+                Integer offset = entry.getValue();
+                if (offset >= 0) {
+                    if (lastOffset >= 0) {
+                        bitmapLengths.put(lastValue, offset - lastOffset);
+                    }
+                    lastValue = value;
+                    lastOffset = offset;
+                }
+            }
+            bitmapLengths.put(lastValue, finalOffset - lastOffset);
+        }
+    }
+
+    public static Comparator<Object> getComparator(DataType dataType) {
+        return dataType.accept(
+                new DataTypeVisitorAdapter<Comparator<Object>>() {
+                    @Override
+                    public Comparator<Object> visitBinaryString() {
+                        return Comparator.comparing(o -> ((BinaryString) o));
+                    }
+
+                    @Override
+                    public Comparator<Object> visitByte() {
+                        return Comparator.comparing(o -> ((Byte) o));
+                    }
+
+                    @Override
+                    public Comparator<Object> visitShort() {
+                        return Comparator.comparing(o -> ((Short) o));
+                    }
+
+                    @Override
+                    public Comparator<Object> visitInt() {
+                        return Comparator.comparing(o -> ((Integer) o));
+                    }
+
+                    @Override
+                    public Comparator<Object> visitLong() {
+                        return Comparator.comparing(o -> ((Long) o));
+                    }
+
+                    @Override
+                    public Comparator<Object> visitFloat() {
+                        return Comparator.comparing(o -> ((Float) o));
+                    }
+
+                    @Override
+                    public Comparator<Object> visitDouble() {
+                        return Comparator.comparing(o -> ((Double) o));
+                    }
+
+                    @Override
+                    public Comparator<Object> visitBoolean() {
+                        return Comparator.comparing(o -> ((Boolean) o));
+                    }
+                });
+    }
+
+    @Override
+    public Entry findEntry(Object bitmapId) {
+        if (bitmapId == null) {
+            if (hasNullValue) {
+                return new Entry(null, nullValueOffset, nullBitmapLength);
+            }
+        } else {
+            BitmapIndexBlock block = findBlock(bitmapId);
+            if (block != null) {
+                return block.findEntry(bitmapId);
+            }
+        }
+        return null;
+    }
+
+    private BitmapIndexBlock findBlock(Object bitmapId) {
+        Comparator<Object> comparator = getComparator(dataType);
+        int idx =
+                Collections.binarySearch(
+                        indexBlocks, null, (b1, ignore) -> 
comparator.compare(b1.key, bitmapId));
+        idx = idx < 0 ? -2 - idx : idx;
+        return idx < 0 ? null : indexBlocks.get(idx);
+    }
+
+    @Override
+    public void serialize(DataOutput out) throws Exception {
+
+        ThrowableConsumer valueWriter = getValueWriter(out);
+
+        out.writeInt(rowCount);
+        out.writeInt(nonNullBitmapNumber);
+        out.writeBoolean(hasNullValue);
+        if (hasNullValue) {
+            out.writeInt(nullValueOffset);
+            out.writeInt(nullBitmapLength);
+        }
+
+        LinkedList<BitmapIndexBlock> indexBlocks = new LinkedList<>();
+        this.indexBlocks = indexBlocks;
+        indexBlocks.add(new BitmapIndexBlock(0));
+        Comparator<Object> comparator = getComparator(dataType);
+        bitmapOffsets.entrySet().stream()
+                .map(
+                        it ->
+                                new Entry(
+                                        it.getKey(),
+                                        it.getValue(),
+                                        bitmapLengths == null
+                                                ? -1
+                                                : 
bitmapLengths.getOrDefault(it.getKey(), -1)))
+                .sorted((e1, e2) -> comparator.compare(e1.key, e2.key))
+                .forEach(
+                        e -> {
+                            BitmapIndexBlock last = indexBlocks.peekLast();
+                            if (!last.tryAdd(e)) {
+                                BitmapIndexBlock next =
+                                        new BitmapIndexBlock(last.offset + 
last.serializedBytes);
+                                indexBlocks.add(next);
+                                if (!next.tryAdd(e)) {
+                                    throw new RuntimeException("index fail");
+                                }
+                            }
+                        });
+
+        out.writeInt(indexBlocks.size());
+
+        int bitmapBodyOffset = 0;
+        for (BitmapIndexBlock e : indexBlocks) {
+            // secondary entry
+            valueWriter.accept(e.key);
+            out.writeInt(e.offset);
+            bitmapBodyOffset += e.serializedBytes;
+        }
+
+        // bitmap body offset
+        out.writeInt(bitmapBodyOffset);
+
+        // bitmap index blocks
+        for (BitmapIndexBlock indexBlock : indexBlocks) {
+            out.writeInt(indexBlock.entryList.size());
+            for (Entry e : indexBlock.entryList) {
+                valueWriter.accept(e.key);
+                out.writeInt(e.offset);
+                out.writeInt(e.length);
+            }
+        }
+    }
+
+    @Override
+    public void deserialize(SeekableInputStream seekableInputStream) throws 
Exception {
+
+        indexBlockStart = seekableInputStream.getPos();
+
+        InputStream inputStream = seekableInputStream;
+        if (options.getBoolean(BitmapFileIndex.ENABLE_BUFFERED_INPUT, true)) {
+            inputStream = new BufferedInputStream(inputStream);
+        }
+        DataInput in = new DataInputStream(inputStream);
+        ThrowableSupplier valueReader = getValueReader(in);
+        Function<Object, Integer> measure = getSerializeSizeMeasure();
+
+        rowCount = in.readInt();
+        indexBlockStart += Integer.BYTES;
+
+        nonNullBitmapNumber = in.readInt();
+        indexBlockStart += Integer.BYTES;
+
+        hasNullValue = in.readBoolean();
+        indexBlockStart++;
+
+        if (hasNullValue) {
+            nullValueOffset = in.readInt();
+            nullBitmapLength = in.readInt();
+            indexBlockStart += 2 * Integer.BYTES;
+        }
+
+        bitmapOffsets = new LinkedHashMap<>();
+
+        int bitmapBlockNumber = in.readInt();
+        indexBlockStart += Integer.BYTES;
+
+        indexBlocks = new ArrayList<>(bitmapBlockNumber);
+        for (int i = 0; i < bitmapBlockNumber; i++) {
+            Object key = valueReader.get();
+            int offset = in.readInt();
+            indexBlocks.add(
+                    new BitmapIndexBlock(dataType, options, key, offset, 
seekableInputStream));
+            indexBlockStart += measure.apply(key) + Integer.BYTES;
+        }
+
+        // bitmap body offset
+        int bitmapBodyOffset = in.readInt();
+        indexBlockStart += Integer.BYTES;
+
+        bodyStart = indexBlockStart + bitmapBodyOffset;
+    }
+
+    /** Split of all bitmap entries. */
+    class BitmapIndexBlock {
+
+        Object key;
+        int offset;
+        int serializedBytes = Integer.BYTES;
+        List<Entry> entryList;
+        Function<Object, Integer> keyBytesMapper;
+        DataType dataType;
+        SeekableInputStream seekableInputStream;
+        Options options;
+
+        void tryDeserialize() {
+            if (entryList == null) {
+                try {
+                    seekableInputStream.seek(indexBlockStart + offset);
+                    InputStream inputStream = seekableInputStream;
+                    if 
(options.getBoolean(BitmapFileIndex.ENABLE_BUFFERED_INPUT, true)) {
+                        inputStream = new BufferedInputStream(inputStream);
+                    }
+                    DataInputStream in = new DataInputStream(inputStream);
+                    ThrowableSupplier valueReader = getValueReader(in);
+                    int entryNum = in.readInt();
+                    entryList = new ArrayList<>(entryNum);
+                    for (int i = 0; i < entryNum; i++) {
+                        entryList.add(new Entry(valueReader.get(), 
in.readInt(), in.readInt()));
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+
+        Entry findEntry(Object bitmapId) {
+            tryDeserialize();
+            Comparator<Object> comparator = getComparator(dataType);
+            int idx =
+                    Collections.binarySearch(
+                            entryList, null, (e1, ignore) -> 
comparator.compare(e1.key, bitmapId));
+            if (idx >= 0) {
+                return entryList.get(idx);
+            }
+            return null;
+        }
+
+        boolean tryAdd(Entry entry) {
+            if (key == null) {
+                key = entry.key;
+            }
+            int entryBytes = 2 * Integer.BYTES + 
keyBytesMapper.apply(entry.key);
+            if (serializedBytes + entryBytes > blockSizeLimit) {
+                return false;
+            }
+            serializedBytes += entryBytes;
+            entryList.add(entry);
+            return true;
+        }
+
+        // for build and serialize
+        public BitmapIndexBlock(int offset) {
+            this.offset = offset;
+            this.entryList = new LinkedList<>();
+            keyBytesMapper = getSerializeSizeMeasure();
+        }
+
+        // for deserialize
+        public BitmapIndexBlock(
+                DataType dataType,
+                Options options,
+                Object key,
+                int offset,
+                SeekableInputStream seekableInputStream) {
+            this.dataType = dataType;
+            this.options = options;
+            this.key = key;
+            this.offset = offset;
+            this.seekableInputStream = seekableInputStream;
+        }
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/bitmapindex/TestBitmapFileIndex.java
 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bitmapindex/TestBitmapFileIndex.java
index 15018e90f1..917c0f194e 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/fileindex/bitmapindex/TestBitmapFileIndex.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/fileindex/bitmapindex/TestBitmapFileIndex.java
@@ -20,24 +20,35 @@ package org.apache.paimon.fileindex.bitmapindex;
 
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.fileindex.FileIndexReader;
+import org.apache.paimon.fileindex.FileIndexResult;
 import org.apache.paimon.fileindex.FileIndexWriter;
 import org.apache.paimon.fileindex.bitmap.BitmapFileIndex;
+import org.apache.paimon.fileindex.bitmap.BitmapFileIndexMetaV2;
 import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
-import org.apache.paimon.fs.ByteArraySeekableStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.RoaringBitmap32;
 
-import org.assertj.core.api.Assertions;
+import org.apache.commons.io.FileUtils;
+import org.junit.Rule;
 import org.junit.jupiter.api.Test;
+import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.util.Arrays;
+import java.util.function.Consumer;
 
 /** test for {@link BitmapFileIndex}. */
 public class TestBitmapFileIndex {
 
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
+
     @Test
     public void testFlip() {
         RoaringBitmap32 bitmap = RoaringBitmap32.bitmapOf(1, 3, 5);
@@ -46,123 +57,187 @@ public class TestBitmapFileIndex {
     }
 
     @Test
-    public void testBitmapIndex1() {
-        VarCharType dataType = new VarCharType();
-        FieldRef fieldRef = new FieldRef(0, "", dataType);
-        BitmapFileIndex bitmapFileIndex = new BitmapFileIndex(dataType, null);
-        FileIndexWriter writer = bitmapFileIndex.createWriter();
-        Object[] arr = {
-            BinaryString.fromString("a"),
-            null,
-            BinaryString.fromString("b"),
-            null,
-            BinaryString.fromString("a"),
-        };
-        for (Object o : arr) {
-            writer.write(o);
-        }
-        byte[] bytes = writer.serializedBytes();
-        ByteArraySeekableStream seekableStream = new 
ByteArraySeekableStream(bytes);
-        FileIndexReader reader = bitmapFileIndex.createReader(seekableStream, 
0, bytes.length);
-
-        BitmapIndexResult result1 =
-                (BitmapIndexResult) reader.visitEqual(fieldRef, 
BinaryString.fromString("a"));
-        assert result1.get().equals(RoaringBitmap32.bitmapOf(0, 4));
-
-        BitmapIndexResult result2 =
-                (BitmapIndexResult) reader.visitEqual(fieldRef, 
BinaryString.fromString("b"));
-        assert result2.get().equals(RoaringBitmap32.bitmapOf(2));
-
-        BitmapIndexResult result3 = (BitmapIndexResult) 
reader.visitIsNull(fieldRef);
-        assert result3.get().equals(RoaringBitmap32.bitmapOf(1, 3));
-
-        BitmapIndexResult result4 = (BitmapIndexResult) result1.and(result2);
-        assert result4.get().equals(RoaringBitmap32.bitmapOf());
-
-        BitmapIndexResult result5 = (BitmapIndexResult) result1.or(result2);
-        assert result5.get().equals(RoaringBitmap32.bitmapOf(0, 2, 4));
+    public void testComparator() {
+        assert BitmapFileIndexMetaV2.getComparator(new VarCharType())
+                        .compare(BinaryString.fromString("a"), 
BinaryString.fromString("b"))
+                < 0;
+        assert BitmapFileIndexMetaV2.getComparator(new VarCharType())
+                        .compare(BinaryString.fromString("a"), 
BinaryString.fromString("a"))
+                == 0;
+        assert BitmapFileIndexMetaV2.getComparator(new VarCharType())
+                        .compare(BinaryString.fromString("c"), 
BinaryString.fromString("b"))
+                > 0;
+        assert BitmapFileIndexMetaV2.getComparator(new IntType()).compare(1, 
2) < 0;
+        assert BitmapFileIndexMetaV2.getComparator(new IntType()).compare(2, 
1) > 0;
     }
 
     @Test
-    public void testBitmapIndex2() {
-        IntType dataType = new IntType();
-        FieldRef fieldRef = new FieldRef(0, "", dataType);
-        BitmapFileIndex bitmapFileIndex = new BitmapFileIndex(dataType, null);
-        FileIndexWriter writer = bitmapFileIndex.createWriter();
-        Object[] arr = {0, 1, null};
-        for (Object o : arr) {
-            writer.write(o);
-        }
-        byte[] bytes = writer.serializedBytes();
-        ByteArraySeekableStream seekableStream = new 
ByteArraySeekableStream(bytes);
-        FileIndexReader reader = bitmapFileIndex.createReader(seekableStream, 
0, bytes.length);
-
-        BitmapIndexResult result1 = (BitmapIndexResult) 
reader.visitEqual(fieldRef, 1);
-        assert result1.get().equals(RoaringBitmap32.bitmapOf(1));
-
-        BitmapIndexResult result2 = (BitmapIndexResult) 
reader.visitIsNull(fieldRef);
-        assert result2.get().equals(RoaringBitmap32.bitmapOf(2));
-
-        BitmapIndexResult result3 = (BitmapIndexResult) 
reader.visitIsNotNull(fieldRef);
-        assert result3.get().equals(RoaringBitmap32.bitmapOf(0, 1));
-
-        BitmapIndexResult result4 =
-                (BitmapIndexResult) reader.visitNotIn(fieldRef, 
Arrays.asList(1, 2));
-        assert result4.get().equals(RoaringBitmap32.bitmapOf(0, 2));
-
-        BitmapIndexResult result5 =
-                (BitmapIndexResult) reader.visitNotIn(fieldRef, 
Arrays.asList(1, 0));
-        assert result5.get().equals(RoaringBitmap32.bitmapOf(2));
+    public void testMemorySize() {
+        assert MemorySize.parse("16kb").getBytes() == 16 * 1024;
+        assert MemorySize.parse("16KB").getBytes() == 16 * 1024;
+        assert MemorySize.parse("16 kb").getBytes() == 16 * 1024;
+        assert MemorySize.parse("16 KB").getBytes() == 16 * 1024;
+        assert MemorySize.parse("16384").getBytes() == 16 * 1024;
     }
 
     @Test
-    public void testBitmapIndex3() {
-
-        IntType intType = new IntType();
-        FieldRef fieldRef = new FieldRef(0, "", intType);
-        BitmapFileIndex bitmapFileIndex = new BitmapFileIndex(intType, null);
-        FileIndexWriter writer = bitmapFileIndex.createWriter();
-
-        // test only one null-value
-        Object[] arr = {1, 2, 1, 2, 1, 3, null};
-
-        for (Object o : arr) {
-            writer.write(o);
-        }
-        byte[] bytes = writer.serializedBytes();
-        ByteArraySeekableStream seekableStream = new 
ByteArraySeekableStream(bytes);
-        FileIndexReader reader = bitmapFileIndex.createReader(seekableStream, 
0, bytes.length);
-
-        BitmapIndexResult result1 = (BitmapIndexResult) 
reader.visitEqual(fieldRef, 1);
-        assert result1.get().equals(RoaringBitmap32.bitmapOf(0, 2, 4));
-
-        // test read singleton bitmap
-        BitmapIndexResult result2 = (BitmapIndexResult) 
reader.visitIsNull(fieldRef);
-        assert result2.get().equals(RoaringBitmap32.bitmapOf(6));
+    public void testV1() throws Exception {
+        testIntType(BitmapFileIndex.VERSION_1);
+        testStringType(BitmapFileIndex.VERSION_1);
+        testBooleanType(BitmapFileIndex.VERSION_1);
+        testHighCardinality(BitmapFileIndex.VERSION_1, 1000000, 100000, null);
     }
 
     @Test
-    void testBitmapIndexForBooleanType() {
+    public void testV2() throws Exception {
+        testIntType(BitmapFileIndex.VERSION_2);
+        testStringType(BitmapFileIndex.VERSION_2);
+        testBooleanType(BitmapFileIndex.VERSION_2);
+        testHighCardinality(BitmapFileIndex.VERSION_2, 1000000, 100000, null);
+    }
 
-        BooleanType booleanType = new BooleanType();
-        FieldRef fieldRef = new FieldRef(0, "", booleanType);
-        BitmapFileIndex bitmapFileIndex = new BitmapFileIndex(booleanType, 
null);
-        FileIndexWriter writer = bitmapFileIndex.createWriter();
+    private FileIndexReader createTestReaderOnWriter(
+            int writerVersion,
+            Integer indexBlockSize,
+            DataType dataType,
+            Consumer<FileIndexWriter> consumer)
+            throws Exception {
+        Options options = new Options();
+        options.setInteger(BitmapFileIndex.VERSION, writerVersion);
+        if (indexBlockSize != null) {
+            options.setInteger(BitmapFileIndex.INDEX_BLOCK_SIZE, 
indexBlockSize);
+        }
+        BitmapFileIndex bitmapFileIndex = new BitmapFileIndex(dataType, 
options);
+        FileIndexWriter writer;
+        writer = bitmapFileIndex.createWriter();
+        consumer.accept(writer);
+        folder.create();
+        File file = folder.newFile("f1");
+        byte[] data = writer.serializedBytes();
+        FileUtils.writeByteArrayToFile(file, data);
+        LocalFileIO.LocalSeekableInputStream localSeekableInputStream =
+                new LocalFileIO.LocalSeekableInputStream(file);
+        return bitmapFileIndex.createReader(localSeekableInputStream, 0, 0);
+    }
 
-        Object[] arr = {Boolean.TRUE, Boolean.FALSE, Boolean.TRUE, 
Boolean.FALSE, null};
+    private void testStringType(int version) throws Exception {
+        FieldRef fieldRef = new FieldRef(0, "", DataTypes.STRING());
+        BinaryString a = BinaryString.fromString("a");
+        BinaryString b = BinaryString.fromString("b");
+        Object[] dataColumn = {a, null, b, null, a};
+        FileIndexReader reader =
+                createTestReaderOnWriter(
+                        version,
+                        null,
+                        DataTypes.STRING(),
+                        writer -> {
+                            for (Object o : dataColumn) {
+                                writer.write(o);
+                            }
+                        });
+        assert ((BitmapIndexResult) reader.visitEqual(fieldRef, a))
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0, 4));
+        assert ((BitmapIndexResult) reader.visitEqual(fieldRef, b))
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(2));
+        assert ((BitmapIndexResult) reader.visitIsNull(fieldRef))
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(1, 3));
+        assert ((BitmapIndexResult) reader.visitIn(fieldRef, Arrays.asList(a, 
b)))
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0, 2, 4));
+        assert !reader.visitEqual(fieldRef, 
BinaryString.fromString("c")).remain();
+    }
 
-        for (Object o : arr) {
-            writer.write(o);
-        }
-        byte[] bytes = writer.serializedBytes();
-        ByteArraySeekableStream seekableStream = new 
ByteArraySeekableStream(bytes);
-        FileIndexReader reader = bitmapFileIndex.createReader(seekableStream, 
0, bytes.length);
+    private void testIntType(int version) throws Exception {
+        FieldRef fieldRef = new FieldRef(0, "", DataTypes.INT());
+        Object[] dataColumn = {0, 1, null};
+        FileIndexReader reader =
+                createTestReaderOnWriter(
+                        version,
+                        null,
+                        DataTypes.INT(),
+                        writer -> {
+                            for (Object o : dataColumn) {
+                                writer.write(o);
+                            }
+                        });
+        assert ((BitmapIndexResult) reader.visitEqual(fieldRef, 0))
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0));
+        assert ((BitmapIndexResult) reader.visitEqual(fieldRef, 1))
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(1));
+        assert ((BitmapIndexResult) reader.visitIsNull(fieldRef))
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(2));
+        assert ((BitmapIndexResult) reader.visitIn(fieldRef, Arrays.asList(0, 
1, 2)))
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0, 1));
+        assert !reader.visitEqual(fieldRef, 2).remain();
+    }
 
-        BitmapIndexResult searchTrueResult =
-                (BitmapIndexResult) reader.visitEqual(fieldRef, Boolean.TRUE);
-        
Assertions.assertThat(searchTrueResult.get()).isEqualTo(RoaringBitmap32.bitmapOf(0,
 2));
+    void testBooleanType(int version) throws Exception {
+        FieldRef fieldRef = new FieldRef(0, "", DataTypes.BOOLEAN());
+        Object[] dataColumn = {Boolean.TRUE, Boolean.FALSE, Boolean.TRUE, 
Boolean.FALSE, null};
+        FileIndexReader reader =
+                createTestReaderOnWriter(
+                        version,
+                        null,
+                        DataTypes.BOOLEAN(),
+                        writer -> {
+                            for (Object o : dataColumn) {
+                                writer.write(o);
+                            }
+                        });
+        assert ((BitmapIndexResult) reader.visitEqual(fieldRef, Boolean.TRUE))
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(0, 2));
+        assert ((BitmapIndexResult) reader.visitIsNull(fieldRef))
+                .get()
+                .equals(RoaringBitmap32.bitmapOf(4));
+    }
 
-        BitmapIndexResult searchNullResult = (BitmapIndexResult) 
reader.visitIsNull(fieldRef);
-        
Assertions.assertThat(searchNullResult.get()).isEqualTo(RoaringBitmap32.bitmapOf(4));
+    private void testHighCardinality(
+            int version, int rowCount, int approxCardinality, Integer 
secondaryBlockSize)
+            throws Exception {
+        FieldRef fieldRef = new FieldRef(0, "", DataTypes.STRING());
+        RoaringBitmap32 middleBm = new RoaringBitmap32();
+        RoaringBitmap32 nullBm = new RoaringBitmap32();
+        long time1 = System.currentTimeMillis();
+        String prefix = "ssssssssss";
+        FileIndexReader reader =
+                createTestReaderOnWriter(
+                        version,
+                        secondaryBlockSize,
+                        DataTypes.STRING(),
+                        writer -> {
+                            for (int i = 0; i < rowCount; i++) {
+
+                                int sid = (int) (Math.random() * 
approxCardinality);
+                                if (sid == approxCardinality / 2) {
+                                    middleBm.add(i);
+                                } else if (Math.random() < 0.01) {
+                                    nullBm.add(i);
+                                    writer.write(null);
+                                    continue;
+                                }
+                                writer.write(BinaryString.fromString(prefix + 
sid));
+                            }
+                        });
+        System.out.println("write time: " + (System.currentTimeMillis() - 
time1));
+        long time2 = System.currentTimeMillis();
+        FileIndexResult result =
+                reader.visitEqual(
+                        fieldRef, BinaryString.fromString(prefix + 
(approxCardinality / 2)));
+        RoaringBitmap32 resultBm = ((BitmapIndexResult) result).get();
+        System.out.println("read time: " + (System.currentTimeMillis() - 
time2));
+        assert resultBm.equals(middleBm);
+        long time3 = System.currentTimeMillis();
+        FileIndexResult resultNull = reader.visitIsNull(fieldRef);
+        RoaringBitmap32 resultNullBm = ((BitmapIndexResult) resultNull).get();
+        System.out.println("read null bitmap time: " + 
(System.currentTimeMillis() - time3));
+        assert resultNullBm.equals(nullBm);
     }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index cb35fa5070..198cdd8cf4 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -44,6 +44,7 @@ import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.RoaringBitmap32;
 
@@ -100,6 +101,7 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
         spark.sql(
                 "CREATE TABLE T(a int) TBLPROPERTIES ("
                         + "'file-index.bitmap.columns'='a',"
+                        + "'file-index.bitmap.a.index-block-size'='32kb',"
                         + "'file-index.in-manifest-threshold'='1B');");
         spark.sql("INSERT INTO T VALUES (0),(1),(2),(3),(4),(5);");
 
@@ -112,6 +114,7 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
 
         // check index reader
         foreachIndexReader(
+                "T",
                 fileIndexReader -> {
                     FileIndexResult fileIndexResult =
                             fileIndexReader.visitEqual(new FieldRef(0, "", new 
IntType()), 3);
@@ -119,6 +122,23 @@ public class SparkFileIndexITCase extends SparkWriteITCase 
{
                     RoaringBitmap32 roaringBitmap32 = ((BitmapIndexResult) 
fileIndexResult).get();
                     assert roaringBitmap32.equals(RoaringBitmap32.bitmapOf(3));
                 });
+
+        // test string type with null bitmap
+        spark.sql(
+                "CREATE TABLE T2(a string) TBLPROPERTIES ("
+                        + "'file-index.bitmap.columns'='a',"
+                        + "'file-index.in-manifest-threshold'='1B');");
+        spark.sql("INSERT INTO T2 VALUES 
('0'),('1'),('1'),(null),('0'),('1');");
+        foreachIndexReader(
+                "T2",
+                fileIndexReader -> {
+                    FileIndexResult fileIndexResult =
+                            fileIndexReader.visitEqual(
+                                    new FieldRef(0, "", new VarCharType()), 
null);
+                    assert fileIndexResult instanceof BitmapIndexResult;
+                    RoaringBitmap32 roaringBitmap32 = ((BitmapIndexResult) 
fileIndexResult).get();
+                    assert roaringBitmap32.equals(RoaringBitmap32.bitmapOf(3));
+                });
     }
 
     @Test
@@ -136,6 +156,7 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
 
         // check index reader
         foreachIndexReader(
+                "T",
                 fileIndexReader -> {
                     FileIndexResult fileIndexResult =
                             fileIndexReader.visitGreaterOrEqual(
@@ -146,9 +167,9 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
                 });
     }
 
-    protected void foreachIndexReader(Consumer<FileIndexReader> consumer)
+    protected void foreachIndexReader(String tableName, 
Consumer<FileIndexReader> consumer)
             throws Catalog.TableNotExistException {
-        Path tableRoot = 
fileSystemCatalog.getTableLocation(Identifier.create("db", "T"));
+        Path tableRoot = 
fileSystemCatalog.getTableLocation(Identifier.create("db", tableName));
         SchemaManager schemaManager = new SchemaManager(fileIO, tableRoot);
         FileStorePathFactory pathFactory =
                 new FileStorePathFactory(
@@ -164,7 +185,7 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
                         null,
                         null);
 
-        Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));
+        Table table = fileSystemCatalog.getTable(Identifier.create("db", 
tableName));
         ReadBuilder readBuilder = table.newReadBuilder();
         List<Split> splits = readBuilder.newScan().plan().splits();
         for (Split split : splits) {

Reply via email to