This is an automated email from the ASF dual-hosted git repository.

leaves12138 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 036dd3a  feat: add RoaringBitmap32, RoaringBitmap64, and BitSet 
utilities (#35)
036dd3a is described below

commit 036dd3a7fdd0fed5b0ab380792b173c5e855e39e
Author: lxy <[email protected]>
AuthorDate: Mon Jun 1 17:11:41 2026 +0800

    feat: add RoaringBitmap32, RoaringBitmap64, and BitSet utilities (#35)
---
 include/paimon/utils/roaring_bitmap32.h           | 171 ++++++
 include/paimon/utils/roaring_bitmap64.h           | 184 +++++++
 src/paimon/common/utils/bit_set.cpp               |  70 +++
 src/paimon/common/utils/bit_set.h                 |  72 +++
 src/paimon/common/utils/bit_set_test.cpp          | 108 ++++
 src/paimon/common/utils/roaring_bitmap32.cpp      | 315 +++++++++++
 src/paimon/common/utils/roaring_bitmap32_test.cpp | 371 +++++++++++++
 src/paimon/common/utils/roaring_bitmap64.cpp      | 389 +++++++++++++
 src/paimon/common/utils/roaring_bitmap64_test.cpp | 633 ++++++++++++++++++++++
 9 files changed, 2313 insertions(+)

diff --git a/include/paimon/utils/roaring_bitmap32.h 
b/include/paimon/utils/roaring_bitmap32.h
new file mode 100644
index 0000000..24b5c98
--- /dev/null
+++ b/include/paimon/utils/roaring_bitmap32.h
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstddef>
+#include <cstdint>
+#include <limits>
+#include <string>
+#include <vector>
+
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class ByteArrayInputStream;
+class RoaringBitmap64;
+
+/// A compressed bitmap for 32-bit integer.
+class PAIMON_EXPORT RoaringBitmap32 {
+ public:
+    RoaringBitmap32();
+    ~RoaringBitmap32();
+
+    RoaringBitmap32(const RoaringBitmap32&) noexcept;
+    RoaringBitmap32& operator=(const RoaringBitmap32&) noexcept;
+
+    RoaringBitmap32(RoaringBitmap32&&) noexcept;
+    RoaringBitmap32& operator=(RoaringBitmap32&&) noexcept;
+
+    class PAIMON_EXPORT Iterator {
+     public:
+        friend class RoaringBitmap32;
+        explicit Iterator(const RoaringBitmap32& bitmap);
+        ~Iterator();
+        Iterator(const Iterator&) noexcept;
+        Iterator(Iterator&&) noexcept;
+        Iterator& operator=(const Iterator&) noexcept;
+        Iterator& operator=(Iterator&&) noexcept;
+
+        /// Return the current value of iterator.
+        int32_t operator*() const;
+        /// Move the iterator to next value.
+        Iterator& operator++();
+        bool operator==(const Iterator& other) const;
+        bool operator!=(const Iterator& other) const;
+
+     private:
+        void* iterator_ = nullptr;
+    };
+
+    static constexpr int32_t MAX_VALUE = std::numeric_limits<int32_t>::max();
+
+    /// @param x value added to bitmap
+    void Add(int32_t x);
+
+    /// @param x value added to bitmap
+    /// @return false if contain x; true if not contain x
+    bool CheckedAdd(int32_t x);
+
+    /// @return true if contain x; false if not contain x
+    bool Contains(int32_t x) const;
+
+    /// @return true if bitmap is empty
+    bool IsEmpty() const;
+
+    /// @return the cardinality of bitmap, i.e., the number of unique value 
added
+    int32_t Cardinality() const;
+
+    /// Computes the negation of the roaring bitmap within the half-open 
interval [min, max).
+    /// Areas outside the interval are unchanged.
+    void Flip(int32_t min, int32_t max);
+
+    /// Adds all values in the half-open interval [min, max).
+    void AddRange(int32_t min, int32_t max);
+
+    /// Removes all values in the half-open interval [min, max).
+    void RemoveRange(int32_t min, int32_t max);
+
+    /// Contain any value in the half-open interval [min, max).
+    bool ContainsAny(int32_t min, int32_t max) const;
+
+    /// Serialize bitmap to bytes.
+    /// @note Cannot accurately compare the output byte streams with Java and 
C++ versions,
+    /// as the `runOptimize` function of roaring bitmap may optimize 
consecutive integers
+    /// differently.
+    PAIMON_UNIQUE_PTR<Bytes> Serialize(MemoryPool* pool) const;
+
+    /// Deserialize bitmap from input stream.
+    Status Deserialize(ByteArrayInputStream* input_stream);
+
+    /// Deserialize bitmap from buffer with begin and length.
+    Status Deserialize(const char* begin, size_t length);
+
+    /// @return How many bytes are required to serialize this bitmap.
+    size_t GetSizeInBytes() const;
+
+    bool operator==(const RoaringBitmap32& other) const noexcept;
+
+    /// Compute the union of the current bitmap and the provided bitmap,
+    /// writing the result in the current bitmap. The provided bitmap is not
+    /// modified.
+    RoaringBitmap32& operator|=(const RoaringBitmap32& other);
+
+    /// Compute the intersection of the current bitmap and the provided bitmap,
+    /// writing the result in the current bitmap. The provided bitmap is not
+    /// modified.
+    /// @note If you are computing the intersection between several
+    /// bitmaps, two-by-two, it is best to start with the smallest bitmap.
+    RoaringBitmap32& operator&=(const RoaringBitmap32& other);
+
+    /// Compute the difference of the current bitmap and the provided bitmap,
+    /// writing the result in the current bitmap. The provided bitmap is not
+    /// modified.
+    RoaringBitmap32& operator-=(const RoaringBitmap32& other);
+
+    std::string ToString() const;
+
+    Iterator Begin() const;
+    Iterator End() const;
+    /// @return the iterator moved to the value which is equal or larger than 
key
+    Iterator EqualOrLarger(int32_t key) const;
+
+    /// Computes the intersection between two bitmaps and returns new bitmap.
+    /// The current bitmap and the provided bitmap are unchanged.
+    ///
+    /// @note If you are computing the intersection between several bitmaps, 
two-by-two, it is best
+    /// to start with the smallest bitmap. Consider also using the operator &= 
to avoid needlessly
+    /// creating many temporary bitmaps.
+    static RoaringBitmap32 And(const RoaringBitmap32& lhs, const 
RoaringBitmap32& rhs);
+
+    /// Computes the union between two bitmaps and returns new bitmap.
+    /// The current bitmap and the provided bitmap are unchanged.
+    static RoaringBitmap32 Or(const RoaringBitmap32& lhs, const 
RoaringBitmap32& rhs);
+
+    /// Computes the difference between two bitmaps and returns new bitmap.
+    /// The current bitmap and the provided bitmap are unchanged.
+    static RoaringBitmap32 AndNot(const RoaringBitmap32& lhs, const 
RoaringBitmap32& rhs);
+
+    /// @return a bitmap contains input values
+    static RoaringBitmap32 From(const std::vector<int32_t>& values);
+
+    /// Fast union multiple bitmaps.
+    static RoaringBitmap32 FastUnion(const std::vector<const 
RoaringBitmap32*>& inputs);
+
+    /// Fast union multiple bitmaps.
+    static RoaringBitmap32 FastUnion(const std::vector<RoaringBitmap32>& 
inputs);
+
+    friend class RoaringBitmap64;
+
+ private:
+    void* roaring_bitmap_ = nullptr;
+};
+}  // namespace paimon
diff --git a/include/paimon/utils/roaring_bitmap64.h 
b/include/paimon/utils/roaring_bitmap64.h
new file mode 100644
index 0000000..289622b
--- /dev/null
+++ b/include/paimon/utils/roaring_bitmap64.h
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstddef>
+#include <cstdint>
+#include <limits>
+#include <string>
+#include <vector>
+
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/status.h"
+#include "paimon/utils/roaring_bitmap32.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class ByteArrayInputStream;
+
+/// A compressed bitmap for 64-bit integer.
+class PAIMON_EXPORT RoaringBitmap64 {
+ public:
+    RoaringBitmap64();
+    ~RoaringBitmap64();
+
+    RoaringBitmap64(const RoaringBitmap64&) noexcept;
+    RoaringBitmap64& operator=(const RoaringBitmap64&) noexcept;
+
+    RoaringBitmap64(RoaringBitmap64&&) noexcept;
+    RoaringBitmap64& operator=(RoaringBitmap64&&) noexcept;
+
+    explicit RoaringBitmap64(const RoaringBitmap32&) noexcept;
+    RoaringBitmap64& operator=(const RoaringBitmap32&) noexcept;
+
+    class PAIMON_EXPORT Iterator {
+     public:
+        friend class RoaringBitmap64;
+        explicit Iterator(const RoaringBitmap64& bitmap);
+        ~Iterator();
+        Iterator(const Iterator&) noexcept;
+        Iterator(Iterator&&) noexcept;
+        Iterator& operator=(const Iterator&) noexcept;
+        Iterator& operator=(Iterator&&) noexcept;
+
+        /// Return the current value of iterator.
+        int64_t operator*() const;
+        /// Move the iterator to next value.
+        Iterator& operator++();
+        bool operator==(const Iterator& other) const;
+        bool operator!=(const Iterator& other) const;
+        /// Move the iterator to the value which is equal or larger than input 
value
+        void EqualOrLarger(int64_t value);
+
+     private:
+        void* iterator_ = nullptr;
+    };
+
+    static constexpr int64_t MAX_VALUE = std::numeric_limits<int64_t>::max();
+
+    /// @param x value added to bitmap
+    void Add(int64_t x);
+
+    /// Bulk-insert `n` values into the bitmap.
+    ///
+    /// Compared to repeatedly calling `Add`, this implementation:
+    ///   1. Buckets the input values by their high-32 bits in a single pass.
+    ///   2. Feeds each bucket to the inner 32-bit Roaring's true-batch
+    ///      `addMany(uint32_t*)` path, which performs container-level bulk
+    ///      insertion.
+    ///
+    /// This avoids the per-value `std::map` lookup of the 64-bit wrapper and
+    /// the per-value insertion overhead inside the 32-bit array container.
+    /// Values may be unsorted; ordering is handled internally.
+    void AddMany(size_t n, const int64_t* values);
+
+    /// @param x value added to bitmap
+    /// @return false if contain x; true if not contain x
+    bool CheckedAdd(int64_t x);
+
+    /// @return true if contain x; false if not contain x
+    bool Contains(int64_t x) const;
+
+    /// @return true if bitmap is empty
+    bool IsEmpty() const;
+
+    /// @return the cardinality of bitmap, i.e., the number of unique value 
added
+    int64_t Cardinality() const;
+
+    /// Computes the negation of the roaring bitmap within the half-open 
interval [min, max).
+    /// Areas outside the interval are unchanged.
+    void Flip(int64_t min, int64_t max);
+
+    /// Adds all values in the half-open interval [min, max).
+    void AddRange(int64_t min, int64_t max);
+
+    /// Removes all values in the half-open interval [min, max).
+    void RemoveRange(int64_t min, int64_t max);
+
+    /// Contain any value in the half-open interval [min, max).
+    bool ContainsAny(int64_t min, int64_t max) const;
+
+    /// Serialize bitmap to bytes.
+    PAIMON_UNIQUE_PTR<Bytes> Serialize(MemoryPool* pool) const;
+
+    /// Deserialize bitmap from input stream.
+    Status Deserialize(ByteArrayInputStream* input_stream);
+
+    /// Deserialize bitmap from buffer with begin and length.
+    Status Deserialize(const char* begin, size_t length);
+
+    /// @return How many bytes are required to serialize this bitmap.
+    size_t GetSizeInBytes() const;
+
+    bool operator==(const RoaringBitmap64& other) const noexcept;
+
+    /// Compute the union of the current bitmap and the provided bitmap,
+    /// writing the result in the current bitmap. The provided bitmap is not
+    /// modified.
+    RoaringBitmap64& operator|=(const RoaringBitmap64& other);
+
+    /// Compute the intersection of the current bitmap and the provided bitmap,
+    /// writing the result in the current bitmap. The provided bitmap is not
+    /// modified.
+    /// @note If you are computing the intersection between several
+    /// bitmaps, two-by-two, it is best to start with the smallest bitmap.
+    RoaringBitmap64& operator&=(const RoaringBitmap64& other);
+
+    /// Compute the difference of the current bitmap and the provided bitmap,
+    /// writing the result in the current bitmap. The provided bitmap is not
+    /// modified.
+    RoaringBitmap64& operator-=(const RoaringBitmap64& other);
+
+    std::string ToString() const;
+
+    Iterator Begin() const;
+    Iterator End() const;
+    /// @return the iterator moved to the value which is equal or larger than 
key
+    Iterator EqualOrLarger(int64_t key) const;
+
+    /// Computes the intersection between two bitmaps and returns new bitmap.
+    /// The current bitmap and the provided bitmap are unchanged.
+    ///
+    /// @note If you are computing the intersection between several bitmaps, 
two-by-two, it is best
+    /// to start with the smallest bitmap. Consider also using the operator &= 
to avoid needlessly
+    /// creating many temporary bitmaps.
+    static RoaringBitmap64 And(const RoaringBitmap64& lhs, const 
RoaringBitmap64& rhs);
+
+    /// Computes the union between two bitmaps and returns new bitmap.
+    /// The current bitmap and the provided bitmap are unchanged.
+    static RoaringBitmap64 Or(const RoaringBitmap64& lhs, const 
RoaringBitmap64& rhs);
+
+    /// Computes the difference between two bitmaps and returns new bitmap.
+    /// The current bitmap and the provided bitmap are unchanged.
+    static RoaringBitmap64 AndNot(const RoaringBitmap64& lhs, const 
RoaringBitmap64& rhs);
+
+    /// @return a bitmap contains input values
+    static RoaringBitmap64 From(const std::vector<int64_t>& values);
+
+    /// Fast union multiple bitmaps.
+    static RoaringBitmap64 FastUnion(const std::vector<const 
RoaringBitmap64*>& inputs);
+
+    /// Fast union multiple bitmaps.
+    static RoaringBitmap64 FastUnion(const std::vector<RoaringBitmap64>& 
inputs);
+
+ private:
+    void* roaring_bitmap_ = nullptr;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/utils/bit_set.cpp 
b/src/paimon/common/utils/bit_set.cpp
new file mode 100644
index 0000000..1cdf89c
--- /dev/null
+++ b/src/paimon/common/utils/bit_set.cpp
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/bit_set.h"
+namespace paimon {
+
+Status BitSet::SetMemorySegment(MemorySegment segment, int32_t offset) {
+    if (offset < 0) {
+        return Status::Invalid("Offset should be positive integer.");
+    }
+    if (offset + byte_length_ > segment.Size()) {
+        return Status::Invalid("Could not set MemorySegment, the remain 
buffers is not enough.");
+    }
+    segment_ = segment;
+    offset_ = offset;
+    return Status::OK();
+}
+
+void BitSet::UnsetMemorySegment() {
+    segment_ = MemorySegment();
+}
+
+Status BitSet::Set(uint32_t index) {
+    if (index >= bit_size_) {
+        return Status::IndexError("Index out of bound");
+    }
+    uint32_t byte_index = index >> 3;
+    auto val = segment_.Get(offset_ + byte_index);
+    val |= static_cast<char>(1u << (index & BYTE_INDEX_MASK));
+    segment_.PutValue(offset_ + byte_index, val);
+    return Status::OK();
+}
+
+bool BitSet::Get(uint32_t index) {
+    if (index >= bit_size_) {
+        return false;
+    }
+    uint32_t byte_index = index >> 3;
+    auto val = segment_.Get(offset_ + byte_index);
+    return (val & static_cast<char>(1u << (index & BYTE_INDEX_MASK))) != 0;
+}
+
+void BitSet::Clear() {
+    int32_t index = 0;
+    while (index + 8 <= byte_length_) {
+        segment_.PutValue(offset_ + index, 0L);
+        index += 8;
+    }
+    while (index < byte_length_) {
+        segment_.PutValue(offset_ + index, static_cast<char>(0));
+        index += 1;
+    }
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/bit_set.h 
b/src/paimon/common/utils/bit_set.h
new file mode 100644
index 0000000..7751dc9
--- /dev/null
+++ b/src/paimon/common/utils/bit_set.h
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <cstdint>
+#include <functional>
+#include <memory>
+
+#include "paimon/common/memory/memory_slice.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/status.h"
+#include "paimon/visibility.h"
+
+namespace paimon {
+class MemoryPool;
+
+/// BitSet based on MemorySegment.
+class PAIMON_EXPORT BitSet {
+ public:
+    explicit BitSet(int64_t byte_length)
+        : byte_length_(byte_length), 
bit_size_(static_cast<uint64_t>(byte_length) << 3) {}
+
+    Status SetMemorySegment(MemorySegment segment, int32_t offset = 0);
+    void UnsetMemorySegment();
+
+    const MemorySegment& GetMemorySegment() const {
+        return segment_;
+    }
+
+    MemorySlice ToSlice() {
+        return MemorySlice(segment_, offset_, byte_length_);
+    }
+
+    int32_t Offset() const {
+        return offset_;
+    }
+    int64_t BitSize() const {
+        return bit_size_;
+    }
+    int64_t ByteLength() const {
+        return byte_length_;
+    }
+
+    Status Set(uint32_t index);
+    bool Get(uint32_t index);
+    void Clear();
+
+ private:
+    static constexpr int32_t BYTE_INDEX_MASK = 0x00000007;
+
+ private:
+    int64_t byte_length_;
+    int64_t bit_size_;
+    int32_t offset_;
+    MemorySegment segment_;
+};
+}  // namespace paimon
diff --git a/src/paimon/common/utils/bit_set_test.cpp 
b/src/paimon/common/utils/bit_set_test.cpp
new file mode 100644
index 0000000..e5c3178
--- /dev/null
+++ b/src/paimon/common/utils/bit_set_test.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/common/utils/bit_set.h"
+
+#include <cstring>
+#include <limits>
+#include <random>
+#include <set>
+#include <utility>
+#include <vector>
+
+#include "gtest/gtest.h"
+#include "paimon/memory/bytes.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+
+TEST(BitSetTest, TestBitSet) {
+    auto bit_set = std::make_shared<BitSet>(1024);
+    auto pool = GetDefaultPool();
+    auto seg = MemorySegment::AllocateHeapMemory(1024, pool.get());
+    ASSERT_OK(bit_set->SetMemorySegment(seg));
+    for (int32_t i = 0; i < 100; i++) {
+        ASSERT_OK(bit_set->Set(i * 2 + 1));
+    }
+    for (int32_t i = 0; i < 100; i++) {
+        ASSERT_TRUE(bit_set->Get(i * 2 + 1));
+    }
+    bit_set->Clear();
+    for (int32_t i = 0; i < 100; i++) {
+        ASSERT_FALSE(bit_set->Get(i * 2 + 1));
+    }
+}
+
+TEST(BitSetTest, TestGetOutOfBound) {
+    auto pool = GetDefaultPool();
+    // 2 bytes = 16 bits
+    auto bit_set = std::make_shared<BitSet>(2);
+    auto seg = MemorySegment::AllocateHeapMemory(2, pool.get());
+    ASSERT_OK(bit_set->SetMemorySegment(seg));
+
+    ASSERT_OK(bit_set->Set(0));
+    ASSERT_OK(bit_set->Set(15));
+    ASSERT_TRUE(bit_set->Get(0));
+    ASSERT_TRUE(bit_set->Get(15));
+
+    // Out-of-bound access should return false (bit_size_ = 16)
+    ASSERT_FALSE(bit_set->Get(16));
+    ASSERT_FALSE(bit_set->Get(100));
+    ASSERT_FALSE(bit_set->Get(std::numeric_limits<uint32_t>::max()));
+
+    // Set out-of-bound should return error
+    ASSERT_NOK_WITH_MSG(bit_set->Set(16), "Index out of bound");
+}
+
+TEST(BitSetTest, TestClearNonAligned) {
+    auto pool = GetDefaultPool();
+    // 5 bytes = 40 bits, not a multiple of 8 bytes,
+    // exercises both the 8-byte loop (never enters) and the tail byte-by-byte 
loop
+    auto bit_set = std::make_shared<BitSet>(5);
+    auto seg = MemorySegment::AllocateHeapMemory(8, pool.get());
+    ASSERT_OK(bit_set->SetMemorySegment(seg));
+
+    // Set every bit
+    for (uint32_t i = 0; i < 40; i++) {
+        ASSERT_OK(bit_set->Set(i));
+    }
+    for (uint32_t i = 0; i < 40; i++) {
+        ASSERT_TRUE(bit_set->Get(i));
+    }
+
+    // Clear and verify all bits are unset
+    bit_set->Clear();
+    for (uint32_t i = 0; i < 40; i++) {
+        ASSERT_FALSE(bit_set->Get(i));
+    }
+
+    // 16 bytes = 128 bits, exact multiple of 8 bytes, exercises the 8-byte 
loop + tail loop
+    auto bit_set_aligned = std::make_shared<BitSet>(16);
+    auto seg2 = MemorySegment::AllocateHeapMemory(16, pool.get());
+    ASSERT_OK(bit_set_aligned->SetMemorySegment(seg2));
+
+    for (uint32_t i = 0; i < 128; i++) {
+        ASSERT_OK(bit_set_aligned->Set(i));
+    }
+    bit_set_aligned->Clear();
+    for (uint32_t i = 0; i < 128; i++) {
+        ASSERT_FALSE(bit_set_aligned->Get(i));
+    }
+}
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/roaring_bitmap32.cpp 
b/src/paimon/common/utils/roaring_bitmap32.cpp
new file mode 100644
index 0000000..af5214b
--- /dev/null
+++ b/src/paimon/common/utils/roaring_bitmap32.cpp
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/utils/roaring_bitmap32.h"
+
+#include <cassert>
+#include <memory>
+#include <utility>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/result.h"
+#include "roaring.hh"  // NOLINT(build/include_subdir)
+
+namespace paimon {
+namespace {
+roaring::Roaring::const_iterator& GetIterator(void* iter) {
+    return *(static_cast<roaring::Roaring::const_iterator*>(iter));
+}
+roaring::Roaring& GetRoaringBitmap(void* bitmap) {
+    return *(static_cast<roaring::Roaring*>(bitmap));
+}
+}  // namespace
+
+RoaringBitmap32::Iterator::Iterator(const RoaringBitmap32& bitmap) {
+    iterator_ =
+        new 
roaring::Roaring::const_iterator(GetRoaringBitmap(bitmap.roaring_bitmap_).begin());
+}
+
+RoaringBitmap32::Iterator::~Iterator() {
+    if (iterator_) {
+        delete static_cast<roaring::Roaring::const_iterator*>(iterator_);
+    }
+}
+
+RoaringBitmap32::Iterator::Iterator(const RoaringBitmap32::Iterator& other) 
noexcept {
+    *this = other;
+}
+RoaringBitmap32::Iterator& RoaringBitmap32::Iterator::operator=(
+    const RoaringBitmap32::Iterator& other) noexcept {
+    if (&other == this) {
+        return *this;
+    }
+    if (!iterator_) {
+        iterator_ = new 
roaring::Roaring::const_iterator(GetIterator(other.iterator_));
+    } else {
+        GetIterator(iterator_) = GetIterator(other.iterator_);
+    }
+    return *this;
+}
+
+RoaringBitmap32::Iterator::Iterator(RoaringBitmap32::Iterator&& other) 
noexcept {
+    *this = std::move(other);
+}
+
+RoaringBitmap32::Iterator& RoaringBitmap32::Iterator::operator=(
+    RoaringBitmap32::Iterator&& other) noexcept {
+    if (&other == this) {
+        return *this;
+    }
+    if (iterator_) {
+        delete static_cast<roaring::Roaring::const_iterator*>(iterator_);
+    }
+    iterator_ = other.iterator_;
+    other.iterator_ = nullptr;
+    return *this;
+}
+
+int32_t RoaringBitmap32::Iterator::operator*() const {
+    return *GetIterator(iterator_);
+}
+RoaringBitmap32::Iterator& RoaringBitmap32::Iterator::operator++() {
+    ++GetIterator(iterator_);
+    return *this;
+}
+bool RoaringBitmap32::Iterator::operator==(const Iterator& other) const {
+    if (&other == this) {
+        return true;
+    }
+    assert(iterator_ && other.iterator_);
+    return GetIterator(iterator_) == GetIterator(other.iterator_);
+}
+bool RoaringBitmap32::Iterator::operator!=(const Iterator& other) const {
+    return !(*this == other);
+}
+
+RoaringBitmap32::RoaringBitmap32() {
+    roaring_bitmap_ = new roaring::Roaring();
+}
+RoaringBitmap32::~RoaringBitmap32() {
+    if (roaring_bitmap_) {
+        delete static_cast<roaring::Roaring*>(roaring_bitmap_);
+    }
+}
+
+RoaringBitmap32::RoaringBitmap32(const RoaringBitmap32& other) noexcept {
+    *this = other;
+}
+RoaringBitmap32& RoaringBitmap32::operator=(const RoaringBitmap32& other) 
noexcept {
+    if (&other == this) {
+        return *this;
+    }
+    if (!roaring_bitmap_) {
+        roaring_bitmap_ = new 
roaring::Roaring(GetRoaringBitmap(other.roaring_bitmap_));
+    } else {
+        GetRoaringBitmap(roaring_bitmap_) = 
GetRoaringBitmap(other.roaring_bitmap_);
+    }
+    return *this;
+}
+
+RoaringBitmap32::RoaringBitmap32(RoaringBitmap32&& other) noexcept {
+    *this = std::move(other);
+}
+
+RoaringBitmap32& RoaringBitmap32::operator=(RoaringBitmap32&& other) noexcept {
+    if (&other == this) {
+        return *this;
+    }
+    if (roaring_bitmap_) {
+        delete static_cast<roaring::Roaring*>(roaring_bitmap_);
+    }
+    roaring_bitmap_ = other.roaring_bitmap_;
+    other.roaring_bitmap_ = nullptr;
+    return *this;
+}
+
+RoaringBitmap32& RoaringBitmap32::operator|=(const RoaringBitmap32& other) {
+    GetRoaringBitmap(roaring_bitmap_) |= 
GetRoaringBitmap(other.roaring_bitmap_);
+    return *this;
+}
+RoaringBitmap32& RoaringBitmap32::operator&=(const RoaringBitmap32& other) {
+    GetRoaringBitmap(roaring_bitmap_) &= 
GetRoaringBitmap(other.roaring_bitmap_);
+    return *this;
+}
+RoaringBitmap32& RoaringBitmap32::operator-=(const RoaringBitmap32& other) {
+    GetRoaringBitmap(roaring_bitmap_) -= 
GetRoaringBitmap(other.roaring_bitmap_);
+    return *this;
+}
+
+void RoaringBitmap32::Add(int32_t x) {
+    GetRoaringBitmap(roaring_bitmap_).add(x);
+}
+
+void RoaringBitmap32::AddRange(int32_t min, int32_t max) {
+    GetRoaringBitmap(roaring_bitmap_).addRange(min, max);
+}
+
+void RoaringBitmap32::RemoveRange(int32_t min, int32_t max) {
+    GetRoaringBitmap(roaring_bitmap_).removeRange(min, max);
+}
+
+bool RoaringBitmap32::ContainsAny(int32_t min, int32_t max) const {
+    auto iter = EqualOrLarger(min);
+    if (iter != End() && *iter < max) {
+        return true;
+    }
+    return false;
+}
+
+bool RoaringBitmap32::CheckedAdd(int32_t x) {
+    if (Contains(x)) {
+        return false;
+    }
+    Add(x);
+    return true;
+}
+
+int32_t RoaringBitmap32::Cardinality() const {
+    return GetRoaringBitmap(roaring_bitmap_).cardinality();
+}
+
+bool RoaringBitmap32::Contains(int32_t x) const {
+    return GetRoaringBitmap(roaring_bitmap_).contains(x);
+}
+
+bool RoaringBitmap32::IsEmpty() const {
+    return GetRoaringBitmap(roaring_bitmap_).isEmpty();
+}
+
+size_t RoaringBitmap32::GetSizeInBytes() const {
+    return GetRoaringBitmap(roaring_bitmap_).getSizeInBytes();
+}
+
+void RoaringBitmap32::Flip(int32_t min, int32_t max) {
+    GetRoaringBitmap(roaring_bitmap_).flip(min, max);
+}
+
+bool RoaringBitmap32::operator==(const RoaringBitmap32& other) const noexcept {
+    if (this == &other) {
+        return true;
+    }
+    assert(roaring_bitmap_ && other.roaring_bitmap_);
+    return GetRoaringBitmap(roaring_bitmap_) == 
GetRoaringBitmap(other.roaring_bitmap_);
+}
+
+PAIMON_UNIQUE_PTR<Bytes> RoaringBitmap32::Serialize(MemoryPool* pool) const {
+    GetRoaringBitmap(roaring_bitmap_).runOptimize();
+    auto& bitmap = GetRoaringBitmap(roaring_bitmap_);
+    // Use default pool if no pool is provided
+    if (pool == nullptr) {
+        pool = GetDefaultPool().get();
+    }
+    auto bytes = Bytes::AllocateBytes(bitmap.getSizeInBytes(), pool);
+    bitmap.write(bytes->data());
+    return bytes;
+}
+
+Status RoaringBitmap32::Deserialize(ByteArrayInputStream* input_stream) {
+    const char* data = input_stream->GetRawData();
+    PAIMON_ASSIGN_OR_RAISE(int64_t pos, input_stream->GetPos());
+    PAIMON_ASSIGN_OR_RAISE(int64_t total_length, input_stream->Length());
+    try {
+        GetRoaringBitmap(roaring_bitmap_) =
+            roaring::Roaring::readSafe(data, /*maxbytes=*/total_length - pos);
+    } catch (...) {
+        return Status::Invalid("catch exception in Deserialize() of 
RoaringBitmap32");
+    }
+    return 
input_stream->Seek(GetRoaringBitmap(roaring_bitmap_).getSizeInBytes(),
+                              SeekOrigin::FS_SEEK_CUR);
+}
+
+Status RoaringBitmap32::Deserialize(const char* begin, size_t length) {
+    try {
+        GetRoaringBitmap(roaring_bitmap_) = roaring::Roaring::readSafe(begin, 
length);
+    } catch (...) {
+        return Status::Invalid("catch exception in Deserialize() of 
RoaringBitmap32");
+    }
+    return Status::OK();
+}
+
+std::string RoaringBitmap32::ToString() const {
+    return GetRoaringBitmap(roaring_bitmap_).toString();
+}
+
+RoaringBitmap32 RoaringBitmap32::And(const RoaringBitmap32& lhs, const 
RoaringBitmap32& rhs) {
+    RoaringBitmap32 res;
+    GetRoaringBitmap(res.roaring_bitmap_) =
+        (GetRoaringBitmap(lhs.roaring_bitmap_) & 
GetRoaringBitmap(rhs.roaring_bitmap_));
+    return res;
+}
+
+RoaringBitmap32 RoaringBitmap32::Or(const RoaringBitmap32& lhs, const 
RoaringBitmap32& rhs) {
+    RoaringBitmap32 res;
+    GetRoaringBitmap(res.roaring_bitmap_) =
+        (GetRoaringBitmap(lhs.roaring_bitmap_) | 
GetRoaringBitmap(rhs.roaring_bitmap_));
+    return res;
+}
+
+RoaringBitmap32 RoaringBitmap32::AndNot(const RoaringBitmap32& lhs, const 
RoaringBitmap32& rhs) {
+    RoaringBitmap32 res;
+    GetRoaringBitmap(res.roaring_bitmap_) =
+        (GetRoaringBitmap(lhs.roaring_bitmap_) - 
GetRoaringBitmap(rhs.roaring_bitmap_));
+    return res;
+}
+
+RoaringBitmap32 RoaringBitmap32::From(const std::vector<int32_t>& values) {
+    RoaringBitmap32 res;
+    for (const auto& value : values) {
+        res.Add(value);
+    }
+    return res;
+}
+
+RoaringBitmap32 RoaringBitmap32::FastUnion(const std::vector<const 
RoaringBitmap32*>& inputs) {
+    std::vector<roaring::Roaring*> roaring_inputs;
+    roaring_inputs.reserve(inputs.size());
+    for (const auto* roaring : inputs) {
+        roaring_inputs.push_back(&GetRoaringBitmap(roaring->roaring_bitmap_));
+    }
+    RoaringBitmap32 res;
+    GetRoaringBitmap(res.roaring_bitmap_) = roaring::Roaring::fastunion(
+        roaring_inputs.size(), const_cast<const 
roaring::Roaring**>(roaring_inputs.data()));
+    return res;
+}
+
+RoaringBitmap32 RoaringBitmap32::FastUnion(const std::vector<RoaringBitmap32>& 
inputs) {
+    std::vector<const RoaringBitmap32*> inputs_ptr;
+    inputs_ptr.reserve(inputs.size());
+    for (const auto& bitmap : inputs) {
+        inputs_ptr.push_back(&bitmap);
+    }
+    return FastUnion(inputs_ptr);
+}
+RoaringBitmap32::Iterator RoaringBitmap32::Begin() const {
+    return RoaringBitmap32::Iterator(*this);
+}
+
+RoaringBitmap32::Iterator RoaringBitmap32::End() const {
+    RoaringBitmap32::Iterator iter(*this);
+    GetIterator(iter.iterator_) = GetRoaringBitmap(roaring_bitmap_).end();
+    return iter;
+}
+
+RoaringBitmap32::Iterator RoaringBitmap32::EqualOrLarger(int32_t key) const {
+    RoaringBitmap32::Iterator iter(*this);
+    GetIterator(iter.iterator_).equalorlarger(key);
+    return iter;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/roaring_bitmap32_test.cpp 
b/src/paimon/common/utils/roaring_bitmap32_test.cpp
new file mode 100644
index 0000000..008ae25
--- /dev/null
+++ b/src/paimon/common/utils/roaring_bitmap32_test.cpp
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/utils/roaring_bitmap32.h"
+
+#include <cstdlib>
+#include <cstring>
+#include <ctime>
+#include <memory>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/result.h"
+#include "paimon/testing/utils/testharness.h"
+
+namespace paimon::test {
+TEST(RoaringBitmap32Test, TestSimple) {
+    RoaringBitmap32 roaring;
+    ASSERT_TRUE(roaring.IsEmpty());
+    roaring.Add(10);
+    roaring.Add(100);
+    ASSERT_TRUE(roaring.Contains(10));
+    ASSERT_TRUE(roaring.Contains(100));
+    ASSERT_FALSE(roaring.CheckedAdd(10));
+    ASSERT_TRUE(roaring.CheckedAdd(20));
+    ASSERT_TRUE(roaring.Contains(20));
+    ASSERT_FALSE(roaring.IsEmpty());
+    ASSERT_EQ("{10,20,100}", roaring.ToString());
+    ASSERT_EQ(3, roaring.Cardinality());
+    roaring.AddRange(10, 15);
+    ASSERT_EQ("{10,11,12,13,14,20,100}", roaring.ToString());
+    ASSERT_EQ(7, roaring.Cardinality());
+    roaring.RemoveRange(12, 20);
+    ASSERT_EQ("{10,11,20,100}", roaring.ToString());
+    ASSERT_EQ(4, roaring.Cardinality());
+}
+TEST(RoaringBitmap32Test, TestCompatibleWithJava) {
+    auto pool = GetDefaultPool();
+    {
+        RoaringBitmap32 roaring;
+        for (int32_t i = 0; i < 100; i++) {
+            roaring.Add(i);
+        }
+        for (int32_t i = 0; i < 100; i++) {
+            ASSERT_TRUE(roaring.Contains(i));
+        }
+        auto bytes = roaring.Serialize(pool.get());
+        std::vector<uint8_t> result(bytes->data(), bytes->data() + 
bytes->size());
+        std::vector<uint8_t> expected = {59, 48, 0, 0, 1, 0, 0, 99, 0, 1, 0, 
0, 0, 99, 0};
+        ASSERT_EQ(result, expected);
+
+        RoaringBitmap32 de_roaring;
+        ASSERT_OK(de_roaring.Deserialize(bytes->data(), bytes->size()));
+        for (int32_t i = 0; i < 100; i++) {
+            ASSERT_TRUE(de_roaring.Contains(i));
+        }
+    }
+    {
+        RoaringBitmap32 roaring;
+        for (int32_t i = RoaringBitmap32::MAX_VALUE - 1; i > 
RoaringBitmap32::MAX_VALUE - 101;
+             i--) {
+            roaring.Add(i);
+        }
+        for (int32_t i = RoaringBitmap32::MAX_VALUE - 1; i > 
RoaringBitmap32::MAX_VALUE - 101;
+             i--) {
+            ASSERT_TRUE(roaring.Contains(i));
+        }
+        auto bytes = roaring.Serialize(pool.get());
+        std::vector<uint8_t> result(bytes->data(), bytes->data() + 
bytes->size());
+        std::vector<uint8_t> expected = {59, 48, 0, 0, 1, 255, 127, 99, 0, 1, 
0, 155, 255, 99, 0};
+        ASSERT_EQ(result, expected);
+
+        RoaringBitmap32 de_roaring;
+        ASSERT_OK(de_roaring.Deserialize(bytes->data(), bytes->size()));
+        for (int32_t i = RoaringBitmap32::MAX_VALUE - 1; i > 
RoaringBitmap32::MAX_VALUE - 101;
+             i--) {
+            ASSERT_TRUE(de_roaring.Contains(i));
+        }
+    }
+    {
+        RoaringBitmap32 roaring;
+        for (int32_t i = 5000; i < 10000; i += 17) {
+            roaring.Add(i);
+        }
+        for (int32_t i = 5000; i < 10000; i += 17) {
+            ASSERT_TRUE(roaring.Contains(i));
+        }
+        auto bytes = roaring.Serialize(pool.get());
+        std::vector<uint8_t> result(bytes->data(), bytes->data() + 
bytes->size());
+        std::vector<uint8_t> expected = {
+            58,  48, 0,   0,  1,   0,  0,   0,  0,   0,  38,  1,  16,  0,  0,  
 0,  136, 19,
+            153, 19, 170, 19, 187, 19, 204, 19, 221, 19, 238, 19, 255, 19, 16, 
 20, 33,  20,
+            50,  20, 67,  20, 84,  20, 101, 20, 118, 20, 135, 20, 152, 20, 
169, 20, 186, 20,
+            203, 20, 220, 20, 237, 20, 254, 20, 15,  21, 32,  21, 49,  21, 66, 
 21, 83,  21,
+            100, 21, 117, 21, 134, 21, 151, 21, 168, 21, 185, 21, 202, 21, 
219, 21, 236, 21,
+            253, 21, 14,  22, 31,  22, 48,  22, 65,  22, 82,  22, 99,  22, 
116, 22, 133, 22,
+            150, 22, 167, 22, 184, 22, 201, 22, 218, 22, 235, 22, 252, 22, 13, 
 23, 30,  23,
+            47,  23, 64,  23, 81,  23, 98,  23, 115, 23, 132, 23, 149, 23, 
166, 23, 183, 23,
+            200, 23, 217, 23, 234, 23, 251, 23, 12,  24, 29,  24, 46,  24, 63, 
 24, 80,  24,
+            97,  24, 114, 24, 131, 24, 148, 24, 165, 24, 182, 24, 199, 24, 
216, 24, 233, 24,
+            250, 24, 11,  25, 28,  25, 45,  25, 62,  25, 79,  25, 96,  25, 
113, 25, 130, 25,
+            147, 25, 164, 25, 181, 25, 198, 25, 215, 25, 232, 25, 249, 25, 10, 
 26, 27,  26,
+            44,  26, 61,  26, 78,  26, 95,  26, 112, 26, 129, 26, 146, 26, 
163, 26, 180, 26,
+            197, 26, 214, 26, 231, 26, 248, 26, 9,   27, 26,  27, 43,  27, 60, 
 27, 77,  27,
+            94,  27, 111, 27, 128, 27, 145, 27, 162, 27, 179, 27, 196, 27, 
213, 27, 230, 27,
+            247, 27, 8,   28, 25,  28, 42,  28, 59,  28, 76,  28, 93,  28, 
110, 28, 127, 28,
+            144, 28, 161, 28, 178, 28, 195, 28, 212, 28, 229, 28, 246, 28, 7,  
 29, 24,  29,
+            41,  29, 58,  29, 75,  29, 92,  29, 109, 29, 126, 29, 143, 29, 
160, 29, 177, 29,
+            194, 29, 211, 29, 228, 29, 245, 29, 6,   30, 23,  30, 40,  30, 57, 
 30, 74,  30,
+            91,  30, 108, 30, 125, 30, 142, 30, 159, 30, 176, 30, 193, 30, 
210, 30, 227, 30,
+            244, 30, 5,   31, 22,  31, 39,  31, 56,  31, 73,  31, 90,  31, 
107, 31, 124, 31,
+            141, 31, 158, 31, 175, 31, 192, 31, 209, 31, 226, 31, 243, 31, 4,  
 32, 21,  32,
+            38,  32, 55,  32, 72,  32, 89,  32, 106, 32, 123, 32, 140, 32, 
157, 32, 174, 32,
+            191, 32, 208, 32, 225, 32, 242, 32, 3,   33, 20,  33, 37,  33, 54, 
 33, 71,  33,
+            88,  33, 105, 33, 122, 33, 139, 33, 156, 33, 173, 33, 190, 33, 
207, 33, 224, 33,
+            241, 33, 2,   34, 19,  34, 36,  34, 53,  34, 70,  34, 87,  34, 
104, 34, 121, 34,
+            138, 34, 155, 34, 172, 34, 189, 34, 206, 34, 223, 34, 240, 34, 1,  
 35, 18,  35,
+            35,  35, 52,  35, 69,  35, 86,  35, 103, 35, 120, 35, 137, 35, 
154, 35, 171, 35,
+            188, 35, 205, 35, 222, 35, 239, 35, 0,   36, 17,  36, 34,  36, 51, 
 36, 68,  36,
+            85,  36, 102, 36, 119, 36, 136, 36, 153, 36, 170, 36, 187, 36, 
204, 36, 221, 36,
+            238, 36, 255, 36, 16,  37, 33,  37, 50,  37, 67,  37, 84,  37, 
101, 37, 118, 37,
+            135, 37, 152, 37, 169, 37, 186, 37, 203, 37, 220, 37, 237, 37, 
254, 37, 15,  38,
+            32,  38, 49,  38, 66,  38, 83,  38, 100, 38, 117, 38, 134, 38, 
151, 38, 168, 38,
+            185, 38, 202, 38, 219, 38, 236, 38, 253, 38, 14,  39};
+        ASSERT_EQ(result, expected);
+
+        RoaringBitmap32 de_roaring;
+        ASSERT_OK(de_roaring.Deserialize(bytes->data(), bytes->size()));
+        for (int32_t i = 5000; i < 10000; i += 17) {
+            ASSERT_TRUE(de_roaring.Contains(i));
+        }
+    }
+    // empty
+    {
+        RoaringBitmap32 roaring;
+        auto bytes = roaring.Serialize(pool.get());
+        std::vector<uint8_t> result(bytes->data(), bytes->data() + 
bytes->size());
+        std::vector<uint8_t> expected = {58, 48, 0, 0, 0, 0, 0, 0};
+        ASSERT_EQ(result, expected);
+        RoaringBitmap32 de_roaring;
+        ASSERT_OK(de_roaring.Deserialize(bytes->data(), bytes->size()));
+        ASSERT_TRUE(de_roaring.IsEmpty());
+        ASSERT_FALSE(de_roaring.Contains(58));
+    }
+}
+
+TEST(RoaringBitmap32Test, TestDeserializeFailed) {
+    RoaringBitmap32 roaring;
+    std::vector<char> invalid_bytes = {0, 100};
+    ASSERT_NOK_WITH_MSG(roaring.Deserialize(invalid_bytes.data(), 
invalid_bytes.size()),
+                        "catch exception in Deserialize() of RoaringBitmap32");
+}
+
+TEST(RoaringBitmap32Test, TestAndOr) {
+    RoaringBitmap32 roaring1 = RoaringBitmap32::From({10, 100});
+    RoaringBitmap32 roaring2 = RoaringBitmap32::From({20, 100, 200});
+
+    auto and_roaring = RoaringBitmap32::And(roaring1, roaring2);
+    ASSERT_EQ(and_roaring, RoaringBitmap32::From({100}));
+
+    auto or_roaring = RoaringBitmap32::Or(roaring1, roaring2);
+    ASSERT_EQ(or_roaring, RoaringBitmap32::From({10, 20, 100, 200}));
+}
+
+TEST(RoaringBitmap32Test, TestAssignAndMove) {
+    RoaringBitmap32 roaring1 = RoaringBitmap32::From({10, 100});
+    RoaringBitmap32 roaring2 = roaring1;
+    ASSERT_EQ(roaring1, roaring2);
+    ASSERT_FALSE(roaring1.IsEmpty());
+    ASSERT_FALSE(roaring2.IsEmpty());
+
+    RoaringBitmap32 roaring3 = std::move(roaring1);
+    ASSERT_EQ(roaring3, roaring2);
+    ASSERT_FALSE(roaring1.roaring_bitmap_);  // NOLINT(bugprone-use-after-move)
+
+    roaring3.Add(20);
+    ASSERT_FALSE(roaring3 == roaring2);
+
+    roaring3 = roaring2;
+    ASSERT_EQ(roaring3, roaring2);
+
+    roaring2 = std::move(roaring3);
+    ASSERT_FALSE(roaring3.roaring_bitmap_);  // NOLINT(bugprone-use-after-move)
+    ASSERT_EQ("{10,100}", roaring2.ToString());
+
+    roaring3 = roaring2;
+    ASSERT_EQ("{10,100}", roaring3.ToString());
+
+    roaring3 = std::move(roaring2);
+    ASSERT_EQ("{10,100}", roaring3.ToString());
+    ASSERT_FALSE(roaring2.roaring_bitmap_);  // NOLINT(bugprone-use-after-move)
+}
+
+TEST(RoaringBitmap32Test, TestFastUnion) {
+    RoaringBitmap32 roaring1 = RoaringBitmap32::From({10, 100});
+    RoaringBitmap32 roaring2 = RoaringBitmap32::From({1, 10, 20, 100, 300});
+    RoaringBitmap32 roaring3 = RoaringBitmap32::From({2, 100, 800});
+
+    RoaringBitmap32 res = RoaringBitmap32::FastUnion({&roaring1, &roaring2, 
&roaring3});
+    ASSERT_EQ(res, RoaringBitmap32::From({1, 2, 10, 20, 100, 300, 800}));
+
+    std::vector<RoaringBitmap32> roarings = {roaring1, roaring2, roaring3};
+    RoaringBitmap32 res2 = RoaringBitmap32::FastUnion(roarings);
+    ASSERT_EQ(res2, RoaringBitmap32::From({1, 2, 10, 20, 100, 300, 800}));
+}
+
+TEST(RoaringBitmap32Test, TestFlip) {
+    RoaringBitmap32 roaring = RoaringBitmap32::From({1, 2, 4});
+    roaring.Flip(0, 5);
+    ASSERT_EQ(roaring, RoaringBitmap32::From({0, 3}));
+}
+
+TEST(RoaringBitmap32Test, TestAndNot) {
+    RoaringBitmap32 roaring1 = RoaringBitmap32::From({10, 20, 100, 200});
+    RoaringBitmap32 roaring2 = RoaringBitmap32::From({5, 20, 80, 200, 250});
+
+    auto andnot_roaring = RoaringBitmap32::AndNot(roaring1, roaring2);
+    ASSERT_EQ(andnot_roaring, RoaringBitmap32::From({10, 100}));
+}
+
+TEST(RoaringBitmap32Test, TestIterator) {
+    RoaringBitmap32 roaring = RoaringBitmap32::From({10, 20});
+    auto iter = roaring.Begin();
+    ASSERT_EQ(*iter, 10);
+    auto iter2 = ++iter;
+    ASSERT_EQ(*iter, 20);
+    ASSERT_EQ(*iter2, 20);
+    ++iter;
+    ASSERT_EQ(iter, roaring.End());
+
+    iter = roaring.EqualOrLarger(5);
+    ASSERT_EQ(*iter, 10);
+    iter = roaring.EqualOrLarger(10);
+    ASSERT_EQ(*iter, 10);
+    iter = roaring.EqualOrLarger(15);
+    ASSERT_EQ(*iter, 20);
+    iter = roaring.EqualOrLarger(20);
+    ASSERT_EQ(*iter, 20);
+    iter = roaring.EqualOrLarger(100);
+    ASSERT_EQ(iter, roaring.End());
+}
+
+TEST(RoaringBitmap32Test, TestIteratorAssignAndMove) {
+    RoaringBitmap32 roaring = RoaringBitmap32::From({10, 100, 200});
+
+    auto iter1 = roaring.EqualOrLarger(10);
+    auto iter2 = iter1;
+    ASSERT_EQ(iter1, iter2);
+    ASSERT_EQ(10, *iter1);
+    ASSERT_EQ(10, *iter2);
+
+    auto iter3 = std::move(iter1);
+    ASSERT_EQ(iter2, iter3);
+    ASSERT_FALSE(iter1.iterator_);  // NOLINT(bugprone-use-after-move)
+
+    ++iter3;
+    ASSERT_NE(iter3, iter2);
+    ASSERT_EQ(100, *iter3);
+
+    iter3 = iter2;
+    ASSERT_EQ(iter3, iter2);
+
+    iter2 = std::move(iter3);
+    ASSERT_FALSE(iter3.iterator_);  // NOLINT(bugprone-use-after-move)
+    ASSERT_EQ(10, *iter2);
+
+    iter3 = iter2;
+    ASSERT_EQ(iter2, iter3);
+    ASSERT_EQ(10, *iter2);
+    ASSERT_EQ(10, *iter3);
+
+    iter3 = std::move(iter2);
+    ASSERT_EQ(10, *iter3);
+    ASSERT_FALSE(iter2.iterator_);  // NOLINT(bugprone-use-after-move)
+}
+
+TEST(RoaringBitmap32Test, TestDeserializeFromInputStream) {
+    RoaringBitmap32 roaring1 = RoaringBitmap32::From({10, 20, 100});
+    RoaringBitmap32 roaring2 = RoaringBitmap32::From({10, 50, 150, 200});
+    RoaringBitmap32 roaring3 = RoaringBitmap32::From({2, 4, 6, 1000});
+
+    size_t len1 = roaring1.GetSizeInBytes();
+    size_t len2 = roaring2.GetSizeInBytes();
+    size_t len3 = roaring3.GetSizeInBytes();
+
+    auto pool = GetDefaultPool();
+    auto bytes1 = roaring1.Serialize(pool.get());
+    ASSERT_EQ(bytes1->size(), len1);
+    auto bytes2 = roaring2.Serialize(pool.get());
+    ASSERT_EQ(bytes2->size(), len2);
+    auto bytes3 = roaring3.Serialize(pool.get());
+    ASSERT_EQ(bytes3->size(), len3);
+
+    auto concat_bytes = std::make_shared<Bytes>(len1 + len2 + len3, 
pool.get());
+    memcpy(concat_bytes->data(), bytes1->data(), len1);
+    memcpy(concat_bytes->data() + len1, bytes2->data(), len2);
+    memcpy(concat_bytes->data() + len1 + len2, bytes3->data(), len3);
+
+    ByteArrayInputStream byte_array_input_stream(concat_bytes->data(), 
concat_bytes->size());
+    RoaringBitmap32 de_roaring1;
+    ASSERT_OK(de_roaring1.Deserialize(&byte_array_input_stream));
+    ASSERT_EQ(de_roaring1, roaring1);
+    ASSERT_EQ(byte_array_input_stream.GetPos().value(), len1);
+
+    RoaringBitmap32 de_roaring2;
+    ASSERT_OK(de_roaring2.Deserialize(&byte_array_input_stream));
+    ASSERT_EQ(de_roaring2, roaring2);
+    ASSERT_EQ(byte_array_input_stream.GetPos().value(), len1 + len2);
+
+    RoaringBitmap32 de_roaring3;
+    ASSERT_OK(de_roaring3.Deserialize(&byte_array_input_stream));
+    ASSERT_EQ(de_roaring3, roaring3);
+    ASSERT_EQ(byte_array_input_stream.GetPos().value(), len1 + len2 + len3);
+}
+
+TEST(RoaringBitmap32Test, TestHighCardinality) {
+    std::srand(time(nullptr));
+    auto pool = GetDefaultPool();
+    RoaringBitmap32 roaring;
+    for (int32_t i = 0; i < 10000; i++) {
+        roaring.Add(std::rand());
+    }
+    auto bytes = roaring.Serialize(pool.get());
+    RoaringBitmap32 de_roaring;
+    ASSERT_OK(de_roaring.Deserialize(bytes->data(), bytes->size()));
+    ASSERT_EQ(roaring, de_roaring);
+}
+
+TEST(RoaringBitmap32Test, TestInplaceAndOr) {
+    RoaringBitmap32 roaring = RoaringBitmap32::From({0, 1, 100});
+    RoaringBitmap32 roaring1 = RoaringBitmap32::From({1, 2, 5, 200});
+    roaring |= roaring1;
+    ASSERT_EQ(roaring.ToString(), "{0,1,2,5,100,200}");
+    RoaringBitmap32 roaring2 = RoaringBitmap32::From({1, 2, 3, 5, 100, 500});
+    roaring &= roaring2;
+    ASSERT_EQ(roaring.ToString(), "{1,2,5,100}");
+    RoaringBitmap32 roaring3 = RoaringBitmap32::From({2, 100});
+    roaring -= roaring3;
+    ASSERT_EQ(roaring.ToString(), "{1,5}");
+}
+
+TEST(RoaringBitmap32Test, TestContainsAny) {
+    RoaringBitmap32 roaring = RoaringBitmap32::From({10, 11, 100});
+    ASSERT_TRUE(roaring.ContainsAny(10, 11));
+    ASSERT_TRUE(roaring.ContainsAny(10, 20));
+    ASSERT_TRUE(roaring.ContainsAny(10, 101));
+    ASSERT_TRUE(roaring.ContainsAny(20, 200));
+    ASSERT_TRUE(roaring.ContainsAny(5, 11));
+    ASSERT_FALSE(roaring.ContainsAny(5, 10));
+    ASSERT_FALSE(roaring.ContainsAny(20, 100));
+    ASSERT_FALSE(roaring.ContainsAny(20, 30));
+    ASSERT_FALSE(roaring.ContainsAny(500, 520));
+}
+
+}  // namespace paimon::test
diff --git a/src/paimon/common/utils/roaring_bitmap64.cpp 
b/src/paimon/common/utils/roaring_bitmap64.cpp
new file mode 100644
index 0000000..510ef4e
--- /dev/null
+++ b/src/paimon/common/utils/roaring_bitmap64.cpp
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/utils/roaring_bitmap64.h"
+
+#include <cassert>
+#include <memory>
+#include <utility>
+
+#include "paimon/fs/file_system.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/memory/memory_pool.h"
+#include "paimon/result.h"
+#include "roaring.hh"  // NOLINT(build/include_subdir)
+
+namespace paimon {
+namespace {
+roaring::Roaring64Map::const_iterator& GetIterator(void* iter) {
+    return *(static_cast<roaring::Roaring64Map::const_iterator*>(iter));
+}
+roaring::Roaring64Map& GetRoaringBitmap(void* bitmap) {
+    return *(static_cast<roaring::Roaring64Map*>(bitmap));
+}
+}  // namespace
+
+RoaringBitmap64::Iterator::Iterator(const RoaringBitmap64& bitmap) {
+    iterator_ =
+        new 
roaring::Roaring64Map::const_iterator(GetRoaringBitmap(bitmap.roaring_bitmap_).begin());
+}
+
+RoaringBitmap64::Iterator::~Iterator() {
+    if (iterator_) {
+        delete static_cast<roaring::Roaring64Map::const_iterator*>(iterator_);
+    }
+}
+
+RoaringBitmap64::Iterator::Iterator(const RoaringBitmap64::Iterator& other) 
noexcept {
+    *this = other;
+}
+RoaringBitmap64::Iterator& RoaringBitmap64::Iterator::operator=(
+    const RoaringBitmap64::Iterator& other) noexcept {
+    if (&other == this) {
+        return *this;
+    }
+    if (!iterator_) {
+        iterator_ = new 
roaring::Roaring64Map::const_iterator(GetIterator(other.iterator_));
+    } else {
+        GetIterator(iterator_) = GetIterator(other.iterator_);
+    }
+    return *this;
+}
+
+RoaringBitmap64::Iterator::Iterator(RoaringBitmap64::Iterator&& other) 
noexcept {
+    *this = std::move(other);
+}
+
+RoaringBitmap64::Iterator& RoaringBitmap64::Iterator::operator=(
+    RoaringBitmap64::Iterator&& other) noexcept {
+    if (&other == this) {
+        return *this;
+    }
+    if (iterator_) {
+        delete static_cast<roaring::Roaring64Map::const_iterator*>(iterator_);
+    }
+    iterator_ = other.iterator_;
+    other.iterator_ = nullptr;
+    return *this;
+}
+
+int64_t RoaringBitmap64::Iterator::operator*() const {
+    return *GetIterator(iterator_);
+}
+RoaringBitmap64::Iterator& RoaringBitmap64::Iterator::operator++() {
+    ++GetIterator(iterator_);
+    return *this;
+}
+bool RoaringBitmap64::Iterator::operator==(const Iterator& other) const {
+    if (&other == this) {
+        return true;
+    }
+    assert(iterator_ && other.iterator_);
+    return GetIterator(iterator_) == GetIterator(other.iterator_);
+}
+bool RoaringBitmap64::Iterator::operator!=(const Iterator& other) const {
+    return !(*this == other);
+}
+
+void RoaringBitmap64::Iterator::EqualOrLarger(int64_t value) {
+    [[maybe_unused]] bool _ = GetIterator(iterator_).move(value);
+}
+
+RoaringBitmap64::RoaringBitmap64() {
+    roaring_bitmap_ = new roaring::Roaring64Map();
+}
+RoaringBitmap64::~RoaringBitmap64() {
+    if (roaring_bitmap_) {
+        delete static_cast<roaring::Roaring64Map*>(roaring_bitmap_);
+    }
+}
+
+RoaringBitmap64::RoaringBitmap64(const RoaringBitmap64& other) noexcept {
+    *this = other;
+}
+RoaringBitmap64& RoaringBitmap64::operator=(const RoaringBitmap64& other) 
noexcept {
+    if (&other == this) {
+        return *this;
+    }
+    if (!roaring_bitmap_) {
+        roaring_bitmap_ = new 
roaring::Roaring64Map(GetRoaringBitmap(other.roaring_bitmap_));
+    } else {
+        GetRoaringBitmap(roaring_bitmap_) = 
GetRoaringBitmap(other.roaring_bitmap_);
+    }
+    return *this;
+}
+
+RoaringBitmap64::RoaringBitmap64(const RoaringBitmap32& other) noexcept {
+    *this = other;
+}
+
+RoaringBitmap64& RoaringBitmap64::operator=(const RoaringBitmap32& other) 
noexcept {
+    auto bitmap32 = static_cast<roaring::Roaring*>(other.roaring_bitmap_);
+    if (!roaring_bitmap_) {
+        roaring_bitmap_ = new roaring::Roaring64Map(*bitmap32);
+    } else {
+        GetRoaringBitmap(roaring_bitmap_) = roaring::Roaring64Map(*bitmap32);
+    }
+    return *this;
+}
+
+RoaringBitmap64::RoaringBitmap64(RoaringBitmap64&& other) noexcept {
+    *this = std::move(other);
+}
+
+RoaringBitmap64& RoaringBitmap64::operator=(RoaringBitmap64&& other) noexcept {
+    if (&other == this) {
+        return *this;
+    }
+    if (roaring_bitmap_) {
+        delete static_cast<roaring::Roaring64Map*>(roaring_bitmap_);
+    }
+    roaring_bitmap_ = other.roaring_bitmap_;
+    other.roaring_bitmap_ = nullptr;
+    return *this;
+}
+
+RoaringBitmap64& RoaringBitmap64::operator|=(const RoaringBitmap64& other) {
+    GetRoaringBitmap(roaring_bitmap_) |= 
GetRoaringBitmap(other.roaring_bitmap_);
+    return *this;
+}
+RoaringBitmap64& RoaringBitmap64::operator&=(const RoaringBitmap64& other) {
+    GetRoaringBitmap(roaring_bitmap_) &= 
GetRoaringBitmap(other.roaring_bitmap_);
+    return *this;
+}
+RoaringBitmap64& RoaringBitmap64::operator-=(const RoaringBitmap64& other) {
+    GetRoaringBitmap(roaring_bitmap_) -= 
GetRoaringBitmap(other.roaring_bitmap_);
+    return *this;
+}
+
+void RoaringBitmap64::Add(int64_t x) {
+    GetRoaringBitmap(roaring_bitmap_).add(static_cast<uint64_t>(x));
+}
+
+void RoaringBitmap64::AddMany(size_t n, const int64_t* values) {
+    if (n == 0 || !values) {
+        return;
+    }
+    auto& bitmap = GetRoaringBitmap(roaring_bitmap_);
+
+    // Bucket values by their high-32 bits in a single pass. K (the number of
+    // distinct buckets) is typically very small (often 1, rarely > a handful).
+    struct Bucket {
+        uint32_t high;
+        std::vector<uint32_t> lows;
+        explicit Bucket(uint32_t high_val) : high(high_val) {}
+    };
+    std::vector<Bucket> buckets;
+    buckets.reserve(4);
+
+    for (size_t i = 0; i < n; ++i) {
+        const auto v = static_cast<uint64_t>(values[i]);
+        const auto high = static_cast<uint32_t>(v >> 32);
+        const auto low = static_cast<uint32_t>(v);
+
+        Bucket* target = nullptr;
+        // Fast path: the previous bucket is overwhelmingly likely to match for
+        // sequential row-id streams produced by a B-tree scan.
+        if (!buckets.empty() && buckets.back().high == high) {
+            target = &buckets.back();
+        } else {
+            for (auto& b : buckets) {
+                if (b.high == high) {
+                    target = &b;
+                    break;
+                }
+            }
+            if (target == nullptr) {
+                buckets.emplace_back(high);
+                // Reserve generously on first encounter; we may end up using
+                // more memory than necessary if K turns out to be > 1, but
+                // this avoids repeated grow-and-copy in the common K == 1 
case.
+                buckets.back().lows.reserve(buckets.size() == 1 ? n : n / 4);
+                target = &buckets.back();
+            }
+        }
+        target->lows.push_back(low);
+    }
+
+    // Hand each bucket to the inner 32-bit Roaring's true-batch addMany,
+    // which performs container-level bulk insertion.
+    for (const auto& b : buckets) {
+        bitmap.getOrCreateInner(b.high).addMany(b.lows.size(), b.lows.data());
+    }
+}
+
+void RoaringBitmap64::AddRange(int64_t min, int64_t max) {
+    GetRoaringBitmap(roaring_bitmap_).addRange(min, max);
+}
+
+void RoaringBitmap64::RemoveRange(int64_t min, int64_t max) {
+    GetRoaringBitmap(roaring_bitmap_).removeRange(min, max);
+}
+
+bool RoaringBitmap64::ContainsAny(int64_t min, int64_t max) const {
+    auto iter = EqualOrLarger(min);
+    if (iter != End() && *iter < max) {
+        return true;
+    }
+    return false;
+}
+
+bool RoaringBitmap64::CheckedAdd(int64_t x) {
+    if (Contains(x)) {
+        return false;
+    }
+    Add(x);
+    return true;
+}
+
+int64_t RoaringBitmap64::Cardinality() const {
+    return GetRoaringBitmap(roaring_bitmap_).cardinality();
+}
+
+bool RoaringBitmap64::Contains(int64_t x) const {
+    return 
GetRoaringBitmap(roaring_bitmap_).contains(static_cast<uint64_t>(x));
+}
+
+bool RoaringBitmap64::IsEmpty() const {
+    return GetRoaringBitmap(roaring_bitmap_).isEmpty();
+}
+
+size_t RoaringBitmap64::GetSizeInBytes() const {
+    return GetRoaringBitmap(roaring_bitmap_).getSizeInBytes();
+}
+
+void RoaringBitmap64::Flip(int64_t min, int64_t max) {
+    GetRoaringBitmap(roaring_bitmap_).flip(min, max);
+}
+
+bool RoaringBitmap64::operator==(const RoaringBitmap64& other) const noexcept {
+    if (this == &other) {
+        return true;
+    }
+    assert(roaring_bitmap_ && other.roaring_bitmap_);
+    return GetRoaringBitmap(roaring_bitmap_) == 
GetRoaringBitmap(other.roaring_bitmap_);
+}
+
+PAIMON_UNIQUE_PTR<Bytes> RoaringBitmap64::Serialize(MemoryPool* pool) const {
+    GetRoaringBitmap(roaring_bitmap_).runOptimize();
+    auto& bitmap = GetRoaringBitmap(roaring_bitmap_);
+    // Use default pool if no pool is provided
+    if (pool == nullptr) {
+        pool = GetDefaultPool().get();
+    }
+    auto bytes = Bytes::AllocateBytes(bitmap.getSizeInBytes(), pool);
+    bitmap.write(bytes->data());
+    return bytes;
+}
+
+Status RoaringBitmap64::Deserialize(ByteArrayInputStream* input_stream) {
+    const char* data = input_stream->GetRawData();
+    PAIMON_ASSIGN_OR_RAISE(int64_t pos, input_stream->GetPos());
+    PAIMON_ASSIGN_OR_RAISE(int64_t total_length, input_stream->Length());
+    try {
+        GetRoaringBitmap(roaring_bitmap_) =
+            roaring::Roaring64Map::readSafe(data, /*maxbytes*/ total_length - 
pos);
+    } catch (...) {
+        return Status::Invalid("catch exception in Deserialize() of 
RoaringBitmap64");
+    }
+    return 
input_stream->Seek(GetRoaringBitmap(roaring_bitmap_).getSizeInBytes(),
+                              SeekOrigin::FS_SEEK_CUR);
+}
+
+Status RoaringBitmap64::Deserialize(const char* begin, size_t length) {
+    try {
+        GetRoaringBitmap(roaring_bitmap_) = 
roaring::Roaring64Map::readSafe(begin, length);
+    } catch (...) {
+        return Status::Invalid("catch exception in Deserialize() of 
RoaringBitmap64");
+    }
+    return Status::OK();
+}
+
+std::string RoaringBitmap64::ToString() const {
+    return GetRoaringBitmap(roaring_bitmap_).toString();
+}
+
+RoaringBitmap64 RoaringBitmap64::And(const RoaringBitmap64& lhs, const 
RoaringBitmap64& rhs) {
+    RoaringBitmap64 res;
+    GetRoaringBitmap(res.roaring_bitmap_) =
+        (GetRoaringBitmap(lhs.roaring_bitmap_) & 
GetRoaringBitmap(rhs.roaring_bitmap_));
+    return res;
+}
+
+RoaringBitmap64 RoaringBitmap64::Or(const RoaringBitmap64& lhs, const 
RoaringBitmap64& rhs) {
+    RoaringBitmap64 res;
+    GetRoaringBitmap(res.roaring_bitmap_) =
+        (GetRoaringBitmap(lhs.roaring_bitmap_) | 
GetRoaringBitmap(rhs.roaring_bitmap_));
+    return res;
+}
+
+RoaringBitmap64 RoaringBitmap64::AndNot(const RoaringBitmap64& lhs, const 
RoaringBitmap64& rhs) {
+    RoaringBitmap64 res;
+    GetRoaringBitmap(res.roaring_bitmap_) =
+        (GetRoaringBitmap(lhs.roaring_bitmap_) - 
GetRoaringBitmap(rhs.roaring_bitmap_));
+    return res;
+}
+
+RoaringBitmap64 RoaringBitmap64::From(const std::vector<int64_t>& values) {
+    RoaringBitmap64 res;
+    for (const auto& value : values) {
+        res.Add(value);
+    }
+    return res;
+}
+
+RoaringBitmap64 RoaringBitmap64::FastUnion(const std::vector<const 
RoaringBitmap64*>& inputs) {
+    std::vector<roaring::Roaring64Map*> roaring_inputs;
+    roaring_inputs.reserve(inputs.size());
+    for (const auto* roaring : inputs) {
+        roaring_inputs.push_back(&GetRoaringBitmap(roaring->roaring_bitmap_));
+    }
+    RoaringBitmap64 res;
+    GetRoaringBitmap(res.roaring_bitmap_) = roaring::Roaring64Map::fastunion(
+        roaring_inputs.size(), const_cast<const 
roaring::Roaring64Map**>(roaring_inputs.data()));
+    return res;
+}
+
+RoaringBitmap64 RoaringBitmap64::FastUnion(const std::vector<RoaringBitmap64>& 
inputs) {
+    std::vector<const RoaringBitmap64*> inputs_ptr;
+    inputs_ptr.reserve(inputs.size());
+    for (const auto& bitmap : inputs) {
+        inputs_ptr.push_back(&bitmap);
+    }
+    return FastUnion(inputs_ptr);
+}
+RoaringBitmap64::Iterator RoaringBitmap64::Begin() const {
+    return RoaringBitmap64::Iterator(*this);
+}
+
+RoaringBitmap64::Iterator RoaringBitmap64::End() const {
+    RoaringBitmap64::Iterator iter(*this);
+    GetIterator(iter.iterator_) = GetRoaringBitmap(roaring_bitmap_).end();
+    return iter;
+}
+
+RoaringBitmap64::Iterator RoaringBitmap64::EqualOrLarger(int64_t key) const {
+    RoaringBitmap64::Iterator iter(*this);
+    bool not_end = GetIterator(iter.iterator_).move(key);
+    if (!not_end) {
+        return End();
+    }
+    return iter;
+}
+
+}  // namespace paimon
diff --git a/src/paimon/common/utils/roaring_bitmap64_test.cpp 
b/src/paimon/common/utils/roaring_bitmap64_test.cpp
new file mode 100644
index 0000000..6a51d29
--- /dev/null
+++ b/src/paimon/common/utils/roaring_bitmap64_test.cpp
@@ -0,0 +1,633 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "paimon/utils/roaring_bitmap64.h"
+
+#include <climits>
+#include <cstdlib>
+#include <cstring>
+#include <ctime>
+#include <memory>
+#include <utility>
+
+#include "gtest/gtest.h"
+#include "paimon/io/byte_array_input_stream.h"
+#include "paimon/result.h"
+#include "paimon/testing/utils/testharness.h"
+#include "paimon/utils/range.h"
+
+namespace paimon::test {
+TEST(RoaringBitmap64Test, TestSimple) {
+    RoaringBitmap64 roaring;
+    ASSERT_TRUE(roaring.IsEmpty());
+    roaring.Add(4147483647l);
+    roaring.Add(614748364721l);
+    ASSERT_TRUE(roaring.Contains(4147483647l));
+    ASSERT_TRUE(roaring.Contains(614748364721l));
+    ASSERT_FALSE(roaring.CheckedAdd(614748364721l));
+    ASSERT_TRUE(roaring.CheckedAdd(8147483647210l));
+    ASSERT_TRUE(roaring.Contains(614748364721l));
+    ASSERT_FALSE(roaring.IsEmpty());
+    ASSERT_EQ("{4147483647,614748364721,8147483647210}", roaring.ToString());
+    ASSERT_EQ(3, roaring.Cardinality());
+    roaring.AddRange(614748364720, 614748364725);
+    ASSERT_EQ(
+        
"{4147483647,614748364720,614748364721,614748364722,614748364723,614748364724,"
+        "8147483647210}",
+        roaring.ToString());
+    ASSERT_EQ(7, roaring.Cardinality());
+    roaring.RemoveRange(614748364721l, 614748364723l);
+    
ASSERT_EQ("{4147483647,614748364720,614748364723,614748364724,8147483647210}",
+              roaring.ToString());
+    ASSERT_EQ(5, roaring.Cardinality());
+}
+TEST(RoaringBitmap64Test, TestCompatibleWithJava) {
+    auto pool = GetDefaultPool();
+    {
+        RoaringBitmap64 roaring;
+        for (int64_t i = 0; i < 100; i++) {
+            roaring.Add(i + 100000000000l);
+        }
+        for (int64_t i = 0; i < 100; i++) {
+            ASSERT_TRUE(roaring.Contains(i + 100000000000l));
+        }
+        auto bytes = roaring.Serialize(pool.get());
+        std::vector<uint8_t> result(bytes->data(), bytes->data() + 
bytes->size());
+        std::vector<uint8_t> expected = {1, 0, 0, 0,   0,  0,  0, 0, 23, 0, 0, 
  0,  59, 48,
+                                         0, 0, 1, 118, 72, 99, 0, 1, 0,  0, 
232, 99, 0};
+        ASSERT_EQ(result, expected);
+
+        RoaringBitmap64 de_roaring;
+        ASSERT_OK(de_roaring.Deserialize(bytes->data(), bytes->size()));
+        for (int64_t i = 0; i < 100; i++) {
+            ASSERT_TRUE(de_roaring.Contains(i + 100000000000l));
+        }
+    }
+    {
+        RoaringBitmap64 roaring;
+        for (int64_t i = RoaringBitmap64::MAX_VALUE - 1; i > 
RoaringBitmap64::MAX_VALUE - 101;
+             i--) {
+            roaring.Add(i);
+        }
+        for (int64_t i = RoaringBitmap64::MAX_VALUE - 1; i > 
RoaringBitmap64::MAX_VALUE - 101;
+             i--) {
+            ASSERT_TRUE(roaring.Contains(i));
+        }
+        auto bytes = roaring.Serialize(pool.get());
+        std::vector<uint8_t> result(bytes->data(), bytes->data() + 
bytes->size());
+        std::vector<uint8_t> expected = {1, 0, 0, 0,   0,   0,  0, 0, 255, 
255, 255, 127, 59, 48,
+                                         0, 0, 1, 255, 255, 99, 0, 1, 0,   
155, 255, 99,  0};
+        ASSERT_EQ(result, expected);
+
+        RoaringBitmap64 de_roaring;
+        ASSERT_OK(de_roaring.Deserialize(bytes->data(), bytes->size()));
+        for (int64_t i = RoaringBitmap64::MAX_VALUE - 1; i > 
RoaringBitmap64::MAX_VALUE - 101;
+             i--) {
+            ASSERT_TRUE(de_roaring.Contains(i));
+        }
+    }
+    {
+        RoaringBitmap64 roaring;
+        for (int64_t i = 5000; i < 10000; i += 17) {
+            roaring.Add(i + 300000000000l);
+        }
+        for (int64_t i = 5000; i < 10000; i += 17) {
+            ASSERT_TRUE(roaring.Contains(i + 300000000000l));
+        }
+        auto bytes = roaring.Serialize(pool.get());
+        std::vector<uint8_t> result(bytes->data(), bytes->data() + 
bytes->size());
+        std::vector<uint8_t> expected = {
+            1,   0,   0,   0,   0,   0,   0,   0,   69,  0,   0,   0,   58,  
48,  0,   0,   1,
+            0,   0,   0,   100, 217, 38,  1,   16,  0,   0,   0,   136, 203, 
153, 203, 170, 203,
+            187, 203, 204, 203, 221, 203, 238, 203, 255, 203, 16,  204, 33,  
204, 50,  204, 67,
+            204, 84,  204, 101, 204, 118, 204, 135, 204, 152, 204, 169, 204, 
186, 204, 203, 204,
+            220, 204, 237, 204, 254, 204, 15,  205, 32,  205, 49,  205, 66,  
205, 83,  205, 100,
+            205, 117, 205, 134, 205, 151, 205, 168, 205, 185, 205, 202, 205, 
219, 205, 236, 205,
+            253, 205, 14,  206, 31,  206, 48,  206, 65,  206, 82,  206, 99,  
206, 116, 206, 133,
+            206, 150, 206, 167, 206, 184, 206, 201, 206, 218, 206, 235, 206, 
252, 206, 13,  207,
+            30,  207, 47,  207, 64,  207, 81,  207, 98,  207, 115, 207, 132, 
207, 149, 207, 166,
+            207, 183, 207, 200, 207, 217, 207, 234, 207, 251, 207, 12,  208, 
29,  208, 46,  208,
+            63,  208, 80,  208, 97,  208, 114, 208, 131, 208, 148, 208, 165, 
208, 182, 208, 199,
+            208, 216, 208, 233, 208, 250, 208, 11,  209, 28,  209, 45,  209, 
62,  209, 79,  209,
+            96,  209, 113, 209, 130, 209, 147, 209, 164, 209, 181, 209, 198, 
209, 215, 209, 232,
+            209, 249, 209, 10,  210, 27,  210, 44,  210, 61,  210, 78,  210, 
95,  210, 112, 210,
+            129, 210, 146, 210, 163, 210, 180, 210, 197, 210, 214, 210, 231, 
210, 248, 210, 9,
+            211, 26,  211, 43,  211, 60,  211, 77,  211, 94,  211, 111, 211, 
128, 211, 145, 211,
+            162, 211, 179, 211, 196, 211, 213, 211, 230, 211, 247, 211, 8,   
212, 25,  212, 42,
+            212, 59,  212, 76,  212, 93,  212, 110, 212, 127, 212, 144, 212, 
161, 212, 178, 212,
+            195, 212, 212, 212, 229, 212, 246, 212, 7,   213, 24,  213, 41,  
213, 58,  213, 75,
+            213, 92,  213, 109, 213, 126, 213, 143, 213, 160, 213, 177, 213, 
194, 213, 211, 213,
+            228, 213, 245, 213, 6,   214, 23,  214, 40,  214, 57,  214, 74,  
214, 91,  214, 108,
+            214, 125, 214, 142, 214, 159, 214, 176, 214, 193, 214, 210, 214, 
227, 214, 244, 214,
+            5,   215, 22,  215, 39,  215, 56,  215, 73,  215, 90,  215, 107, 
215, 124, 215, 141,
+            215, 158, 215, 175, 215, 192, 215, 209, 215, 226, 215, 243, 215, 
4,   216, 21,  216,
+            38,  216, 55,  216, 72,  216, 89,  216, 106, 216, 123, 216, 140, 
216, 157, 216, 174,
+            216, 191, 216, 208, 216, 225, 216, 242, 216, 3,   217, 20,  217, 
37,  217, 54,  217,
+            71,  217, 88,  217, 105, 217, 122, 217, 139, 217, 156, 217, 173, 
217, 190, 217, 207,
+            217, 224, 217, 241, 217, 2,   218, 19,  218, 36,  218, 53,  218, 
70,  218, 87,  218,
+            104, 218, 121, 218, 138, 218, 155, 218, 172, 218, 189, 218, 206, 
218, 223, 218, 240,
+            218, 1,   219, 18,  219, 35,  219, 52,  219, 69,  219, 86,  219, 
103, 219, 120, 219,
+            137, 219, 154, 219, 171, 219, 188, 219, 205, 219, 222, 219, 239, 
219, 0,   220, 17,
+            220, 34,  220, 51,  220, 68,  220, 85,  220, 102, 220, 119, 220, 
136, 220, 153, 220,
+            170, 220, 187, 220, 204, 220, 221, 220, 238, 220, 255, 220, 16,  
221, 33,  221, 50,
+            221, 67,  221, 84,  221, 101, 221, 118, 221, 135, 221, 152, 221, 
169, 221, 186, 221,
+            203, 221, 220, 221, 237, 221, 254, 221, 15,  222, 32,  222, 49,  
222, 66,  222, 83,
+            222, 100, 222, 117, 222, 134, 222, 151, 222, 168, 222, 185, 222, 
202, 222, 219, 222,
+            236, 222, 253, 222, 14,  223};
+        ASSERT_EQ(result, expected);
+
+        RoaringBitmap64 de_roaring;
+        ASSERT_OK(de_roaring.Deserialize(bytes->data(), bytes->size()));
+        for (int64_t i = 5000; i < 10000; i += 17) {
+            ASSERT_TRUE(de_roaring.Contains(i + 300000000000l));
+        }
+    }
+    // empty
+    {
+        RoaringBitmap64 roaring;
+        auto bytes = roaring.Serialize(pool.get());
+        std::vector<uint8_t> result(bytes->data(), bytes->data() + 
bytes->size());
+        std::vector<uint8_t> expected = {0, 0, 0, 0, 0, 0, 0, 0};
+        ASSERT_EQ(result, expected);
+        RoaringBitmap64 de_roaring;
+        ASSERT_OK(de_roaring.Deserialize(bytes->data(), bytes->size()));
+        ASSERT_TRUE(de_roaring.IsEmpty());
+        ASSERT_FALSE(de_roaring.Contains(58));
+    }
+}
+
+TEST(RoaringBitmap64Test, TestDeserializeFailed) {
+    RoaringBitmap64 roaring;
+    std::vector<char> invalid_bytes = {0, 100};
+    ASSERT_NOK_WITH_MSG(roaring.Deserialize(invalid_bytes.data(), 
invalid_bytes.size()),
+                        "catch exception in Deserialize() of RoaringBitmap64");
+}
+
+TEST(RoaringBitmap64Test, TestAndOr) {
+    RoaringBitmap64 roaring1 = RoaringBitmap64::From({100000000000l, 
100000000000000l});
+    RoaringBitmap64 roaring2 =
+        RoaringBitmap64::From({200000000000l, 100000000000000l, 
200000000000000l});
+
+    auto and_roaring = RoaringBitmap64::And(roaring1, roaring2);
+    ASSERT_EQ(and_roaring, RoaringBitmap64::From({100000000000000l}));
+
+    auto or_roaring = RoaringBitmap64::Or(roaring1, roaring2);
+    ASSERT_EQ(or_roaring, RoaringBitmap64::From(
+                              {100000000000l, 200000000000l, 100000000000000l, 
200000000000000l}));
+}
+
+TEST(RoaringBitmap64Test, TestAssignAndMove) {
+    RoaringBitmap64 roaring1 = RoaringBitmap64::From({100000000000l, 
200000000000000l});
+    RoaringBitmap64 roaring2 = roaring1;
+    ASSERT_EQ(roaring1, roaring2);
+    ASSERT_FALSE(roaring1.IsEmpty());
+    ASSERT_FALSE(roaring2.IsEmpty());
+
+    RoaringBitmap64 roaring3 = std::move(roaring1);
+    ASSERT_EQ(roaring3, roaring2);
+    ASSERT_FALSE(roaring1.roaring_bitmap_);  // NOLINT(bugprone-use-after-move)
+
+    roaring3.Add(20);
+    ASSERT_FALSE(roaring3 == roaring2);
+
+    roaring3 = roaring2;
+    ASSERT_EQ(roaring3, roaring2);
+
+    roaring2 = std::move(roaring3);
+    ASSERT_FALSE(roaring3.roaring_bitmap_);  // NOLINT(bugprone-use-after-move)
+    ASSERT_EQ("{100000000000,200000000000000}", roaring2.ToString());
+
+    roaring3 = roaring2;
+    ASSERT_EQ("{100000000000,200000000000000}", roaring3.ToString());
+
+    roaring3 = std::move(roaring2);
+    ASSERT_EQ("{100000000000,200000000000000}", roaring3.ToString());
+    ASSERT_FALSE(roaring2.roaring_bitmap_);  // NOLINT(bugprone-use-after-move)
+}
+
+TEST(RoaringBitmap64Test, TestFastUnion) {
+    RoaringBitmap64 roaring1 = RoaringBitmap64::From({100000000000l, 
200000000000000l});
+    RoaringBitmap64 roaring2 = RoaringBitmap64::From(
+        {1l, 100000000000l, 150000000000l, 200000000000000l, 
300000000000000l});
+    RoaringBitmap64 roaring3 = RoaringBitmap64::From({2l, 200000000000000l, 
800l});
+
+    RoaringBitmap64 res = RoaringBitmap64::FastUnion({&roaring1, &roaring2, 
&roaring3});
+    ASSERT_EQ(res, RoaringBitmap64::From({1l, 2l, 100000000000l, 
150000000000l, 200000000000000l,
+                                          300000000000000l, 800l}));
+
+    std::vector<RoaringBitmap64> roarings = {roaring1, roaring2, roaring3};
+    RoaringBitmap64 res2 = RoaringBitmap64::FastUnion(roarings);
+    ASSERT_EQ(res2, RoaringBitmap64::From({1l, 2l, 100000000000l, 
150000000000l, 200000000000000l,
+                                           300000000000000l, 800l}));
+}
+
+TEST(RoaringBitmap64Test, TestFlip) {
+    RoaringBitmap64 roaring =
+        RoaringBitmap64::From({1000000000001l, 1000000000002l, 
1000000000004l});
+    roaring.Flip(1000000000000l, 1000000000005l);
+    ASSERT_EQ(roaring, RoaringBitmap64::From({1000000000000l, 
1000000000003l}));
+}
+
+TEST(RoaringBitmap64Test, TestAndNot) {
+    RoaringBitmap64 roaring1 = RoaringBitmap64::From(
+        {10000000000010l, 10000000000020l, 100000000000100l, 
100000000000200l});
+    RoaringBitmap64 roaring2 = RoaringBitmap64::From(
+        {1000000000005l, 10000000000020l, 10000000000080l, 100000000000200l, 
100000000000250l});
+
+    auto andnot_roaring = RoaringBitmap64::AndNot(roaring1, roaring2);
+    ASSERT_EQ(andnot_roaring, RoaringBitmap64::From({10000000000010l, 
100000000000100l}));
+}
+
+TEST(RoaringBitmap64Test, TestIterator) {
+    RoaringBitmap64 roaring = RoaringBitmap64::From({100000000000l, 
200000000000l});
+    auto iter = roaring.Begin();
+    ASSERT_EQ(*iter, 100000000000l);
+    auto iter2 = ++iter;
+    ASSERT_EQ(*iter, 200000000000l);
+    ASSERT_EQ(*iter2, 200000000000l);
+    ++iter;
+    ASSERT_EQ(iter, roaring.End());
+
+    iter = roaring.EqualOrLarger(5);
+    ASSERT_EQ(*iter, 100000000000l);
+    iter = roaring.EqualOrLarger(100000000000l);
+    ASSERT_EQ(*iter, 100000000000l);
+    iter = roaring.EqualOrLarger(150000000000l);
+    ASSERT_EQ(*iter, 200000000000l);
+    iter = roaring.EqualOrLarger(200000000000l);
+    ASSERT_EQ(*iter, 200000000000l);
+    iter = roaring.EqualOrLarger(200000000001l);
+    ASSERT_EQ(iter, roaring.End());
+}
+
+TEST(RoaringBitmap64Test, TestIteratorAssignAndMove) {
+    RoaringBitmap64 roaring =
+        RoaringBitmap64::From({10000000000010l, 100000000000100l, 
100000000000200l});
+
+    auto iter1 = roaring.EqualOrLarger(10000000000010l);
+    auto iter2 = iter1;
+    ASSERT_EQ(iter1, iter2);
+    ASSERT_EQ(10000000000010l, *iter1);
+    ASSERT_EQ(10000000000010l, *iter2);
+
+    auto iter3 = std::move(iter1);
+    ASSERT_EQ(iter2, iter3);
+    ASSERT_FALSE(iter1.iterator_);  // NOLINT(bugprone-use-after-move)
+
+    ++iter3;
+    ASSERT_NE(iter3, iter2);
+    ASSERT_EQ(100000000000100l, *iter3);
+
+    iter3 = iter2;
+    ASSERT_EQ(iter3, iter2);
+
+    iter2 = std::move(iter3);
+    ASSERT_FALSE(iter3.iterator_);  // NOLINT(bugprone-use-after-move)
+    ASSERT_EQ(10000000000010l, *iter2);
+
+    iter3 = iter2;
+    ASSERT_EQ(iter2, iter3);
+    ASSERT_EQ(10000000000010l, *iter2);
+    ASSERT_EQ(10000000000010l, *iter3);
+
+    iter3 = std::move(iter2);
+    ASSERT_EQ(10000000000010l, *iter3);
+    ASSERT_FALSE(iter2.iterator_);  // NOLINT(bugprone-use-after-move)
+}
+
+TEST(RoaringBitmap64Test, TestDeserializeFromInputStream) {
+    RoaringBitmap64 roaring1 =
+        RoaringBitmap64::From({10000000000010l, 10000000000020l, 
100000000000100l});
+    RoaringBitmap64 roaring2 = RoaringBitmap64::From(
+        {10000000000010l, 10000000000050l, 100000000000150l, 
100000000000200l});
+    RoaringBitmap64 roaring3 =
+        RoaringBitmap64::From({1000000000002l, 1000000000004l, 1000000000006l, 
1000000000001000l});
+
+    size_t len1 = roaring1.GetSizeInBytes();
+    size_t len2 = roaring2.GetSizeInBytes();
+    size_t len3 = roaring3.GetSizeInBytes();
+
+    auto pool = GetDefaultPool();
+    auto bytes1 = roaring1.Serialize(pool.get());
+    ASSERT_EQ(bytes1->size(), len1);
+    auto bytes2 = roaring2.Serialize(pool.get());
+    ASSERT_EQ(bytes2->size(), len2);
+    auto bytes3 = roaring3.Serialize(pool.get());
+    ASSERT_EQ(bytes3->size(), len3);
+
+    auto concat_bytes = std::make_shared<Bytes>(len1 + len2 + len3, 
pool.get());
+    memcpy(concat_bytes->data(), bytes1->data(), len1);
+    memcpy(concat_bytes->data() + len1, bytes2->data(), len2);
+    memcpy(concat_bytes->data() + len1 + len2, bytes3->data(), len3);
+
+    ByteArrayInputStream byte_array_input_stream(concat_bytes->data(), 
concat_bytes->size());
+    RoaringBitmap64 de_roaring1;
+    ASSERT_OK(de_roaring1.Deserialize(&byte_array_input_stream));
+    ASSERT_EQ(de_roaring1, roaring1);
+    ASSERT_EQ(byte_array_input_stream.GetPos().value(), len1);
+
+    RoaringBitmap64 de_roaring2;
+    ASSERT_OK(de_roaring2.Deserialize(&byte_array_input_stream));
+    ASSERT_EQ(de_roaring2, roaring2);
+    ASSERT_EQ(byte_array_input_stream.GetPos().value(), len1 + len2);
+
+    RoaringBitmap64 de_roaring3;
+    ASSERT_OK(de_roaring3.Deserialize(&byte_array_input_stream));
+    ASSERT_EQ(de_roaring3, roaring3);
+    ASSERT_EQ(byte_array_input_stream.GetPos().value(), len1 + len2 + len3);
+}
+
+TEST(RoaringBitmap64Test, TestHighCardinality) {
+    std::srand(time(nullptr));
+    auto pool = GetDefaultPool();
+    RoaringBitmap64 roaring;
+    for (int64_t i = 0; i < 10000; i++) {
+        roaring.Add(static_cast<int64_t>(std::numeric_limits<int32_t>::max()) 
+ std::rand());
+    }
+    auto bytes = roaring.Serialize(pool.get());
+    RoaringBitmap64 de_roaring;
+    ASSERT_OK(de_roaring.Deserialize(bytes->data(), bytes->size()));
+    ASSERT_EQ(roaring, de_roaring);
+}
+
+TEST(RoaringBitmap64Test, TestInplaceAndOr) {
+    RoaringBitmap64 roaring = RoaringBitmap64::From({0l, 1l, 
100000000000100l});
+    RoaringBitmap64 roaring1 = RoaringBitmap64::From({1l, 2l, 1000000000005l, 
100000000000200l});
+    roaring |= roaring1;
+    ASSERT_EQ(roaring.ToString(), 
"{0,1,2,1000000000005,100000000000100,100000000000200}");
+    RoaringBitmap64 roaring2 =
+        RoaringBitmap64::From({1l, 2l, 3l, 1000000000005l, 100000000000100l, 
100000000000500l});
+    roaring &= roaring2;
+    ASSERT_EQ(roaring.ToString(), "{1,2,1000000000005,100000000000100}");
+    RoaringBitmap64 roaring3 = RoaringBitmap64::From({2l, 100000000000100l});
+    roaring -= roaring3;
+    ASSERT_EQ(roaring.ToString(), "{1,1000000000005}");
+}
+
+TEST(RoaringBitmap64Test, TestContainsAny) {
+    RoaringBitmap64 roaring =
+        RoaringBitmap64::From({10000000000010l, 10000000000011l, 
10000000000100l});
+    ASSERT_TRUE(roaring.ContainsAny(10000000000010l, 10000000000011l));
+    ASSERT_TRUE(roaring.ContainsAny(10000000000010l, 10000000000020l));
+    ASSERT_TRUE(roaring.ContainsAny(10000000000010l, 10000000000101l));
+    ASSERT_TRUE(roaring.ContainsAny(10000000000020l, 10000000000200l));
+    ASSERT_TRUE(roaring.ContainsAny(10000000000005l, 10000000000011l));
+    ASSERT_FALSE(roaring.ContainsAny(10000000000005l, 10000000000010l));
+    ASSERT_FALSE(roaring.ContainsAny(10000000000020l, 10000000000100l));
+    ASSERT_FALSE(roaring.ContainsAny(10000000000020l, 10000000000030l));
+    ASSERT_FALSE(roaring.ContainsAny(10000000000500l, 10000000000520l));
+}
+
+TEST(RoaringBitmap64Test, TestFromRoaringBitmap32) {
+    {
+        RoaringBitmap32 roaring32 = RoaringBitmap32::From({10, 20, 21});
+        RoaringBitmap64 roaring64(roaring32);
+        ASSERT_EQ(roaring64.ToString(), "{10,20,21}");
+    }
+    {
+        RoaringBitmap32 roaring32 = RoaringBitmap32::From({10, 20, 21});
+        RoaringBitmap64 roaring64;
+        roaring64 = roaring32;
+        ASSERT_EQ(roaring64.ToString(), "{10,20,21}");
+    }
+}
+
+TEST(RoaringBitmap64Test, TestIteratorEqualOrLarger) {
+    RoaringBitmap64 roaring = RoaringBitmap64::From({1l, 3l, 5l, 100l});
+    auto iter = roaring.Begin();
+    ASSERT_EQ(*iter, 1l);
+    iter.EqualOrLarger(5l);
+    ASSERT_EQ(*iter, 5l);
+    iter.EqualOrLarger(10l);
+    ASSERT_EQ(*iter, 100l);
+    iter.EqualOrLarger(200l);
+    ASSERT_EQ(iter, roaring.End());
+}
+
+// Helper function to convert a RoaringBitmap64 to a list of contiguous ranges.
+static std::vector<Range> ToRangeList(const RoaringBitmap64& bitmap) {
+    std::vector<Range> ranges;
+    if (bitmap.IsEmpty()) {
+        return ranges;
+    }
+
+    int64_t current_start = -1;
+    int64_t current_end = -1;
+
+    for (auto it = bitmap.Begin(); it != bitmap.End(); ++it) {
+        int64_t value = *it;
+        if (current_start == -1) {
+            current_start = value;
+            current_end = value;
+        } else if (value == current_end + 1) {
+            current_end = value;
+        } else {
+            ranges.emplace_back(current_start, current_end);
+            current_start = value;
+            current_end = value;
+        }
+    }
+
+    if (current_start != -1) {
+        ranges.emplace_back(current_start, current_end);
+    }
+
+    return ranges;
+}
+
+TEST(RoaringBitmap64Test, TestAddRangeBasic) {
+    RoaringBitmap64 bitmap;
+    bitmap.AddRange(5, 11);  // half-open interval [5, 11) == closed [5, 10]
+
+    ASSERT_EQ(bitmap.Cardinality(), 6);
+    ASSERT_FALSE(bitmap.Contains(4));
+    ASSERT_TRUE(bitmap.Contains(5));
+    ASSERT_TRUE(bitmap.Contains(7));
+    ASSERT_TRUE(bitmap.Contains(10));
+    ASSERT_FALSE(bitmap.Contains(11));
+}
+
+TEST(RoaringBitmap64Test, TestAddRangeSingleElement) {
+    RoaringBitmap64 bitmap;
+    bitmap.AddRange(100, 101);  // half-open interval [100, 101) == single 
element 100
+
+    ASSERT_EQ(bitmap.Cardinality(), 1);
+    ASSERT_FALSE(bitmap.Contains(99));
+    ASSERT_TRUE(bitmap.Contains(100));
+    ASSERT_FALSE(bitmap.Contains(101));
+}
+
+TEST(RoaringBitmap64Test, TestAddRangeMultipleNonOverlapping) {
+    RoaringBitmap64 bitmap;
+    bitmap.AddRange(0, 6);    // [0, 5]
+    bitmap.AddRange(10, 16);  // [10, 15]
+    bitmap.AddRange(20, 26);  // [20, 25]
+
+    ASSERT_EQ(bitmap.Cardinality(), 18);
+
+    ASSERT_FALSE(bitmap.Contains(6));
+    ASSERT_FALSE(bitmap.Contains(9));
+    ASSERT_FALSE(bitmap.Contains(16));
+    ASSERT_FALSE(bitmap.Contains(19));
+
+    ASSERT_TRUE(bitmap.Contains(0));
+    ASSERT_TRUE(bitmap.Contains(5));
+    ASSERT_TRUE(bitmap.Contains(10));
+    ASSERT_TRUE(bitmap.Contains(15));
+    ASSERT_TRUE(bitmap.Contains(20));
+    ASSERT_TRUE(bitmap.Contains(25));
+
+    std::vector<Range> ranges = ToRangeList(bitmap);
+    ASSERT_EQ(ranges.size(), 3);
+    ASSERT_EQ(ranges[0], Range(0, 5));
+    ASSERT_EQ(ranges[1], Range(10, 15));
+    ASSERT_EQ(ranges[2], Range(20, 25));
+}
+
+TEST(RoaringBitmap64Test, TestAddRangeLargeValues) {
+    RoaringBitmap64 bitmap;
+    int64_t start = static_cast<int64_t>(INT_MAX) + 100L;
+    int64_t end = static_cast<int64_t>(INT_MAX) + 200L;
+    bitmap.AddRange(start, end + 1);  // half-open interval [start, end+1) == 
closed [start, end]
+
+    ASSERT_EQ(bitmap.Cardinality(), 101);
+    ASSERT_FALSE(bitmap.Contains(start - 1));
+    ASSERT_TRUE(bitmap.Contains(start));
+    ASSERT_TRUE(bitmap.Contains(start + 50));
+    ASSERT_TRUE(bitmap.Contains(end));
+    ASSERT_FALSE(bitmap.Contains(end + 1));
+
+    std::vector<int64_t> values;
+    for (auto it = bitmap.Begin(); it != bitmap.End(); ++it) {
+        values.push_back(*it);
+    }
+    ASSERT_EQ(values.size(), 101);
+    ASSERT_EQ(values[0], start);
+    ASSERT_EQ(values[100], end);
+}
+
+TEST(RoaringBitmap64Test, TestAddMany) {
+    // empty input
+    {
+        RoaringBitmap64 bitmap;
+        bitmap.AddMany(0, nullptr);
+        ASSERT_TRUE(bitmap.IsEmpty());
+    }
+    // single element
+    {
+        RoaringBitmap64 bitmap;
+        int64_t value = 999999999999ll;
+        bitmap.AddMany(1, &value);
+        ASSERT_EQ(bitmap.Cardinality(), 1);
+        ASSERT_TRUE(bitmap.Contains(999999999999ll));
+    }
+    // single bucket (all values share the same high-32 bits)
+    {
+        RoaringBitmap64 bitmap;
+        std::vector<int64_t> values = {100000000001ll, 100000000005ll, 
100000000003ll,
+                                       100000000002ll};
+        bitmap.AddMany(values.size(), values.data());
+        ASSERT_EQ(bitmap.Cardinality(), 4);
+        ASSERT_TRUE(bitmap.Contains(100000000001ll));
+        ASSERT_TRUE(bitmap.Contains(100000000002ll));
+        ASSERT_TRUE(bitmap.Contains(100000000003ll));
+        ASSERT_TRUE(bitmap.Contains(100000000005ll));
+        ASSERT_FALSE(bitmap.Contains(100000000004ll));
+    }
+    // multiple buckets (values span different high-32 bit buckets)
+    {
+        RoaringBitmap64 bitmap;
+        std::vector<int64_t> values = {1ll, 100000000000ll, 200000000000ll, 
2ll, 100000000001ll};
+        bitmap.AddMany(values.size(), values.data());
+        ASSERT_EQ(bitmap.Cardinality(), 5);
+        ASSERT_TRUE(bitmap.Contains(1ll));
+        ASSERT_TRUE(bitmap.Contains(2ll));
+        ASSERT_TRUE(bitmap.Contains(100000000000ll));
+        ASSERT_TRUE(bitmap.Contains(100000000001ll));
+        ASSERT_TRUE(bitmap.Contains(200000000000ll));
+    }
+    // duplicates
+    {
+        RoaringBitmap64 bitmap;
+        std::vector<int64_t> values = {42ll, 42ll, 42ll, 100000000000ll, 
100000000000ll};
+        bitmap.AddMany(values.size(), values.data());
+        ASSERT_EQ(bitmap.Cardinality(), 2);
+        ASSERT_TRUE(bitmap.Contains(42ll));
+        ASSERT_TRUE(bitmap.Contains(100000000000ll));
+    }
+    // unsorted input (reverse order)
+    {
+        std::vector<int64_t> values;
+        for (int64_t i = 99; i >= 0; --i) {
+            values.push_back(i + 100000000000ll);
+        }
+        RoaringBitmap64 bitmap;
+        bitmap.AddMany(values.size(), values.data());
+        ASSERT_EQ(bitmap.Cardinality(), 100);
+        for (int64_t i = 0; i < 100; ++i) {
+            ASSERT_TRUE(bitmap.Contains(i + 100000000000ll));
+        }
+    }
+    // incremental insert (multiple AddMany calls accumulate)
+    {
+        RoaringBitmap64 bitmap;
+        std::vector<int64_t> batch1 = {10ll, 20ll, 30ll};
+        std::vector<int64_t> batch2 = {20ll, 40ll, 100000000000ll};
+        bitmap.AddMany(batch1.size(), batch1.data());
+        ASSERT_EQ(bitmap.Cardinality(), 3);
+        bitmap.AddMany(batch2.size(), batch2.data());
+        ASSERT_EQ(bitmap.Cardinality(), 5);
+        ASSERT_TRUE(bitmap.Contains(10ll));
+        ASSERT_TRUE(bitmap.Contains(20ll));
+        ASSERT_TRUE(bitmap.Contains(30ll));
+        ASSERT_TRUE(bitmap.Contains(40ll));
+        ASSERT_TRUE(bitmap.Contains(100000000000ll));
+    }
+    // large values near MAX_VALUE
+    {
+        std::vector<int64_t> values;
+        for (int64_t i = 0; i < 50; ++i) {
+            values.push_back(RoaringBitmap64::MAX_VALUE - 1 - i);
+        }
+        RoaringBitmap64 bitmap;
+        bitmap.AddMany(values.size(), values.data());
+        ASSERT_EQ(bitmap.Cardinality(), 50);
+        for (auto v : values) {
+            ASSERT_TRUE(bitmap.Contains(v));
+        }
+    }
+    // consistency with Add: AddMany must produce the same bitmap as repeated 
Add
+    {
+        std::vector<int64_t> values;
+        for (int64_t i = 0; i < 100; ++i) {
+            values.push_back(i + 100000000000ll);
+        }
+        for (int64_t i = 0; i < 50; ++i) {
+            values.push_back(i + 200000000000ll);
+        }
+        RoaringBitmap64 bitmap_add;
+        for (auto v : values) {
+            bitmap_add.Add(v);
+        }
+        RoaringBitmap64 bitmap_add_many;
+        bitmap_add_many.AddMany(values.size(), values.data());
+        ASSERT_EQ(bitmap_add, bitmap_add_many);
+    }
+}
+
+}  // namespace paimon::test


Reply via email to