This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 546112c  feat: Migrate bitmap file index implementation (#49)
546112c is described below

commit 546112c36a545710f0bb551107b063ae1dc788ce
Author: lxy <[email protected]>
AuthorDate: Fri Jun 5 09:54:36 2026 +0800

    feat: Migrate bitmap file index implementation (#49)
---
 .../bitmap/apply_bitmap_index_batch_reader.h       | 113 +++
 .../apply_bitmap_index_batch_reader_test.cpp       | 196 +++++
 .../common/file_index/bitmap/bitmap_file_index.cpp | 360 +++++++++
 .../common/file_index/bitmap/bitmap_file_index.h   | 135 ++++
 .../bitmap/bitmap_file_index_factory.cpp           |  37 +
 .../file_index/bitmap/bitmap_file_index_factory.h  |  42 ++
 .../file_index/bitmap/bitmap_file_index_meta.cpp   | 183 +++++
 .../file_index/bitmap/bitmap_file_index_meta.h     |  97 +++
 .../bitmap/bitmap_file_index_meta_test.cpp         | 186 +++++
 .../bitmap/bitmap_file_index_meta_v1.cpp           | 129 ++++
 .../file_index/bitmap/bitmap_file_index_meta_v1.h  |  92 +++
 .../bitmap/bitmap_file_index_meta_v2.cpp           | 275 +++++++
 .../file_index/bitmap/bitmap_file_index_meta_v2.h  | 149 ++++
 .../file_index/bitmap/bitmap_file_index_test.cpp   | 819 +++++++++++++++++++++
 .../file_index/bitmap/bitmap_index_result.cpp      |  83 +++
 .../file_index/bitmap/bitmap_index_result_test.cpp |  95 +++
 16 files changed, 2991 insertions(+)

diff --git 
a/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h 
b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h
new file mode 100644
index 0000000..2895751
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cassert>
+#include <cstdint>
+#include <memory>
+#include <utility>
+
+#include "arrow/c/abi.h"
+#include "arrow/c/bridge.h"
+#include "arrow/c/helpers.h"
+#include "paimon/common/reader/reader_utils.h"
+#include "paimon/file_index/bitmap_index_result.h"
+#include "paimon/reader/file_batch_reader.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon {
+class Metrics;
+
+class ApplyBitmapIndexBatchReader : public FileBatchReader {
+ public:
+    ApplyBitmapIndexBatchReader(std::unique_ptr<FileBatchReader>&& reader, 
RoaringBitmap32&& bitmap)
+        : reader_(std::move(reader)), bitmap_(std::move(bitmap)) {
+        assert(reader_);
+    }
+
+    Result<ReadBatch> NextBatch() override {
+        return Status::Invalid(
+            "paimon inner reader ApplyBitmapIndexBatchReader should use 
NextBatchWithBitmap");
+    }
+
+    Result<ReadBatchWithBitmap> NextBatchWithBitmap() override {
+        while (true) {
+            PAIMON_ASSIGN_OR_RAISE(ReadBatchWithBitmap batch_with_bitmap,
+                                   reader_->NextBatchWithBitmap());
+            if (BatchReader::IsEofBatch(batch_with_bitmap)) {
+                return batch_with_bitmap;
+            }
+            auto& [batch, bitmap] = batch_with_bitmap;
+            PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 valid_bitmap, 
Filter(batch.first->length));
+            bitmap &= valid_bitmap;
+            if (bitmap.IsEmpty()) {
+                ReaderUtils::ReleaseReadBatch(std::move(batch));
+                continue;
+            }
+            return batch_with_bitmap;
+        }
+    }
+
+    void Close() override {
+        return reader_->Close();
+    }
+
+    std::shared_ptr<Metrics> GetReaderMetrics() const override {
+        return reader_->GetReaderMetrics();
+    }
+
+    Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override {
+        return reader_->GetFileSchema();
+    }
+
+    Status SetReadSchema(::ArrowSchema* read_schema, const 
std::shared_ptr<Predicate>& predicate,
+                         const std::optional<RoaringBitmap32>& 
selection_bitmap) override {
+        return Status::Invalid("ApplyBitmapIndexBatchReader does not support 
SetReadSchema");
+    }
+
+    Result<uint64_t> GetPreviousBatchFirstRowNumber() const override {
+        return reader_->GetPreviousBatchFirstRowNumber();
+    }
+
+    Result<uint64_t> GetNumberOfRows() const override {
+        return reader_->GetNumberOfRows();
+    }
+
+    bool SupportPreciseBitmapSelection() const override {
+        return reader_->SupportPreciseBitmapSelection();
+    }
+
+ private:
+    Result<RoaringBitmap32> Filter(int32_t batch_size) const {
+        RoaringBitmap32 is_valid;
+        PAIMON_ASSIGN_OR_RAISE(int32_t start_pos, 
reader_->GetPreviousBatchFirstRowNumber());
+        int32_t length = batch_size;
+        for (auto iter = bitmap_.EqualOrLarger(start_pos);
+             iter != bitmap_.End() && *iter < start_pos + length; ++iter) {
+            is_valid.Add(*iter - start_pos);
+        }
+        return is_valid;
+    }
+
+ private:
+    std::unique_ptr<FileBatchReader> reader_;
+    RoaringBitmap32 bitmap_;
+};
+}  // namespace paimon
diff --git 
a/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp 
b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp
new file mode 100644
index 0000000..1168a35
--- /dev/null
+++ 
b/src/paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader_test.cpp
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bitmap/apply_bitmap_index_batch_reader.h"
+
+#include <cstdlib>
+#include <string>
+#include <vector>
+
+#include "arrow/api.h"
+#include "arrow/array/array_nested.h"
+#include "arrow/ipc/json_simple.h"
+#include "fmt/format.h"
+#include "fmt/ranges.h"
+#include "gtest/gtest.h"
+#include "paimon/common/reader/prefetch_file_batch_reader_impl.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/executor.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/mock/mock_file_batch_reader.h"
+#include "paimon/testing/mock/mock_file_system.h"
+#include "paimon/testing/mock/mock_format_reader_builder.h"
+#include "paimon/testing/utils/read_result_collector.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/read_ahead_cache.h"
+
+namespace arrow {
+class Array;
+}  // namespace arrow
+namespace paimon {
+class FileSystem;
+}  // namespace paimon
+
+namespace paimon::test {
+class ApplyBitmapIndexBatchReaderTest : public ::testing::Test,
+                                        public 
::testing::WithParamInterface<bool> {
+ public:
+    void SetUp() override {
+        int_type_ = arrow::int32();
+        target_type_ = arrow::struct_({arrow::field("f1", int_type_)});
+
+        pool_ = GetDefaultPool();
+        fs_ = std::make_shared<MockFileSystem>();
+        executor_ = CreateDefaultExecutor(/*thread_count=*/2);
+    }
+    void TearDown() override {}
+
+    void CheckResult(BatchReader* apply_bitmap_batch_reader,
+                     const std::shared_ptr<arrow::ChunkedArray>& 
expected_chunk_array) {
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<arrow::ChunkedArray> 
result_chunk_array,
+                             
ReadResultCollector::CollectResult(apply_bitmap_batch_reader));
+        if (expected_chunk_array) {
+            ASSERT_TRUE(result_chunk_array);
+            ASSERT_EQ(expected_chunk_array->length(), 
result_chunk_array->length());
+            ASSERT_TRUE(expected_chunk_array->Equals(result_chunk_array));
+        } else {
+            ASSERT_FALSE(result_chunk_array);
+        }
+    }
+
+    void CheckResult(const std::string& data_str, const std::vector<int32_t>& 
bitmap_data,
+                     const std::string& expected_str, int32_t 
specified_batch_size = -1) {
+        auto f1 = arrow::ipc::internal::json::ArrayFromJSON(int_type_, 
data_str).ValueOrDie();
+        std::shared_ptr<arrow::Array> data =
+            arrow::StructArray::Make({f1}, 
target_type_->fields()).ValueOrDie();
+
+        std::vector<int32_t> batch_sizes = {1, 2, 4, 10};
+        if (specified_batch_size != -1) {
+            batch_sizes = {specified_batch_size};
+        }
+        int32_t prefetch_batch_count = 3;
+        for (int32_t batch_size : batch_sizes) {
+            auto bitmap_index = RoaringBitmap32::From(bitmap_data);
+            std::unique_ptr<FileBatchReader> file_batch_reader;
+            bool enable_prefetch = GetParam();
+            if (enable_prefetch) {
+                MockFormatReaderBuilder reader_builder(data, target_type_, 
batch_size);
+                ASSERT_OK_AND_ASSIGN(
+                    file_batch_reader,
+                    PrefetchFileBatchReaderImpl::Create(
+                        /*data_file_path=*/"DUMMY", &reader_builder, fs_, 
prefetch_batch_count,
+                        batch_size, prefetch_batch_count * 2,
+                        /*enable_adaptive_prefetch_strategy=*/false, executor_,
+                        /*initialize_read_ranges=*/true,
+                        /*prefetch_cache_mode=*/PrefetchCacheMode::ALWAYS, 
CacheConfig(), pool_));
+            } else {
+                file_batch_reader =
+                    std::make_unique<MockFileBatchReader>(data, target_type_, 
batch_size);
+            }
+            auto apply_bitmap_batch_reader = 
std::make_unique<ApplyBitmapIndexBatchReader>(
+                std::move(file_batch_reader), std::move(bitmap_index));
+            if (expected_str.empty()) {
+                CheckResult(apply_bitmap_batch_reader.get(), nullptr);
+            } else {
+                auto expected =
+                    arrow::ipc::internal::json::ArrayFromJSON(int_type_, 
expected_str).ValueOrDie();
+                std::shared_ptr<arrow::Array> expect_array =
+                    arrow::StructArray::Make({expected}, 
target_type_->fields()).ValueOrDie();
+                auto expected_chunk_array = 
std::make_shared<arrow::ChunkedArray>(expect_array);
+                CheckResult(apply_bitmap_batch_reader.get(), 
expected_chunk_array);
+            }
+        }
+    }
+
+ private:
+    std::shared_ptr<arrow::DataType> int_type_;
+    std::shared_ptr<arrow::DataType> target_type_;
+    std::shared_ptr<MemoryPool> pool_;
+    std::shared_ptr<FileSystem> fs_;
+    std::shared_ptr<Executor> executor_;
+};
+
+TEST_P(ApplyBitmapIndexBatchReaderTest, TestSimple) {
+    std::string data_str = "[10, 11, 12, 13]";
+    {
+        std::vector<int32_t> bitmap_data = {0, 3};
+        CheckResult(data_str, bitmap_data, "[10, 13]");
+    }
+    {
+        std::vector<int32_t> bitmap_data = {1, 2};
+        CheckResult(data_str, bitmap_data, "[11, 12]");
+    }
+    {
+        std::vector<int32_t> bitmap_data = {};
+        // empty result
+        CheckResult(data_str, bitmap_data, "");
+    }
+    {
+        std::vector<int32_t> bitmap_data = {0, 1, 2, 3};
+        CheckResult(data_str, bitmap_data, "[10, 11, 12, 13]");
+    }
+}
+
+TEST_P(ApplyBitmapIndexBatchReaderTest, TestSimple2) {
+    std::string data_str = "[10, 11, 12, 13, 14, 15, 16]";
+    {
+        std::vector<int32_t> bitmap_data = {0, 3, 5, 6};
+        CheckResult(data_str, bitmap_data, "[10, 13, 15, 16]");
+    }
+    {
+        std::vector<int32_t> bitmap_data = {0, 3, 4, 5};
+        CheckResult(data_str, bitmap_data, "[10, 13, 14, 15]");
+    }
+    {
+        std::vector<int32_t> bitmap_data = {4, 5};
+        CheckResult(data_str, bitmap_data, "[14, 15]");
+    }
+
+    {
+        std::vector<int32_t> bitmap_data = {0, 1, 2, 3, 4, 5, 6};
+        CheckResult(data_str, bitmap_data, "[10, 11, 12, 13, 14, 15, 16]");
+    }
+    {
+        std::vector<int32_t> bitmap_data = {};
+        const std::vector<std::pair<std::string, int64_t>> expected = {};
+        CheckResult(data_str, bitmap_data, "");
+    }
+}
+
+TEST_P(ApplyBitmapIndexBatchReaderTest, TestBulkData) {
+    std::vector<int32_t> data;
+    data.reserve(10000);
+    for (int32_t i = 0; i < 10000; i++) {
+        data.push_back(i);
+    }
+    std::vector<int32_t> bitmap_data;
+    int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+    std::srand(seed);
+    for (int32_t i = 0; i < 10000; i++) {
+        if (std::rand() % 3 == 0) {
+            bitmap_data.push_back(i);
+        }
+    }
+    std::string data_str = fmt::format("[{}]", fmt::join(data, ","));
+    std::string result_str = fmt::format("[{}]", fmt::join(bitmap_data, ","));
+    CheckResult(data_str, bitmap_data, result_str, 
/*specified_batch_size=*/1024);
+}
+INSTANTIATE_TEST_SUITE_P(EnablePrefetch, ApplyBitmapIndexBatchReaderTest,
+                         ::testing::Values(false, true));
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/file_index/bitmap/bitmap_file_index.cpp 
b/src/paimon/common/file_index/bitmap/bitmap_file_index.cpp
new file mode 100644
index 0000000..a9542a0
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index.cpp
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bitmap/bitmap_file_index.h"
+
+#include <utility>
+
+#include "arrow/c/bridge.h"
+#include "fmt/format.h"
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta.h"
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta_v1.h"
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta_v2.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/common/utils/options_utils.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/file_index/bitmap_index_result.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+class MemoryPool;
+
+BitmapFileIndex::BitmapFileIndex(const std::map<std::string, std::string>& 
options)
+    : options_(options) {}
+
+Result<Literal> BitmapFileIndex::ConvertLiteral(
+    const Literal& literal, const std::shared_ptr<arrow::DataType>& 
arrow_type) {
+    if (literal.GetType() != FieldType::TIMESTAMP) {
+        return literal;
+    }
+    // convert timestamp literal to long
+    if (literal.IsNull()) {
+        return Literal(FieldType::BIGINT);
+    } else {
+        auto ts_type = 
std::dynamic_pointer_cast<arrow::TimestampType>(arrow_type);
+        if (!ts_type) {
+            return Status::Invalid(fmt::format("literal type TIMESTAMP 
mismatch arrow type {}",
+                                               arrow_type->ToString()));
+        }
+        int64_t precision = DateTimeUtils::GetPrecisionFromType(ts_type);
+        int64_t value = 0;
+        if (precision <= Timestamp::MILLIS_PRECISION) {
+            value = literal.GetValue<Timestamp>().GetMillisecond();
+        } else {
+            value = literal.GetValue<Timestamp>().ToMicrosecond();
+        }
+        return Literal(value);
+    }
+}
+
+FieldType BitmapFileIndex::ConvertType(const FieldType& data_type) {
+    if (data_type == FieldType::TIMESTAMP) {
+        return FieldType::BIGINT;
+    }
+    return data_type;
+}
+
+Result<std::shared_ptr<FileIndexReader>> BitmapFileIndex::CreateReader(
+    ::ArrowSchema* c_arrow_schema, int32_t start, int32_t length,
+    const std::shared_ptr<InputStream>& input_stream,
+    const std::shared_ptr<MemoryPool>& pool) const {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> 
arrow_schema,
+                                      arrow::ImportSchema(c_arrow_schema));
+    if (arrow_schema->num_fields() != 1) {
+        return Status::Invalid(
+            "invalid schema for BitmapFileIndexReader, supposed to have single 
"
+            "field.");
+    }
+    auto arrow_type = arrow_schema->field(0)->type();
+    PAIMON_ASSIGN_OR_RAISE(FieldType data_type,
+                           
FieldTypeUtils::ConvertToFieldType(arrow_type->id()));
+    return std::make_shared<BitmapFileIndexReader>(arrow_type, data_type, 
start, length,
+                                                   input_stream, pool);
+}
+
+Result<std::shared_ptr<FileIndexWriter>> BitmapFileIndex::CreateWriter(
+    ::ArrowSchema* c_arrow_schema, const std::shared_ptr<MemoryPool>& pool) 
const {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> 
arrow_schema,
+                                      arrow::ImportSchema(c_arrow_schema));
+    if (arrow_schema->num_fields() != 1) {
+        return Status::Invalid(
+            "invalid schema for BitmapFileIndexWriter, supposed to have single 
"
+            "field.");
+    }
+    auto arrow_field = arrow_schema->field(0);
+    return BitmapFileIndexWriter::Create(arrow_schema, arrow_field->name(), 
options_, pool);
+}
+
+Result<std::shared_ptr<BitmapFileIndexWriter>> BitmapFileIndexWriter::Create(
+    const std::shared_ptr<arrow::Schema>& arrow_schema, const std::string& 
field_name,
+    const std::map<std::string, std::string>& options, const 
std::shared_ptr<MemoryPool>& pool) {
+    PAIMON_ASSIGN_OR_RAISE(int8_t version,
+                           OptionsUtils::GetValueFromMap<int8_t>(options, 
BitmapFileIndex::VERSION,
+                                                                 
BitmapFileIndex::VERSION_2));
+    auto arrow_field = arrow_schema->GetFieldByName(field_name);
+    if (!arrow_field) {
+        return Status::Invalid(
+            fmt::format("field {} not in arrow_schema for 
BitmapFileIndexWriter", field_name));
+    }
+    auto struct_type = arrow::struct_({arrow_field});
+    return std::shared_ptr<BitmapFileIndexWriter>(
+        new BitmapFileIndexWriter(version, struct_type, arrow_field->type(), 
options, pool));
+}
+
+BitmapFileIndexWriter::BitmapFileIndexWriter(int8_t version,
+                                             const 
std::shared_ptr<arrow::DataType>& struct_type,
+                                             const 
std::shared_ptr<arrow::DataType>& arrow_type,
+                                             const std::map<std::string, 
std::string>& options,
+                                             const 
std::shared_ptr<MemoryPool>& pool)
+    : version_(version),
+      struct_type_(struct_type),
+      arrow_type_(arrow_type),
+      options_(options),
+      pool_(pool) {}
+
+Status BitmapFileIndexWriter::AddBatch(::ArrowArray* batch) {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Array> 
arrow_array,
+                                      arrow::ImportArray(batch, struct_type_));
+    auto struct_array = 
std::dynamic_pointer_cast<arrow::StructArray>(arrow_array);
+    if (!struct_array || struct_array->num_fields() != 1) {
+        return Status::Invalid(
+            "invalid batch for BitmapFileIndexWriter, supposed to be struct 
array with single "
+            "field.");
+    }
+    PAIMON_ASSIGN_OR_RAISE(
+        std::vector<Literal> array_values,
+        LiteralConverter::ConvertLiteralsFromArray(*(struct_array->field(0)), 
/*own_data=*/true));
+    for (const auto& value : array_values) {
+        if (value.IsNull()) {
+            null_bitmap_.Add(row_number_);
+        } else {
+            PAIMON_ASSIGN_OR_RAISE(Literal converted_value,
+                                   BitmapFileIndex::ConvertLiteral(value, 
arrow_type_));
+            id_to_bitmap_[converted_value].Add(row_number_);
+        }
+        row_number_++;
+    }
+    return Status::OK();
+}
+
+Result<PAIMON_UNIQUE_PTR<Bytes>> BitmapFileIndexWriter::SerializedBytes() 
const {
+    PAIMON_ASSIGN_OR_RAISE(FieldType field_type,
+                           
FieldTypeUtils::ConvertToFieldType(arrow_type_->id()));
+    FieldType converted_data_type = BitmapFileIndex::ConvertType(field_type);
+    auto data_output_stream = std::make_shared<MemorySegmentOutputStream>(
+        MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool_);
+    data_output_stream->SetOrder(ByteOrder::PAIMON_BIG_ENDIAN);
+
+    data_output_stream->WriteValue<int8_t>(version_);
+
+    // 1.serialize bitmaps to bytes
+    std::shared_ptr<Bytes> null_bitmap_bytes = 
null_bitmap_.Serialize(pool_.get());
+    std::unordered_map<Literal, std::shared_ptr<Bytes>> id_to_bitmap_bytes;
+    for (const auto& [literal, bitmap] : id_to_bitmap_) {
+        id_to_bitmap_bytes[literal] = bitmap.Serialize(pool_.get());
+    }
+
+    // 2.build bitmap file index meta
+    std::vector<BitmapFileIndexMeta::Entry> write_entries;
+    std::vector<std::shared_ptr<Bytes>> serialize_bitmaps;
+    write_entries.reserve(id_to_bitmap_.size());
+    serialize_bitmaps.reserve(id_to_bitmap_.size());
+
+    // If null bitmap is not empty, it is placed at the beginning.
+    // offset_ref is the offset of first non-null literal.
+    int32_t offset_ref =
+        (null_bitmap_.IsEmpty() || null_bitmap_.Cardinality() == 1) ? 0 : 
null_bitmap_bytes->size();
+    for (const auto& [literal, bitmap] : id_to_bitmap_) {
+        auto bytes = id_to_bitmap_bytes[literal];
+        assert(bytes);
+        if (bitmap.Cardinality() == 1) {
+            // If bitmap has only one element, inline element value to offset.
+            write_entries.emplace_back(literal,
+                                       /*offset=*/-1 - *(bitmap.Begin()),
+                                       /*length=*/-1);
+        } else {
+            serialize_bitmaps.push_back(bytes);
+            write_entries.emplace_back(literal, /*offset=*/offset_ref,
+                                       /*length=*/bytes->size());
+            offset_ref += bytes->size();
+        }
+    }
+
+    // If bitmap has only one element, inline element value to offset.
+    int32_t null_value_offset =
+        null_bitmap_.Cardinality() == 1 ? (-1 - *(null_bitmap_.Begin())) : 0;
+    BitmapFileIndexMeta::Entry null_value_entry(Literal(converted_data_type),
+                                                /*offset=*/null_value_offset,
+                                                
/*length=*/null_bitmap_bytes->size());
+
+    // prepare bitmap file index meta
+    std::shared_ptr<BitmapFileIndexMeta> bitmap_file_index_meta;
+    if (version_ == BitmapFileIndex::VERSION_1) {
+        bitmap_file_index_meta = std::make_shared<BitmapFileIndexMetaV1>(
+            converted_data_type, row_number_, !null_bitmap_.IsEmpty(), 
null_value_entry,
+            std::move(write_entries), pool_);
+    } else if (version_ == BitmapFileIndex::VERSION_2) {
+        bitmap_file_index_meta = std::make_shared<BitmapFileIndexMetaV2>(
+            converted_data_type, row_number_, !null_bitmap_.IsEmpty(), 
null_value_entry,
+            std::move(write_entries), options_, pool_);
+    } else {
+        return Status::Invalid(fmt::format("invalid version: {} for bitmap 
index", version_));
+    }
+
+    // 3.serialize meta
+    
PAIMON_RETURN_NOT_OK(bitmap_file_index_meta->Serialize(data_output_stream));
+
+    // 4.serialize body
+    if (null_bitmap_.Cardinality() > 1) {
+        data_output_stream->WriteBytes(null_bitmap_bytes);
+    }
+    for (const auto& bytes : serialize_bitmaps) {
+        data_output_stream->WriteBytes(bytes);
+    }
+    return MemorySegmentUtils::CopyToBytes(data_output_stream->Segments(), 
/*offset=*/0,
+                                           
/*num_bytes=*/data_output_stream->CurrentSize(),
+                                           pool_.get());
+}
+
+BitmapFileIndexReader::BitmapFileIndexReader(const 
std::shared_ptr<arrow::DataType>& arrow_type,
+                                             const FieldType& data_type, 
int32_t start,
+                                             int32_t length,
+                                             const 
std::shared_ptr<InputStream>& input_stream,
+                                             const 
std::shared_ptr<MemoryPool>& pool)
+    : head_start_(start),
+      length_(length),
+      data_type_(data_type),
+      arrow_type_(arrow_type),
+      pool_(pool),
+      input_stream_(input_stream) {}
+
+Result<std::shared_ptr<FileIndexResult>> 
BitmapFileIndexReader::VisitEqual(const Literal& literal) {
+    return VisitIn({literal});
+}
+
+Result<std::shared_ptr<FileIndexResult>> BitmapFileIndexReader::VisitNotEqual(
+    const Literal& literal) {
+    return VisitNotIn({literal});
+}
+
+Result<std::shared_ptr<FileIndexResult>> BitmapFileIndexReader::VisitIn(
+    const std::vector<Literal>& literals) {
+    if (literals.empty()) {
+        return Status::Invalid("literals cannot be empty in In predicate");
+    }
+    return std::make_shared<BitmapIndexResult>(
+        [literals = literals, reader = shared_from_this()]() -> 
Result<RoaringBitmap32> {
+            PAIMON_RETURN_NOT_OK(reader->ReadInternalMeta());
+            return reader->GetInListResultBitmap(literals);
+        });
+}
+
+Result<std::shared_ptr<FileIndexResult>> BitmapFileIndexReader::VisitNotIn(
+    const std::vector<Literal>& literals) {
+    if (literals.empty()) {
+        return Status::Invalid("literals cannot be empty in In predicate");
+    }
+    return std::make_shared<BitmapIndexResult>(
+        [literals = literals, reader = shared_from_this()]() -> 
Result<RoaringBitmap32> {
+            PAIMON_RETURN_NOT_OK(reader->ReadInternalMeta());
+            // not in does not contain null
+            PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 bitmap, 
reader->GetInListResultBitmap(literals));
+            bitmap.Flip(/*min=*/0, 
/*max=*/reader->bitmap_file_index_meta_->GetRowCount());
+            PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 null,
+                                   
reader->GetInListResultBitmap({Literal(reader->data_type_)}));
+            bitmap -= null;
+            return bitmap;
+        });
+}
+
+Result<std::shared_ptr<FileIndexResult>> BitmapFileIndexReader::VisitIsNull() {
+    return VisitIn({Literal(data_type_)});
+}
+
+Result<std::shared_ptr<FileIndexResult>> 
BitmapFileIndexReader::VisitIsNotNull() {
+    return VisitNotIn({Literal(data_type_)});
+}
+
+Result<RoaringBitmap32> BitmapFileIndexReader::GetInListResultBitmap(
+    const std::vector<Literal>& literals) {
+    std::vector<const RoaringBitmap32*> result_bitmaps;
+    result_bitmaps.reserve(literals.size());
+    for (const Literal& literal : literals) {
+        PAIMON_ASSIGN_OR_RAISE(Literal converted_literal,
+                               BitmapFileIndex::ConvertLiteral(literal, 
arrow_type_));
+        auto iter = bitmaps_.find(converted_literal);
+        if (iter != bitmaps_.end()) {
+            result_bitmaps.emplace_back(&(iter->second));
+        } else {
+            PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 bitmap, 
ReadBitmap(converted_literal));
+            auto new_iter = bitmaps_.emplace(converted_literal, 
std::move(bitmap));
+            result_bitmaps.emplace_back(&(new_iter.first->second));
+        }
+    }
+    return RoaringBitmap32::FastUnion(result_bitmaps);
+}
+
+Result<RoaringBitmap32> BitmapFileIndexReader::ReadBitmap(const Literal& 
literal) {
+    PAIMON_ASSIGN_OR_RAISE(const BitmapFileIndexMeta::Entry* entry,
+                           bitmap_file_index_meta_->FindEntry(literal));
+    if (entry == nullptr) {
+        return RoaringBitmap32();
+    }
+    int32_t offset = entry->offset;
+    if (offset < 0) {
+        // offset < 0, indicates only one value in bitmap, and the value is 
(-1 - offset)
+        return RoaringBitmap32::From({-1 - offset});
+    } else {
+        
PAIMON_RETURN_NOT_OK(input_stream_->Seek(bitmap_file_index_meta_->GetBodyStart()
 + offset,
+                                                 SeekOrigin::FS_SEEK_SET));
+        auto bitmap_bytes = std::make_unique<Bytes>(entry->length, 
pool_.get());
+        DataInputStream input(input_stream_);
+        PAIMON_RETURN_NOT_OK(input.ReadBytes(bitmap_bytes.get()));
+        RoaringBitmap32 res;
+        PAIMON_RETURN_NOT_OK(res.Deserialize(bitmap_bytes->data(), 
bitmap_bytes->size()));
+        return res;
+    }
+}
+
+Status BitmapFileIndexReader::ReadInternalMeta() {
+    if (!bitmap_file_index_meta_) {
+        PAIMON_RETURN_NOT_OK(input_stream_->Seek(head_start_, 
SeekOrigin::FS_SEEK_SET));
+        DataInputStream data_input_stream(input_stream_);
+        PAIMON_ASSIGN_OR_RAISE(int8_t version, 
data_input_stream.ReadValue<int8_t>());
+        FieldType converted_type = BitmapFileIndex::ConvertType(data_type_);
+        if (version == BitmapFileIndex::VERSION_1) {
+            bitmap_file_index_meta_ = std::make_shared<BitmapFileIndexMetaV1>(
+                converted_type, head_start_, length_, pool_);
+        } else if (version == BitmapFileIndex::VERSION_2) {
+            bitmap_file_index_meta_ =
+                std::make_shared<BitmapFileIndexMetaV2>(converted_type, 
length_, pool_);
+        } else {
+            return Status::Invalid(fmt::format("unknown bitmap file index 
version {}", version));
+        }
+        
PAIMON_RETURN_NOT_OK(bitmap_file_index_meta_->Deserialize(input_stream_));
+    }
+    return Status::OK();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bitmap/bitmap_file_index.h 
b/src/paimon/common/file_index/bitmap/bitmap_file_index.h
new file mode 100644
index 0000000..9cf15dd
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index.h
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta.h"
+#include "paimon/common/predicate/literal_converter.h"
+#include "paimon/file_index/file_index_reader.h"
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/file_index/file_indexer.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/utils/roaring_bitmap32.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class BitmapFileIndexMeta;
+class InputStream;
+class MemoryPool;
+
+class PAIMON_EXPORT BitmapFileIndex : public FileIndexer {
+ public:
+    explicit BitmapFileIndex(const std::map<std::string, std::string>& 
options);
+    ~BitmapFileIndex() override = default;
+
+    Result<std::shared_ptr<FileIndexReader>> CreateReader(
+        ::ArrowSchema* arrow_schema, int32_t start, int32_t length,
+        const std::shared_ptr<InputStream>& input_stream,
+        const std::shared_ptr<MemoryPool>& pool) const override;
+
+    Result<std::shared_ptr<FileIndexWriter>> CreateWriter(
+        ::ArrowSchema* arrow_schema, const std::shared_ptr<MemoryPool>& pool) 
const override;
+
+    /// Currently, it is mainly used to convert timestamps to long
+    static Result<Literal> ConvertLiteral(const Literal& literal,
+                                          const 
std::shared_ptr<arrow::DataType>& arrow_type);
+    static FieldType ConvertType(const FieldType& data_type);
+
+ public:
+    static constexpr int8_t VERSION_1 = 1;
+    static constexpr int8_t VERSION_2 = 2;
+    static constexpr char VERSION[] = "version";
+    static constexpr char INDEX_BLOCK_SIZE[] = "index-block-size";
+
+ private:
+    std::map<std::string, std::string> options_;
+};
+
+class BitmapFileIndexWriter : public FileIndexWriter {
+ public:
+    static Result<std::shared_ptr<BitmapFileIndexWriter>> Create(
+        const std::shared_ptr<arrow::Schema>& arrow_schema, const std::string& 
field_name,
+        const std::map<std::string, std::string>& options, const 
std::shared_ptr<MemoryPool>& pool);
+
+    Status AddBatch(::ArrowArray* batch) override;
+
+    Result<PAIMON_UNIQUE_PTR<Bytes>> SerializedBytes() const override;
+
+ private:
+    BitmapFileIndexWriter(int8_t version, const 
std::shared_ptr<arrow::DataType>& struct_type,
+                          const std::shared_ptr<arrow::DataType>& arrow_type,
+                          const std::map<std::string, std::string>& options,
+                          const std::shared_ptr<MemoryPool>& pool);
+
+ private:
+    int8_t version_;
+    /// @note struct_type_ contains only one field with arrow_type_, used for 
import from C
+    /// ArrowArray
+    std::shared_ptr<arrow::DataType> struct_type_;
+    std::shared_ptr<arrow::DataType> arrow_type_;
+    std::unordered_map<Literal, RoaringBitmap32> id_to_bitmap_;
+    RoaringBitmap32 null_bitmap_;
+    int32_t row_number_ = 0;
+    std::map<std::string, std::string> options_;
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+class BitmapFileIndexReader : public FileIndexReader,
+                              public 
std::enable_shared_from_this<BitmapFileIndexReader> {
+ public:
+    BitmapFileIndexReader(const std::shared_ptr<arrow::DataType>& arrow_type,
+                          const FieldType& data_type, int32_t start, int32_t 
length,
+                          const std::shared_ptr<InputStream>& input_stream,
+                          const std::shared_ptr<MemoryPool>& pool);
+
+    // TODO(xinyu.lxy): may overwrite VisitGreaterThan... like VisitIsNotNull
+
+    Result<std::shared_ptr<FileIndexResult>> VisitEqual(const Literal& 
literal) override;
+    Result<std::shared_ptr<FileIndexResult>> VisitNotEqual(const Literal& 
literal) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitIn(const 
std::vector<Literal>& literals) override;
+    Result<std::shared_ptr<FileIndexResult>> VisitNotIn(
+        const std::vector<Literal>& literals) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitIsNull() override;
+    Result<std::shared_ptr<FileIndexResult>> VisitIsNotNull() override;
+
+ private:
+    Result<RoaringBitmap32> GetInListResultBitmap(const std::vector<Literal>& 
literals);
+    Result<RoaringBitmap32> ReadBitmap(const Literal& literal);
+    Status ReadInternalMeta();
+
+ private:
+    int32_t head_start_;
+    int32_t length_;
+    FieldType data_type_;
+    std::shared_ptr<arrow::DataType> arrow_type_;
+    std::shared_ptr<MemoryPool> pool_;
+    std::shared_ptr<InputStream> input_stream_;
+    std::unordered_map<Literal, RoaringBitmap32> bitmaps_;
+    std::shared_ptr<BitmapFileIndexMeta> bitmap_file_index_meta_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bitmap/bitmap_file_index_factory.cpp 
b/src/paimon/common/file_index/bitmap/bitmap_file_index_factory.cpp
new file mode 100644
index 0000000..829ae93
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index_factory.cpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bitmap/bitmap_file_index_factory.h"
+
+#include <utility>
+
+#include "paimon/common/file_index/bitmap/bitmap_file_index.h"
+#include "paimon/factories/factory.h"
+
+namespace paimon {
+
+const char BitmapFileIndexFactory::IDENTIFIER[] = "bitmap";
+
+Result<std::unique_ptr<FileIndexer>> BitmapFileIndexFactory::Create(
+    const std::map<std::string, std::string>& options) const {
+    return std::make_unique<BitmapFileIndex>(options);
+}
+
+REGISTER_PAIMON_FACTORY(BitmapFileIndexFactory);
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bitmap/bitmap_file_index_factory.h 
b/src/paimon/common/file_index/bitmap/bitmap_file_index_factory.h
new file mode 100644
index 0000000..7310603
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index_factory.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/file_index/file_indexer.h"
+#include "paimon/file_index/file_indexer_factory.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+class BitmapFileIndexFactory : public FileIndexerFactory {
+ public:
+    static const char IDENTIFIER[];
+
+    const char* Identifier() const override {
+        return IDENTIFIER;
+    }
+    Result<std::unique_ptr<FileIndexer>> Create(
+        const std::map<std::string, std::string>& options) const override;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bitmap/bitmap_file_index_meta.cpp 
b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta.cpp
new file mode 100644
index 0000000..64e05c6
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta.cpp
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta.h"
+
+#include <string>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/defs.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+
+namespace paimon {
+class MemoryPool;
+
+BitmapFileIndexMeta::Entry::Entry(const Literal& key, int32_t offset, int32_t 
length)
+    : key(key), offset(offset), length(length) {}
+
+BitmapFileIndexMeta::BitmapFileIndexMeta(const FieldType& type, int32_t 
total_length,
+                                         const std::shared_ptr<MemoryPool>& 
pool)
+    : data_type_(type),
+      total_length_(total_length),
+      null_value_entry_(Literal(type), -1, -1),
+      pool_(pool) {}
+
+BitmapFileIndexMeta::BitmapFileIndexMeta(const FieldType& type, int32_t 
row_count,
+                                         bool has_null_value, const Entry& 
null_value_entry,
+                                         std::vector<Entry>&& write_entries,
+                                         const std::shared_ptr<MemoryPool>& 
pool)
+    : data_type_(type),
+      row_count_(row_count),
+      has_null_value_(has_null_value),
+      null_value_entry_(null_value_entry),
+      write_entries_(std::move(write_entries)),
+      pool_(pool) {}
+
+Result<std::function<void(const Literal&)>> 
BitmapFileIndexMeta::GetValueWriter(
+    const std::shared_ptr<MemorySegmentOutputStream>& output_stream) const {
+    switch (data_type_) {
+        case FieldType::BOOLEAN:
+            return std::function<void(const Literal&)>(
+                [output_stream](const Literal& literal) -> void {
+                    output_stream->WriteValue<bool>(literal.GetValue<bool>());
+                });
+        case FieldType::TINYINT:
+            return std::function<void(const Literal&)>(
+                [output_stream](const Literal& literal) -> void {
+                    
output_stream->WriteValue<int8_t>(literal.GetValue<int8_t>());
+                });
+        case FieldType::SMALLINT:
+            return std::function<void(const Literal&)>(
+                [output_stream](const Literal& literal) -> void {
+                    
output_stream->WriteValue<int16_t>(literal.GetValue<int16_t>());
+                });
+        case FieldType::DATE:
+        case FieldType::INT:
+            return std::function<void(const Literal&)>(
+                [output_stream](const Literal& literal) -> void {
+                    
output_stream->WriteValue<int32_t>(literal.GetValue<int32_t>());
+                });
+        case FieldType::BIGINT:
+            return std::function<void(const Literal&)>(
+                [output_stream](const Literal& literal) -> void {
+                    
output_stream->WriteValue<int64_t>(literal.GetValue<int64_t>());
+                });
+        case FieldType::STRING:
+            return std::function<void(const Literal&)>(
+                [output_stream](const Literal& literal) -> void {
+                    auto value = literal.GetValue<std::string>();
+                    output_stream->WriteValue<uint32_t>(value.size());
+                    output_stream->Write(value.data(), value.size());
+                });
+        default:
+            return Status::Invalid(fmt::format("invalid index field type {}",
+                                               
FieldTypeUtils::FieldTypeToString(data_type_)));
+    }
+}
+
+Result<std::function<Result<Literal>()>> BitmapFileIndexMeta::GetValueReader(
+    const std::shared_ptr<DataInputStream>& in, bool move_body_start) {
+    const auto& field_type = data_type_;
+    switch (field_type) {
+        case FieldType::STRING: {
+            std::function<Result<Literal>()> func = [&in, move_body_start, 
field_type,
+                                                     this]() -> 
Result<Literal> {
+                PAIMON_ASSIGN_OR_RAISE(uint32_t length,
+                                       ReadAndMoveBodyStart<int32_t>(in, 
move_body_start));
+                if (move_body_start) {
+                    body_start_ += length;
+                }
+                auto bytes = std::make_unique<Bytes>(length, pool_.get());
+                PAIMON_RETURN_NOT_OK(in->ReadBytes(bytes.get()));
+                return Literal(field_type, bytes->data(), bytes->size());
+            };
+            return func;
+        }
+        case FieldType::BOOLEAN: {
+            std::function<Result<Literal>()> func = [&in, move_body_start,
+                                                     this]() -> 
Result<Literal> {
+                PAIMON_ASSIGN_OR_RAISE(bool value, 
ReadAndMoveBodyStart<bool>(in, move_body_start));
+                return Literal(value);
+            };
+            return func;
+        }
+        case FieldType::TINYINT: {
+            std::function<Result<Literal>()> func = [&in, move_body_start,
+                                                     this]() -> 
Result<Literal> {
+                PAIMON_ASSIGN_OR_RAISE(int8_t value,
+                                       ReadAndMoveBodyStart<int8_t>(in, 
move_body_start));
+                return Literal(value);
+            };
+            return func;
+        }
+        case FieldType::SMALLINT: {
+            std::function<Result<Literal>()> func = [&in, move_body_start,
+                                                     this]() -> 
Result<Literal> {
+                PAIMON_ASSIGN_OR_RAISE(int16_t value,
+                                       ReadAndMoveBodyStart<int16_t>(in, 
move_body_start));
+                return Literal(value);
+            };
+            return func;
+        }
+        case FieldType::INT: {
+            std::function<Result<Literal>()> func = [&in, move_body_start,
+                                                     this]() -> 
Result<Literal> {
+                PAIMON_ASSIGN_OR_RAISE(int32_t value,
+                                       ReadAndMoveBodyStart<int32_t>(in, 
move_body_start));
+                return Literal(value);
+            };
+            return func;
+        }
+        case FieldType::BIGINT: {
+            std::function<Result<Literal>()> func = [&in, move_body_start,
+                                                     this]() -> 
Result<Literal> {
+                PAIMON_ASSIGN_OR_RAISE(int64_t value,
+                                       ReadAndMoveBodyStart<int64_t>(in, 
move_body_start));
+                return Literal(value);
+            };
+            return func;
+        }
+        case FieldType::DATE: {
+            std::function<Result<Literal>()> func = [&in, move_body_start,
+                                                     this]() -> 
Result<Literal> {
+                PAIMON_ASSIGN_OR_RAISE(int32_t value,
+                                       ReadAndMoveBodyStart<int32_t>(in, 
move_body_start));
+                return Literal(FieldType::DATE, value);
+            };
+            return func;
+        }
+        case FieldType::TIMESTAMP: {
+            std::function<Result<Literal>()> func = [&in, move_body_start,
+                                                     this]() -> 
Result<Literal> {
+                PAIMON_ASSIGN_OR_RAISE(int64_t value,
+                                       ReadAndMoveBodyStart<int64_t>(in, 
move_body_start));
+                // convert timestamp to bigint
+                return Literal(value);
+            };
+            return func;
+        }
+        default:
+            return Status::Invalid(fmt::format("not support field type {} in 
BitmapIndex",
+                                               
FieldTypeUtils::FieldTypeToString(field_type)));
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bitmap/bitmap_file_index_meta.h 
b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta.h
new file mode 100644
index 0000000..e05e8a3
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta.h
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <vector>
+
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class DataInputStream;
+class InputStream;
+class MemoryPool;
+enum class FieldType;
+
+class BitmapFileIndexMeta {
+ public:
+    struct Entry {
+        Entry(const Literal& key, int32_t offset, int32_t length);
+        Literal key;
+        int32_t offset;
+        int32_t length;
+    };
+    virtual ~BitmapFileIndexMeta() = default;
+    // used for read
+    BitmapFileIndexMeta(const FieldType& type, int32_t total_length,
+                        const std::shared_ptr<MemoryPool>& pool);
+    // used for write
+    BitmapFileIndexMeta(const FieldType& type, int32_t row_count, bool 
has_null_value,
+                        const Entry& null_value_entry, std::vector<Entry>&& 
write_entries,
+                        const std::shared_ptr<MemoryPool>& pool);
+
+    int32_t GetRowCount() const {
+        return row_count_;
+    }
+    int64_t GetBodyStart() const {
+        return body_start_;
+    }
+
+    virtual Result<const Entry*> FindEntry(const Literal& bitmap_id) = 0;
+    virtual Status Deserialize(const std::shared_ptr<InputStream>& 
input_stream) = 0;
+    virtual Status Serialize(const std::shared_ptr<MemorySegmentOutputStream>& 
output_stream) = 0;
+
+ protected:
+    Result<std::function<Result<Literal>()>> GetValueReader(
+        const std::shared_ptr<DataInputStream>& in, bool move_body_start);
+
+    Result<std::function<void(const Literal&)>> GetValueWriter(
+        const std::shared_ptr<MemorySegmentOutputStream>& output_stream) const;
+
+    template <typename T>
+    Result<T> ReadAndMoveBodyStart(const std::shared_ptr<DataInputStream>& in,
+                                   bool move_body_start = true) {
+        PAIMON_ASSIGN_OR_RAISE(T value, in->ReadValue<T>());
+        if (move_body_start) {
+            body_start_ += sizeof(T);
+        }
+        return value;
+    }
+
+ protected:
+    FieldType data_type_;
+    int32_t row_count_ = -1;
+    bool has_null_value_ = false;
+    int64_t body_start_ = -1;
+    int32_t total_length_ = -1;
+    Entry null_value_entry_;
+    // @note use only for serialize
+    std::vector<Entry> write_entries_;
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+}  // namespace paimon
diff --git 
a/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_test.cpp 
b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_test.cpp
new file mode 100644
index 0000000..66fe4b8
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_test.cpp
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta.h"
+
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/file_index/bitmap/bitmap_file_index.h"
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta_v1.h"
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta_v2.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/defs.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/result.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(BitmapFileIndexMetaTest, TestStringType) {
+    auto check_result = [](BitmapFileIndexMeta* index_meta) {
+        Literal lit_a(FieldType::STRING, "a", 1);
+        Literal lit_b(FieldType::STRING, "b", 1);
+        Literal lit_null(FieldType::STRING);
+
+        ASSERT_OK_AND_ASSIGN(auto entry_a, index_meta->FindEntry(lit_a));
+        ASSERT_EQ(entry_a->offset, 20);
+        ASSERT_EQ(entry_a->length, 20);
+
+        ASSERT_OK_AND_ASSIGN(auto entry_b, index_meta->FindEntry(lit_b));
+        ASSERT_EQ(entry_b->offset, -3);
+        ASSERT_EQ(entry_b->length, -1);
+
+        ASSERT_OK_AND_ASSIGN(auto entry_null, index_meta->FindEntry(lit_null));
+        ASSERT_EQ(entry_null->offset, 0);
+        ASSERT_EQ(entry_null->length, 20);
+
+        // test non-exist field
+        Literal lit_non_exist(FieldType::STRING, "non", 3);
+        ASSERT_OK_AND_ASSIGN(auto* entry_non, 
index_meta->FindEntry(lit_non_exist));
+        ASSERT_FALSE(entry_non);
+
+        Literal lit_non_exist2(FieldType::STRING, "A", 1);
+        ASSERT_OK_AND_ASSIGN(auto* entry_non2, 
index_meta->FindEntry(lit_non_exist2));
+        ASSERT_FALSE(entry_non2);
+    };
+    {
+        // test v1 version
+        std::vector<char> index_bytes = {
+            1, 0, 0, 0,  5,  0,  0,  0,  2,  1,  0, 0, 0, 0, 0, 0, 0,  1, 97, 
0, 0,  0, 20, 0,
+            0, 0, 1, 98, -1, -1, -1, -3, 58, 48, 0, 0, 1, 0, 0, 0, 0,  0, 1,  
0, 16, 0, 0,  0,
+            1, 0, 3, 0,  58, 48, 0,  0,  1,  0,  0, 0, 0, 0, 1, 0, 16, 0, 0,  
0, 0,  0, 4,  0};
+
+        auto input_stream =
+            std::make_shared<ByteArrayInputStream>(index_bytes.data(), 
index_bytes.size());
+        // skip version
+        ASSERT_OK(input_stream->Seek(1, SeekOrigin::FS_SEEK_SET));
+        BitmapFileIndexMetaV1 index_meta(FieldType::STRING, 0, 
index_bytes.size(),
+                                         GetDefaultPool());
+        ASSERT_OK(index_meta.Deserialize(input_stream));
+        check_result(&index_meta);
+    }
+    {
+        // test v2 version
+        std::vector<char> index_bytes = {
+            2,  0,  0,  0,  5, 0,  0, 0, 2, 1,  0, 0, 0, 0,  0,  0,  0,  20, 
0,  0,  0,
+            1,  0,  0,  0,  1, 97, 0, 0, 0, 0,  0, 0, 0, 30, 0,  0,  0,  2,  
0,  0,  0,
+            1,  97, 0,  0,  0, 20, 0, 0, 0, 20, 0, 0, 0, 1,  98, -1, -1, -1, 
-3, -1, -1,
+            -1, -1, 58, 48, 0, 0,  1, 0, 0, 0,  0, 0, 1, 0,  16, 0,  0,  0,  
1,  0,  3,
+            0,  58, 48, 0,  0, 1,  0, 0, 0, 0,  0, 1, 0, 16, 0,  0,  0,  0,  
0,  4,  0};
+
+        auto input_stream =
+            std::make_shared<ByteArrayInputStream>(index_bytes.data(), 
index_bytes.size());
+        // skip version
+        ASSERT_OK(input_stream->Seek(1, SeekOrigin::FS_SEEK_SET));
+        BitmapFileIndexMetaV2 index_meta(FieldType::STRING, 
index_bytes.size(), GetDefaultPool());
+        ASSERT_OK(index_meta.Deserialize(input_stream));
+        check_result(&index_meta);
+    }
+}
+
+TEST(BitmapFileIndexMetaTest, TestInvalidType) {
+    std::vector<char> index_bytes = {
+        2,  0,  0,  0,  5, 0,  0, 0, 2, 1,  0, 0, 0, 0,  0,  0,  0,  20, 0,  
0,  0,
+        1,  0,  0,  0,  1, 97, 0, 0, 0, 0,  0, 0, 0, 30, 0,  0,  0,  2,  0,  
0,  0,
+        1,  97, 0,  0,  0, 20, 0, 0, 0, 20, 0, 0, 0, 1,  98, -1, -1, -1, -3, 
-1, -1,
+        -1, -1, 58, 48, 0, 0,  1, 0, 0, 0,  0, 0, 1, 0,  16, 0,  0,  0,  1,  
0,  3,
+        0,  58, 48, 0,  0, 1,  0, 0, 0, 0,  0, 1, 0, 16, 0,  0,  0,  0,  0,  
4,  0};
+
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(index_bytes.data(), 
index_bytes.size());
+    // skip version
+    ASSERT_OK(input_stream->Seek(1, SeekOrigin::FS_SEEK_SET));
+    BitmapFileIndexMetaV2 index_meta(FieldType::DECIMAL, index_bytes.size(), 
GetDefaultPool());
+    ASSERT_NOK_WITH_MSG(index_meta.Deserialize(input_stream),
+                        "not support field type DECIMAL in BitmapIndex");
+}
+
+// Test that when block_size is set small enough, entries overflow a single 
block and new blocks
+// are created during serialization.
+TEST(BitmapFileIndexMetaTest, TestSmallBlockSizeForcesMultipleBlocks) {
+    auto pool = GetDefaultPool();
+
+    int32_t entry_count = 5;
+    int32_t row_count = 100;
+
+    std::vector<BitmapFileIndexMeta::Entry> write_entries;
+    write_entries.reserve(entry_count);
+    for (int32_t i = 0; i < entry_count; i++) {
+        Literal key(FieldType::INT, i);
+        write_entries.emplace_back(key, /*offset=*/i * 10, /*length=*/10);
+    }
+
+    // Set a very small block size to force block overflow
+    std::map<std::string, std::string> options;
+    options[BitmapFileIndex::INDEX_BLOCK_SIZE] = "20";
+
+    BitmapFileIndexMeta::Entry null_entry(Literal(FieldType::INT), 
/*offset=*/0, /*length=*/5);
+
+    BitmapFileIndexMetaV2 write_meta(FieldType::INT, row_count, 
/*has_null_value=*/true, null_entry,
+                                     std::move(write_entries), options, pool);
+
+    // Serialize
+    auto output_stream = 
std::make_shared<MemorySegmentOutputStream>(/*segment_size=*/1024, pool);
+    ASSERT_OK(write_meta.Serialize(output_stream));
+
+    // Copy serialized data to a byte buffer
+    auto serialized_bytes = MemorySegmentUtils::CopyToBytes(
+        output_stream->Segments(), 0, output_stream->CurrentSize(), 
pool.get());
+    ASSERT_TRUE(serialized_bytes != nullptr);
+
+    // Deserialize from the serialized bytes
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(serialized_bytes->data(), 
serialized_bytes->size());
+    BitmapFileIndexMetaV2 read_meta(FieldType::INT, serialized_bytes->size(), 
pool);
+    ASSERT_OK(read_meta.Deserialize(input_stream));
+
+    ASSERT_EQ(read_meta.GetRowCount(), row_count);
+
+    // Verify null entry lookup
+    Literal null_literal(FieldType::INT);
+    ASSERT_OK_AND_ASSIGN(const BitmapFileIndexMeta::Entry* found_null,
+                         read_meta.FindEntry(null_literal));
+    ASSERT_TRUE(found_null != nullptr);
+    ASSERT_EQ(found_null->offset, 0);
+    ASSERT_EQ(found_null->length, 5);
+
+    // Verify all non-null entries can be found with correct offset/length
+    for (int32_t i = 0; i < entry_count; i++) {
+        Literal key(FieldType::INT, i);
+        ASSERT_OK_AND_ASSIGN(const BitmapFileIndexMeta::Entry* found_entry,
+                             read_meta.FindEntry(key));
+        ASSERT_TRUE(found_entry != nullptr);
+        ASSERT_EQ(found_entry->offset, i * 10);
+        ASSERT_EQ(found_entry->length, 10);
+    }
+
+    // Verify non-existent key returns nullptr
+    Literal missing_key(FieldType::INT, 999);
+    ASSERT_OK_AND_ASSIGN(const BitmapFileIndexMeta::Entry* missing_entry,
+                         read_meta.FindEntry(missing_key));
+    ASSERT_FALSE(missing_entry);
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v1.cpp 
b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v1.cpp
new file mode 100644
index 0000000..1dc91cd
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v1.cpp
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta_v1.h"
+
+#include <cassert>
+#include <functional>
+#include <utility>
+
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/io/buffered_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/predicate/literal.h"
+
+namespace paimon {
+BitmapFileIndexMetaV1::BitmapFileIndexMetaV1(const FieldType& type, int32_t 
start,
+                                             int32_t total_length,
+                                             const 
std::shared_ptr<MemoryPool>& pool)
+    : BitmapFileIndexMeta(type, total_length, pool), start_(start) {}
+
+BitmapFileIndexMetaV1::BitmapFileIndexMetaV1(const FieldType& type, int32_t 
row_count,
+                                             bool has_null_value, const Entry& 
null_value_entry,
+                                             std::vector<Entry>&& 
write_entries,
+                                             const 
std::shared_ptr<MemoryPool>& pool)
+    : BitmapFileIndexMeta(type, row_count, has_null_value, null_value_entry,
+                          std::move(write_entries), pool) {}
+
+Result<const BitmapFileIndexMeta::Entry*> BitmapFileIndexMetaV1::FindEntry(
+    const Literal& bitmap_id) {
+    if (bitmap_id.IsNull()) {
+        if (has_null_value_) {
+            return &null_value_entry_;
+        }
+    }
+    auto entry_iter = entries_.find(bitmap_id);
+    if (entry_iter != entries_.end()) {
+        return &entry_iter->second;
+    }
+    return nullptr;
+}
+
+Status BitmapFileIndexMetaV1::Serialize(
+    const std::shared_ptr<MemorySegmentOutputStream>& output_stream) {
+    PAIMON_ASSIGN_OR_RAISE(std::function<void(const Literal&)> write_value,
+                           GetValueWriter(output_stream));
+    output_stream->WriteValue<int32_t>(row_count_);
+    // non-null bitmap number
+    
output_stream->WriteValue<int32_t>(static_cast<int32_t>(write_entries_.size()));
+    output_stream->WriteValue<bool>(has_null_value_);
+    if (has_null_value_) {
+        output_stream->WriteValue<int32_t>(null_value_entry_.offset);
+    }
+    for (const auto& entry : write_entries_) {
+        write_value(entry.key);
+        output_stream->WriteValue<int32_t>(entry.offset);
+    }
+    return Status::OK();
+}
+
+Status BitmapFileIndexMetaV1::Deserialize(const std::shared_ptr<InputStream>& 
input_stream) {
+    PAIMON_ASSIGN_OR_RAISE(body_start_, input_stream->GetPos());
+    auto buffered_input_stream = std::make_shared<BufferedInputStream>(
+        input_stream, BufferedInputStream::DEFAULT_BUFFER_SIZE, pool_.get());
+    auto in = std::make_shared<DataInputStream>(buffered_input_stream);
+
+    PAIMON_ASSIGN_OR_RAISE(std::function<Result<Literal>()> value_reader,
+                           GetValueReader(in, /*move_body_start=*/true));
+
+    PAIMON_ASSIGN_OR_RAISE(row_count_, ReadAndMoveBodyStart<int32_t>(in));
+    PAIMON_ASSIGN_OR_RAISE(int32_t non_null_bitmap_number, 
ReadAndMoveBodyStart<int32_t>(in));
+    PAIMON_ASSIGN_OR_RAISE(has_null_value_, ReadAndMoveBodyStart<bool>(in));
+
+    int32_t null_value_offset = -1;
+    if (has_null_value_) {
+        PAIMON_ASSIGN_OR_RAISE(null_value_offset, 
ReadAndMoveBodyStart<int32_t>(in));
+    }
+
+    Literal null_value(data_type_);
+    Literal last_value = null_value;
+    int32_t last_offset = null_value_offset;
+    for (int32_t i = 0; i < non_null_bitmap_number; i++) {
+        PAIMON_ASSIGN_OR_RAISE(Literal value, value_reader());
+        PAIMON_ASSIGN_OR_RAISE(int32_t offset, 
ReadAndMoveBodyStart<int32_t>(in));
+        if (offset >= 0) {
+            if (last_offset >= 0) {
+                int32_t length = offset - last_offset;
+                entries_.emplace(last_value, Entry(last_value, last_offset, 
length));
+            }
+            last_offset = offset;
+            last_value = value;
+        } else {
+            // offset is negative indicates offset is the inline bitmap
+            entries_.emplace(value, Entry(value, offset, -1));
+        }
+    }
+    if (last_offset >= 0) {
+        entries_.emplace(last_value, Entry(last_value, last_offset,
+                                           total_length_ - (body_start_ - 
start_) - last_offset));
+    }
+
+    if (has_null_value_) {
+        auto null_iter = entries_.find(null_value);
+        if (null_iter == entries_.end()) {
+            assert(null_value_offset < 0);
+            null_value_entry_ = Entry(null_value, null_value_offset, -1);
+        } else {
+            null_value_entry_ = null_iter->second;
+        }
+    }
+    return Status::OK();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v1.h 
b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v1.h
new file mode 100644
index 0000000..1dbc19e
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v1.h
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class InputStream;
+class Literal;
+class MemoryPool;
+enum class FieldType;
+///
+///
+/// <pre>
+/// Bitmap file index format (V1)
+/// +-------------------------------------------------+-----------------
+/// | version (1 byte)                                |
+/// +-------------------------------------------------+
+/// | row count (4 bytes int)                         |
+/// +-------------------------------------------------+
+/// | non-null value bitmap number (4 bytes int)      |
+/// +-------------------------------------------------+
+/// | has null value (1 byte)                         |
+/// +-------------------------------------------------+
+/// | null value offset (4 bytes if has null value)   |       HEAD
+/// +-------------------------------------------------+
+/// | value 1 | offset 1                              |
+/// +-------------------------------------------------+
+/// | value 2 | offset 2                              |
+/// +-------------------------------------------------+
+/// | value 3 | offset 3                              |
+/// +-------------------------------------------------+
+/// | ...                                             |
+/// +-------------------------------------------------+-----------------
+/// | serialized bitmap 1                             |
+/// +-------------------------------------------------+
+/// | serialized bitmap 2                             |
+/// +-------------------------------------------------+       BODY
+/// | serialized bitmap 3                             |
+/// +-------------------------------------------------+
+/// | ...                                             |
+/// +-------------------------------------------------+-----------------
+///
+/// value x:                       var bytes for any data type (as bitmap 
identifier)
+/// offset:                        4 bytes int (when it is negative, it 
represents that there is
+/// only one value and its position is the inverse of the negative value)
+/// </pre>
+
+class BitmapFileIndexMetaV1 : public BitmapFileIndexMeta {
+ public:
+    // used for read
+    BitmapFileIndexMetaV1(const FieldType& type, int32_t start, int32_t 
total_length,
+                          const std::shared_ptr<MemoryPool>& pool);
+    // used for write
+    BitmapFileIndexMetaV1(const FieldType& type, int32_t row_count, bool 
has_null_value,
+                          const Entry& null_value_entry, std::vector<Entry>&& 
write_entries,
+                          const std::shared_ptr<MemoryPool>& pool);
+
+    Result<const BitmapFileIndexMeta::Entry*> FindEntry(const Literal& 
bitmap_id) override;
+
+    Status Deserialize(const std::shared_ptr<InputStream>& input_stream) 
override;
+    Status Serialize(const std::shared_ptr<MemorySegmentOutputStream>& 
output_stream) override;
+
+ private:
+    int32_t start_ = -1;
+    std::unordered_map<Literal, BitmapFileIndexMeta::Entry> entries_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v2.cpp 
b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v2.cpp
new file mode 100644
index 0000000..304e139
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v2.cpp
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta_v2.h"
+
+#include <algorithm>
+#include <cassert>
+#include <functional>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/file_index/bitmap/bitmap_file_index.h"
+#include "paimon/common/options/memory_size.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/io/buffered_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+
+namespace paimon {
+BitmapFileIndexMetaV2::BitmapFileIndexMetaV2(const FieldType& type, int32_t 
total_length,
+                                             const 
std::shared_ptr<MemoryPool>& pool)
+    : BitmapFileIndexMeta(type, total_length, pool) {}
+
+BitmapFileIndexMetaV2::BitmapFileIndexMetaV2(const FieldType& type, int32_t 
row_count,
+                                             bool has_null_value, const Entry& 
null_value_entry,
+                                             std::vector<Entry>&& 
write_entries,
+                                             const std::map<std::string, 
std::string>& options,
+                                             const 
std::shared_ptr<MemoryPool>& pool)
+    : BitmapFileIndexMeta(type, row_count, has_null_value, null_value_entry,
+                          std::move(write_entries), pool),
+      options_(options) {}
+
+Status BitmapFileIndexMetaV2::Serialize(
+    const std::shared_ptr<MemorySegmentOutputStream>& output_stream) {
+    auto iter = options_.find(BitmapFileIndex::INDEX_BLOCK_SIZE);
+    if (iter != options_.end()) {
+        PAIMON_ASSIGN_OR_RAISE(block_size_limit_, 
MemorySize::ParseBytes(iter->second));
+    }
+
+    output_stream->WriteValue<int32_t>(row_count_);
+    // non-null bitmap number
+    
output_stream->WriteValue<int32_t>(static_cast<int32_t>(write_entries_.size()));
+    output_stream->WriteValue<bool>(has_null_value_);
+    if (has_null_value_) {
+        output_stream->WriteValue<int32_t>(null_value_entry_.offset);
+        output_stream->WriteValue<int32_t>(null_value_entry_.length);
+    }
+
+    if (!write_entries_.empty()) {
+        index_blocks_.push_back(
+            std::make_unique<BitmapIndexBlock>(this, /*offset=*/0, 
pool_.get()));
+    }
+    std::sort(write_entries_.begin(), write_entries_.end(), [](const Entry& 
e1, const Entry& e2) {
+        return e1.key.CompareTo(e2.key).value() < 0;
+    });
+
+    for (const auto& entry : write_entries_) {
+        auto& last_block = index_blocks_.back();
+        PAIMON_ASSIGN_OR_RAISE(bool added, last_block->TryAdd(entry));
+        if (!added) {
+            auto new_block = std::make_unique<BitmapIndexBlock>(
+                this, /*offset=*/last_block->offset + 
last_block->serialized_bytes, pool_.get());
+            index_blocks_.push_back(std::move(new_block));
+            PAIMON_ASSIGN_OR_RAISE(bool new_added, 
index_blocks_.back()->TryAdd(entry));
+            if (!new_added) {
+                return Status::Invalid("add entry to BitmapIndexBlock failed");
+            }
+        }
+    }
+
+    output_stream->WriteValue<int32_t>(index_blocks_.size());
+
+    int32_t bitmap_body_offset = 0;
+    PAIMON_ASSIGN_OR_RAISE(std::function<void(const Literal&)> write_value,
+                           GetValueWriter(output_stream));
+    for (const auto& block : index_blocks_) {
+        // secondary entry
+        write_value(block->key);
+        output_stream->WriteValue<int32_t>(block->offset);
+        bitmap_body_offset += block->serialized_bytes;
+    }
+    // bitmap body offset
+    output_stream->WriteValue<int32_t>(bitmap_body_offset);
+
+    // bitmap index blocks
+    for (const auto& block : index_blocks_) {
+        output_stream->WriteValue<int32_t>(block->entry_list.size());
+        for (const auto& entry : block->entry_list) {
+            write_value(entry.key);
+            output_stream->WriteValue<int32_t>(entry.offset);
+            output_stream->WriteValue<int32_t>(entry.length);
+        }
+    }
+    return Status::OK();
+}
+
+Result<const BitmapFileIndexMeta::Entry*> BitmapFileIndexMetaV2::FindEntry(
+    const Literal& bitmap_id) {
+    if (bitmap_id.IsNull()) {
+        if (has_null_value_) {
+            return &null_value_entry_;
+        }
+    } else {
+        BitmapIndexBlock* block = FindBlock(bitmap_id);
+        if (block) {
+            return block->FindEntry(bitmap_id);
+        }
+    }
+    return nullptr;
+}
+
+BitmapFileIndexMetaV2::BitmapIndexBlock* BitmapFileIndexMetaV2::FindBlock(
+    const Literal& bitmap_id) {
+    if (index_blocks_.empty()) {
+        return nullptr;
+    }
+    auto iter = std::lower_bound(
+        index_blocks_.begin(), index_blocks_.end(), bitmap_id,
+        [](const std::unique_ptr<BitmapIndexBlock>& block, const Literal& 
literal) {
+            return block->key.CompareTo(literal).value() < 0;
+        });
+    // bitmap_id < all data in index
+    if (iter == index_blocks_.begin() && (*iter)->key != bitmap_id) {
+        return nullptr;
+    }
+    if (iter == index_blocks_.end() || (*iter)->key != bitmap_id) {
+        iter--;
+    }
+    return iter->get();
+}
+
+Status BitmapFileIndexMetaV2::Deserialize(const std::shared_ptr<InputStream>& 
input_stream) {
+    PAIMON_ASSIGN_OR_RAISE(body_start_, input_stream->GetPos());
+    auto buffered_input_stream = std::make_shared<BufferedInputStream>(
+        input_stream, BufferedInputStream::DEFAULT_BUFFER_SIZE, pool_.get());
+    auto in = std::make_shared<DataInputStream>(buffered_input_stream);
+
+    PAIMON_ASSIGN_OR_RAISE(std::function<Result<Literal>()> value_reader,
+                           GetValueReader(in, /*move_body_start=*/true));
+
+    PAIMON_ASSIGN_OR_RAISE(row_count_, ReadAndMoveBodyStart<int32_t>(in));
+    PAIMON_ASSIGN_OR_RAISE([[maybe_unused]] int32_t non_null_bitmap_number,
+                           ReadAndMoveBodyStart<int32_t>(in));
+    PAIMON_ASSIGN_OR_RAISE(has_null_value_, ReadAndMoveBodyStart<bool>(in));
+
+    if (has_null_value_) {
+        PAIMON_ASSIGN_OR_RAISE(int32_t offset, 
ReadAndMoveBodyStart<int32_t>(in));
+        PAIMON_ASSIGN_OR_RAISE(int32_t length, 
ReadAndMoveBodyStart<int32_t>(in));
+        null_value_entry_ = Entry(Literal(data_type_), offset, length);
+    }
+
+    PAIMON_ASSIGN_OR_RAISE(int32_t bitmap_block_number, 
ReadAndMoveBodyStart<int32_t>(in));
+
+    for (int32_t i = 0; i < bitmap_block_number; i++) {
+        PAIMON_ASSIGN_OR_RAISE(Literal key, value_reader());
+        PAIMON_ASSIGN_OR_RAISE(int32_t offset, 
ReadAndMoveBodyStart<int32_t>(in));
+        index_blocks_.push_back(
+            std::make_unique<BitmapIndexBlock>(this, key, offset, 
input_stream, pool_.get()));
+    }
+
+    PAIMON_ASSIGN_OR_RAISE(int32_t bitmap_body_offset, 
ReadAndMoveBodyStart<int32_t>(in));
+
+    index_block_start_ = body_start_;
+    body_start_ += bitmap_body_offset;
+    return Status::OK();
+}
+
+BitmapFileIndexMetaV2::BitmapIndexBlock::BitmapIndexBlock(
+    BitmapFileIndexMetaV2* outer, const Literal& _key, int32_t _offset,
+    const std::shared_ptr<InputStream>& input_stream, MemoryPool* pool)
+    : key(_key), offset(_offset), input_stream_(input_stream), outer_(outer), 
pool_(pool) {
+    assert(outer_);
+    assert(pool_);
+}
+
+BitmapFileIndexMetaV2::BitmapIndexBlock::BitmapIndexBlock(BitmapFileIndexMetaV2*
 outer,
+                                                          int32_t _offset, 
MemoryPool* pool)
+    : key(outer->data_type_), offset(_offset), outer_(outer), pool_(pool) {
+    assert(outer_);
+    assert(pool_);
+}
+
+Status BitmapFileIndexMetaV2::BitmapIndexBlock::TryDeserialize() {
+    if (!is_deserialized_) {
+        PAIMON_RETURN_NOT_OK(
+            input_stream_->Seek(outer_->index_block_start_ + offset, 
SeekOrigin::FS_SEEK_SET));
+        auto buffered_input_stream = std::make_shared<BufferedInputStream>(
+            input_stream_, BufferedInputStream::DEFAULT_BUFFER_SIZE, pool_);
+        auto in = std::make_shared<DataInputStream>(buffered_input_stream);
+        PAIMON_ASSIGN_OR_RAISE(std::function<Result<Literal>()> value_reader,
+                               outer_->GetValueReader(in, 
/*move_body_start=*/false));
+        PAIMON_ASSIGN_OR_RAISE(int32_t entry_num, in->ReadValue<int32_t>());
+        entry_list.reserve(entry_num);
+        for (int32_t i = 0; i < entry_num; i++) {
+            PAIMON_ASSIGN_OR_RAISE(Literal key, value_reader());
+            PAIMON_ASSIGN_OR_RAISE(int32_t offset, in->ReadValue<int32_t>());
+            PAIMON_ASSIGN_OR_RAISE(int32_t length, in->ReadValue<int32_t>());
+            entry_list.emplace_back(key, offset, length);
+        }
+        is_deserialized_ = true;
+    }
+    return Status::OK();
+}
+
+Result<const BitmapFileIndexMeta::Entry*> 
BitmapFileIndexMetaV2::BitmapIndexBlock::FindEntry(
+    const Literal& bitmap_id) {
+    PAIMON_RETURN_NOT_OK(TryDeserialize());
+    auto iter = std::lower_bound(entry_list.begin(), entry_list.end(), 
bitmap_id,
+                                 [](const Entry& entry, const Literal& 
literal) {
+                                     return 
entry.key.CompareTo(literal).value() < 0;
+                                 });
+    if (iter != entry_list.end() && iter->key == bitmap_id) {
+        return iter.base();
+    }
+    return nullptr;
+}
+
+Result<bool> BitmapFileIndexMetaV2::BitmapIndexBlock::TryAdd(const Entry& 
entry) {
+    // null literal will not be added to block
+    if (key.IsNull()) {
+        key = entry.key;
+    }
+    PAIMON_ASSIGN_OR_RAISE(int32_t key_bytes, GetKeyBytes(entry.key));
+    int32_t entry_bytes = 2 * sizeof(int32_t) + key_bytes;
+    if (serialized_bytes + entry_bytes > outer_->block_size_limit_) {
+        return false;
+    }
+    serialized_bytes += entry_bytes;
+    entry_list.push_back(entry);
+    return true;
+}
+
+Result<int32_t> BitmapFileIndexMetaV2::BitmapIndexBlock::GetKeyBytes(const 
Literal& literal) {
+    auto field_type = literal.GetType();
+    switch (field_type) {
+        case FieldType::BOOLEAN:
+            return sizeof(bool);
+        case FieldType::TINYINT:
+            return sizeof(int8_t);
+        case FieldType::SMALLINT:
+            return sizeof(int16_t);
+        case FieldType::DATE:
+        case FieldType::INT:
+            return sizeof(int32_t);
+        case FieldType::BIGINT:
+            return sizeof(int64_t);
+        case FieldType::FLOAT:
+            return sizeof(float);
+        case FieldType::DOUBLE:
+            return sizeof(double);
+        case FieldType::STRING:
+            return sizeof(int32_t) + literal.GetValue<std::string>().size();
+        default:
+            return Status::Invalid(fmt::format("invalid index field type {}",
+                                               
FieldTypeUtils::FieldTypeToString(field_type)));
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v2.h 
b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v2.h
new file mode 100644
index 0000000..860b389
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index_meta_v2.h
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "paimon/common/file_index/bitmap/bitmap_file_index_meta.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class InputStream;
+class MemoryPool;
+enum class FieldType;
+/// When the bitmap-indexed column cardinality is high, using the first 
version of the bitmap index
+/// format will take a lot of time to read the entire dictionary. But in fact 
we don't need a full
+/// dictionary when dealing with a small number of predicates, the performance 
of predicate hits on
+/// the bitmap can be improved by creating a secondary index on the dictionary.
+///
+/// <pre>
+/// Bitmap file index format (V2)
+/// +-------------------------------------------------+-----------------
+/// | version (1 byte) = 2                           |
+/// +-------------------------------------------------+
+/// | row count (4 bytes int)                        |
+/// +-------------------------------------------------+
+/// | non-null value bitmap number (4 bytes int)     |
+/// +-------------------------------------------------+
+/// | has null value (1 byte)                        |
+/// +-------------------------------------------------+
+/// | null value offset (4 bytes if has null value)  |       HEAD
+/// +-------------------------------------------------+
+/// | null bitmap length (4 bytes if has null value) |
+/// +-------------------------------------------------+
+/// | bitmap index block number (4 bytes int)        |
+/// +-------------------------------------------------+
+/// | value 1 | offset 1                             |
+/// +-------------------------------------------------+
+/// | value 2 | offset 2                             |
+/// +-------------------------------------------------+
+/// | ...                                            |
+/// +-------------------------------------------------+
+/// | bitmap blocks offset (4 bytes int)             |
+/// +-------------------------------------------------+-----------------
+/// | bitmap index block 1                           |
+/// +-------------------------------------------------+
+/// | bitmap index block 2                           |  INDEX BLOCKS
+/// +-------------------------------------------------+
+/// | ...                                            |
+/// +-------------------------------------------------+-----------------
+/// | serialized bitmap 1                            |
+/// +-------------------------------------------------+
+/// | serialized bitmap 2                            |
+/// +-------------------------------------------------+  BITMAP BLOCKS
+/// | serialized bitmap 3                            |
+/// +-------------------------------------------------+
+/// | ...                                            |
+/// +-------------------------------------------------+-----------------
+///
+/// index block format:
+/// +-------------------------------------------------+
+/// | entry number (4 bytes int)                     |
+/// +-------------------------------------------------+
+/// | value 1 | offset 1 | length 1                  |
+/// +-------------------------------------------------+
+/// | value 2 | offset 2 | length 2                  |
+/// +-------------------------------------------------+
+/// | ...                                            |
+/// +-------------------------------------------------+
+/// </pre>
+class BitmapFileIndexMetaV2 : public BitmapFileIndexMeta {
+ public:
+    // used for read
+    BitmapFileIndexMetaV2(const FieldType& type, int32_t total_length,
+                          const std::shared_ptr<MemoryPool>& pool);
+    // used for write
+    BitmapFileIndexMetaV2(const FieldType& type, int32_t row_count, bool 
has_null_value,
+                          const Entry& null_value_entry, std::vector<Entry>&& 
write_entries,
+                          const std::map<std::string, std::string>& options,
+                          const std::shared_ptr<MemoryPool>& pool);
+
+    Result<const BitmapFileIndexMeta::Entry*> FindEntry(const Literal& 
bitmap_id) override;
+    Status Deserialize(const std::shared_ptr<InputStream>& input_stream) 
override;
+    Status Serialize(const std::shared_ptr<MemorySegmentOutputStream>& 
output_stream) override;
+
+ private:
+    class BitmapIndexBlock;
+    BitmapIndexBlock* FindBlock(const Literal& bitmap_id);
+    static constexpr int64_t DEFAULT_INDEX_BLOCK_SIZE = 16 * 1024;  // 16KB
+
+ private:
+    std::map<std::string, std::string> options_;
+    int64_t index_block_start_ = -1;
+    int64_t block_size_limit_ = DEFAULT_INDEX_BLOCK_SIZE;
+    std::vector<std::unique_ptr<BitmapIndexBlock>> index_blocks_;
+};
+
+class BitmapFileIndexMetaV2::BitmapIndexBlock {
+ public:
+    // used for read
+    BitmapIndexBlock(BitmapFileIndexMetaV2* outer, const Literal& key, int32_t 
offset,
+                     const std::shared_ptr<InputStream>& input_stream, 
MemoryPool* pool);
+    // used for write
+    BitmapIndexBlock(BitmapFileIndexMetaV2* outer, int32_t offset, MemoryPool* 
pool);
+
+    Result<const Entry*> FindEntry(const Literal& bitmap_id);
+
+    Result<bool> TryAdd(const Entry& entry);
+
+ private:
+    Status TryDeserialize();
+
+    static Result<int32_t> GetKeyBytes(const Literal& literal);
+
+ public:
+    Literal key;
+    int32_t offset = -1;
+    int32_t serialized_bytes = sizeof(int32_t);
+    std::vector<Entry> entry_list;
+
+ private:
+    bool is_deserialized_ = false;
+    std::shared_ptr<InputStream> input_stream_;
+    BitmapFileIndexMetaV2* outer_ = nullptr;
+    MemoryPool* pool_;
+};
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bitmap/bitmap_file_index_test.cpp 
b/src/paimon/common/file_index/bitmap/bitmap_file_index_test.cpp
new file mode 100644
index 0000000..cbe92a8
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_file_index_test.cpp
@@ -0,0 +1,819 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bitmap/bitmap_file_index.h"
+
+#include <utility>
+
+#include "arrow/api.h"
+#include "arrow/c/bridge.h"
+#include "arrow/ipc/json_simple.h"
+#include "gtest/gtest.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/file_index/bitmap_index_result.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/fs/local/local_file_system.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+namespace paimon::test {
+class BitmapIndexTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+    }
+    void TearDown() override {
+        pool_.reset();
+    }
+
+    std::unique_ptr<::ArrowSchema> CreateArrowSchema(
+        const std::shared_ptr<arrow::DataType>& data_type) const {
+        auto schema = arrow::schema({arrow::field("f0", data_type)});
+        auto c_schema = std::make_unique<::ArrowSchema>();
+        EXPECT_TRUE(arrow::ExportSchema(*schema, c_schema.get()).ok());
+        return c_schema;
+    }
+
+    void CheckResult(const std::shared_ptr<FileIndexResult>& result,
+                     const std::vector<int32_t>& expected) const {
+        auto typed_result = 
std::dynamic_pointer_cast<BitmapIndexResult>(result);
+        ASSERT_TRUE(typed_result);
+        ASSERT_OK_AND_ASSIGN(const RoaringBitmap32* bitmap, 
typed_result->GetBitmap());
+        ASSERT_TRUE(bitmap);
+        ASSERT_EQ(*(typed_result->GetBitmap().value()), 
RoaringBitmap32::From(expected))
+            << "result=" << (typed_result->GetBitmap().value())->ToString()
+            << ", expected=" << RoaringBitmap32::From(expected).ToString();
+    }
+
+    Result<PAIMON_UNIQUE_PTR<Bytes>> WriteIndex(const 
std::shared_ptr<arrow::DataType>& type,
+                                                int32_t version,
+                                                const 
std::shared_ptr<arrow::Array>& array) const {
+        auto arrow_schema = arrow::schema({arrow::field("f0", type)});
+        BitmapFileIndex file_index({{"version", std::to_string(version)}});
+        ArrowSchema c_schema;
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*arrow_schema, 
&c_schema));
+        PAIMON_ASSIGN_OR_RAISE(auto writer, file_index.CreateWriter(&c_schema, 
pool_));
+        ArrowArray c_array;
+        PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &c_array));
+        PAIMON_RETURN_NOT_OK(writer->AddBatch(&c_array));
+        return writer->SerializedBytes();
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+TEST_F(BitmapIndexTest, TestStringType) {
+    // data: a, null, b, null, a
+    auto type = arrow::utf8();
+    auto write_data = [&](int32_t version) -> Result<PAIMON_UNIQUE_PTR<Bytes>> 
{
+        auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+            
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({arrow::field("f0", 
type)}),
+                                                      R"([
+        ["a"],
+        [null],
+        ["b"],
+        [null],
+        ["a"]
+    ])")
+                .ValueOrDie());
+        return WriteIndex(type, version, array);
+    };
+
+    auto check_result = [&](const char* index_bytes, int32_t index_length) {
+        auto input_stream = 
std::make_shared<ByteArrayInputStream>(index_bytes, index_length);
+
+        BitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            file_index.CreateReader(CreateArrowSchema(type).get(),
+                                    /*start=*/0, /*length=*/index_length, 
input_stream, pool_));
+        ASSERT_TRUE(reader);
+
+        Literal lit_a(FieldType::STRING, "a", 1);
+        CheckResult(reader->VisitEqual(lit_a).value(), {0, 4});
+
+        Literal lit_b(FieldType::STRING, "b", 1);
+        CheckResult(reader->VisitEqual(lit_b).value(), {2});
+
+        CheckResult(reader->VisitIsNull().value(), {1, 3});
+
+        CheckResult(reader->VisitIn({lit_a, lit_b}).value(), {0, 2, 4});
+        CheckResult(reader->VisitNotIn({lit_a, lit_b}).value(), {});
+
+        // non-exist
+        Literal lit_c(FieldType::STRING, "c", 1);
+        ASSERT_FALSE(reader->VisitEqual(lit_c).value()->IsRemain().value());
+    };
+    {
+        // test write and read, v1 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/1));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test write and read, v2 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/2));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test v1 version
+        std::vector<char> index_bytes = {
+            1, 0, 0, 0,  5,  0,  0,  0,  2,  1,  0, 0, 0, 0, 0, 0, 0,  1, 97, 
0, 0,  0, 20, 0,
+            0, 0, 1, 98, -1, -1, -1, -3, 58, 48, 0, 0, 1, 0, 0, 0, 0,  0, 1,  
0, 16, 0, 0,  0,
+            1, 0, 3, 0,  58, 48, 0,  0,  1,  0,  0, 0, 0, 0, 1, 0, 16, 0, 0,  
0, 0,  0, 4,  0};
+
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+    {
+        // test v2 version
+        std::vector<char> index_bytes = {
+            2,  0,  0,  0,  5, 0,  0, 0, 2, 1,  0, 0, 0, 0,  0,  0,  0,  20, 
0,  0,  0,
+            1,  0,  0,  0,  1, 97, 0, 0, 0, 0,  0, 0, 0, 30, 0,  0,  0,  2,  
0,  0,  0,
+            1,  97, 0,  0,  0, 20, 0, 0, 0, 20, 0, 0, 0, 1,  98, -1, -1, -1, 
-3, -1, -1,
+            -1, -1, 58, 48, 0, 0,  1, 0, 0, 0,  0, 0, 1, 0,  16, 0,  0,  0,  
1,  0,  3,
+            0,  58, 48, 0,  0, 1,  0, 0, 0, 0,  0, 1, 0, 16, 0,  0,  0,  0,  
0,  4,  0};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+}
+
+TEST_F(BitmapIndexTest, TestBooleanType) {
+    // data: true, false, true, false, null
+    auto type = arrow::boolean();
+    auto write_data = [&](int32_t version) -> Result<PAIMON_UNIQUE_PTR<Bytes>> 
{
+        auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+            
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({arrow::field("f0", 
type)}),
+                                                      R"([
+        [true],
+        [false],
+        [true],
+        [false],
+        [null]
+    ])")
+                .ValueOrDie());
+        return WriteIndex(type, version, array);
+    };
+    auto check_result = [&](const char* index_bytes, int32_t index_length) {
+        auto input_stream = 
std::make_shared<ByteArrayInputStream>(index_bytes, index_length);
+        BitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            file_index.CreateReader(CreateArrowSchema(type).get(),
+                                    /*start=*/0, /*length=*/index_length, 
input_stream, pool_));
+        ASSERT_TRUE(reader);
+        Literal lit_true(true);
+        Literal lit_false(false);
+        CheckResult(reader->VisitEqual(lit_true).value(), {0, 2});
+        CheckResult(reader->VisitEqual(lit_false).value(), {1, 3});
+        CheckResult(reader->VisitIsNull().value(), {4});
+        CheckResult(reader->VisitIn({lit_true, lit_false}).value(), {0, 1, 2, 
3});
+        CheckResult(reader->VisitNotIn({lit_true, lit_false}).value(), {});
+    };
+    {
+        // test write and read, v1 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/1));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test write and read, v2 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/2));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test v1 version
+        std::vector<char> index_bytes = {1, 0, 0, 0, 5,  0, 0, 0,  2,  1,  -1, 
-1, -1, -5, 0, 0,
+                                         0, 0, 0, 1, 0,  0, 0, 20, 58, 48, 0,  
0,  1,  0,  0, 0,
+                                         0, 0, 1, 0, 16, 0, 0, 0,  1,  0,  3,  
0,  58, 48, 0, 0,
+                                         1, 0, 0, 0, 0,  0, 1, 0,  16, 0,  0,  
0,  0,  0,  2, 0};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+    {
+        // test v2 version
+        std::vector<char> index_bytes = {
+            2,  0,  0,  0, 5,  0,  0,  0, 2, 1, -1, -1, -1, -5, 0, 0, 0, 18, 
0, 0,  0, 1, 0, 0,
+            0,  0,  0,  0, 0,  0,  22, 0, 0, 0, 2,  0,  0,  0,  0, 0, 0, 0,  
0, 20, 1, 0, 0, 0,
+            20, 0,  0,  0, 20, 58, 48, 0, 0, 1, 0,  0,  0,  0,  0, 1, 0, 16, 
0, 0,  0, 1, 0, 3,
+            0,  58, 48, 0, 0,  1,  0,  0, 0, 0, 0,  1,  0,  16, 0, 0, 0, 0,  
0, 2,  0};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+}
+
+TEST_F(BitmapIndexTest, TestTinyIntType) {
+    // data: null, null, null, 1, 1
+    auto type = arrow::int8();
+    auto write_data = [&](int32_t version) -> Result<PAIMON_UNIQUE_PTR<Bytes>> 
{
+        auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+            
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({arrow::field("f0", 
type)}),
+                                                      R"([
+        [null],
+        [null],
+        [null],
+        [1],
+        [1]
+    ])")
+                .ValueOrDie());
+        return WriteIndex(type, version, array);
+    };
+    auto check_result = [&](const char* index_bytes, int32_t index_length) {
+        auto input_stream = 
std::make_shared<ByteArrayInputStream>(index_bytes, index_length);
+        BitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            file_index.CreateReader(CreateArrowSchema(type).get(),
+                                    /*start=*/0, /*length=*/index_length, 
input_stream, pool_));
+        ASSERT_TRUE(reader);
+        Literal lit1(static_cast<int8_t>(1));
+        Literal lit2(static_cast<int8_t>(2));
+        CheckResult(reader->VisitEqual(lit1).value(), {3, 4});
+        CheckResult(reader->VisitIsNull().value(), {0, 1, 2});
+        CheckResult(reader->VisitEqual(lit2).value(), {});
+    };
+    {
+        // test write and read, v1 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/1));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test write and read, v2 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/2));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test v1 version
+        std::vector<char> index_bytes = {1, 0, 0,  0,  5,  0,  0, 0, 1, 1,  0, 
 0, 0, 0, 1, 0,
+                                         0, 0, 22, 58, 48, 0,  0, 1, 0, 0,  0, 
 0, 0, 2, 0, 16,
+                                         0, 0, 0,  0,  0,  1,  0, 2, 0, 58, 
48, 0, 0, 1, 0, 0,
+                                         0, 0, 0,  1,  0,  16, 0, 0, 0, 3,  0, 
 4, 0};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+    {
+        // test v2 version
+        std::vector<char> index_bytes = {
+            2,  0,  0, 0, 5, 0, 0, 0, 1,  1, 0, 0, 0,  0, 0, 0, 0, 22, 0, 0, 
0, 1,
+            1,  0,  0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 1,  1, 0, 0, 0, 22, 0, 0, 
0, 20,
+            58, 48, 0, 0, 1, 0, 0, 0, 0,  0, 2, 0, 16, 0, 0, 0, 0, 0,  1, 0, 
2, 0,
+            58, 48, 0, 0, 1, 0, 0, 0, 0,  0, 1, 0, 16, 0, 0, 0, 3, 0,  4, 0};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+}
+
+TEST_F(BitmapIndexTest, TestSmallIntType) {
+    // data: null, 1, 1, 1, 1
+    auto type = arrow::int16();
+    auto write_data = [&](int32_t version) -> Result<PAIMON_UNIQUE_PTR<Bytes>> 
{
+        auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+            
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({arrow::field("f0", 
type)}),
+                                                      R"([
+        [null],
+        [1],
+        [1],
+        [1],
+        [1]
+    ])")
+                .ValueOrDie());
+        return WriteIndex(type, version, array);
+    };
+    auto check_result = [&](const char* index_bytes, int32_t index_length) {
+        auto input_stream = 
std::make_shared<ByteArrayInputStream>(index_bytes, index_length);
+        BitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            file_index.CreateReader(CreateArrowSchema(type).get(),
+                                    /*start=*/0, /*length=*/index_length, 
input_stream, pool_));
+        ASSERT_TRUE(reader);
+        Literal lit1(static_cast<int16_t>(1));
+        Literal lit2(static_cast<int16_t>(2));
+        CheckResult(reader->VisitEqual(lit1).value(), {1, 2, 3, 4});
+        CheckResult(reader->VisitIsNull().value(), {0});
+        CheckResult(reader->VisitEqual(lit2).value(), {});
+    };
+    {
+        // test write and read, v1 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/1));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test write and read, v2 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/2));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    // as unique non-null value cardinality = 1, can test compatible with java
+    {
+        // test v1 version
+        std::vector<char> index_bytes = {1, 0, 0,  0,  5, 0, 0, 0, 1, 1, -1, 
-1, -1, -1, 0, 1, 0, 0,
+                                         0, 0, 59, 48, 0, 0, 1, 0, 0, 3, 0,  
1,  0,  1,  0, 3, 0};
+        check_result(index_bytes.data(), index_bytes.size());
+
+        // test compatible with java
+        ASSERT_OK_AND_ASSIGN(auto index_bytes2, write_data(/*version=*/1));
+        ASSERT_EQ(index_bytes, std::vector<char>(index_bytes2->data(),
+                                                 index_bytes2->data() + 
index_bytes2->size()));
+    }
+    {
+        // test v2 version
+        std::vector<char> index_bytes = {2, 0,  0, 0, 5, 0, 0, 0, 1, 1, -1, 
-1, -1, -1, 0,  0,
+                                         0, 18, 0, 0, 0, 1, 0, 1, 0, 0, 0,  0, 
 0,  0,  0,  14,
+                                         0, 0,  0, 1, 0, 1, 0, 0, 0, 0, 0,  0, 
 0,  15, 59, 48,
+                                         0, 0,  1, 0, 0, 3, 0, 1, 0, 1, 0,  3, 
 0};
+        check_result(index_bytes.data(), index_bytes.size());
+
+        // test compatible with java
+        ASSERT_OK_AND_ASSIGN(auto index_bytes2, write_data(/*version=*/2));
+        ASSERT_EQ(index_bytes, std::vector<char>(index_bytes2->data(),
+                                                 index_bytes2->data() + 
index_bytes2->size()));
+    }
+}
+
+TEST_F(BitmapIndexTest, TestBigIntType) {
+    // data: 1, 2, 3
+    auto type = arrow::int64();
+    auto write_data = [&](int32_t version) -> Result<PAIMON_UNIQUE_PTR<Bytes>> 
{
+        auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+            
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({arrow::field("f0", 
type)}),
+                                                      R"([
+        [1],
+        [2],
+        [3]
+    ])")
+                .ValueOrDie());
+        return WriteIndex(type, version, array);
+    };
+    auto check_result = [&](const char* index_bytes, int32_t index_length) {
+        auto input_stream = 
std::make_shared<ByteArrayInputStream>(index_bytes, index_length);
+        BitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            file_index.CreateReader(CreateArrowSchema(type).get(),
+                                    /*start=*/0, /*length=*/index_length, 
input_stream, pool_));
+        ASSERT_TRUE(reader);
+        Literal lit1(static_cast<int64_t>(1));
+        Literal lit3(static_cast<int64_t>(3));
+        Literal lit4(static_cast<int64_t>(4));
+        CheckResult(reader->VisitEqual(lit1).value(), {0});
+        CheckResult(reader->VisitEqual(lit3).value(), {2});
+        CheckResult(reader->VisitEqual(lit4).value(), {});
+    };
+    {
+        // test write and read, v1 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/1));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test write and read, v2 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/2));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test v1 version
+        std::vector<char> index_bytes = {1,  0,  0,  0,  3,  0,  0, 0, 3, 0, 
0,  0,  0,  0, 0,  0,
+                                         0,  1,  -1, -1, -1, -1, 0, 0, 0, 0, 
0,  0,  0,  2, -1, -1,
+                                         -1, -2, 0,  0,  0,  0,  0, 0, 0, 3, 
-1, -1, -1, -3};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+    {
+        // test v2 version
+        std::vector<char> index_bytes = {
+            2,  0,  0,  0,  3,  0,  0,  0,  3,  0, 0, 0,  0,  1,  0,  0,  0,  
0,  0,  0,  0,
+            1,  0,  0,  0,  0,  0,  0,  0,  52, 0, 0, 0,  3,  0,  0,  0,  0,  
0,  0,  0,  1,
+            -1, -1, -1, -1, -1, -1, -1, -1, 0,  0, 0, 0,  0,  0,  0,  2,  -1, 
-1, -1, -2, -1,
+            -1, -1, -1, 0,  0,  0,  0,  0,  0,  0, 3, -1, -1, -1, -3, -1, -1, 
-1, -1};
+        check_result(index_bytes.data(), index_bytes.size());
+
+        // test compatible with java
+        ASSERT_OK_AND_ASSIGN(auto index_bytes2, write_data(/*version=*/2));
+        ASSERT_EQ(index_bytes, std::vector<char>(index_bytes2->data(),
+                                                 index_bytes2->data() + 
index_bytes2->size()));
+    }
+}
+
+TEST_F(BitmapIndexTest, TestDateType) {
+    // data: 20200220, 20200220, null, null, 20220222, 20220222
+    auto type = arrow::date32();
+    auto write_data = [&](int32_t version) -> Result<PAIMON_UNIQUE_PTR<Bytes>> 
{
+        auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+            
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({arrow::field("f0", 
type)}),
+                                                      R"([
+        [20200220],
+        [20200220],
+        [null],
+        [null],
+        [20220222],
+        [20220222]
+    ])")
+                .ValueOrDie());
+        return WriteIndex(type, version, array);
+    };
+    auto check_result = [&](const char* index_bytes, int32_t index_length) {
+        auto input_stream = 
std::make_shared<ByteArrayInputStream>(index_bytes, index_length);
+        BitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            file_index.CreateReader(CreateArrowSchema(type).get(),
+                                    /*start=*/0, /*length=*/index_length, 
input_stream, pool_));
+        ASSERT_TRUE(reader);
+        Literal lit1(FieldType::DATE, static_cast<int32_t>(20200220));
+        Literal lit2(FieldType::DATE, static_cast<int32_t>(20220222));
+        Literal lit3(FieldType::DATE, static_cast<int32_t>(20250222));
+        CheckResult(reader->VisitEqual(lit1).value(), {0, 1});
+        CheckResult(reader->VisitEqual(lit2).value(), {4, 5});
+        CheckResult(reader->VisitIsNull().value(), {2, 3});
+        CheckResult(reader->VisitEqual(lit3).value(), {});
+    };
+    {
+        // test write and read, v1 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/1));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test write and read, v2 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/2));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test v1 version
+        std::vector<char> index_bytes = {
+            1,  0,    0,  0, 6,  0,  0,  0,  2,  1, 0, 0, 0, 0,  1, 52, 59, 
28, 0, 0,  0, 20, 1,
+            52, -119, 62, 0, 0,  0,  40, 58, 48, 0, 0, 1, 0, 0,  0, 0,  0,  1, 
 0, 16, 0, 0,  0,
+            2,  0,    3,  0, 58, 48, 0,  0,  1,  0, 0, 0, 0, 0,  1, 0,  16, 0, 
 0, 0,  0, 0,  1,
+            0,  58,   48, 0, 0,  1,  0,  0,  0,  0, 0, 1, 0, 16, 0, 0,  0,  4, 
 0, 5,  0};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+    {
+        // test v2 version
+        std::vector<char> index_bytes = {
+            2,  0, 0,  0,  6,  0, 0, 0,  2, 1,  0,    0,  0,  0, 0, 0,  0, 20, 
0,  0,  0,
+            1,  1, 52, 59, 28, 0, 0, 0,  0, 0,  0,    0,  28, 0, 0, 0,  2, 1,  
52, 59, 28,
+            0,  0, 0,  20, 0,  0, 0, 20, 1, 52, -119, 62, 0,  0, 0, 40, 0, 0,  
0,  20, 58,
+            48, 0, 0,  1,  0,  0, 0, 0,  0, 1,  0,    16, 0,  0, 0, 2,  0, 3,  
0,  58, 48,
+            0,  0, 1,  0,  0,  0, 0, 0,  1, 0,  16,   0,  0,  0, 0, 0,  1, 0,  
58, 48, 0,
+            0,  1, 0,  0,  0,  0, 0, 1,  0, 16, 0,    0,  0,  4, 0, 5,  0};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+}
+
+TEST_F(BitmapIndexTest, TestIntType) {
+    // data: 0, 1, null
+    auto type = arrow::int32();
+    auto write_data = [&](int32_t version) -> Result<PAIMON_UNIQUE_PTR<Bytes>> 
{
+        auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+            
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({arrow::field("f0", 
type)}),
+                                                      R"([
+        [0],
+        [1],
+        [null]
+    ])")
+                .ValueOrDie());
+        return WriteIndex(type, version, array);
+    };
+    auto check_result = [&](const char* index_bytes, int32_t index_length) {
+        auto input_stream = 
std::make_shared<ByteArrayInputStream>(index_bytes, index_length);
+        BitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            file_index.CreateReader(CreateArrowSchema(type).get(),
+                                    /*start=*/0, /*length=*/index_length, 
input_stream, pool_));
+        ASSERT_TRUE(reader);
+        Literal lit_0(static_cast<int32_t>(0));
+        Literal lit_1(static_cast<int32_t>(1));
+        Literal lit_2(static_cast<int32_t>(2));
+        CheckResult(reader->VisitEqual(lit_0).value(), {0});
+        CheckResult(reader->VisitEqual(lit_1).value(), {1});
+        CheckResult(reader->VisitIsNull().value(), {2});
+        CheckResult(reader->VisitIn({lit_0, lit_1, lit_2}).value(), {0, 1});
+        CheckResult(reader->VisitNotIn({lit_0, lit_1, lit_2}).value(), {});
+    };
+    {
+        // test write and read, v1 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/1));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test write and read, v2 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/2));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test v1 version
+        std::vector<char> index_bytes = {1, 0, 0, 0,  3,  0,  0,  0, 2, 1, -1, 
-1, -1, -3, 0,
+                                         0, 0, 0, -1, -1, -1, -1, 0, 0, 0, 1,  
-1, -1, -1, -2};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+    {
+        // test v2 version
+        std::vector<char> index_bytes = {2,  0,  0, 0, 3, 0, 0,  0,  2,  1,  
-1, -1, -1, -3, 0,  0,
+                                         0,  18, 0, 0, 0, 1, 0,  0,  0,  0,  
0,  0,  0,  0,  0,  0,
+                                         0,  28, 0, 0, 0, 2, 0,  0,  0,  0,  
-1, -1, -1, -1, -1, -1,
+                                         -1, -1, 0, 0, 0, 1, -1, -1, -1, -2, 
-1, -1, -1, -1};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+}
+
+TEST_F(BitmapIndexTest, TestTimestampType) {
+    // data:
+    // 1745542802000lms, 123000ns
+    // 1745542902000lms, 123000ns
+    // 1745542602000lms, 123000ns
+    // -1745lms, 123000ns
+    // -1765lms, 123000ns
+    // null
+    // 1745542802000lms, 123001ns
+    // -1725lms, 123000ns
+    auto type = arrow::timestamp(arrow::TimeUnit::NANO);
+    auto write_data = [&](int32_t version) -> Result<PAIMON_UNIQUE_PTR<Bytes>> 
{
+        auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+            
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({arrow::field("f0", 
type)}),
+                                                      R"([
+        [1745542802000123000],
+        [1745542902000123000],
+        [1745542602000123000],
+        [-1744877000],
+        [-1764877000],
+        [null],
+        [1745542802000123001],
+        [-1724877000]
+    ])")
+                .ValueOrDie());
+        return WriteIndex(type, version, array);
+    };
+    auto check_result = [&](const char* index_bytes, int32_t index_length) {
+        auto input_stream = 
std::make_shared<ByteArrayInputStream>(index_bytes, index_length);
+        BitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            file_index.CreateReader(CreateArrowSchema(type).get(),
+                                    /*start=*/0, /*length=*/index_length, 
input_stream, pool_));
+        ASSERT_TRUE(reader);
+
+        CheckResult(reader->VisitIsNull().value(), {5});
+        CheckResult(reader->VisitIsNotNull().value(), {0, 1, 2, 3, 4, 6, 7});
+        CheckResult(reader->VisitEqual(Literal(Timestamp(1745542502000l, 
123000))).value(), {});
+        // as timestamp is normalized by micro seconds, there is a loss of 
precision in the
+        // nanosecond part
+        CheckResult(reader->VisitEqual(Literal(Timestamp(1745542802000l, 
123000))).value(), {0, 6});
+        CheckResult(reader->VisitNotEqual(Literal(Timestamp(1745542802000l, 
123000))).value(),
+                    {1, 2, 3, 4, 7});
+        CheckResult(reader
+                        ->VisitIn({Literal(Timestamp(1745542802000l, 123000)),
+                                   Literal(Timestamp(-1745, 123000)),
+                                   Literal(Timestamp(1745542602000, 123000))})
+                        .value(),
+                    {0, 2, 3, 6});
+        CheckResult(reader
+                        ->VisitNotIn({Literal(Timestamp(1745542802000l, 
123000)),
+                                      Literal(Timestamp(-1745, 123000)),
+                                      Literal(Timestamp(1745542602000, 
123000))})
+                        .value(),
+                    {1, 4, 7});
+    };
+    {
+        // test write and read, v1 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/1));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test write and read, v2 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/2));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test v1 version
+        std::vector<char> index_bytes = {
+            1,   0,   0,  0,  8,  0,    0,   0,   6,  1,   -1, -1,   -1,  -6,  
0,    6,  51, -113,
+            -38, -89, 72, -5, 0,  0,    0,   0,   -1, -1,  -1, -1,   -1,  -27, 
-82,  51, -1, -1,
+            -1,  -8,  -1, -1, -1, -1,   -1,  -27, 17, -13, -1, -1,   -1,  -5,  
-1,   -1, -1, -1,
+            -1,  -27, 96, 19, -1, -1,   -1,  -4,  0,  6,   51, -113, -50, -69, 
-122, -5, -1, -1,
+            -1,  -3,  0,  6,  51, -113, -32, -99, 41, -5,  -1, -1,   -1,  -2,  
58,   48, 0,  0,
+            1,   0,   0,  0,  0,  0,    1,   0,   16, 0,   0,  0,    0,   0,   
6,    0};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+    {
+        // test v2 version
+        std::vector<char> index_bytes = {
+            2,   0,    0,   0,    8,   0,   0,    0,   6,   1,   -1, -1,  -1, 
-6,  0,  0,  0,  18,
+            0,   0,    0,   1,    -1,  -1,  -1,   -1,  -1,  -27, 17, -13, 0,  
0,   0,  0,  0,  0,
+            0,   100,  0,   0,    0,   6,   -1,   -1,  -1,  -1,  -1, -27, 17, 
-13, -1, -1, -1, -5,
+            -1,  -1,   -1,  -1,   -1,  -1,  -1,   -1,  -1,  -27, 96, 19,  -1, 
-1,  -1, -4, -1, -1,
+            -1,  -1,   -1,  -1,   -1,  -1,  -1,   -27, -82, 51,  -1, -1,  -1, 
-8,  -1, -1, -1, -1,
+            0,   6,    51,  -113, -50, -69, -122, -5,  -1,  -1,  -1, -3,  -1, 
-1,  -1, -1, 0,  6,
+            51,  -113, -38, -89,  72,  -5,  0,    0,   0,   0,   0,  0,   0,  
20,  0,  6,  51, -113,
+            -32, -99,  41,  -5,   -1,  -1,  -1,   -2,  -1,  -1,  -1, -1,  58, 
48,  0,  0,  1,  0,
+            0,   0,    0,   0,    1,   0,   16,   0,   0,   0,   0,  0,   6,  
0};
+        check_result(index_bytes.data(), index_bytes.size());
+    }
+}
+
+TEST_F(BitmapIndexTest, TestHighCardinalityForCompatibility) {
+    auto type = arrow::utf8();
+    auto check_result = [&](const std::string& index_file_name) {
+        auto file_system = std::make_unique<LocalFileSystem>();
+        ASSERT_OK_AND_ASSIGN(std::shared_ptr<InputStream> input_stream,
+                             file_system->Open(index_file_name));
+        ASSERT_OK_AND_ASSIGN(uint64_t length, input_stream->Length());
+
+        BitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(auto reader, 
file_index.CreateReader(CreateArrowSchema(type).get(),
+                                                                  /*start=*/0, 
/*length=*/length,
+                                                                  
input_stream, pool_));
+        ASSERT_TRUE(reader);
+        Literal lit_s1(FieldType::STRING, "asdfghjkl123_2500", 17);
+        Literal lit_s2(FieldType::STRING, "asdfghjkl123_5000", 17);
+        Literal lit_s3(FieldType::STRING, "asdfghjkl123_7500", 17);
+
+        CheckResult(reader->VisitEqual(lit_s1).value(),
+                    {3127, 18654, 23615, 43768, 62555, 63261, 66284, 74708, 
99268});
+        CheckResult(reader->VisitEqual(lit_s2).value(),
+                    {2269, 21292, 23667, 35066, 42377, 46085, 64811, 79062, 
96161});
+        CheckResult(reader->VisitEqual(lit_s3).value(),
+                    {14856, 50768, 60230, 68792, 72208, 73800, 78261, 81346, 
86520, 87420, 88501,
+                     95131, 99401});
+    };
+
+    // test v1 version
+    check_result(paimon::test::GetDataDir() + "/file_index/bitmap-index-v1");
+
+    // test v2 version
+    check_result(paimon::test::GetDataDir() + "/file_index/bitmap-index-v2");
+}
+
+TEST_F(BitmapIndexTest, TestHighCardinalityForWriteAndRead) {
+    int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+    std::srand(seed);
+    auto type = arrow::utf8();
+    std::vector<std::string> unique_values = {
+        "asdfghjkl123_0", "asdfghjkl123_2500", "asdfghjkl123_300", 
"asdfghjkl123_20",
+        "asdfghjkl123_1", "asdfghjkl123_2",    "asdfghjkl123_3",   
"asdfghjkl123_4",
+        "asdfghjkl123_5", "asdfghjkl123_6"};
+    std::vector<std::vector<int32_t>> expected_bitmaps(unique_values.size());
+
+    auto write_data = [&](int32_t version) -> Result<PAIMON_UNIQUE_PTR<Bytes>> 
{
+        // clear expected bitmaps
+        for (auto& bitmap : expected_bitmaps) {
+            bitmap.clear();
+        }
+
+        arrow::StructBuilder struct_builder(arrow::struct_({arrow::field("f0", 
type)}),
+                                            arrow::default_memory_pool(),
+                                            
{std::make_shared<arrow::StringBuilder>()});
+        auto string_builder = 
static_cast<arrow::StringBuilder*>(struct_builder.field_builder(0));
+
+        for (int32_t i = 0; i < 100000; i++) {
+            EXPECT_TRUE(struct_builder.Append().ok());
+            int32_t idx = paimon::test::RandomNumber(0, unique_values.size() - 
1);
+            
EXPECT_TRUE(string_builder->Append(unique_values[idx].data()).ok());
+            expected_bitmaps[idx].push_back(i);
+        }
+        std::shared_ptr<arrow::Array> array;
+        EXPECT_TRUE(struct_builder.Finish(&array).ok());
+        return WriteIndex(type, version, array);
+    };
+
+    auto check_result = [&](const char* index_bytes, int32_t index_length) {
+        auto input_stream = 
std::make_shared<ByteArrayInputStream>(index_bytes, index_length);
+        BitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            file_index.CreateReader(CreateArrowSchema(type).get(),
+                                    /*start=*/0, /*length=*/index_length, 
input_stream, pool_));
+        ASSERT_TRUE(reader);
+
+        for (size_t i = 0; i < unique_values.size(); i++) {
+            Literal lit(FieldType::STRING, unique_values[i].data(), 
unique_values[i].size());
+            CheckResult(reader->VisitEqual(lit).value(), expected_bitmaps[i]);
+        }
+        CheckResult(reader->VisitIsNull().value(), {});
+    };
+    {
+        // test write and read, v1 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/1));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test write and read, v2 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/2));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+}
+
+TEST_F(BitmapIndexTest, TestCompatibleWithJava) {
+    // data: apple, null, apple, null, apple
+    // If and only if non-null elements only contain one value (e.g., apple), 
index bytes can be
+    // compared for compatibility.
+    auto type = arrow::utf8();
+    auto write_data = [&](int32_t version) -> Result<PAIMON_UNIQUE_PTR<Bytes>> 
{
+        auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+            
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({arrow::field("f0", 
type)}),
+                                                      R"([
+        ["apple"],
+        [null],
+        ["apple"],
+        [null],
+        ["apple"]
+    ])")
+                .ValueOrDie());
+        return WriteIndex(type, version, array);
+    };
+    {
+        // test v1 version
+        std::vector<char> java_index_bytes = {
+            1, 0,  0,  0,  5,  0,  0, 0, 1, 1, 0, 0, 0, 0,  0, 0, 0,  5, 97, 
112, 112, 108, 101,
+            0, 0,  0,  20, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0,  1, 0, 16, 0, 0,  
0,   1,   0,   3,
+            0, 58, 48, 0,  0,  1,  0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0,  0, 0,  
2,   0,   4,   0};
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/1));
+        ASSERT_EQ(java_index_bytes, std::vector<char>(index_bytes->data(),
+                                                      index_bytes->data() + 
index_bytes->size()));
+    }
+    {
+        // test v2 version
+        std::vector<char> java_index_bytes = {
+            2, 0, 0, 0, 5, 0,  0,   0,   1,   1,   0, 0, 0, 0,  0, 0, 0, 20, 
0,  0,  0,
+            1, 0, 0, 0, 5, 97, 112, 112, 108, 101, 0, 0, 0, 0,  0, 0, 0, 21, 
0,  0,  0,
+            1, 0, 0, 0, 5, 97, 112, 112, 108, 101, 0, 0, 0, 20, 0, 0, 0, 22, 
58, 48, 0,
+            0, 1, 0, 0, 0, 0,  0,   1,   0,   16,  0, 0, 0, 1,  0, 3, 0, 58, 
48, 0,  0,
+            1, 0, 0, 0, 0, 0,  2,   0,   16,  0,   0, 0, 0, 0,  2, 0, 4, 0};
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/2));
+        ASSERT_EQ(java_index_bytes, std::vector<char>(index_bytes->data(),
+                                                      index_bytes->data() + 
index_bytes->size()));
+    }
+}
+
+TEST_F(BitmapIndexTest, TestAllNull) {
+    // data: null, null, null, null
+    auto type = arrow::int32();
+    auto write_data = [&](int32_t version) -> Result<PAIMON_UNIQUE_PTR<Bytes>> 
{
+        auto array = std::dynamic_pointer_cast<arrow::StructArray>(
+            
arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_({arrow::field("f0", 
type)}),
+                                                      R"([
+        [null],
+        [null],
+        [null],
+        [null]
+    ])")
+                .ValueOrDie());
+        return WriteIndex(type, version, array);
+    };
+    auto check_result = [&](const char* index_bytes, int32_t index_length) {
+        auto input_stream = 
std::make_shared<ByteArrayInputStream>(index_bytes, index_length);
+        BitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(
+            auto reader,
+            file_index.CreateReader(CreateArrowSchema(type).get(),
+                                    /*start=*/0, /*length=*/index_length, 
input_stream, pool_));
+        ASSERT_TRUE(reader);
+        Literal lit_0(static_cast<int32_t>(0));
+        CheckResult(reader->VisitEqual(lit_0).value(), {});
+        CheckResult(reader->VisitIsNull().value(), {0, 1, 2, 3});
+        CheckResult(reader->VisitIsNotNull().value(), {});
+    };
+    {
+        // test write and read, v1 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/1));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test write and read, v2 version
+        ASSERT_OK_AND_ASSIGN(auto index_bytes, write_data(/*version=*/2));
+        check_result(index_bytes->data(), index_bytes->size());
+    }
+    {
+        // test v1 version
+        std::vector<char> index_bytes = {1,  0, 0, 0, 4, 0, 0, 0, 0, 1, 0, 0, 
0, 0, 59,
+                                         48, 0, 0, 1, 0, 0, 3, 0, 1, 0, 0, 0, 
3, 0};
+        check_result(index_bytes.data(), index_bytes.size());
+
+        // test compatible
+        ASSERT_OK_AND_ASSIGN(auto index_bytes2, write_data(/*version=*/1));
+        ASSERT_EQ(index_bytes, std::vector<char>(index_bytes2->data(),
+                                                 index_bytes2->data() + 
index_bytes2->size()));
+    }
+    {
+        // test v2 version
+        std::vector<char> index_bytes = {2, 0, 0, 0,  4, 0, 0, 0, 0, 1, 0, 0, 
0,  0,
+                                         0, 0, 0, 15, 0, 0, 0, 0, 0, 0, 0, 0, 
59, 48,
+                                         0, 0, 1, 0,  0, 3, 0, 1, 0, 0, 0, 3, 
0};
+        check_result(index_bytes.data(), index_bytes.size());
+
+        // test compatible
+        ASSERT_OK_AND_ASSIGN(auto index_bytes2, write_data(/*version=*/2));
+        ASSERT_EQ(index_bytes, std::vector<char>(index_bytes2->data(),
+                                                 index_bytes2->data() + 
index_bytes2->size()));
+    }
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/file_index/bitmap/bitmap_index_result.cpp 
b/src/paimon/common/file_index/bitmap/bitmap_index_result.cpp
new file mode 100644
index 0000000..1027ce5
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_index_result.cpp
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/file_index/bitmap_index_result.h"
+
+#include <utility>
+
+#include "paimon/status.h"
+
+namespace paimon {
+BitmapIndexResult::BitmapIndexResult(BitmapSupplier bitmap_supplier)
+    : bitmap_supplier_(std::move(bitmap_supplier)) {}
+
+BitmapIndexResult::~BitmapIndexResult() = default;
+
+Result<const RoaringBitmap32*> BitmapIndexResult::GetBitmap() const {
+    if (!initialized_) {
+        PAIMON_ASSIGN_OR_RAISE(bitmap_, bitmap_supplier_());
+        initialized_ = true;
+    }
+    return &bitmap_;
+}
+
+Result<bool> BitmapIndexResult::IsRemain() const {
+    PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap32* bitmap, GetBitmap());
+    return !bitmap->IsEmpty();
+}
+
+Result<std::shared_ptr<FileIndexResult>> BitmapIndexResult::And(
+    const std::shared_ptr<FileIndexResult>& other) {
+    auto typed_other = std::dynamic_pointer_cast<BitmapIndexResult>(other);
+    if (typed_other) {
+        return std::make_shared<BitmapIndexResult>(
+            [result = 
std::dynamic_pointer_cast<BitmapIndexResult>(shared_from_this()),
+             typed_other]() -> Result<RoaringBitmap32> {
+                PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap32* bitmap, 
result->GetBitmap());
+                PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap32* other_bitmap,
+                                       typed_other->GetBitmap())
+                return RoaringBitmap32::And(*bitmap, *other_bitmap);
+            });
+    }
+    return FileIndexResult::And(other);
+}
+
+Result<std::shared_ptr<FileIndexResult>> BitmapIndexResult::Or(
+    const std::shared_ptr<FileIndexResult>& other) {
+    auto typed_other = std::dynamic_pointer_cast<BitmapIndexResult>(other);
+    if (typed_other) {
+        return std::make_shared<BitmapIndexResult>(
+            [result = 
std::dynamic_pointer_cast<BitmapIndexResult>(shared_from_this()),
+             typed_other]() -> Result<RoaringBitmap32> {
+                PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap32* bitmap, 
result->GetBitmap());
+                PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap32* other_bitmap,
+                                       typed_other->GetBitmap())
+                return RoaringBitmap32::Or(*bitmap, *other_bitmap);
+            });
+    }
+    return FileIndexResult::Or(other);
+}
+
+std::string BitmapIndexResult::ToString() const {
+    auto bitmap = GetBitmap();
+    if (!bitmap.ok()) {
+        return bitmap.status().ToString();
+    }
+    return bitmap.value()->ToString();
+}
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bitmap/bitmap_index_result_test.cpp 
b/src/paimon/common/file_index/bitmap/bitmap_index_result_test.cpp
new file mode 100644
index 0000000..e1c5882
--- /dev/null
+++ b/src/paimon/common/file_index/bitmap/bitmap_index_result_test.cpp
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/file_index/bitmap_index_result.h"
+
+#include <cstdint>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+class BitmapIndexResultTest : public ::testing::Test {
+ public:
+    void SetUp() override {}
+    void TearDown() override {}
+
+    std::shared_ptr<BitmapIndexResult> CreateResult(const 
std::vector<int32_t>& values) const {
+        return std::make_shared<BitmapIndexResult>(
+            [=]() -> Result<RoaringBitmap32> { return 
RoaringBitmap32::From(values); });
+    }
+
+    void CheckResult(const std::shared_ptr<FileIndexResult>& result,
+                     const std::vector<int32_t>& expected) const {
+        auto typed_result = 
std::dynamic_pointer_cast<BitmapIndexResult>(result);
+        ASSERT_TRUE(typed_result);
+        ASSERT_EQ(*(typed_result->GetBitmap().value()), 
RoaringBitmap32::From(expected));
+    }
+};
+
+TEST_F(BitmapIndexResultTest, TestSimple) {
+    auto res1 = CreateResult({10, 100});
+    auto res2 = CreateResult({10, 20, 200});
+    auto res3 = CreateResult({});
+
+    ASSERT_EQ(res1->ToString(), "{10,100}");
+    ASSERT_EQ(res3->ToString(), "{}");
+
+    ASSERT_TRUE(res1->IsRemain().value());
+    ASSERT_FALSE(res3->IsRemain().value());
+    CheckResult(res1->And(res2).value(), {10});
+    CheckResult(res1->Or(res2).value(), {10, 20, 100, 200});
+
+    CheckResult(res1->And(res3).value(), {});
+    CheckResult(res1->Or(res3).value(), {10, 100});
+}
+
+TEST_F(BitmapIndexResultTest, TestCapture) {
+    auto res1 = CreateResult({10, 100});
+    auto res2 = CreateResult({10, 20, 200});
+
+    ASSERT_OK_AND_ASSIGN(auto res, res1->And(res2));
+    res1.reset();
+    res2.reset();
+    CheckResult(res, {10});
+}
+
+TEST_F(BitmapIndexResultTest, TestCompoundIndexResult) {
+    auto res1 = CreateResult({1, 3, 5});
+    auto res2 = CreateResult({});
+    ASSERT_TRUE(FileIndexResult::Remain()->IsRemain().value());
+    ASSERT_FALSE(FileIndexResult::Skip()->IsRemain().value());
+
+    ASSERT_TRUE(res1->IsRemain().value());
+    ASSERT_FALSE(res2->IsRemain().value());
+
+    
ASSERT_FALSE(res1->And(FileIndexResult::Skip()).value()->IsRemain().value());
+    
ASSERT_TRUE(res1->And(FileIndexResult::Remain()).value()->IsRemain().value());
+    ASSERT_TRUE(res1->Or(FileIndexResult::Skip()).value()->IsRemain().value());
+    
ASSERT_TRUE(res1->Or(FileIndexResult::Remain()).value()->IsRemain().value());
+
+    
ASSERT_FALSE(res2->And(FileIndexResult::Skip()).value()->IsRemain().value());
+    
ASSERT_FALSE(res2->And(FileIndexResult::Remain()).value()->IsRemain().value());
+    
ASSERT_FALSE(res2->Or(FileIndexResult::Skip()).value()->IsRemain().value());
+    
ASSERT_TRUE(res2->Or(FileIndexResult::Remain()).value()->IsRemain().value());
+}
+
+}  // namespace paimon::test


Reply via email to