This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 8d90623  feat: Migrate bloom filter and BSI file index implementations 
(#50)
8d90623 is described below

commit 8d906230d5eef5ab5e424e1654c6a80fef7d24ed
Author: lxy <[email protected]>
AuthorDate: Fri Jun 5 16:49:06 2026 +0800

    feat: Migrate bloom filter and BSI file index implementations (#50)
---
 .../bloomfilter/bloom_filter_file_index.cpp        |  88 +++++
 .../bloomfilter/bloom_filter_file_index.h          |  79 +++++
 .../bloom_filter_file_index_factory.cpp            |  37 ++
 .../bloomfilter/bloom_filter_file_index_factory.h  |  42 +++
 .../bloomfilter/bloom_filter_file_index_test.cpp   | 179 ++++++++++
 .../common/file_index/bloomfilter/fast_hash.cpp    | 138 ++++++++
 .../common/file_index/bloomfilter/fast_hash.h      |  49 +++
 .../file_index/bloomfilter/fast_hash_test.cpp      | 167 +++++++++
 .../bsi/bit_slice_index_bitmap_file_index.cpp      | 294 ++++++++++++++++
 .../bsi/bit_slice_index_bitmap_file_index.h        | 112 ++++++
 .../bit_slice_index_bitmap_file_index_factory.cpp  |  37 ++
 .../bit_slice_index_bitmap_file_index_factory.h    |  42 +++
 .../bsi/bit_slice_index_bitmap_file_index_test.cpp | 390 +++++++++++++++++++++
 .../bsi/bit_slice_index_roaring_bitmap.cpp         | 300 ++++++++++++++++
 .../bsi/bit_slice_index_roaring_bitmap.h           |  98 ++++++
 .../bsi/bit_slice_index_roaring_bitmap_test.cpp    | 284 +++++++++++++++
 16 files changed, 2336 insertions(+)

diff --git 
a/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index.cpp 
b/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index.cpp
new file mode 100644
index 0000000..b0a40ff
--- /dev/null
+++ b/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index.cpp
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bloomfilter/bloom_filter_file_index.h"
+
+#include <cstddef>
+#include <functional>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class MemoryPool;
+
+BloomFilterFileIndex::BloomFilterFileIndex(const std::map<std::string, 
std::string>& options) {}
+Result<std::shared_ptr<FileIndexReader>> BloomFilterFileIndex::CreateReader(
+    ::ArrowSchema* c_arrow_schema, int32_t start, int32_t length,
+    const std::shared_ptr<InputStream>& input_stream,
+    const std::shared_ptr<MemoryPool>& pool) const {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> 
arrow_schema,
+                                      arrow::ImportSchema(c_arrow_schema));
+    if (arrow_schema->num_fields() != 1) {
+        return Status::Invalid(
+            "invalid schema for BloomFilterFileIndexReader, supposed to have 
single "
+            "field.");
+    }
+    auto arrow_type = arrow_schema->field(0)->type();
+
+    PAIMON_RETURN_NOT_OK(input_stream->Seek(start, SeekOrigin::FS_SEEK_SET));
+    auto bytes = std::make_shared<Bytes>(length, pool.get());
+    PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len,
+                           input_stream->Read(bytes->data(), bytes->size()));
+    if (static_cast<size_t>(actual_read_len) != bytes->size()) {
+        return Status::Invalid(
+            fmt::format("create reader for BloomFilterFileIndex failed, 
expected read len "
+                        "{}, actual read len {}",
+                        bytes->size(), actual_read_len));
+    }
+    return BloomFilterFileIndexReader::Create(arrow_type, bytes);
+}
+
+Result<std::shared_ptr<BloomFilterFileIndexReader>> 
BloomFilterFileIndexReader::Create(
+    const std::shared_ptr<arrow::DataType>& arrow_type, const 
std::shared_ptr<Bytes>& bytes) {
+    // compatible with java, little endian
+    const char* data = bytes->data();
+    auto num_hash_functions =
+        
static_cast<int32_t>((static_cast<uint32_t>(static_cast<uint8_t>(data[0])) << 
24) |
+                             
(static_cast<uint32_t>(static_cast<uint8_t>(data[1])) << 16) |
+                             
(static_cast<uint32_t>(static_cast<uint8_t>(data[2])) << 8) |
+                             
static_cast<uint32_t>(static_cast<uint8_t>(data[3])));
+    PAIMON_ASSIGN_OR_RAISE(FastHash::HashFunction hash_function,
+                           FastHash::GetHashFunction(arrow_type));
+    auto bit_set = std::make_unique<BloomFilter64::BitSet>(bytes, 
/*offset=*/sizeof(int32_t));
+    return std::shared_ptr<BloomFilterFileIndexReader>(new 
BloomFilterFileIndexReader(
+        hash_function, BloomFilter64(num_hash_functions, std::move(bit_set))));
+}
+
+BloomFilterFileIndexReader::BloomFilterFileIndexReader(const 
FastHash::HashFunction& hash_function,
+                                                       BloomFilter64&& filter)
+    : hash_function_(hash_function), filter_(std::move(filter)) {}
+
+Result<std::shared_ptr<FileIndexResult>> 
BloomFilterFileIndexReader::VisitEqual(
+    const Literal& literal) {
+    int64_t hash = hash_function_(literal);
+    return literal.IsNull() || filter_.TestHash(hash) ? 
FileIndexResult::Remain()
+                                                      : 
FileIndexResult::Skip();
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index.h 
b/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index.h
new file mode 100644
index 0000000..8152c22
--- /dev/null
+++ b/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index.h
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <map>
+#include <memory>
+#include <string>
+
+#include "arrow/c/bridge.h"
+#include "paimon/common/file_index/bloomfilter/fast_hash.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/common/utils/bloom_filter64.h"
+#include "paimon/file_index/file_index_reader.h"
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/file_index/file_indexer.h"
+#include "paimon/result.h"
+namespace paimon {
+class Bytes;
+class InputStream;
+class Literal;
+class MemoryPool;
+enum class FieldType;
+
+/// Bloom filter for file index.
+///
+/// @note This class use `BloomFilter64` as a base filter. Store the num hash 
function (one
+/// integer) and bit set bytes only. Use `HashFunction` to hash the objects, 
which hash bytes
+/// type(like varchar, binary, etc.) using xx hash, hash numeric type by 
specified number hash(see
+/// 
http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm).
+class BloomFilterFileIndex : public FileIndexer {
+ public:
+    explicit BloomFilterFileIndex(const std::map<std::string, std::string>& 
options);
+    ~BloomFilterFileIndex() override = default;
+
+    Result<std::shared_ptr<FileIndexReader>> CreateReader(
+        ::ArrowSchema* arrow_schema, int32_t start, int32_t length,
+        const std::shared_ptr<InputStream>& input_stream,
+        const std::shared_ptr<MemoryPool>& pool) const override;
+
+    Result<std::shared_ptr<FileIndexWriter>> CreateWriter(
+        ::ArrowSchema* arrow_schema, const std::shared_ptr<MemoryPool>& pool) 
const override {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::DataType> 
arrow_type,
+                                          arrow::ImportType(arrow_schema));
+        return Status::NotImplemented("do not support index writer in bloom 
filter");
+    }
+};
+
+class BloomFilterFileIndexReader : public FileIndexReader {
+ public:
+    static Result<std::shared_ptr<BloomFilterFileIndexReader>> Create(
+        const std::shared_ptr<arrow::DataType>& arrow_type, const 
std::shared_ptr<Bytes>& bytes);
+
+    Result<std::shared_ptr<FileIndexResult>> VisitEqual(const Literal& 
literal) override;
+
+ private:
+    BloomFilterFileIndexReader(const FastHash::HashFunction& hash_function, 
BloomFilter64&& filter);
+
+ private:
+    FastHash::HashFunction hash_function_;
+    BloomFilter64 filter_;
+};
+}  // namespace paimon
diff --git 
a/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index_factory.cpp 
b/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index_factory.cpp
new file mode 100644
index 0000000..97b8baa
--- /dev/null
+++ 
b/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index_factory.cpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
"paimon/common/file_index/bloomfilter/bloom_filter_file_index_factory.h"
+
+#include <utility>
+
+#include "paimon/common/file_index/bloomfilter/bloom_filter_file_index.h"
+#include "paimon/factories/factory_creator.h"
+
+namespace paimon {
+
+const char BloomFilterFileIndexFactory::IDENTIFIER[] = "bloom-filter";
+
+Result<std::unique_ptr<FileIndexer>> BloomFilterFileIndexFactory::Create(
+    const std::map<std::string, std::string>& options) const {
+    return std::make_unique<BloomFilterFileIndex>(options);
+}
+
+REGISTER_PAIMON_FACTORY(BloomFilterFileIndexFactory);
+
+}  // namespace paimon
diff --git 
a/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index_factory.h 
b/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index_factory.h
new file mode 100644
index 0000000..813adcd
--- /dev/null
+++ b/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index_factory.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/file_index/file_indexer.h"
+#include "paimon/file_index/file_indexer_factory.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+class BloomFilterFileIndexFactory : public FileIndexerFactory {
+ public:
+    static const char IDENTIFIER[];
+
+    const char* Identifier() const override {
+        return IDENTIFIER;
+    }
+    Result<std::unique_ptr<FileIndexer>> Create(
+        const std::map<std::string, std::string>& options) const override;
+};
+
+}  // namespace paimon
diff --git 
a/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index_test.cpp 
b/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index_test.cpp
new file mode 100644
index 0000000..209f0b4
--- /dev/null
+++ b/src/paimon/common/file_index/bloomfilter/bloom_filter_file_index_test.cpp
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bloomfilter/bloom_filter_file_index.h"
+
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+class BloomFilterIndexReaderTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+    }
+    void TearDown() override {
+        pool_.reset();
+    }
+
+    std::unique_ptr<::ArrowSchema> CreateArrowSchema(
+        const std::shared_ptr<arrow::DataType>& data_type) const {
+        auto schema = arrow::schema({arrow::field("f0", data_type)});
+        auto c_schema = std::make_unique<::ArrowSchema>();
+        EXPECT_TRUE(arrow::ExportSchema(*schema, c_schema.get()).ok());
+        return c_schema;
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+TEST_F(BloomFilterIndexReaderTest, TestStringType) {
+    // data: "a", "b", ""
+    std::vector<uint8_t> index_bytes = {0, 0, 0, 6, 0, 32, 32, 3, 208, 32, 0, 
64, 73, 16, 201};
+    auto input_stream = std::make_shared<ByteArrayInputStream>(
+        reinterpret_cast<char*>(index_bytes.data()), index_bytes.size());
+
+    BloomFilterFileIndex file_index({});
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        file_index.CreateReader(CreateArrowSchema(arrow::utf8()).get(),
+                                /*start=*/0, /*length=*/index_bytes.size(), 
input_stream, pool_));
+    ASSERT_TRUE(reader);
+    ASSERT_TRUE(reader->VisitEqual(Literal(FieldType::STRING, "a", 
1)).value()->IsRemain().value());
+    ASSERT_TRUE(reader->VisitEqual(Literal(FieldType::STRING, "b", 
1)).value()->IsRemain().value());
+    ASSERT_TRUE(reader->VisitEqual(Literal(FieldType::STRING, "", 
0)).value()->IsRemain().value());
+    
ASSERT_TRUE(reader->VisitEqual(Literal(FieldType::STRING)).value()->IsRemain().value());
+}
+
+TEST_F(BloomFilterIndexReaderTest, TestIntegerTypes) {
+    // data: 1, 2, -1, 123
+    std::vector<uint8_t> index_bytes = {0, 0, 0, 6, 24, 4, 79, 0, 35, 128, 1, 
136, 26, 64, 129};
+    auto input_stream = std::make_shared<ByteArrayInputStream>(
+        reinterpret_cast<char*>(index_bytes.data()), index_bytes.size());
+
+    BloomFilterFileIndex file_index({});
+
+    auto check_result = [&](const std::shared_ptr<arrow::DataType>& type,
+                            const std::vector<Literal>& literals) {
+        ASSERT_OK_AND_ASSIGN(auto reader,
+                             
file_index.CreateReader(CreateArrowSchema(type).get(),
+                                                     /*start=*/0, 
/*length=*/index_bytes.size(),
+                                                     input_stream, pool_));
+        ASSERT_TRUE(reader);
+        for (const Literal& literal : literals) {
+            
ASSERT_TRUE(reader->VisitEqual(literal).value()->IsRemain().value());
+        }
+        ASSERT_OK_AND_ASSIGN(auto field_type, 
FieldTypeUtils::ConvertToFieldType(type->id()));
+        
ASSERT_TRUE(reader->VisitEqual(Literal(field_type)).value()->IsRemain().value());
+    };
+
+    check_result(arrow::int8(),
+                 {Literal(static_cast<int8_t>(1)), 
Literal(static_cast<int8_t>(2)),
+                  Literal(static_cast<int8_t>(-1)), 
Literal(static_cast<int8_t>(123))});
+    check_result(arrow::int16(),
+                 {Literal(static_cast<int16_t>(1)), 
Literal(static_cast<int16_t>(2)),
+                  Literal(static_cast<int16_t>(-1)), 
Literal(static_cast<int16_t>(123))});
+    check_result(arrow::int32(),
+                 {Literal(static_cast<int32_t>(1)), 
Literal(static_cast<int32_t>(2)),
+                  Literal(static_cast<int32_t>(-1)), 
Literal(static_cast<int32_t>(123))});
+    check_result(arrow::int64(),
+                 {Literal(static_cast<int64_t>(1)), 
Literal(static_cast<int64_t>(2)),
+                  Literal(static_cast<int64_t>(-1)), 
Literal(static_cast<int64_t>(123))});
+    check_result(arrow::date32(), {Literal(FieldType::DATE, 1), 
Literal(FieldType::DATE, 2),
+                                   Literal(FieldType::DATE, -1), 
Literal(FieldType::DATE, 123)});
+}
+
+TEST_F(BloomFilterIndexReaderTest, TestFloatTypes) {
+    // data: -0.3, -5.2, 0, 0.1, 123.12
+    auto check_result = [&](const std::vector<uint8_t>& index_bytes,
+                            const std::shared_ptr<arrow::DataType>& type,
+                            const std::vector<Literal>& literals) {
+        auto input_stream = std::make_shared<ByteArrayInputStream>(
+            reinterpret_cast<const char*>(index_bytes.data()), 
index_bytes.size());
+
+        BloomFilterFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(auto reader,
+                             
file_index.CreateReader(CreateArrowSchema(type).get(),
+                                                     /*start=*/0, 
/*length=*/index_bytes.size(),
+                                                     input_stream, pool_));
+        ASSERT_TRUE(reader);
+        for (const Literal& literal : literals) {
+            
ASSERT_TRUE(reader->VisitEqual(literal).value()->IsRemain().value());
+        }
+        ASSERT_OK_AND_ASSIGN(auto field_type, 
FieldTypeUtils::ConvertToFieldType(type->id()));
+        
ASSERT_TRUE(reader->VisitEqual(Literal(field_type)).value()->IsRemain().value());
+    };
+
+    {
+        std::vector<uint8_t> index_bytes = {0,   0, 0, 6,  21,  130, 128, 2,
+                                            106, 0, 8, 25, 165, 32,  10};
+        check_result(
+            index_bytes, arrow::float32(),
+            {Literal(-0.3f), Literal(-5.2f), Literal(0.0f), Literal(0.1f), 
Literal(123.12f)});
+    }
+    {
+        std::vector<uint8_t> index_bytes = {0, 0, 0, 6, 9, 6, 1, 48, 33, 6, 1, 
17, 105, 0, 21};
+        check_result(index_bytes, arrow::float64(),
+                     {Literal(-0.3), Literal(-5.2), Literal(0.0), 
Literal(0.1), Literal(123.12)});
+    }
+}
+
+TEST_F(BloomFilterIndexReaderTest, TestTimestampType) {
+    // data:
+    // 1745542802000lms, 123000ns
+    // 1745542902000lms, 123000ns
+    // 1745542602000lms, 123000ns
+    // -1745lms, 123000ns
+    // -1765lms, 123000ns
+    // 1745542802000lms, 123001ns
+    // -1725lms, 123000ns
+    std::vector<uint8_t> index_bytes = {0, 0, 0, 6, 37, 47, 72, 16, 193, 132, 
38, 199, 13, 46, 4};
+    auto input_stream = std::make_shared<ByteArrayInputStream>(
+        reinterpret_cast<char*>(index_bytes.data()), index_bytes.size());
+
+    BloomFilterFileIndex file_index({});
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        
file_index.CreateReader(CreateArrowSchema(arrow::timestamp(arrow::TimeUnit::NANO)).get(),
+                                /*start=*/0, /*length=*/index_bytes.size(), 
input_stream, pool_));
+    ASSERT_TRUE(reader);
+    ASSERT_TRUE(
+        reader->VisitEqual(Literal(Timestamp(1745542802000l, 
123000))).value()->IsRemain().value());
+    ASSERT_TRUE(
+        reader->VisitEqual(Literal(Timestamp(1745542902000l, 
123000))).value()->IsRemain().value());
+    ASSERT_TRUE(
+        reader->VisitEqual(Literal(Timestamp(1745542602000l, 
123000))).value()->IsRemain().value());
+    ASSERT_TRUE(reader->VisitEqual(Literal(Timestamp(-1745l, 
123000))).value()->IsRemain().value());
+    ASSERT_TRUE(reader->VisitEqual(Literal(Timestamp(-1765l, 
123000))).value()->IsRemain().value());
+    ASSERT_TRUE(
+        reader->VisitEqual(Literal(Timestamp(1745542802000l, 
123001))).value()->IsRemain().value());
+    ASSERT_TRUE(reader->VisitEqual(Literal(Timestamp(-1725l, 
123000))).value()->IsRemain().value());
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/file_index/bloomfilter/fast_hash.cpp 
b/src/paimon/common/file_index/bloomfilter/fast_hash.cpp
new file mode 100644
index 0000000..ed9738e
--- /dev/null
+++ b/src/paimon/common/file_index/bloomfilter/fast_hash.cpp
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bloomfilter/fast_hash.h"
+
+#include <cassert>
+#include <cstring>
+#include <string>
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/status.h"
+#include "xxhash.h"  // NOLINT(build/include_subdir)
+
+namespace paimon {
+Result<FastHash::HashFunction> FastHash::GetHashFunction(
+    const std::shared_ptr<arrow::DataType>& arrow_type) {
+    PAIMON_ASSIGN_OR_RAISE(FieldType field_type,
+                           
FieldTypeUtils::ConvertToFieldType(arrow_type->id()));
+    switch (field_type) {
+        case FieldType::TINYINT:
+            return HashFunction([](const Literal& literal) -> int64_t {
+                return 
GetLongHash(static_cast<int64_t>(literal.GetValue<int8_t>()));
+            });
+        case FieldType::SMALLINT:
+            return HashFunction([](const Literal& literal) -> int64_t {
+                return 
GetLongHash(static_cast<int64_t>(literal.GetValue<int16_t>()));
+            });
+        case FieldType::DATE:
+        case FieldType::INT:
+            return HashFunction([](const Literal& literal) -> int64_t {
+                return 
GetLongHash(static_cast<int64_t>(literal.GetValue<int32_t>()));
+            });
+        case FieldType::BIGINT:
+            return HashFunction([](const Literal& literal) -> int64_t {
+                return 
GetLongHash(static_cast<int64_t>(literal.GetValue<int64_t>()));
+            });
+        case FieldType::FLOAT:
+            return HashFunction([](const Literal& literal) -> int64_t {
+                auto raw_value = literal.GetValue<float>();
+                int32_t bits = 0;
+                std::memcpy(&bits, &raw_value, sizeof(raw_value));
+                return GetLongHash(bits);
+            });
+        case FieldType::DOUBLE:
+            return HashFunction([](const Literal& literal) -> int64_t {
+                auto raw_value = literal.GetValue<double>();
+                int64_t bits;
+                std::memcpy(&bits, &raw_value, sizeof(raw_value));
+                return GetLongHash(bits);
+            });
+        case FieldType::TIMESTAMP: {
+            auto ts_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(arrow_type);
+            int32_t precision = DateTimeUtils::GetPrecisionFromType(ts_type);
+            assert(precision >= 0);
+            return HashFunction([precision](const Literal& literal) -> int64_t 
{
+                int64_t value = 0;
+                if (precision <= Timestamp::MILLIS_PRECISION) {
+                    value = literal.GetValue<Timestamp>().GetMillisecond();
+                } else {
+                    value = literal.GetValue<Timestamp>().ToMicrosecond();
+                }
+                return GetLongHash(value);
+            });
+        }
+        case FieldType::STRING:
+        case FieldType::BINARY:
+            return HashFunction([](const Literal& literal) -> int64_t {
+                auto value = literal.GetValue<std::string>();
+                return Hash64(value.data(), value.size());
+            });
+        default:
+            return Status::Invalid(fmt::format("bloom filter index does not 
support {}",
+                                               
FieldTypeUtils::FieldTypeToString(field_type)));
+    }
+}
+
+int64_t FastHash::GetLongHash(int64_t key) {
+    // NOTE: This hash function must produce results identical to the Java 
implementation.
+    // Java uses two's-complement wrapping arithmetic and arithmetic 
right-shift (>>).
+
+    auto to_unsigned = [](int64_t v) -> uint64_t {
+        uint64_t result;
+        std::memcpy(&result, &v, sizeof(result));
+        return result;
+    };
+    auto to_signed = [](uint64_t v) -> int64_t {
+        int64_t result;
+        std::memcpy(&result, &v, sizeof(result));
+        return result;
+    };
+    // Arithmetic right-shift: shift unsigned then sign-extend from the top.
+    auto asr = [](uint64_t v, int32_t shift) -> uint64_t {
+        bool sign = (v >> 63) != 0;
+        uint64_t shifted = v >> shift;
+        if (sign) {
+            // Fill the top `shift` bits with 1s.
+            shifted |= ~(~static_cast<uint64_t>(0) >> shift);
+        }
+        return shifted;
+    };
+
+    uint64_t k = to_unsigned(key);
+    k = (~k) + (k << 21);  // key = (key << 21) - key - 1;
+    k = k ^ asr(k, 24);
+    k = (k + (k << 3)) + (k << 8);  // key * 265
+    k = k ^ asr(k, 14);
+    k = (k + (k << 2)) + (k << 4);  // key * 21
+    k = k ^ asr(k, 28);
+    k = k + (k << 31);
+    return to_signed(k);
+}
+
+int64_t FastHash::Hash64(const char* data, size_t length) {
+    return XXH64(data, length, /*seed=*/0);
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bloomfilter/fast_hash.h 
b/src/paimon/common/file_index/bloomfilter/fast_hash.h
new file mode 100644
index 0000000..1ca67fd
--- /dev/null
+++ b/src/paimon/common/file_index/bloomfilter/fast_hash.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <functional>
+#include <memory>
+
+#include "arrow/type.h"
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/result.h"
+
+namespace paimon {
+/// Hash literal to 64 bits hash code.
+class FastHash {
+ public:
+    FastHash() = delete;
+    ~FastHash() = delete;
+
+    // precondition: literal is not null
+    using HashFunction = std::function<int64_t(const Literal& literal)>;
+
+    static Result<HashFunction> GetHashFunction(const 
std::shared_ptr<arrow::DataType>& arrow_type);
+
+ private:
+    // Thomas Wang's integer hash function
+    // 
http://web.archive.org/web/20071223173210/http://www.concentric.net/~Ttwang/tech/inthash.htm
+    static int64_t GetLongHash(int64_t key);
+    static int64_t Hash64(const char* data, size_t length);
+};
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bloomfilter/fast_hash_test.cpp 
b/src/paimon/common/file_index/bloomfilter/fast_hash_test.cpp
new file mode 100644
index 0000000..680d591
--- /dev/null
+++ b/src/paimon/common/file_index/bloomfilter/fast_hash_test.cpp
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bloomfilter/fast_hash.h"
+
+#include <limits>
+#include <string>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/status.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+class FastHashTest : public ::testing::Test {
+ public:
+    void SetUp() override {}
+    void TearDown() override {}
+
+    void CheckResult(const FastHash::HashFunction& hash_function,
+                     const std::vector<Literal>& literals,
+                     const std::vector<uint64_t> expected_hash) {
+        ASSERT_EQ(literals.size(), expected_hash.size());
+        for (int32_t i = 0; i < static_cast<int32_t>(literals.size()); i++) {
+            ASSERT_EQ(expected_hash[i], hash_function(literals[i]));
+        }
+    }
+};
+
+TEST_F(FastHashTest, TestCompatibleWithJava) {
+    {
+        // test tiny int
+        ASSERT_OK_AND_ASSIGN(auto hash_function, 
FastHash::GetHashFunction(arrow::int8()));
+        CheckResult(hash_function,
+                    {Literal(static_cast<int8_t>(-128)), 
Literal(static_cast<int8_t>(-100)),
+                     Literal(static_cast<int8_t>(-1)), 
Literal(static_cast<int8_t>(0)),
+                     Literal(static_cast<int8_t>(1)), 
Literal(static_cast<int8_t>(10)),
+                     Literal(static_cast<int8_t>(127))},
+                    {0xe547e8444a8fcdd1, 0xdb213b4e3642747d, 
0x5bca868437950d03, 0x0,
+                     0x5bca7c69b794f8ce, 0x95ea2955abd45275, 
0x897cf79712f9ec7c});
+    }
+    {
+        // test small int
+        ASSERT_OK_AND_ASSIGN(auto hash_function, 
FastHash::GetHashFunction(arrow::int16()));
+        CheckResult(hash_function,
+                    {Literal(static_cast<int16_t>(-32768)), 
Literal(static_cast<int16_t>(-100)),
+                     Literal(static_cast<int16_t>(-1)), 
Literal(static_cast<int16_t>(0)),
+                     Literal(static_cast<int16_t>(1)), 
Literal(static_cast<int16_t>(10)),
+                     Literal(static_cast<int16_t>(32767))},
+                    {0x47ed1a480fd77cfb, 0xdb213b4e3642747d, 
0x5bca868437950d03, 0x0,
+                     0x5bca7c69b794f8ce, 0x95ea2955abd45275, 
0xe968161ed2cd74ae});
+    }
+    {
+        // test int
+        ASSERT_OK_AND_ASSIGN(auto hash_function, 
FastHash::GetHashFunction(arrow::int32()));
+        CheckResult(
+            hash_function,
+            {Literal(static_cast<int32_t>(-2147483648)), 
Literal(static_cast<int32_t>(-1034556)),
+             Literal(static_cast<int32_t>(-1)), 
Literal(static_cast<int32_t>(0)),
+             Literal(static_cast<int32_t>(1)), 
Literal(static_cast<int32_t>(49647)),
+             Literal(static_cast<int32_t>(2147483647))},
+            {0x111ec0fd6aa8626c, 0xfb5d8d6df66551c8, 0x5bca868437950d03, 0x0, 
0x5bca7c69b794f8ce,
+             0x94147f05a824e009, 0xc6d8bcc4d61c69a4});
+    }
+    {
+        // test date
+        ASSERT_OK_AND_ASSIGN(auto hash_function, 
FastHash::GetHashFunction(arrow::date32()));
+        CheckResult(
+            hash_function,
+            {Literal(static_cast<int32_t>(-2147483648)), 
Literal(static_cast<int32_t>(-1034556)),
+             Literal(static_cast<int32_t>(-1)), 
Literal(static_cast<int32_t>(0)),
+             Literal(static_cast<int32_t>(1)), 
Literal(static_cast<int32_t>(49647)),
+             Literal(static_cast<int32_t>(2147483647))},
+            {0x111ec0fd6aa8626c, 0xfb5d8d6df66551c8, 0x5bca868437950d03, 0x0, 
0x5bca7c69b794f8ce,
+             0x94147f05a824e009, 0xc6d8bcc4d61c69a4});
+    }
+    {
+        // test bit int
+        ASSERT_OK_AND_ASSIGN(auto hash_function, 
FastHash::GetHashFunction(arrow::int64()));
+        CheckResult(
+            hash_function,
+            
{Literal(static_cast<int64_t>(std::numeric_limits<int64_t>::min())),
+             Literal(static_cast<int64_t>(-4598654206466ll)), 
Literal(static_cast<int64_t>(-1)),
+             Literal(static_cast<int64_t>(0)), 
Literal(static_cast<int64_t>(1)),
+             Literal(static_cast<int64_t>(8548553896418ll)),
+             Literal(static_cast<int64_t>(9223372036854775807ll))},
+            {0x3be7d0f7780de548, 0xebd8376102414af8, 0x5bca868437950d03, 0x0, 
0x5bca7c69b794f8ce,
+             0xe7f3590a09b6693a, 0x81ad52718398e837});
+    }
+    {
+        // test Timestamp type
+        ASSERT_OK_AND_ASSIGN(auto hash_function,
+                             
FastHash::GetHashFunction(arrow::timestamp(arrow::TimeUnit::NANO)));
+        CheckResult(
+            hash_function,
+            {Literal(Timestamp(1745542802000l, 123000)), 
Literal(Timestamp(1745542902000l, 123000)),
+             Literal(Timestamp(1745542602000l, 123000)), 
Literal(Timestamp(-1745l, 123000)),
+             Literal(Timestamp(-1765l, 123000)), 
Literal(Timestamp(1745542802000l, 123000)),
+             Literal(Timestamp(-1725l, 123000))},
+            {0x3fa6477403e32e14, 0x1ec9ecd0a1b07aea, 0xd74122b773e5f45c, 
0xe7f352e64f55f259,
+             0xbe8a32867c820cf2, 0x3fa6477403e32e14, 0xd3ffe009a770b6a0});
+    }
+    {
+        // test float
+        ASSERT_OK_AND_ASSIGN(auto hash_function, 
FastHash::GetHashFunction(arrow::float32()));
+        CheckResult(hash_function,
+                    {Literal(std::numeric_limits<float>::lowest()), 
Literal(-123.45f),
+                     Literal(-12345.6f), Literal(0.0f), Literal(2.1f), 
Literal(345.12f),
+                     Literal(std::numeric_limits<float>::max())},
+                    {0xecf6796dd7355dbc, 0x1f4dcc5b8a502b70, 
0xe678035506c03314, 0x0,
+                     0x8df65966db697d6, 0xdc9b5b8828f877ee, 
0xf9b567bea590d8d});
+    }
+    {
+        // test double
+        ASSERT_OK_AND_ASSIGN(auto hash_function, 
FastHash::GetHashFunction(arrow::float64()));
+        CheckResult(hash_function,
+                    {Literal(std::numeric_limits<double>::lowest()), 
Literal(-123.45),
+                     Literal(-12345.6), Literal(0.0), Literal(2.1), 
Literal(345.12),
+                     Literal(std::numeric_limits<double>::max())},
+                    {0xb3c148792fed6cb9, 0x989ea602f70c211, 
0x8040a6007c7b22e0, 0x0,
+                     0xfca8098dd6548561, 0x477acb9b5361fc7f, 
0x939ba81c9dffe90});
+    }
+    {
+        // test string
+        ASSERT_OK_AND_ASSIGN(auto hash_function, 
FastHash::GetHashFunction(arrow::utf8()));
+        CheckResult(hash_function,
+                    {Literal(FieldType::STRING, "", 0), 
Literal(FieldType::STRING, "example", 7),
+                     Literal(FieldType::STRING, "Have a nice day!", 16)},
+                    {0xef46db3751d8e999, 0xe6eda53558c41c5e, 
0xe663266d57d776c2});
+    }
+    {
+        // test binary
+        ASSERT_OK_AND_ASSIGN(auto hash_function, 
FastHash::GetHashFunction(arrow::binary()));
+        std::string str = "我是一个粉刷匠";
+        CheckResult(
+            hash_function,
+            {Literal(FieldType::STRING, "", 0), Literal(FieldType::STRING, 
"example", 7),
+             Literal(FieldType::STRING, "Have a nice day!", 16),
+             Literal(FieldType::STRING, str.data(), str.size())},
+            {0xef46db3751d8e999, 0xe6eda53558c41c5e, 0xe663266d57d776c2, 
0x9f01fcd3f19877e6});
+    }
+    {
+        // test invalid type
+        ASSERT_NOK_WITH_MSG(FastHash::GetHashFunction(arrow::boolean()),
+                            "bloom filter index does not support BOOLEAN");
+    }
+}
+
+}  // namespace paimon::test
diff --git 
a/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.cpp 
b/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.cpp
new file mode 100644
index 0000000..49ab89e
--- /dev/null
+++ b/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.cpp
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.h"
+
+#include <cassert>
+#include <cstddef>
+
+#include "fmt/format.h"
+#include "paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/file_index/bitmap_index_result.h"
+#include "paimon/fs/file_system.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon {
+class MemoryPool;
+
+BitSliceIndexBitmapFileIndex::BitSliceIndexBitmapFileIndex(
+    const std::map<std::string, std::string>& options) {}
+
+Result<std::shared_ptr<FileIndexReader>> 
BitSliceIndexBitmapFileIndex::CreateReader(
+    ::ArrowSchema* c_arrow_schema, int32_t start, int32_t length,
+    const std::shared_ptr<InputStream>& input_stream,
+    const std::shared_ptr<MemoryPool>& pool) const {
+    PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::Schema> 
arrow_schema,
+                                      arrow::ImportSchema(c_arrow_schema));
+    if (arrow_schema->num_fields() != 1) {
+        return Status::Invalid(
+            "invalid schema for BitSliceIndexBitmapFileIndexReader, supposed 
to have single "
+            "field.");
+    }
+    auto arrow_type = arrow_schema->field(0)->type();
+
+    PAIMON_RETURN_NOT_OK(input_stream->Seek(start, SeekOrigin::FS_SEEK_SET));
+    auto bytes = std::make_unique<Bytes>(length, pool.get());
+    PAIMON_ASSIGN_OR_RAISE(int32_t actual_read_len,
+                           input_stream->Read(bytes->data(), bytes->size()));
+    if (static_cast<size_t>(actual_read_len) != bytes->size()) {
+        return Status::Invalid(
+            fmt::format("create reader for BitSliceIndexBitmapFileIndex 
failed, expected read len "
+                        "{}, actual read len {}",
+                        bytes->size(), actual_read_len));
+    }
+    auto byte_array_input_stream =
+        std::make_shared<ByteArrayInputStream>(bytes->data(), bytes->size());
+    DataInputStream data_input_stream(byte_array_input_stream);
+    PAIMON_ASSIGN_OR_RAISE(int8_t version, 
data_input_stream.ReadValue<int8_t>());
+    if (version > VERSION_1) {
+        return Status::Invalid(fmt::format(
+            "read bsi index file fail, do not support version {}, please 
update plugin version",
+            version));
+    }
+    PAIMON_ASSIGN_OR_RAISE(int32_t row_number, 
data_input_stream.ReadValue<int32_t>());
+    PAIMON_ASSIGN_OR_RAISE(bool has_positive, 
data_input_stream.ReadValue<bool>());
+    std::shared_ptr<BitSliceIndexRoaringBitmap> positive = 
BitSliceIndexRoaringBitmap::Empty();
+    if (has_positive) {
+        PAIMON_ASSIGN_OR_RAISE(positive,
+                               
BitSliceIndexRoaringBitmap::Create(byte_array_input_stream));
+    }
+    PAIMON_ASSIGN_OR_RAISE(bool has_negative, 
data_input_stream.ReadValue<bool>());
+    std::shared_ptr<BitSliceIndexRoaringBitmap> negative = 
BitSliceIndexRoaringBitmap::Empty();
+    if (has_negative) {
+        PAIMON_ASSIGN_OR_RAISE(negative,
+                               
BitSliceIndexRoaringBitmap::Create(byte_array_input_stream));
+    }
+    PAIMON_ASSIGN_OR_RAISE(BitSliceIndexBitmapFileIndex::ValueMapperType 
value_mapper,
+                           GetValueMapper(arrow_type));
+    return std::make_shared<BitSliceIndexBitmapFileIndexReader>(row_number, 
value_mapper, positive,
+                                                                negative);
+}
+
+// precondition, literal is not null
+Result<BitSliceIndexBitmapFileIndex::ValueMapperType> 
BitSliceIndexBitmapFileIndex::GetValueMapper(
+    const std::shared_ptr<arrow::DataType>& arrow_type) {
+    PAIMON_ASSIGN_OR_RAISE(FieldType field_type,
+                           
FieldTypeUtils::ConvertToFieldType(arrow_type->id()));
+    switch (field_type) {
+        case FieldType::TINYINT:
+            return BitSliceIndexBitmapFileIndex::ValueMapperType(
+                [](const Literal& literal) -> Result<int64_t> {
+                    return GetValueFromLiteral<int8_t>(literal);
+                });
+        case FieldType::SMALLINT:
+            return BitSliceIndexBitmapFileIndex::ValueMapperType(
+                [](const Literal& literal) -> Result<int64_t> {
+                    return GetValueFromLiteral<int16_t>(literal);
+                });
+        case FieldType::DATE:
+        case FieldType::INT:
+            return BitSliceIndexBitmapFileIndex::ValueMapperType(
+                [](const Literal& literal) -> Result<int64_t> {
+                    return GetValueFromLiteral<int32_t>(literal);
+                });
+        case FieldType::BIGINT:
+            return BitSliceIndexBitmapFileIndex::ValueMapperType(
+                [](const Literal& literal) -> Result<int64_t> {
+                    return GetValueFromLiteral<int64_t>(literal);
+                });
+        case FieldType::TIMESTAMP: {
+            auto ts_type = 
arrow::internal::checked_pointer_cast<arrow::TimestampType>(arrow_type);
+            int64_t precision = DateTimeUtils::GetPrecisionFromType(ts_type);
+            assert(precision >= 0);
+            return BitSliceIndexBitmapFileIndex::ValueMapperType(
+                [precision](const Literal& literal) -> Result<int64_t> {
+                    if (literal.IsNull()) {
+                        return Status::Invalid(
+                            "literal cannot be null when GetValue in 
BitSliceIndexBitmapFileIndex");
+                    }
+                    if (precision <= Timestamp::MILLIS_PRECISION) {
+                        return literal.GetValue<Timestamp>().GetMillisecond();
+                    }
+                    return literal.GetValue<Timestamp>().ToMicrosecond();
+                });
+        }
+        default:
+            // TODO(xinyu.lxy): support decimal
+            return Status::Invalid(
+                "BitSliceIndexBitmapFileIndex only support 
TINYINT/SMALLINT/INT/BIGINT/DATE");
+    }
+}
+
+BitSliceIndexBitmapFileIndexReader::BitSliceIndexBitmapFileIndexReader(
+    int32_t row_number, const BitSliceIndexBitmapFileIndex::ValueMapperType& 
value_mapper,
+    const std::shared_ptr<BitSliceIndexRoaringBitmap>& positive,
+    const std::shared_ptr<BitSliceIndexRoaringBitmap>& negative)
+    : row_number_(row_number),
+      value_mapper_(value_mapper),
+      positive_(positive),
+      negative_(negative) {}
+
+Result<std::shared_ptr<FileIndexResult>> 
BitSliceIndexBitmapFileIndexReader::VisitGreaterThan(
+    const Literal& literal) {
+    BitmapIndexResult::BitmapSupplier bitmap_supplier =
+        [literal = literal, reader = shared_from_this()]() -> 
Result<RoaringBitmap32> {
+        PAIMON_ASSIGN_OR_RAISE(int64_t value, reader->value_mapper_(literal));
+        if (value >= 0) {
+            return reader->positive_->GreaterThan(value);
+        } else {
+            PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 b1, 
reader->negative_->LessThan(-value));
+            RoaringBitmap32 b2 = reader->positive_->IsNotNull();
+            b1 |= b2;
+            return b1;
+        }
+    };
+    return std::make_shared<BitmapIndexResult>(bitmap_supplier);
+}
+
+Result<std::shared_ptr<FileIndexResult>> 
BitSliceIndexBitmapFileIndexReader::VisitGreaterOrEqual(
+    const Literal& literal) {
+    BitmapIndexResult::BitmapSupplier bitmap_supplier =
+        [literal = literal, reader = shared_from_this()]() -> 
Result<RoaringBitmap32> {
+        PAIMON_ASSIGN_OR_RAISE(int64_t value, reader->value_mapper_(literal));
+        if (value >= 0) {
+            return reader->positive_->GreaterOrEqual(value);
+        } else {
+            PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 b1, 
reader->negative_->LessOrEqual(-value));
+            RoaringBitmap32 b2 = reader->positive_->IsNotNull();
+            b1 |= b2;
+            return b1;
+        }
+    };
+    return std::make_shared<BitmapIndexResult>(bitmap_supplier);
+}
+
+Result<std::shared_ptr<FileIndexResult>> 
BitSliceIndexBitmapFileIndexReader::VisitLessThan(
+    const Literal& literal) {
+    BitmapIndexResult::BitmapSupplier bitmap_supplier =
+        [literal = literal, reader = shared_from_this()]() -> 
Result<RoaringBitmap32> {
+        PAIMON_ASSIGN_OR_RAISE(int64_t value, reader->value_mapper_(literal));
+        if (value < 0) {
+            return reader->negative_->GreaterThan(-value);
+        } else {
+            PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 b1, 
reader->positive_->LessThan(value));
+            RoaringBitmap32 b2 = reader->negative_->IsNotNull();
+            b1 |= b2;
+            return b1;
+        }
+    };
+    return std::make_shared<BitmapIndexResult>(bitmap_supplier);
+}
+Result<std::shared_ptr<FileIndexResult>> 
BitSliceIndexBitmapFileIndexReader::VisitLessOrEqual(
+    const Literal& literal) {
+    BitmapIndexResult::BitmapSupplier bitmap_supplier =
+        [literal = literal, reader = shared_from_this()]() -> 
Result<RoaringBitmap32> {
+        PAIMON_ASSIGN_OR_RAISE(int64_t value, reader->value_mapper_(literal));
+        if (value < 0) {
+            return reader->negative_->GreaterOrEqual(-value);
+        } else {
+            PAIMON_ASSIGN_OR_RAISE(RoaringBitmap32 b1, 
reader->positive_->LessOrEqual(value));
+            RoaringBitmap32 b2 = reader->negative_->IsNotNull();
+            b1 |= b2;
+            return b1;
+        }
+    };
+    return std::make_shared<BitmapIndexResult>(bitmap_supplier);
+}
+
+Result<std::shared_ptr<FileIndexResult>> 
BitSliceIndexBitmapFileIndexReader::VisitEqual(
+    const Literal& literal) {
+    return VisitIn({literal});
+}
+Result<std::shared_ptr<FileIndexResult>> 
BitSliceIndexBitmapFileIndexReader::VisitNotEqual(
+    const Literal& literal) {
+    return VisitNotIn({literal});
+}
+
+Result<std::shared_ptr<FileIndexResult>> 
BitSliceIndexBitmapFileIndexReader::VisitIn(
+    const std::vector<Literal>& literals) {
+    BitmapIndexResult::BitmapSupplier bitmap_supplier =
+        [literals = literals, reader = shared_from_this()]() -> 
Result<RoaringBitmap32> {
+        std::vector<RoaringBitmap32> result_bitmaps;
+        result_bitmaps.reserve(literals.size());
+        for (const auto& literal : literals) {
+            PAIMON_ASSIGN_OR_RAISE(int64_t value, 
reader->value_mapper_(literal));
+            RoaringBitmap32 equal;
+            if (value < 0) {
+                PAIMON_ASSIGN_OR_RAISE(equal, 
reader->negative_->Equal(-value));
+            } else {
+                PAIMON_ASSIGN_OR_RAISE(equal, reader->positive_->Equal(value));
+            }
+            result_bitmaps.emplace_back(std::move(equal));
+        }
+        return RoaringBitmap32::FastUnion(result_bitmaps);
+    };
+    return std::make_shared<BitmapIndexResult>(bitmap_supplier);
+}
+
+Result<std::shared_ptr<FileIndexResult>> 
BitSliceIndexBitmapFileIndexReader::VisitNotIn(
+    const std::vector<Literal>& literals) {
+    BitmapIndexResult::BitmapSupplier bitmap_supplier =
+        [literals = literals, reader = shared_from_this()]() -> 
Result<RoaringBitmap32> {
+        auto ebm =
+            RoaringBitmap32::Or(reader->positive_->IsNotNull(), 
reader->negative_->IsNotNull());
+        std::vector<RoaringBitmap32> result_bitmaps;
+        result_bitmaps.reserve(literals.size());
+        for (const auto& literal : literals) {
+            PAIMON_ASSIGN_OR_RAISE(int64_t value, 
reader->value_mapper_(literal));
+            RoaringBitmap32 equal;
+            if (value < 0) {
+                PAIMON_ASSIGN_OR_RAISE(equal, 
reader->negative_->Equal(-value));
+            } else {
+                PAIMON_ASSIGN_OR_RAISE(equal, reader->positive_->Equal(value));
+            }
+            result_bitmaps.emplace_back(std::move(equal));
+        }
+        auto in = RoaringBitmap32::FastUnion(result_bitmaps);
+        ebm -= in;
+        return ebm;
+    };
+    return std::make_shared<BitmapIndexResult>(bitmap_supplier);
+}
+
+Result<std::shared_ptr<FileIndexResult>> 
BitSliceIndexBitmapFileIndexReader::VisitIsNull() {
+    BitmapIndexResult::BitmapSupplier bitmap_supplier =
+        [reader = shared_from_this()]() -> Result<RoaringBitmap32> {
+        auto res =
+            RoaringBitmap32::Or(reader->positive_->IsNotNull(), 
reader->negative_->IsNotNull());
+        res.Flip(0, reader->row_number_);
+        return res;
+    };
+    return std::make_shared<BitmapIndexResult>(bitmap_supplier);
+}
+
+Result<std::shared_ptr<FileIndexResult>> 
BitSliceIndexBitmapFileIndexReader::VisitIsNotNull() {
+    BitmapIndexResult::BitmapSupplier bitmap_supplier =
+        [reader = shared_from_this()]() -> Result<RoaringBitmap32> {
+        return RoaringBitmap32::Or(reader->positive_->IsNotNull(), 
reader->negative_->IsNotNull());
+    };
+    return std::make_shared<BitmapIndexResult>(bitmap_supplier);
+}
+
+}  // namespace paimon
diff --git 
a/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.h 
b/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.h
new file mode 100644
index 0000000..e661ea2
--- /dev/null
+++ b/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.h
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/c/bridge.h"
+#include "arrow/type.h"
+#include "paimon/common/utils/arrow/status_utils.h"
+#include "paimon/file_index/file_index_reader.h"
+#include "paimon/file_index/file_index_result.h"
+#include "paimon/file_index/file_indexer.h"
+#include "paimon/predicate/literal.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+
+namespace paimon {
+class BitSliceIndexRoaringBitmap;
+class InputStream;
+class MemoryPool;
+enum class FieldType;
+
+/// implementation of BSI file index.
+class BitSliceIndexBitmapFileIndex : public FileIndexer {
+ public:
+    explicit BitSliceIndexBitmapFileIndex(const std::map<std::string, 
std::string>& options);
+    ~BitSliceIndexBitmapFileIndex() override = default;
+
+    Result<std::shared_ptr<FileIndexReader>> CreateReader(
+        ::ArrowSchema* arrow_schema, int32_t start, int32_t length,
+        const std::shared_ptr<InputStream>& input_stream,
+        const std::shared_ptr<MemoryPool>& pool) const override;
+
+    Result<std::shared_ptr<FileIndexWriter>> CreateWriter(
+        ::ArrowSchema* arrow_schema, const std::shared_ptr<MemoryPool>& pool) 
const override {
+        PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr<arrow::DataType> 
arrow_type,
+                                          arrow::ImportType(arrow_schema));
+        return Status::NotImplemented("do not support index writer in bsi");
+    }
+
+    using ValueMapperType = std::function<Result<int64_t>(const Literal& 
literal)>;
+
+ private:
+    static Result<ValueMapperType> GetValueMapper(
+        const std::shared_ptr<arrow::DataType>& arrow_type);
+
+    template <typename T>
+    static Result<int64_t> GetValueFromLiteral(const Literal& literal) {
+        if (literal.IsNull()) {
+            return Status::Invalid(
+                "literal cannot be null when GetValue in 
BitSliceIndexBitmapFileIndex");
+        }
+        return static_cast<int64_t>(literal.GetValue<T>());
+    }
+
+ private:
+    static constexpr int8_t VERSION_1 = 1;
+};
+
+class BitSliceIndexBitmapFileIndexReader
+    : public FileIndexReader,
+      public std::enable_shared_from_this<BitSliceIndexBitmapFileIndexReader> {
+ public:
+    BitSliceIndexBitmapFileIndexReader(
+        int32_t row_number, const 
BitSliceIndexBitmapFileIndex::ValueMapperType& value_mapper,
+        const std::shared_ptr<BitSliceIndexRoaringBitmap>& positive,
+        const std::shared_ptr<BitSliceIndexRoaringBitmap>& negative);
+
+    Result<std::shared_ptr<FileIndexResult>> VisitGreaterThan(const Literal& 
literal) override;
+    Result<std::shared_ptr<FileIndexResult>> VisitGreaterOrEqual(const 
Literal& literal) override;
+    Result<std::shared_ptr<FileIndexResult>> VisitLessThan(const Literal& 
literal) override;
+    Result<std::shared_ptr<FileIndexResult>> VisitLessOrEqual(const Literal& 
literal) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitEqual(const Literal& 
literal) override;
+    Result<std::shared_ptr<FileIndexResult>> VisitNotEqual(const Literal& 
literal) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitIn(const 
std::vector<Literal>& literals) override;
+    Result<std::shared_ptr<FileIndexResult>> VisitNotIn(
+        const std::vector<Literal>& literals) override;
+
+    Result<std::shared_ptr<FileIndexResult>> VisitIsNull() override;
+    Result<std::shared_ptr<FileIndexResult>> VisitIsNotNull() override;
+
+ private:
+    int32_t row_number_;
+    BitSliceIndexBitmapFileIndex::ValueMapperType value_mapper_;
+    std::shared_ptr<BitSliceIndexRoaringBitmap> positive_;
+    std::shared_ptr<BitSliceIndexRoaringBitmap> negative_;
+};
+}  // namespace paimon
diff --git 
a/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index_factory.cpp
 
b/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index_factory.cpp
new file mode 100644
index 0000000..0afa35e
--- /dev/null
+++ 
b/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index_factory.cpp
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include 
"paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index_factory.h"
+
+#include <utility>
+
+#include "paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.h"
+#include "paimon/factories/factory.h"
+
+namespace paimon {
+
+const char BitSliceIndexBitmapFileIndexFactory::IDENTIFIER[] = "bsi";
+
+Result<std::unique_ptr<FileIndexer>> 
BitSliceIndexBitmapFileIndexFactory::Create(
+    const std::map<std::string, std::string>& options) const {
+    return std::make_unique<BitSliceIndexBitmapFileIndex>(options);
+}
+
+REGISTER_PAIMON_FACTORY(BitSliceIndexBitmapFileIndexFactory);
+
+}  // namespace paimon
diff --git 
a/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index_factory.h 
b/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index_factory.h
new file mode 100644
index 0000000..7a10483
--- /dev/null
+++ 
b/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index_factory.h
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <map>
+#include <memory>
+#include <string>
+
+#include "paimon/file_index/file_indexer.h"
+#include "paimon/file_index/file_indexer_factory.h"
+#include "paimon/result.h"
+
+namespace paimon {
+
+class BitSliceIndexBitmapFileIndexFactory : public FileIndexerFactory {
+ public:
+    static const char IDENTIFIER[];
+
+    const char* Identifier() const override {
+        return IDENTIFIER;
+    }
+    Result<std::unique_ptr<FileIndexer>> Create(
+        const std::map<std::string, std::string>& options) const override;
+};
+
+}  // namespace paimon
diff --git 
a/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index_test.cpp 
b/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index_test.cpp
new file mode 100644
index 0000000..b678ba3
--- /dev/null
+++ 
b/src/paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index_test.cpp
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bsi/bit_slice_index_bitmap_file_index.h"
+
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/field_type_utils.h"
+#include "paimon/data/timestamp.h"
+#include "paimon/defs.h"
+#include "paimon/file_index/bitmap_index_result.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon::test {
+class BitSliceIndexBitmapIndexReaderTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        pool_ = GetDefaultPool();
+    }
+    void TearDown() override {
+        pool_.reset();
+    }
+
+    std::unique_ptr<::ArrowSchema> CreateArrowSchema(
+        const std::shared_ptr<arrow::DataType>& data_type) const {
+        auto schema = arrow::schema({arrow::field("f0", data_type)});
+        auto c_schema = std::make_unique<::ArrowSchema>();
+        EXPECT_TRUE(arrow::ExportSchema(*schema, c_schema.get()).ok());
+        return c_schema;
+    }
+
+    void CheckResult(const std::shared_ptr<FileIndexResult>& result,
+                     const std::vector<int32_t>& expected) const {
+        auto typed_result = 
std::dynamic_pointer_cast<BitmapIndexResult>(result);
+        ASSERT_TRUE(typed_result);
+        ASSERT_OK_AND_ASSIGN(const RoaringBitmap32* bitmap, 
typed_result->GetBitmap());
+        ASSERT_TRUE(bitmap);
+        ASSERT_EQ(*(typed_result->GetBitmap().value()), 
RoaringBitmap32::From(expected))
+            << "result=" << typed_result->GetBitmap().value()->ToString()
+            << ", expected=" << RoaringBitmap32::From(expected).ToString();
+    }
+
+ private:
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+TEST_F(BitSliceIndexBitmapIndexReaderTest, TestMix) {
+    // data: 1, 2, null, -2, -2, -1, null, 2, 0, 5, null
+    std::vector<char> index_bytes = {
+        1, 0, 0, 0,  11, 1,  1,  0, 0, 0, 0,  0, 0, 0, 0, 0,  0,  0,  0,  0,  
0, 0, 5, 58, 48,
+        0, 0, 1, 0,  0,  0,  0,  0, 4, 0, 16, 0, 0, 0, 0, 0,  1,  0,  7,  0,  
8, 0, 9, 0,  0,
+        0, 0, 3, 58, 48, 0,  0,  1, 0, 0, 0,  0, 0, 1, 0, 16, 0,  0,  0,  0,  
0, 9, 0, 58, 48,
+        0, 0, 1, 0,  0,  0,  0,  0, 1, 0, 16, 0, 0, 0, 1, 0,  7,  0,  58, 48, 
0, 0, 1, 0,  0,
+        0, 0, 0, 0,  0,  16, 0,  0, 0, 9, 0,  1, 1, 0, 0, 0,  0,  0,  0,  0,  
0, 0, 0, 0,  0,
+        0, 0, 0, 2,  58, 48, 0,  0, 1, 0, 0,  0, 0, 0, 2, 0,  16, 0,  0,  0,  
3, 0, 4, 0,  5,
+        0, 0, 0, 0,  2,  58, 48, 0, 0, 1, 0,  0, 0, 0, 0, 0,  0,  16, 0,  0,  
0, 5, 0, 58, 48,
+        0, 0, 1, 0,  0,  0,  0,  0, 1, 0, 16, 0, 0, 0, 3, 0,  4,  0};
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(index_bytes.data(), 
index_bytes.size());
+    BitSliceIndexBitmapFileIndex file_index({});
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        file_index.CreateReader(CreateArrowSchema(arrow::int32()).get(),
+                                /*start=*/0, /*length=*/index_bytes.size(), 
input_stream, pool_));
+    // test equal
+    CheckResult(reader->VisitEqual(Literal(2)).value(), {1, 7});
+    CheckResult(reader->VisitEqual(Literal(-2)).value(), {3, 4});
+    CheckResult(reader->VisitEqual(Literal(100)).value(), {});
+
+    // test not equal
+    CheckResult(reader->VisitNotEqual(Literal(2)).value(), {0, 3, 4, 5, 8, 9});
+    CheckResult(reader->VisitNotEqual(Literal(-2)).value(), {0, 1, 5, 7, 8, 
9});
+    CheckResult(reader->VisitNotEqual(Literal(100)).value(), {0, 1, 3, 4, 5, 
7, 8, 9});
+
+    // test in
+    CheckResult(reader->VisitIn({Literal(-1), Literal(1), Literal(2), 
Literal(3)}).value(),
+                {0, 1, 5, 7});
+
+    // test not in
+    CheckResult(reader->VisitNotIn({Literal(-1), Literal(1), Literal(2), 
Literal(3)}).value(),
+                {3, 4, 8, 9});
+
+    // test null
+    CheckResult(reader->VisitIsNull().value(), {2, 6, 10});
+
+    // test not null
+    CheckResult(reader->VisitIsNotNull().value(), {0, 1, 3, 4, 5, 7, 8, 9});
+
+    // test less than
+    CheckResult(reader->VisitLessThan(Literal(2)).value(), {0, 3, 4, 5, 8});
+    CheckResult(reader->VisitLessOrEqual(Literal(2)).value(), {0, 1, 3, 4, 5, 
7, 8});
+    CheckResult(reader->VisitLessThan(Literal(-1)).value(), {3, 4});
+    CheckResult(reader->VisitLessOrEqual(Literal(-1)).value(), {3, 4, 5});
+
+    // test greater than
+    CheckResult(reader->VisitGreaterThan(Literal(-2)).value(), {0, 1, 5, 7, 8, 
9});
+    CheckResult(reader->VisitGreaterOrEqual(Literal(-2)).value(), {0, 1, 3, 4, 
5, 7, 8, 9});
+    CheckResult(reader->VisitGreaterThan(Literal(2)).value(), {9});
+    CheckResult(reader->VisitGreaterOrEqual(Literal(2)).value(), {1, 7, 9});
+}
+
+TEST_F(BitSliceIndexBitmapIndexReaderTest, TestPositiveOnly) {
+    // data: 0, 1, null, 3, 4, 5, 6, 0, null
+    std::vector<char> index_bytes = {
+        1,  0,  0, 0, 9, 1, 1, 0, 0,  0, 0, 0,  0,  0, 0, 0, 0, 0, 0, 0,  0,  
0, 6,
+        59, 48, 0, 0, 1, 0, 0, 6, 0,  2, 0, 0,  0,  1, 0, 3, 0, 4, 0, 0,  0,  
0, 3,
+        58, 48, 0, 0, 1, 0, 0, 0, 0,  0, 2, 0,  16, 0, 0, 0, 1, 0, 3, 0,  5,  
0, 58,
+        48, 0,  0, 1, 0, 0, 0, 0, 0,  1, 0, 16, 0,  0, 0, 3, 0, 6, 0, 58, 48, 
0, 0,
+        1,  0,  0, 0, 0, 0, 2, 0, 16, 0, 0, 0,  4,  0, 5, 0, 6, 0, 0};
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(index_bytes.data(), 
index_bytes.size());
+    BitSliceIndexBitmapFileIndex file_index({});
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        file_index.CreateReader(CreateArrowSchema(arrow::int32()).get(),
+                                /*start=*/0, /*length=*/index_bytes.size(), 
input_stream, pool_));
+    // test equal
+    CheckResult(reader->VisitEqual(Literal(0)).value(), {0, 7});
+    CheckResult(reader->VisitEqual(Literal(1)).value(), {1});
+    CheckResult(reader->VisitEqual(Literal(-1)).value(), {});
+
+    // test not equal
+    CheckResult(reader->VisitNotEqual(Literal(2)).value(), {0, 1, 3, 4, 5, 6, 
7});
+    CheckResult(reader->VisitNotEqual(Literal(-2)).value(), {0, 1, 3, 4, 5, 6, 
7});
+    CheckResult(reader->VisitNotEqual(Literal(3)).value(), {0, 1, 4, 5, 6, 7});
+
+    // test in
+    CheckResult(reader->VisitIn({Literal(-1), Literal(1), Literal(2), 
Literal(3)}).value(), {1, 3});
+
+    // test not in
+    CheckResult(reader->VisitNotIn({Literal(-1), Literal(1), Literal(2), 
Literal(3)}).value(),
+                {0, 4, 5, 6, 7});
+
+    // test null
+    CheckResult(reader->VisitIsNull().value(), {2, 8});
+
+    // test not null
+    CheckResult(reader->VisitIsNotNull().value(), {0, 1, 3, 4, 5, 6, 7});
+
+    // test less than
+    CheckResult(reader->VisitLessThan(Literal(3)).value(), {0, 1, 7});
+    CheckResult(reader->VisitLessOrEqual(Literal(3)).value(), {0, 1, 3, 7});
+    CheckResult(reader->VisitLessThan(Literal(-1)).value(), {});
+    CheckResult(reader->VisitLessOrEqual(Literal(-1)).value(), {});
+
+    // test greater than
+    CheckResult(reader->VisitGreaterThan(Literal(-2)).value(), {0, 1, 3, 4, 5, 
6, 7});
+    CheckResult(reader->VisitGreaterOrEqual(Literal(-2)).value(), {0, 1, 3, 4, 
5, 6, 7});
+    CheckResult(reader->VisitGreaterThan(Literal(1)).value(), {3, 4, 5, 6});
+    CheckResult(reader->VisitGreaterOrEqual(Literal(1)).value(), {1, 3, 4, 5, 
6});
+}
+
+TEST_F(BitSliceIndexBitmapIndexReaderTest, TestNegativeOnly) {
+    // data: null, -1, null, -3, -4, -5, -6, -1, null
+    std::vector<char> index_bytes = {
+        1,  0,  0,  0,  9, 0, 1, 1, 0, 0, 0, 0,  0, 0,  0,  0, 0, 0, 0, 0, 0, 
0, 0,
+        6,  59, 48, 0,  0, 1, 0, 0, 5, 0, 2, 0,  1, 0,  0,  0, 3, 0, 4, 0, 0, 
0, 0,
+        3,  58, 48, 0,  0, 1, 0, 0, 0, 0, 0, 3,  0, 16, 0,  0, 0, 1, 0, 3, 0, 
5, 0,
+        7,  0,  58, 48, 0, 0, 1, 0, 0, 0, 0, 0,  1, 0,  16, 0, 0, 0, 3, 0, 6, 
0, 58,
+        48, 0,  0,  1,  0, 0, 0, 0, 0, 2, 0, 16, 0, 0,  0,  4, 0, 5, 0, 6, 0};
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(index_bytes.data(), 
index_bytes.size());
+    BitSliceIndexBitmapFileIndex file_index({});
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        file_index.CreateReader(CreateArrowSchema(arrow::int32()).get(),
+                                /*start=*/0, /*length=*/index_bytes.size(), 
input_stream, pool_));
+    // test equal
+    CheckResult(reader->VisitEqual(Literal(1)).value(), {});
+    CheckResult(reader->VisitEqual(Literal(-2)).value(), {});
+    CheckResult(reader->VisitEqual(Literal(-1)).value(), {1, 7});
+
+    // test not equal
+    CheckResult(reader->VisitNotEqual(Literal(2)).value(), {1, 3, 4, 5, 6, 7});
+    CheckResult(reader->VisitNotEqual(Literal(-2)).value(), {1, 3, 4, 5, 6, 
7});
+    CheckResult(reader->VisitNotEqual(Literal(-3)).value(), {1, 4, 5, 6, 7});
+
+    // test in
+    CheckResult(reader->VisitIn({Literal(-1), Literal(-4), Literal(-2), 
Literal(3)}).value(),
+                {1, 4, 7});
+
+    // test not in
+    CheckResult(reader->VisitNotIn({Literal(-1), Literal(-4), Literal(-2), 
Literal(3)}).value(),
+                {3, 5, 6});
+
+    // test null
+    CheckResult(reader->VisitIsNull().value(), {0, 2, 8});
+
+    // test not null
+    CheckResult(reader->VisitIsNotNull().value(), {1, 3, 4, 5, 6, 7});
+
+    // test less than
+    CheckResult(reader->VisitLessThan(Literal(-3)).value(), {4, 5, 6});
+    CheckResult(reader->VisitLessOrEqual(Literal(-3)).value(), {3, 4, 5, 6});
+    CheckResult(reader->VisitLessThan(Literal(1)).value(), {1, 3, 4, 5, 6, 7});
+    CheckResult(reader->VisitLessOrEqual(Literal(1)).value(), {1, 3, 4, 5, 6, 
7});
+
+    // test greater than
+    CheckResult(reader->VisitGreaterThan(Literal(-3)).value(), {1, 7});
+    CheckResult(reader->VisitGreaterOrEqual(Literal(-3)).value(), {1, 3, 7});
+    CheckResult(reader->VisitGreaterThan(Literal(1)).value(), {});
+    CheckResult(reader->VisitGreaterOrEqual(Literal(1)).value(), {});
+}
+
+TEST_F(BitSliceIndexBitmapIndexReaderTest, TestPrimitiveType) {
+    // data: null, 1, null, 2, -1
+    auto check_result = [&](const std::shared_ptr<arrow::DataType>& type, 
const Literal& literal) {
+        std::vector<char> index_bytes = {
+            1,  0, 0, 0,  5,  1,  1,  0, 0, 0, 0, 0,  0, 0, 0, 0,  0, 0,  0,  
0, 0, 0, 2, 58,
+            48, 0, 0, 1,  0,  0,  0,  0, 0, 1, 0, 16, 0, 0, 0, 1,  0, 3,  0,  
0, 0, 0, 2, 58,
+            48, 0, 0, 1,  0,  0,  0,  0, 0, 0, 0, 16, 0, 0, 0, 1,  0, 58, 48, 
0, 0, 1, 0, 0,
+            0,  0, 0, 0,  0,  16, 0,  0, 0, 3, 0, 1,  1, 0, 0, 0,  0, 0,  0,  
0, 0, 0, 0, 0,
+            0,  0, 0, 0,  1,  58, 48, 0, 0, 1, 0, 0,  0, 0, 0, 0,  0, 16, 0,  
0, 0, 4, 0, 0,
+            0,  0, 1, 58, 48, 0,  0,  1, 0, 0, 0, 0,  0, 0, 0, 16, 0, 0,  0,  
4, 0};
+        auto input_stream =
+            std::make_shared<ByteArrayInputStream>(index_bytes.data(), 
index_bytes.size());
+        BitSliceIndexBitmapFileIndex file_index({});
+        ASSERT_OK_AND_ASSIGN(auto reader,
+                             
file_index.CreateReader(CreateArrowSchema(type).get(),
+                                                     /*start=*/0, 
/*length=*/index_bytes.size(),
+                                                     input_stream, pool_));
+        CheckResult(reader->VisitIsNull().value(), {0, 2});
+        CheckResult(reader->VisitIsNotNull().value(), {1, 3, 4});
+        CheckResult(reader->VisitGreaterThan(literal).value(), {3});
+        CheckResult(reader->VisitLessOrEqual(literal).value(), {1, 4});
+        // test invalid case
+        ASSERT_OK_AND_ASSIGN(auto field_type, 
FieldTypeUtils::ConvertToFieldType(type->id()));
+        
ASSERT_NOK_WITH_MSG(reader->VisitEqual(Literal(field_type)).value()->IsRemain(),
+                            "literal cannot be null when GetValue in 
BitSliceIndexBitmapFileIndex");
+    };
+    check_result(arrow::int8(), Literal(static_cast<int8_t>(1)));
+    check_result(arrow::int16(), Literal(static_cast<int16_t>(1)));
+    check_result(arrow::int32(), Literal(static_cast<int32_t>(1)));
+    check_result(arrow::date32(), Literal(FieldType::DATE, 1));
+    check_result(arrow::int64(), Literal(static_cast<int64_t>(1)));
+    check_result(arrow::timestamp(arrow::TimeUnit::NANO), Literal(Timestamp(0, 
1000)));
+}
+
+TEST_F(BitSliceIndexBitmapIndexReaderTest, TestTimestampType) {
+    std::vector<char> index_bytes = {
+        1,  0,  0,  0,  8,  1,  1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  6,  
51,  -113, -32, -99, 41,
+        -5, 58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  3,  0,  16, 0,  0,  0,  0, 
  0,    1,   0,   2,
+        0,  6,  0,  0,  0,  0,  51, 58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  3, 
  0,    16,  0,   0,
+        0,  0,  0,  1,  0,  2,  0,  6,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0, 
  0,    3,   0,   16,
+        0,  0,  0,  0,  0,  1,  0,  2,  0,  6,  0,  58, 48, 0,  0,  0,  0,  0, 
  0,    58,  48,  0,
+        0,  1,  0,  0,  0,  0,  0,  3,  0,  16, 0,  0,  0,  0,  0,  1,  0,  2, 
  0,    6,   0,   58,
+        48, 0,  0,  1,  0,  0,  0,  0,  0,  3,  0,  16, 0,  0,  0,  0,  0,  1, 
  0,    2,   0,   6,
+        0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  3,  0,  16, 0,  0,  0,  0, 
  0,    1,   0,   2,
+        0,  6,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  3,  0,  16, 0,  0, 
  0,    0,   0,   1,
+        0,  2,  0,  6,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  3,  0,  
16,  0,    0,   0,   0,
+        0,  1,  0,  2,  0,  6,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  0, 
  0,    16,  0,   0,
+        0,  1,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  0,  0,  16, 0,  0, 
  0,    2,   0,   58,
+        48, 0,  0,  1,  0,  0,  0,  0,  0,  0,  0,  16, 0,  0,  0,  2,  0,  
58,  48,   0,   0,   1,
+        0,  0,  0,  0,  0,  2,  0,  16, 0,  0,  0,  0,  0,  1,  0,  6,  0,  
58,  48,   0,   0,   0,
+        0,  0,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  0,  0,  16, 0,  0, 
  0,    1,   0,   58,
+        48, 0,  0,  1,  0,  0,  0,  0,  0,  1,  0,  16, 0,  0,  0,  0,  0,  6, 
  0,    58,  48,  0,
+        0,  1,  0,  0,  0,  0,  0,  0,  0,  16, 0,  0,  0,  2,  0,  58, 48, 0, 
  0,    1,   0,   0,
+        0,  0,  0,  3,  0,  16, 0,  0,  0,  0,  0,  1,  0,  2,  0,  6,  0,  
58,  48,   0,   0,   1,
+        0,  0,  0,  0,  0,  2,  0,  16, 0,  0,  0,  0,  0,  2,  0,  6,  0,  
58,  48,   0,   0,   1,
+        0,  0,  0,  0,  0,  2,  0,  16, 0,  0,  0,  0,  0,  1,  0,  6,  0,  
58,  48,   0,   0,   1,
+        0,  0,  0,  0,  0,  1,  0,  16, 0,  0,  0,  1,  0,  2,  0,  58, 48, 0, 
  0,    1,   0,   0,
+        0,  0,  0,  1,  0,  16, 0,  0,  0,  1,  0,  2,  0,  58, 48, 0,  0,  1, 
  0,    0,   0,   0,
+        0,  2,  0,  16, 0,  0,  0,  0,  0,  2,  0,  6,  0,  58, 48, 0,  0,  0, 
  0,    0,   0,   58,
+        48, 0,  0,  1,  0,  0,  0,  0,  0,  3,  0,  16, 0,  0,  0,  0,  0,  1, 
  0,    2,   0,   6,
+        0,  58, 48, 0,  0,  0,  0,  0,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0, 
  0,    2,   0,   16,
+        0,  0,  0,  0,  0,  2,  0,  6,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0, 
  0,    0,   0,   16,
+        0,  0,  0,  2,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  2,  0,  
16,  0,    0,   0,   0,
+        0,  2,  0,  6,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  1,  0,  
16,  0,    0,   0,   0,
+        0,  6,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  0,  0,  16, 0,  0, 
  0,    1,   0,   58,
+        48, 0,  0,  1,  0,  0,  0,  0,  0,  3,  0,  16, 0,  0,  0,  0,  0,  1, 
  0,    2,   0,   6,
+        0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  3,  0,  16, 0,  0,  0,  0, 
  0,    1,   0,   2,
+        0,  6,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  3,  0,  16, 0,  0, 
  0,    0,   0,   1,
+        0,  2,  0,  6,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  3,  0,  
16,  0,    0,   0,   0,
+        0,  1,  0,  2,  0,  6,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  3, 
  0,    16,  0,   0,
+        0,  0,  0,  1,  0,  2,  0,  6,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0, 
  0,    3,   0,   16,
+        0,  0,  0,  0,  0,  1,  0,  2,  0,  6,  0,  58, 48, 0,  0,  0,  0,  0, 
  0,    58,  48,  0,
+        0,  0,  0,  0,  0,  58, 48, 0,  0,  0,  0,  0,  0,  58, 48, 0,  0,  1, 
  0,    0,   0,   0,
+        0,  3,  0,  16, 0,  0,  0,  0,  0,  1,  0,  2,  0,  6,  0,  58, 48, 0, 
  0,    1,   0,   0,
+        0,  0,  0,  3,  0,  16, 0,  0,  0,  0,  0,  1,  0,  2,  0,  6,  0,  
58,  48,   0,   0,   1,
+        0,  0,  0,  0,  0,  3,  0,  16, 0,  0,  0,  0,  0,  1,  0,  2,  0,  6, 
  0,    58,  48,  0,
+        0,  0,  0,  0,  0,  58, 48, 0,  0,  0,  0,  0,  0,  58, 48, 0,  0,  1, 
  0,    0,   0,   0,
+        0,  3,  0,  16, 0,  0,  0,  0,  0,  1,  0,  2,  0,  6,  0,  58, 48, 0, 
  0,    1,   0,   0,
+        0,  0,  0,  3,  0,  16, 0,  0,  0,  0,  0,  1,  0,  2,  0,  6,  0,  
58,  48,   0,   0,   0,
+        0,  0,  0,  58, 48, 0,  0,  0,  0,  0,  0,  58, 48, 0,  0,  0,  0,  0, 
  0,    58,  48,  0,
+        0,  1,  0,  0,  0,  0,  0,  3,  0,  16, 0,  0,  0,  0,  0,  1,  0,  2, 
  0,    6,   0,   58,
+        48, 0,  0,  1,  0,  0,  0,  0,  0,  3,  0,  16, 0,  0,  0,  0,  0,  1, 
  0,    2,   0,   6,
+        0,  1,  1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  26, 
-18, 13,   58,  48,  0,
+        0,  1,  0,  0,  0,  0,  0,  2,  0,  16, 0,  0,  0,  3,  0,  4,  0,  7, 
  0,    0,   0,   0,
+        21, 58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  2,  0,  16, 0,  0,  0,  3, 
  0,    4,   0,   7,
+        0,  58, 48, 0,  0,  0,  0,  0,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0, 
  0,    2,   0,   16,
+        0,  0,  0,  3,  0,  4,  0,  7,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0, 
  0,    2,   0,   16,
+        0,  0,  0,  3,  0,  4,  0,  7,  0,  58, 48, 0,  0,  0,  0,  0,  0,  
58,  48,   0,   0,   1,
+        0,  0,  0,  0,  0,  0,  0,  16, 0,  0,  0,  3,  0,  58, 48, 0,  0,  1, 
  0,    0,   0,   0,
+        0,  1,  0,  16, 0,  0,  0,  3,  0,  7,  0,  58, 48, 0,  0,  1,  0,  0, 
  0,    0,   0,   1,
+        0,  16, 0,  0,  0,  3,  0,  7,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0, 
  0,    1,   0,   16,
+        0,  0,  0,  3,  0,  7,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  1, 
  0,    16,  0,   0,
+        0,  3,  0,  4,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  1,  0,  
16,  0,    0,   0,   3,
+        0,  4,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  1,  0,  16, 0,  0, 
  0,    3,   0,   4,
+        0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  1,  0,  16, 0,  0,  0,  3, 
  0,    7,   0,   58,
+        48, 0,  0,  1,  0,  0,  0,  0,  0,  0,  0,  16, 0,  0,  0,  4,  0,  
58,  48,   0,   0,   1,
+        0,  0,  0,  0,  0,  1,  0,  16, 0,  0,  0,  4,  0,  7,  0,  58, 48, 0, 
  0,    1,   0,   0,
+        0,  0,  0,  1,  0,  16, 0,  0,  0,  3,  0,  4,  0,  58, 48, 0,  0,  0, 
  0,    0,   0,   58,
+        48, 0,  0,  1,  0,  0,  0,  0,  0,  2,  0,  16, 0,  0,  0,  3,  0,  4, 
  0,    7,   0,   58,
+        48, 0,  0,  0,  0,  0,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  2, 
  0,    16,  0,   0,
+        0,  3,  0,  4,  0,  7,  0,  58, 48, 0,  0,  1,  0,  0,  0,  0,  0,  2, 
  0,    16,  0,   0,
+        0,  3,  0,  4,  0,  7,  0};
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(index_bytes.data(), 
index_bytes.size());
+    BitSliceIndexBitmapFileIndex file_index({});
+    ASSERT_OK_AND_ASSIGN(
+        auto reader,
+        
file_index.CreateReader(CreateArrowSchema(arrow::timestamp(arrow::TimeUnit::NANO)).get(),
+                                /*start=*/0, /*length=*/index_bytes.size(), 
input_stream, pool_));
+    // data:
+    // 1745542802000lms, 123000ns
+    // 1745542902000lms, 123000ns
+    // 1745542602000lms, 123000ns
+    // -1745lms, 123000ns
+    // -1765lms, 123000ns
+    // null
+    // 1745542802000lms, 123001ns
+    // -1725lms, 123000ns
+    CheckResult(reader->VisitIsNull().value(), {5});
+    CheckResult(reader->VisitIsNotNull().value(), {0, 1, 2, 3, 4, 6, 7});
+    // as timestamp is normalized by micro seconds, there is a loss of 
precision in the nanosecond
+    // part
+    CheckResult(reader->VisitGreaterThan(Literal(Timestamp(1745542802000l, 
123000))).value(), {1});
+    CheckResult(reader->VisitGreaterOrEqual(Literal(Timestamp(1745542802000l, 
123000))).value(),
+                {0, 1, 6});
+    CheckResult(reader->VisitLessThan(Literal(Timestamp(-1745, 
123000))).value(), {4});
+    CheckResult(reader->VisitLessOrEqual(Literal(Timestamp(0, 
123000))).value(), {3, 4, 7});
+    CheckResult(reader->VisitEqual(Literal(Timestamp(1745542502000l, 
123000))).value(), {});
+    CheckResult(reader->VisitEqual(Literal(Timestamp(1745542802000l, 
123000))).value(), {0, 6});
+    CheckResult(reader->VisitNotEqual(Literal(Timestamp(1745542802000l, 
123000))).value(),
+                {1, 2, 3, 4, 7});
+    CheckResult(reader
+                    ->VisitIn({Literal(Timestamp(1745542802000l, 123000)),
+                               Literal(Timestamp(-1745, 123000)),
+                               Literal(Timestamp(1745542602000, 123000))})
+                    .value(),
+                {0, 2, 3, 6});
+    CheckResult(reader
+                    ->VisitNotIn({Literal(Timestamp(1745542802000l, 123000)),
+                                  Literal(Timestamp(-1745, 123000)),
+                                  Literal(Timestamp(1745542602000, 123000))})
+                    .value(),
+                {1, 4, 7});
+
+    // test invalid case
+    
ASSERT_NOK_WITH_MSG(reader->VisitEqual(Literal(FieldType::TIMESTAMP)).value()->IsRemain(),
+                        "literal cannot be null when GetValue in 
BitSliceIndexBitmapFileIndex");
+}
+
+TEST_F(BitSliceIndexBitmapIndexReaderTest, TestUnInvalidType) {
+    std::vector<char> index_bytes = {
+        1,  0, 0, 0,  5,  1,  1,  0, 0, 0, 0, 0,  0, 0, 0, 0,  0, 0,  0,  0, 
0, 0, 2, 58,
+        48, 0, 0, 1,  0,  0,  0,  0, 0, 1, 0, 16, 0, 0, 0, 1,  0, 3,  0,  0, 
0, 0, 2, 58,
+        48, 0, 0, 1,  0,  0,  0,  0, 0, 0, 0, 16, 0, 0, 0, 1,  0, 58, 48, 0, 
0, 1, 0, 0,
+        0,  0, 0, 0,  0,  16, 0,  0, 0, 3, 0, 1,  1, 0, 0, 0,  0, 0,  0,  0, 
0, 0, 0, 0,
+        0,  0, 0, 0,  1,  58, 48, 0, 0, 1, 0, 0,  0, 0, 0, 0,  0, 16, 0,  0, 
0, 4, 0, 0,
+        0,  0, 1, 58, 48, 0,  0,  1, 0, 0, 0, 0,  0, 0, 0, 16, 0, 0,  0,  4, 
0};
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(index_bytes.data(), 
index_bytes.size());
+    BitSliceIndexBitmapFileIndex file_index({});
+    ASSERT_NOK_WITH_MSG(
+        file_index.CreateReader(CreateArrowSchema(arrow::boolean()).get(),
+                                /*start=*/0, /*length=*/index_bytes.size(), 
input_stream, pool_),
+        "BitSliceIndexBitmapFileIndex only support 
TINYINT/SMALLINT/INT/BIGINT/DATE");
+}
+
+}  // namespace paimon::test
diff --git 
a/src/paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap.cpp 
b/src/paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap.cpp
new file mode 100644
index 0000000..c51d533
--- /dev/null
+++ b/src/paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap.cpp
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap.h"
+
+#include <utility>
+
+#include "fmt/format.h"
+#include "paimon/common/io/memory_segment_output_stream.h"
+#include "paimon/common/memory/memory_segment_utils.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/io/data_input_stream.h"
+
+namespace paimon {
+class MemoryPool;
+
+const int8_t BitSliceIndexRoaringBitmap::VERSION_1 = 1;
+const std::shared_ptr<BitSliceIndexRoaringBitmap>& 
BitSliceIndexRoaringBitmap::Empty() {
+    static std::shared_ptr<BitSliceIndexRoaringBitmap> kEmpty =
+        std::shared_ptr<BitSliceIndexRoaringBitmap>(
+            new BitSliceIndexRoaringBitmap(/*min=*/0, /*max=*/0, 
RoaringBitmap32(), {}));
+    return kEmpty;
+}
+
+BitSliceIndexRoaringBitmap::BitSliceIndexRoaringBitmap(int64_t min, int64_t 
max,
+                                                       RoaringBitmap32&& ebm,
+                                                       
std::vector<RoaringBitmap32>&& slices)
+    : min_(min), max_(max), ebm_(std::move(ebm)), slices_(std::move(slices)) {}
+
+Result<std::shared_ptr<BitSliceIndexRoaringBitmap>> 
BitSliceIndexRoaringBitmap::Create(
+    const std::shared_ptr<ByteArrayInputStream>& input_stream) {
+    DataInputStream data_input_stream(input_stream);
+    PAIMON_ASSIGN_OR_RAISE(int8_t version, 
data_input_stream.ReadValue<int8_t>());
+    if (version > VERSION_1) {
+        return Status::Invalid(fmt::format(
+            "deserialize bsi index fail, do not support version {}, please 
update plugin version",
+            version));
+    }
+    // deserialize min & max
+    PAIMON_ASSIGN_OR_RAISE(int64_t min, 
data_input_stream.ReadValue<int64_t>());
+    PAIMON_ASSIGN_OR_RAISE(int64_t max, 
data_input_stream.ReadValue<int64_t>());
+
+    // deserialize ebm
+    RoaringBitmap32 ebm;
+    PAIMON_RETURN_NOT_OK(ebm.Deserialize(input_stream.get()));
+
+    // deserialize slices
+    PAIMON_ASSIGN_OR_RAISE(int32_t slice_num, 
data_input_stream.ReadValue<int32_t>());
+    std::vector<RoaringBitmap32> slices;
+    slices.reserve(slice_num);
+    for (int32_t i = 0; i < slice_num; i++) {
+        RoaringBitmap32 slice;
+        PAIMON_RETURN_NOT_OK(slice.Deserialize(input_stream.get()));
+        slices.emplace_back(std::move(slice));
+    }
+    return std::shared_ptr<BitSliceIndexRoaringBitmap>(
+        new BitSliceIndexRoaringBitmap(min, max, std::move(ebm), 
std::move(slices)));
+}
+
+bool BitSliceIndexRoaringBitmap::operator==(const BitSliceIndexRoaringBitmap& 
other) const {
+    if (this == &other) {
+        return true;
+    }
+    // TODO(xinyu.lxy): Java does not compare max
+    return min_ == other.min_ && max_ == other.max_ && ebm_ == other.ebm_ &&
+           slices_ == other.slices_;
+}
+Result<RoaringBitmap32> BitSliceIndexRoaringBitmap::Equal(int64_t literal) 
const {
+    return Compare(Function::Type::EQUAL, literal);
+}
+Result<RoaringBitmap32> BitSliceIndexRoaringBitmap::LessThan(int64_t literal) 
const {
+    return Compare(Function::Type::LESS_THAN, literal);
+}
+Result<RoaringBitmap32> BitSliceIndexRoaringBitmap::LessOrEqual(int64_t 
literal) const {
+    return Compare(Function::Type::LESS_OR_EQUAL, literal);
+}
+Result<RoaringBitmap32> BitSliceIndexRoaringBitmap::GreaterThan(int64_t 
literal) const {
+    return Compare(Function::Type::GREATER_THAN, literal);
+}
+Result<RoaringBitmap32> BitSliceIndexRoaringBitmap::GreaterOrEqual(int64_t 
literal) const {
+    return Compare(Function::Type::GREATER_OR_EQUAL, literal);
+}
+const RoaringBitmap32& BitSliceIndexRoaringBitmap::IsNotNull() const {
+    return ebm_;
+}
+
+Result<RoaringBitmap32> BitSliceIndexRoaringBitmap::Compare(const 
Function::Type& operation,
+                                                            int64_t literal) 
const {
+    // using min/max to fast skip
+    PAIMON_ASSIGN_OR_RAISE(std::optional<RoaringBitmap32> 
min_max_compare_result,
+                           CompareUsingMinMax(operation, literal));
+    if (min_max_compare_result) {
+        return min_max_compare_result.value();
+    }
+    return ONeilCompare(operation, literal - min_);
+}
+
+Result<std::optional<RoaringBitmap32>> 
BitSliceIndexRoaringBitmap::CompareUsingMinMax(
+    const Function::Type& operation, int64_t literal) const {
+    auto empty = []() -> std::optional<RoaringBitmap32> { return 
RoaringBitmap32(); };
+    auto all = [&]() -> std::optional<RoaringBitmap32> { return IsNotNull(); };
+    switch (operation) {
+        case Function::Type::EQUAL: {
+            if (min_ == max_ && min_ == literal) {
+                return all();
+            } else if (literal < min_ || literal > max_) {
+                return empty();
+            }
+            break;
+        }
+        case Function::Type::NOT_EQUAL: {
+            if (min_ == max_ && min_ == literal) {
+                return empty();
+            } else if (literal < min_ || literal > max_) {
+                return all();
+            }
+            break;
+        }
+        case Function::Type::GREATER_OR_EQUAL: {
+            if (literal <= min_) {
+                return all();
+            } else if (literal > max_) {
+                return empty();
+            }
+            break;
+        }
+        case Function::Type::GREATER_THAN: {
+            if (literal < min_) {
+                return all();
+            } else if (literal >= max_) {
+                return empty();
+            }
+            break;
+        }
+        case Function::Type::LESS_OR_EQUAL: {
+            if (literal >= max_) {
+                return all();
+            } else if (literal < min_) {
+                return empty();
+            }
+            break;
+        }
+        case Function::Type::LESS_THAN: {
+            if (literal > max_) {
+                return all();
+            } else if (literal <= min_) {
+                return empty();
+            }
+            break;
+        }
+        default:
+            return Status::Invalid(
+                "Invalid Function::Type in CompareUsingMinMax of 
BitSliceIndex, only support "
+                
"EQUAL/NOT_EQUAL/GREATER_OR_EQUAL/GREATER_THAN/LESS_OR_EQUAL/LESS_THAN");
+    }
+    return std::optional<RoaringBitmap32>();
+}
+
+Result<RoaringBitmap32> BitSliceIndexRoaringBitmap::ONeilCompare(const 
Function::Type& operation,
+                                                                 int64_t 
literal) const {
+    RoaringBitmap32 greater_than;
+    RoaringBitmap32 less_than;
+    RoaringBitmap32 equal = ebm_;
+
+    for (int32_t i = slices_.size() - 1; i >= 0; i--) {
+        int64_t bit = (literal >> i) & 1;
+        if (bit == 1) {
+            less_than |= RoaringBitmap32::AndNot(equal, slices_[i]);
+            equal &= slices_[i];
+        } else {
+            greater_than |= RoaringBitmap32::And(equal, slices_[i]);
+            equal -= slices_[i];
+        }
+    }
+    equal &= ebm_;
+
+    switch (operation) {
+        case Function::Type::EQUAL:
+            return equal;
+        case Function::Type::NOT_EQUAL:
+            return RoaringBitmap32::AndNot(ebm_, equal);
+        case Function::Type::GREATER_OR_EQUAL: {
+            greater_than &= ebm_;
+            greater_than |= equal;
+            return greater_than;
+        }
+        case Function::Type::GREATER_THAN: {
+            greater_than &= ebm_;
+            return greater_than;
+        }
+        case Function::Type::LESS_OR_EQUAL: {
+            less_than &= ebm_;
+            less_than |= equal;
+            return less_than;
+        }
+        case Function::Type::LESS_THAN: {
+            less_than &= ebm_;
+            return less_than;
+        }
+        default:
+            return Status::Invalid(
+                "Invalid Function::Type in ONeilCompare of BitSliceIndex, only 
support "
+                
"EQUAL/NOT_EQUAL/GREATER_OR_EQUAL/GREATER_THAN/LESS_OR_EQUAL/LESS_THAN");
+    }
+}
+
+namespace {
+int32_t NumberOfTrailingZeros(int64_t value) {
+    if (value == 0) {
+        return 64;
+    }
+    return __builtin_ctzll(value);
+}
+int32_t NumberOfLeadingZeros(int64_t value) {
+    if (value == 0) {
+        return 64;
+    }
+    return __builtin_clzll(value);
+}
+}  // namespace
+
+Result<std::unique_ptr<BitSliceIndexRoaringBitmap::Appender>>
+BitSliceIndexRoaringBitmap::Appender::Create(int64_t min, int64_t max) {
+    if (min < 0) {
+        return Status::Invalid("values should be non-negative in 
BitSliceIndexRoaringBitmap");
+    }
+    if (min > max) {
+        return Status::Invalid("min should be less than max in 
BitSliceIndexRoaringBitmap");
+    }
+    std::vector<RoaringBitmap32> slices;
+    slices.resize(64 - NumberOfLeadingZeros(max - min));
+    auto bsi = std::shared_ptr<BitSliceIndexRoaringBitmap>(
+        new BitSliceIndexRoaringBitmap(min, max, RoaringBitmap32(), 
std::move(slices)));
+    return std::unique_ptr<BitSliceIndexRoaringBitmap::Appender>(
+        new BitSliceIndexRoaringBitmap::Appender(std::move(bsi)));
+}
+
+BitSliceIndexRoaringBitmap::Appender::Appender(std::shared_ptr<BitSliceIndexRoaringBitmap>&&
 bitmap)
+    : bsi_(std::move(bitmap)) {}
+
+Status BitSliceIndexRoaringBitmap::Appender::Append(int32_t rid, int64_t 
value) {
+    if (value > bsi_->max_) {
+        return Status::Invalid(
+            fmt::format("value {} is too large for append to 
BitSliceIndexRoaringBitmap", value));
+    }
+    if (bsi_->ebm_.Contains(rid)) {
+        return Status::Invalid(
+            fmt::format("rid {} is already exists for append to 
BitSliceIndexRoaringBitmap", rid));
+    }
+
+    // reduce the number of slices
+    value = value - bsi_->min_;
+
+    // only bit=1 need to set
+    while (value != 0) {
+        bsi_->slices_[NumberOfTrailingZeros(value)].Add(rid);
+        value &= (value - 1);
+    }
+    bsi_->ebm_.Add(rid);
+    return Status::OK();
+}
+
+bool BitSliceIndexRoaringBitmap::Appender::IsNotEmpty() const {
+    return !bsi_->ebm_.IsEmpty();
+}
+
+std::shared_ptr<Bytes> BitSliceIndexRoaringBitmap::Appender::Serialize(
+    const std::shared_ptr<MemoryPool>& pool) {
+    MemorySegmentOutputStream output_stream(
+        /*segment_size=*/MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, 
pool);
+    output_stream.WriteValue<int8_t>(BitSliceIndexRoaringBitmap::VERSION_1);
+    output_stream.WriteValue<int64_t>(bsi_->min_);
+    output_stream.WriteValue<int64_t>(bsi_->max_);
+    std::shared_ptr<Bytes> ebm_bytes = bsi_->ebm_.Serialize(pool.get());
+    output_stream.WriteBytes(ebm_bytes);
+    output_stream.WriteValue<int32_t>(bsi_->slices_.size());
+    for (auto& slice : bsi_->slices_) {
+        std::shared_ptr<Bytes> slice_bytes = slice.Serialize(pool.get());
+        output_stream.WriteBytes(slice_bytes);
+    }
+    return MemorySegmentUtils::CopyToBytes(output_stream.Segments(), 0, 
output_stream.CurrentSize(),
+                                           pool.get());
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap.h 
b/src/paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap.h
new file mode 100644
index 0000000..ad7f586
--- /dev/null
+++ b/src/paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap.h
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <vector>
+
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/predicate/function.h"
+#include "paimon/result.h"
+#include "paimon/status.h"
+#include "paimon/utils/roaring_bitmap32.h"
+
+namespace paimon {
+class ByteArrayInputStream;
+class MemoryPool;
+
+/// A bit slice index compressed bitmap.
+class BitSliceIndexRoaringBitmap {
+ public:
+    static Result<std::shared_ptr<BitSliceIndexRoaringBitmap>> Create(
+        const std::shared_ptr<ByteArrayInputStream>& input_stream);
+
+    Result<RoaringBitmap32> Equal(int64_t literal) const;
+    Result<RoaringBitmap32> LessThan(int64_t literal) const;
+    Result<RoaringBitmap32> LessOrEqual(int64_t literal) const;
+    Result<RoaringBitmap32> GreaterThan(int64_t literal) const;
+    Result<RoaringBitmap32> GreaterOrEqual(int64_t literal) const;
+    const RoaringBitmap32& IsNotNull() const;
+
+    bool operator==(const BitSliceIndexRoaringBitmap& other) const;
+
+    class Appender {
+     public:
+        static Result<std::unique_ptr<Appender>> Create(int64_t min, int64_t 
max);
+        Status Append(int32_t rid, int64_t value);
+        bool IsNotEmpty() const;
+        // TODO(xinyu.lxy): may use data output stream
+        std::shared_ptr<Bytes> Serialize(const std::shared_ptr<MemoryPool>& 
pool);
+        std::shared_ptr<BitSliceIndexRoaringBitmap> Build() const {
+            return bsi_;
+        }
+
+     private:
+        explicit Appender(std::shared_ptr<BitSliceIndexRoaringBitmap>&& bsi);
+
+     private:
+        std::shared_ptr<BitSliceIndexRoaringBitmap> bsi_;
+    };
+
+ public:
+    static const int8_t VERSION_1;
+    static const std::shared_ptr<BitSliceIndexRoaringBitmap>& Empty();
+
+ private:
+    BitSliceIndexRoaringBitmap(int64_t min, int64_t max, RoaringBitmap32&& ebm,
+                               std::vector<RoaringBitmap32>&& slices);
+
+    Result<RoaringBitmap32> Compare(const Function::Type& operation, int64_t 
literal) const;
+
+    Result<std::optional<RoaringBitmap32>> CompareUsingMinMax(const 
Function::Type& operation,
+                                                              int64_t literal) 
const;
+
+    /// O'Neil bit-sliced index compare algorithm.
+    ///
+    /// See <a href="https://dl.acm.org/doi/10.1145/253262.253268";>Improved 
query performance with
+    /// variant indexes</a>
+    ///
+    /// @param operation compare operation
+    /// @param literal the value we found filter
+    /// @return rid set we found in this bsi with giving conditions, using 
RoaringBitmap to express
+    Result<RoaringBitmap32> ONeilCompare(const Function::Type& operation, 
int64_t literal) const;
+
+ private:
+    int64_t min_;
+    int64_t max_;
+    RoaringBitmap32 ebm_;
+    std::vector<RoaringBitmap32> slices_;
+};
+}  // namespace paimon
diff --git 
a/src/paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap_test.cpp 
b/src/paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap_test.cpp
new file mode 100644
index 0000000..75cda90
--- /dev/null
+++ b/src/paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap_test.cpp
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/file_index/bsi/bit_slice_index_roaring_bitmap.h"
+
+#include <algorithm>
+#include <cstdlib>
+#include <map>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/common/utils/date_time_utils.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+class BitSliceIndexRoaringBitmapTest : public ::testing::Test {
+ public:
+    void SetUp() override {
+        int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs();
+        srand(seed);
+        pool_ = GetDefaultPool();
+        int64_t min = 0, max = 0;
+        // force add min 1, max VALUE_BOUND
+        std::vector<int64_t> bounds = {1, VALUE_BOUND};
+        for (int32_t i = 0; i < static_cast<int32_t>(bounds.size()); i++) {
+            int64_t next = bounds[i];
+            min = std::min(min == 0 ? next : min, next);
+            max = std::max(max == 0 ? next : max, next);
+            expected_map_[next].push_back(i);
+        }
+        // add random value
+        for (int32_t i = bounds.size(); i < NUM_OF_ROWS; i++) {
+            if (i % 5 == 0) {
+                // for null
+                continue;
+            }
+            int64_t next = GenerateNextValue();
+            min = std::min(min == 0 ? next : min, next);
+            max = std::max(max == 0 ? next : max, next);
+            expected_map_[next].push_back(i);
+        }
+        ASSERT_OK_AND_ASSIGN(auto appender, 
BitSliceIndexRoaringBitmap::Appender::Create(min, max));
+        for (const auto& [value, rids] : expected_map_) {
+            for (const auto& rid : rids) {
+                ASSERT_OK(appender->Append(rid, value));
+            }
+        }
+        bsi_ = appender->Build();
+    }
+    void TearDown() override {
+        pool_.reset();
+        bsi_.reset();
+        expected_map_.clear();
+    }
+
+    int64_t GenerateNextValue() const {
+        // return a value in the range [1, VALUE_BOUND]
+        return rand() % VALUE_BOUND + 1;
+    }
+
+    int64_t GenerateNextValueExceptMinMax() const {
+        // return a value in the range (1, VALUE_BOUND)
+        return 2 + paimon::test::RandomNumber(0, (VALUE_BOUND - 2) - 1);
+    }
+
+    static constexpr int32_t NUM_OF_ROWS = 100000;
+    static constexpr int32_t VALUE_BOUND = 1000;
+    static constexpr int32_t VALUE_LT_MIN = 0;
+    static constexpr int32_t VALUE_GT_MAX = VALUE_BOUND + 100;
+
+ private:
+    std::shared_ptr<BitSliceIndexRoaringBitmap> bsi_;
+    // value : [rid]
+    std::map<int64_t, std::vector<int32_t>> expected_map_;
+    std::shared_ptr<MemoryPool> pool_;
+};
+
+TEST_F(BitSliceIndexRoaringBitmapTest, TestSerializeAndDeserialize) {
+    ASSERT_OK_AND_ASSIGN(auto appender,
+                         
BitSliceIndexRoaringBitmap::Appender::Create(/*min=*/0, /*max=*/10));
+    ASSERT_FALSE(appender->IsNotEmpty());
+    ASSERT_OK(appender->Append(0, 0));
+    ASSERT_OK(appender->Append(1, 1));
+    ASSERT_OK(appender->Append(2, 2));
+    ASSERT_OK(appender->Append(10, 6));
+
+    // test invalid append
+    ASSERT_NOK_WITH_MSG(appender->Append(11, 20),
+                        "value 20 is too large for append to 
BitSliceIndexRoaringBitmap");
+    ASSERT_NOK_WITH_MSG(appender->Append(10, 6),
+                        "rid 10 is already exists for append to 
BitSliceIndexRoaringBitmap");
+
+    ASSERT_TRUE(appender->IsNotEmpty());
+    auto bsi = appender->Build();
+
+    auto serialized_bytes = appender->Serialize(pool_);
+    auto input_stream =
+        std::make_shared<ByteArrayInputStream>(serialized_bytes->data(), 
serialized_bytes->size());
+    ASSERT_OK_AND_ASSIGN(auto de_bsi, 
BitSliceIndexRoaringBitmap::Create(input_stream));
+    ASSERT_EQ(*bsi, *de_bsi);
+}
+
+TEST_F(BitSliceIndexRoaringBitmapTest, TestEqual) {
+    // test predicate in the value bound
+    for (int32_t i = 0; i < 10; i++) {
+        int64_t literal = GenerateNextValue();
+        ASSERT_OK_AND_ASSIGN(auto result, bsi_->Equal(literal));
+        ASSERT_EQ(result, RoaringBitmap32::From(expected_map_[literal]));
+    }
+    // test predicate out of the value bound
+    ASSERT_TRUE(bsi_->Equal(VALUE_LT_MIN).value().IsEmpty());
+    ASSERT_TRUE(bsi_->Equal(VALUE_GT_MAX).value().IsEmpty());
+}
+
+TEST_F(BitSliceIndexRoaringBitmapTest, TestNotEqualInONeilCompare) {
+    // test predicate in the value bound
+    for (int32_t i = 0; i < 10; i++) {
+        int64_t literal = GenerateNextValue();
+        ASSERT_OK_AND_ASSIGN(auto result,
+                             bsi_->ONeilCompare(Function::Type::NOT_EQUAL, 
literal - bsi_->min_));
+        ASSERT_EQ(result, RoaringBitmap32::AndNot(bsi_->ebm_,
+                                                  
RoaringBitmap32::From(expected_map_[literal])));
+    }
+    ASSERT_NOK_WITH_MSG(bsi_->ONeilCompare(Function::Type::IN, bsi_->min_),
+                        "Invalid Function::Type in ONeilCompare of 
BitSliceIndex, only support "
+                        
"EQUAL/NOT_EQUAL/GREATER_OR_EQUAL/GREATER_THAN/LESS_OR_EQUAL/LESS_THAN");
+}
+
+TEST_F(BitSliceIndexRoaringBitmapTest, TestLT) {
+    // test predicate in the value bound
+    for (int32_t i = 0; i < 10; i++) {
+        int64_t literal = GenerateNextValue();
+        RoaringBitmap32 expected;
+        for (const auto& [value, rids] : expected_map_) {
+            if (value < literal) {
+                expected |= RoaringBitmap32::From(rids);
+            }
+        }
+        ASSERT_OK_AND_ASSIGN(auto result, bsi_->LessThan(literal));
+        ASSERT_EQ(result, expected);
+    }
+    ASSERT_TRUE(bsi_->LessThan(VALUE_LT_MIN).value().IsEmpty());
+    ASSERT_EQ(bsi_->LessThan(VALUE_GT_MAX).value(), bsi_->IsNotNull());
+}
+
+TEST_F(BitSliceIndexRoaringBitmapTest, TestLTE) {
+    // test predicate in the value bound
+    for (int32_t i = 0; i < 10; i++) {
+        int64_t literal = GenerateNextValue();
+        RoaringBitmap32 expected;
+        for (const auto& [value, rids] : expected_map_) {
+            if (value <= literal) {
+                expected |= RoaringBitmap32::From(rids);
+            }
+        }
+        ASSERT_OK_AND_ASSIGN(auto result, bsi_->LessOrEqual(literal));
+        ASSERT_EQ(result, expected);
+    }
+    ASSERT_TRUE(bsi_->LessOrEqual(VALUE_LT_MIN).value().IsEmpty());
+    ASSERT_EQ(bsi_->LessOrEqual(VALUE_GT_MAX).value(), bsi_->IsNotNull());
+}
+
+TEST_F(BitSliceIndexRoaringBitmapTest, TestGT) {
+    // test predicate in the value bound
+    for (int32_t i = 0; i < 10; i++) {
+        int64_t literal = GenerateNextValue();
+        RoaringBitmap32 expected;
+        for (const auto& [value, rids] : expected_map_) {
+            if (value > literal) {
+                expected |= RoaringBitmap32::From(rids);
+            }
+        }
+        ASSERT_OK_AND_ASSIGN(auto result, bsi_->GreaterThan(literal));
+        ASSERT_EQ(result, expected);
+    }
+    ASSERT_TRUE(bsi_->GreaterThan(VALUE_GT_MAX).value().IsEmpty());
+    ASSERT_EQ(bsi_->GreaterThan(VALUE_LT_MIN).value(), bsi_->IsNotNull());
+}
+
+TEST_F(BitSliceIndexRoaringBitmapTest, TestGTE) {
+    // test predicate in the value bound
+    for (int32_t i = 0; i < 10; i++) {
+        int64_t literal = GenerateNextValue();
+        RoaringBitmap32 expected;
+        for (const auto& [value, rids] : expected_map_) {
+            if (value >= literal) {
+                expected |= RoaringBitmap32::From(rids);
+            }
+        }
+        ASSERT_OK_AND_ASSIGN(auto result, bsi_->GreaterOrEqual(literal));
+        ASSERT_EQ(result, expected);
+    }
+    ASSERT_TRUE(bsi_->GreaterOrEqual(VALUE_GT_MAX).value().IsEmpty());
+    ASSERT_EQ(bsi_->GreaterOrEqual(VALUE_LT_MIN).value(), bsi_->IsNotNull());
+}
+
+TEST_F(BitSliceIndexRoaringBitmapTest, TestIsNotNull) {
+    RoaringBitmap32 expected;
+    for (const auto& [value, rids] : expected_map_) {
+        expected |= RoaringBitmap32::From(rids);
+    }
+    ASSERT_EQ(bsi_->IsNotNull(), expected);
+}
+
+TEST_F(BitSliceIndexRoaringBitmapTest, TestCompareUsingMinMax) {
+    // a predicate in the value bound
+    int64_t literal = GenerateNextValue();
+    int64_t literal_except_min_max = GenerateNextValueExceptMinMax();
+    std::optional<RoaringBitmap32> empty = RoaringBitmap32();
+    std::optional<RoaringBitmap32> not_null = bsi_->IsNotNull();
+    std::optional<RoaringBitmap32> in_bound = std::nullopt;
+
+    // test eq & neq
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::EQUAL, 
literal).value(), in_bound);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::EQUAL, 
VALUE_LT_MIN).value(), empty);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::EQUAL, 
VALUE_GT_MAX).value(), empty);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::NOT_EQUAL, 
literal).value(), in_bound);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::NOT_EQUAL, 
VALUE_LT_MIN).value(), not_null);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::NOT_EQUAL, 
VALUE_GT_MAX).value(), not_null);
+
+    // test lt & lte
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::LESS_THAN, 
literal_except_min_max).value(),
+              in_bound);
+    ASSERT_EQ(
+        bsi_->CompareUsingMinMax(Function::Type::LESS_OR_EQUAL, 
literal_except_min_max).value(),
+        in_bound);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::LESS_THAN, 
VALUE_LT_MIN).value(), empty);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::LESS_OR_EQUAL, 
VALUE_LT_MIN).value(), empty);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::LESS_THAN, 
VALUE_GT_MAX).value(), not_null);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::LESS_OR_EQUAL, 
VALUE_GT_MAX).value(),
+              not_null);
+
+    // test gt & gte
+    ASSERT_EQ(
+        bsi_->CompareUsingMinMax(Function::Type::GREATER_THAN, 
literal_except_min_max).value(),
+        in_bound);
+    ASSERT_EQ(
+        bsi_->CompareUsingMinMax(Function::Type::GREATER_OR_EQUAL, 
literal_except_min_max).value(),
+        in_bound);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::GREATER_THAN, 
VALUE_LT_MIN).value(),
+              not_null);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::GREATER_OR_EQUAL, 
VALUE_LT_MIN).value(),
+              not_null);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::GREATER_THAN, 
VALUE_GT_MAX).value(), empty);
+    ASSERT_EQ(bsi_->CompareUsingMinMax(Function::Type::GREATER_OR_EQUAL, 
VALUE_GT_MAX).value(),
+              empty);
+
+    // test invalid case
+    ASSERT_NOK_WITH_MSG(
+        bsi_->CompareUsingMinMax(Function::Type::IN, 10),
+        "Invalid Function::Type in CompareUsingMinMax of BitSliceIndex, only 
support "
+        
"EQUAL/NOT_EQUAL/GREATER_OR_EQUAL/GREATER_THAN/LESS_OR_EQUAL/LESS_THAN");
+}
+
+TEST_F(BitSliceIndexRoaringBitmapTest, TestSingleValue) {
+    ASSERT_OK_AND_ASSIGN(auto appender,
+                         
BitSliceIndexRoaringBitmap::Appender::Create(/*min=*/1, /*max=*/1));
+    ASSERT_OK(appender->Append(0, 1));
+    ASSERT_OK(appender->Append(1, 1));
+    ASSERT_OK(appender->Append(3, 1));
+    auto bsi = appender->Build();
+    ASSERT_EQ(bsi->Equal(1).value(), RoaringBitmap32::From({0, 1, 3}));
+    ASSERT_EQ(bsi->CompareUsingMinMax(Function::Type::NOT_EQUAL, 1).value(),
+              RoaringBitmap32::From({}));
+}
+
+}  // namespace paimon::test

Reply via email to