This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 133742da feat: add roaring-based position bitmap (#595)
133742da is described below

commit 133742da37bdcf4b2c3882d8b5fb49ea148c8e57
Author: Gang Wu <[email protected]>
AuthorDate: Wed Mar 25 21:50:17 2026 +0800

    feat: add roaring-based position bitmap (#595)
    
    Co-authored-by: emkornfield <[email protected]>
---
 src/iceberg/CMakeLists.txt                       |   2 +
 src/iceberg/deletes/CMakeLists.txt               |  18 +
 src/iceberg/deletes/meson.build                  |  18 +
 src/iceberg/deletes/roaring_position_bitmap.cc   | 270 +++++++++++
 src/iceberg/deletes/roaring_position_bitmap.h    | 110 +++++
 src/iceberg/meson.build                          |   2 +
 src/iceberg/test/CMakeLists.txt                  |   1 +
 src/iceberg/test/meson.build                     |   1 +
 src/iceberg/test/resources/64map32bitvals.bin    | Bin 0 -> 48 bytes
 src/iceberg/test/resources/64mapempty.bin        | Bin 0 -> 8 bytes
 src/iceberg/test/resources/64maphighvals.bin     | Bin 0 -> 1086 bytes
 src/iceberg/test/resources/64mapspreadvals.bin   | Bin 0 -> 408 bytes
 src/iceberg/test/roaring_position_bitmap_test.cc | 565 +++++++++++++++++++++++
 13 files changed, 987 insertions(+)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index b503a41e..2f34fd03 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -25,6 +25,7 @@ set(ICEBERG_SOURCES
     data/position_delete_writer.cc
     data/writer.cc
     delete_file_index.cc
+    deletes/roaring_position_bitmap.cc
     expression/aggregate.cc
     expression/binder.cc
     expression/evaluator.cc
@@ -166,6 +167,7 @@ iceberg_install_all_headers(iceberg)
 
 add_subdirectory(catalog)
 add_subdirectory(data)
+add_subdirectory(deletes)
 add_subdirectory(expression)
 add_subdirectory(manifest)
 add_subdirectory(puffin)
diff --git a/src/iceberg/deletes/CMakeLists.txt 
b/src/iceberg/deletes/CMakeLists.txt
new file mode 100644
index 00000000..2ce7ccf1
--- /dev/null
+++ b/src/iceberg/deletes/CMakeLists.txt
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+iceberg_install_all_headers(iceberg/deletes)
diff --git a/src/iceberg/deletes/meson.build b/src/iceberg/deletes/meson.build
new file mode 100644
index 00000000..28a01de1
--- /dev/null
+++ b/src/iceberg/deletes/meson.build
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+install_headers(['roaring_position_bitmap.h'], subdir: 'iceberg/deletes')
diff --git a/src/iceberg/deletes/roaring_position_bitmap.cc 
b/src/iceberg/deletes/roaring_position_bitmap.cc
new file mode 100644
index 00000000..fec03788
--- /dev/null
+++ b/src/iceberg/deletes/roaring_position_bitmap.cc
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/deletes/roaring_position_bitmap.h"
+
+#include <cstring>
+#include <exception>
+#include <limits>
+#include <utility>
+#include <vector>
+
+#include <roaring/roaring.hh>
+
+#include "iceberg/util/endian.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+constexpr size_t kBitmapCountSizeBytes = 8;
+constexpr size_t kBitmapKeySizeBytes = 4;
+
+// Extracts high 32 bits from a 64-bit position (the key).
+int32_t Key(int64_t pos) { return static_cast<int32_t>(pos >> 32); }
+
+// Extracts low 32 bits from a 64-bit position.
+uint32_t Pos32Bits(int64_t pos) { return static_cast<uint32_t>(0xFFFFFFFF & 
pos); }
+
+// Combines key (high 32 bits) and pos32 (low 32 bits) into a 64-bit
+// position. The low 32 bits are zero-extended to avoid sign extension.
+int64_t ToPosition(int32_t key, uint32_t pos32) {
+  return (int64_t{key} << 32) | int64_t{pos32};
+}
+
+void WriteLE64(char* buf, int64_t value) {
+  auto le = ToLittleEndian(static_cast<uint64_t>(value));
+  std::memcpy(buf, &le, sizeof(le));
+}
+
+void WriteLE32(char* buf, int32_t value) {
+  auto le = ToLittleEndian(static_cast<uint32_t>(value));
+  std::memcpy(buf, &le, sizeof(le));
+}
+
+int64_t ReadLE64(const char* buf) {
+  uint64_t v;
+  std::memcpy(&v, buf, sizeof(v));
+  return static_cast<int64_t>(FromLittleEndian(v));
+}
+
+int32_t ReadLE32(const char* buf) {
+  uint32_t v;
+  std::memcpy(&v, buf, sizeof(v));
+  return static_cast<int32_t>(FromLittleEndian(v));
+}
+
+Status ValidatePosition(int64_t pos) {
+  if (pos < 0 || pos > RoaringPositionBitmap::kMaxPosition) {
+    return InvalidArgument("Bitmap supports positions that are >= 0 and <= {}: 
{}",
+                           RoaringPositionBitmap::kMaxPosition, pos);
+  }
+  return {};
+}
+
+}  // namespace
+
+struct RoaringPositionBitmap::Impl {
+  std::vector<roaring::Roaring> bitmaps;
+
+  void AllocateBitmapsIfNeeded(int32_t required_length) {
+    if (std::cmp_less(bitmaps.size(), required_length)) {
+      bitmaps.resize(static_cast<size_t>(required_length));
+    }
+  }
+};
+
+RoaringPositionBitmap::RoaringPositionBitmap() : 
impl_(std::make_unique<Impl>()) {}
+
+RoaringPositionBitmap::~RoaringPositionBitmap() = default;
+
+RoaringPositionBitmap::RoaringPositionBitmap(const RoaringPositionBitmap& 
other)
+    : impl_(other.impl_ != nullptr ? std::make_unique<Impl>(*other.impl_)
+                                   : std::make_unique<Impl>()) {}
+
+RoaringPositionBitmap& RoaringPositionBitmap::operator=(
+    const RoaringPositionBitmap& other) {
+  if (this == &other) {
+    return *this;
+  }
+  impl_ = other.impl_ != nullptr ? std::make_unique<Impl>(*other.impl_)
+                                 : std::make_unique<Impl>();
+  return *this;
+}
+
+RoaringPositionBitmap::RoaringPositionBitmap(RoaringPositionBitmap&&) noexcept 
= default;
+
+RoaringPositionBitmap& RoaringPositionBitmap::operator=(
+    RoaringPositionBitmap&&) noexcept = default;
+
+RoaringPositionBitmap::RoaringPositionBitmap(std::unique_ptr<Impl> impl)
+    : impl_(std::move(impl)) {}
+
+Status RoaringPositionBitmap::Add(int64_t pos) {
+  ICEBERG_RETURN_UNEXPECTED(ValidatePosition(pos));
+  int32_t key = Key(pos);
+  uint32_t pos32 = Pos32Bits(pos);
+  impl_->AllocateBitmapsIfNeeded(key + 1);
+  impl_->bitmaps[key].add(pos32);
+  return {};
+}
+
+Status RoaringPositionBitmap::AddRange(int64_t pos_start, int64_t pos_end) {
+  for (int64_t pos = pos_start; pos < pos_end; ++pos) {
+    ICEBERG_RETURN_UNEXPECTED(Add(pos));
+  }
+  return {};
+}
+
+Result<bool> RoaringPositionBitmap::Contains(int64_t pos) const {
+  ICEBERG_RETURN_UNEXPECTED(ValidatePosition(pos));
+  int32_t key = Key(pos);
+  uint32_t pos32 = Pos32Bits(pos);
+  return std::cmp_less(key, impl_->bitmaps.size()) && 
impl_->bitmaps[key].contains(pos32);
+}
+
+bool RoaringPositionBitmap::IsEmpty() const { return Cardinality() == 0; }
+
+size_t RoaringPositionBitmap::Cardinality() const {
+  size_t total = 0;
+  for (const auto& bitmap : impl_->bitmaps) {
+    total += bitmap.cardinality();
+  }
+  return total;
+}
+
+void RoaringPositionBitmap::Or(const RoaringPositionBitmap& other) {
+  
impl_->AllocateBitmapsIfNeeded(static_cast<int32_t>(other.impl_->bitmaps.size()));
+  for (size_t key = 0; key < other.impl_->bitmaps.size(); ++key) {
+    impl_->bitmaps[key] |= other.impl_->bitmaps[key];
+  }
+}
+
+bool RoaringPositionBitmap::Optimize() {
+  bool changed = false;
+  for (auto& bitmap : impl_->bitmaps) {
+    changed |= bitmap.runOptimize();
+  }
+  return changed;
+}
+
+void RoaringPositionBitmap::ForEach(const std::function<void(int64_t)>& fn) 
const {
+  for (size_t key = 0; key < impl_->bitmaps.size(); ++key) {
+    for (uint32_t pos32 : impl_->bitmaps[key]) {
+      fn(ToPosition(static_cast<int32_t>(key), pos32));
+    }
+  }
+}
+
+size_t RoaringPositionBitmap::SerializedSizeInBytes() const {
+  size_t size = kBitmapCountSizeBytes;
+  for (const auto& bitmap : impl_->bitmaps) {
+    size += kBitmapKeySizeBytes + bitmap.getSizeInBytes(/*portable=*/true);
+  }
+  return size;
+}
+
+// Serializes using the portable format (little-endian).
+// See https://iceberg.apache.org/puffin-spec/#deletion-vector-v1-blob-type
+Result<std::string> RoaringPositionBitmap::Serialize() const {
+  size_t size = SerializedSizeInBytes();
+  std::string result(size, '\0');
+  char* buf = result.data();
+
+  // Write bitmap count (array length including empties)
+  WriteLE64(buf, static_cast<int64_t>(impl_->bitmaps.size()));
+  buf += kBitmapCountSizeBytes;
+
+  // Write each bitmap with its key
+  for (int32_t key = 0; std::cmp_less(key, impl_->bitmaps.size()); ++key) {
+    WriteLE32(buf, key);
+    buf += kBitmapKeySizeBytes;
+    size_t written = impl_->bitmaps[key].write(buf, /*portable=*/true);
+    buf += written;
+  }
+
+  return result;
+}
+
+Result<RoaringPositionBitmap> 
RoaringPositionBitmap::Deserialize(std::string_view bytes) {
+  const char* buf = bytes.data();
+  size_t remaining = bytes.size();
+
+  ICEBERG_PRECHECK(remaining >= kBitmapCountSizeBytes,
+                   "Buffer too small for bitmap count: {} bytes", remaining);
+
+  int64_t bitmap_count = ReadLE64(buf);
+  buf += kBitmapCountSizeBytes;
+  remaining -= kBitmapCountSizeBytes;
+
+  ICEBERG_PRECHECK(
+      bitmap_count >= 0 && bitmap_count <= std::numeric_limits<int32_t>::max(),
+      "Invalid bitmap count: {}", bitmap_count);
+
+  auto impl = std::make_unique<Impl>();
+  int32_t last_key = -1;
+  auto remaining_count = static_cast<int32_t>(bitmap_count);
+
+  while (remaining_count > 0) {
+    ICEBERG_PRECHECK(remaining >= kBitmapKeySizeBytes,
+                     "Buffer too small for bitmap key: {} bytes", remaining);
+
+    int32_t key = ReadLE32(buf);
+    buf += kBitmapKeySizeBytes;
+    remaining -= kBitmapKeySizeBytes;
+
+    ICEBERG_PRECHECK(key >= 0, "Invalid unsigned key: {}", key);
+    ICEBERG_PRECHECK(key < std::numeric_limits<int32_t>::max(), "Key is too 
large: {}",
+                     key);
+    ICEBERG_PRECHECK(key > last_key,
+                     "Keys must be sorted in ascending order, got key {} after 
{}", key,
+                     last_key);
+
+    // Fill gaps with empty bitmaps
+    while (last_key < key - 1) {
+      impl->bitmaps.emplace_back();
+      ++last_key;
+    }
+
+    // Read bitmap using portable safe deserialization.
+    // CRoaring's readSafe may throw on corrupted data.
+    roaring::Roaring bitmap;
+    try {
+      bitmap = roaring::Roaring::readSafe(buf, remaining);
+    } catch (const std::exception& e) {
+      return InvalidArgument("Failed to deserialize bitmap at key {}: {}", 
key, e.what());
+    }
+    size_t bitmap_size = bitmap.getSizeInBytes(/*portable=*/true);
+    ICEBERG_PRECHECK(
+        bitmap_size <= remaining,
+        "Buffer too small for bitmap key {}: {} bytes needed, {} bytes 
available", key,
+        bitmap_size, remaining);
+    buf += bitmap_size;
+    remaining -= bitmap_size;
+
+    impl->bitmaps.emplace_back(std::move(bitmap));
+    last_key = key;
+    --remaining_count;
+  }
+
+  return RoaringPositionBitmap(std::move(impl));
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/deletes/roaring_position_bitmap.h 
b/src/iceberg/deletes/roaring_position_bitmap.h
new file mode 100644
index 00000000..37a3fe14
--- /dev/null
+++ b/src/iceberg/deletes/roaring_position_bitmap.h
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/deletes/roaring_position_bitmap.h
+/// A 64-bit position bitmap using an array of 32-bit Roaring bitmaps.
+
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <string>
+#include <string_view>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+
+namespace iceberg {
+
+/// \brief A bitmap that supports positive 64-bit positions, optimized
+/// for cases where most positions fit in 32 bits.
+///
+/// Incoming 64-bit positions are divided into a 32-bit "key" using the
+/// most significant 4 bytes and a 32-bit position using the least
+/// significant 4 bytes. For each key, a 32-bit Roaring bitmap is
+/// maintained to store positions for that key.
+///
+
+/// \note This class is used to represent deletion vectors.  The Puffin puffin
+/// reader/write handle adding the additional required framing (length prefix, 
magic,
+/// magic bytes, CRC-32) for  `deletion-vector-v1` persistence.
+class ICEBERG_EXPORT RoaringPositionBitmap {
+ public:
+  /// \brief Maximum supported position (aligned with the Java implementation).
+  static constexpr int64_t kMaxPosition = 0x7FFFFFFE80000000LL;
+
+  RoaringPositionBitmap();
+  ~RoaringPositionBitmap();
+
+  RoaringPositionBitmap(RoaringPositionBitmap&& other) noexcept;
+  RoaringPositionBitmap& operator=(RoaringPositionBitmap&& other) noexcept;
+
+  RoaringPositionBitmap(const RoaringPositionBitmap& other);
+  RoaringPositionBitmap& operator=(const RoaringPositionBitmap& other);
+
+  /// \brief Sets a position in the bitmap.
+  /// \param pos the position (must be >= 0 and <= kMaxPosition)
+  /// \return Status indicating success or InvalidArgument error
+  [[nodiscard]] Status Add(int64_t pos);
+
+  /// \brief Sets a range of positions [pos_start, pos_end).
+  /// \return Status indicating success or InvalidArgument error
+  [[nodiscard]] Status AddRange(int64_t pos_start, int64_t pos_end);
+
+  /// \brief Checks if a position is set in the bitmap.
+  /// \return Result<bool> or InvalidArgument error
+  [[nodiscard]] Result<bool> Contains(int64_t pos) const;
+
+  /// \brief Returns true if the bitmap has no positions set.
+  bool IsEmpty() const;
+
+  /// \brief Returns the number of set positions in the bitmap.
+  size_t Cardinality() const;
+
+  /// \brief Merges all positions from the other bitmap into this one
+  /// (in-place union).
+  void Or(const RoaringPositionBitmap& other);
+
+  /// \brief Optimizes the bitmap by applying run-length encoding to
+  /// containers where it is more space efficient than array or bitset
+  /// representations.
+  /// \return true if any container was changed
+  bool Optimize();
+
+  /// \brief Iterates over all set positions in ascending order.
+  void ForEach(const std::function<void(int64_t)>& fn) const;
+
+  /// \brief Returns the serialized size in bytes.
+  size_t SerializedSizeInBytes() const;
+
+  /// \brief Serializes using the portable format (little-endian).
+  Result<std::string> Serialize() const;
+
+  /// \brief Deserializes a bitmap from bytes.
+  static Result<RoaringPositionBitmap> Deserialize(std::string_view bytes);
+
+ private:
+  struct Impl;
+  std::unique_ptr<Impl> impl_;
+
+  explicit RoaringPositionBitmap(std::unique_ptr<Impl> impl);
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 2cf1065b..42f46d5a 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -43,6 +43,7 @@ iceberg_sources = files(
     'arrow_c_data_guard_internal.cc',
     'catalog/memory/in_memory_catalog.cc',
     'delete_file_index.cc',
+    'deletes/roaring_position_bitmap.cc',
     'expression/aggregate.cc',
     'expression/binder.cc',
     'expression/evaluator.cc',
@@ -221,6 +222,7 @@ install_headers(
 )
 
 subdir('catalog')
+subdir('deletes')
 subdir('expression')
 subdir('manifest')
 subdir('puffin')
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 768e0507..c802df8a 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -115,6 +115,7 @@ add_iceberg_test(util_test
                  endian_test.cc
                  formatter_test.cc
                  location_util_test.cc
+                 roaring_position_bitmap_test.cc
                  string_util_test.cc
                  transform_util_test.cc
                  truncate_util_test.cc
diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build
index df2d5db8..1eafc5fe 100644
--- a/src/iceberg/test/meson.build
+++ b/src/iceberg/test/meson.build
@@ -90,6 +90,7 @@ iceberg_tests = {
             'endian_test.cc',
             'formatter_test.cc',
             'location_util_test.cc',
+            'roaring_position_bitmap_test.cc',
             'string_util_test.cc',
             'transform_util_test.cc',
             'truncate_util_test.cc',
diff --git a/src/iceberg/test/resources/64map32bitvals.bin 
b/src/iceberg/test/resources/64map32bitvals.bin
new file mode 100644
index 00000000..475b8944
Binary files /dev/null and b/src/iceberg/test/resources/64map32bitvals.bin 
differ
diff --git a/src/iceberg/test/resources/64mapempty.bin 
b/src/iceberg/test/resources/64mapempty.bin
new file mode 100644
index 00000000..1b1cb4d4
Binary files /dev/null and b/src/iceberg/test/resources/64mapempty.bin differ
diff --git a/src/iceberg/test/resources/64maphighvals.bin 
b/src/iceberg/test/resources/64maphighvals.bin
new file mode 100644
index 00000000..d4312b8d
Binary files /dev/null and b/src/iceberg/test/resources/64maphighvals.bin differ
diff --git a/src/iceberg/test/resources/64mapspreadvals.bin 
b/src/iceberg/test/resources/64mapspreadvals.bin
new file mode 100644
index 00000000..83c72f6b
Binary files /dev/null and b/src/iceberg/test/resources/64mapspreadvals.bin 
differ
diff --git a/src/iceberg/test/roaring_position_bitmap_test.cc 
b/src/iceberg/test/roaring_position_bitmap_test.cc
new file mode 100644
index 00000000..8d7020b0
--- /dev/null
+++ b/src/iceberg/test/roaring_position_bitmap_test.cc
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/deletes/roaring_position_bitmap.h"
+
+#include <cstddef>
+#include <cstdint>
+#include <filesystem>
+#include <fstream>
+#include <random>
+#include <set>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/test_config.h"
+
+namespace iceberg {
+
+namespace {
+
+constexpr int64_t kBitmapSize = 0xFFFFFFFFL;
+constexpr int64_t kBitmapOffset = kBitmapSize + 1L;
+constexpr int64_t kContainerSize = 0xFFFF;  // Character.MAX_VALUE
+constexpr int64_t kContainerOffset = kContainerSize + 1L;
+
+// Helper to construct a position from bitmap index, container index, and value
+int64_t Position(int32_t bitmap_index, int32_t container_index, int64_t value) 
{
+  return bitmap_index * kBitmapOffset + container_index * kContainerOffset + 
value;
+}
+
+std::string ReadTestResource(const std::string& filename) {
+  std::filesystem::path path = std::filesystem::path(ICEBERG_TEST_RESOURCES) / 
filename;
+  std::ifstream file(path, std::ios::binary);
+  EXPECT_TRUE(file.good()) << "Cannot open: " << path;
+  return {std::istreambuf_iterator<char>(file), 
std::istreambuf_iterator<char>()};
+}
+
+RoaringPositionBitmap RoundTripSerialize(const RoaringPositionBitmap& bitmap) {
+  auto serialized = bitmap.Serialize();
+  EXPECT_THAT(serialized, IsOk());
+  auto deserialized = RoaringPositionBitmap::Deserialize(serialized.value());
+  EXPECT_THAT(deserialized, IsOk());
+  return std::move(deserialized.value());
+}
+
+void AssertEqualContent(const RoaringPositionBitmap& bitmap,
+                        const std::set<int64_t>& positions) {
+  ASSERT_EQ(bitmap.Cardinality(), positions.size());
+  for (int64_t pos : positions) {
+    ASSERT_THAT(bitmap.Contains(pos), HasValue(::testing::Eq(true)))
+        << "Position not found: " << pos;
+  }
+  bitmap.ForEach([&](int64_t pos) {
+    ASSERT_TRUE(positions.contains(pos)) << "Unexpected position: " << pos;
+  });
+}
+
+void AssertEqual(RoaringPositionBitmap& bitmap, const std::set<int64_t>& 
positions) {
+  AssertEqualContent(bitmap, positions);
+
+  auto copy1 = RoundTripSerialize(bitmap);
+  AssertEqualContent(copy1, positions);
+
+  bitmap.Optimize();
+  auto copy2 = RoundTripSerialize(bitmap);
+  AssertEqualContent(copy2, positions);
+}
+
+RoaringPositionBitmap BuildBitmap(const std::set<int64_t>& positions) {
+  RoaringPositionBitmap bitmap;
+  for (int64_t pos : positions) {
+    EXPECT_THAT(bitmap.Add(pos), IsOk());
+  }
+  return bitmap;
+}
+
+struct AddRangeParams {
+  const char* name;
+  int64_t start;
+  int64_t end;
+  std::vector<int64_t> absent_positions;
+};
+
+class RoaringPositionBitmapAddRangeTest
+    : public ::testing::TestWithParam<AddRangeParams> {};
+
+TEST_P(RoaringPositionBitmapAddRangeTest, AddsExpectedPositions) {
+  const auto& param = GetParam();
+  RoaringPositionBitmap bitmap;
+  ASSERT_THAT(bitmap.AddRange(param.start, param.end), IsOk());
+
+  std::set<int64_t> expected_positions;
+  for (int64_t pos = param.start; pos < param.end; ++pos) {
+    expected_positions.insert(pos);
+  }
+  AssertEqualContent(bitmap, expected_positions);
+  for (int64_t pos : param.absent_positions) {
+    ASSERT_FALSE(bitmap.Contains(pos).value());
+  }
+}
+
+INSTANTIATE_TEST_SUITE_P(
+    AddRangeScenarios, RoaringPositionBitmapAddRangeTest,
+    ::testing::Values(AddRangeParams{.name = "single_key",
+                                     .start = 10,
+                                     .end = 20,
+                                     .absent_positions = {9, 20}},
+                      AddRangeParams{
+                          .name = "across_keys",
+                          .start = (int64_t{1} << 32) - 5,
+                          .end = (int64_t{1} << 32) + 5,
+                          .absent_positions = {0, (int64_t{1} << 32) + 5},
+                      }),
+    [](const ::testing::TestParamInfo<AddRangeParams>& info) { return 
info.param.name; });
+
+struct OrParams {
+  const char* name;
+  std::set<int64_t> lhs_input;
+  std::set<int64_t> rhs_input;
+  std::set<int64_t> lhs_expected;
+  std::set<int64_t> rhs_expected;
+};
+
+class RoaringPositionBitmapOrTest : public ::testing::TestWithParam<OrParams> 
{};
+
+TEST_P(RoaringPositionBitmapOrTest, ProducesExpectedUnionAndKeepsRhsUnchanged) 
{
+  const auto& param = GetParam();
+  auto lhs = BuildBitmap(param.lhs_input);
+  auto rhs = BuildBitmap(param.rhs_input);
+
+  lhs.Or(rhs);
+
+  AssertEqualContent(lhs, param.lhs_expected);
+  AssertEqualContent(rhs, param.rhs_expected);
+}
+
+INSTANTIATE_TEST_SUITE_P(
+    OrScenarios, RoaringPositionBitmapOrTest,
+    ::testing::Values(
+        OrParams{
+            .name = "disjoint",
+            .lhs_input = {10L, 20L},
+            .rhs_input = {30L, 40L, int64_t{2} << 32},
+            .lhs_expected = {10L, 20L, 30L, 40L, int64_t{2} << 32},
+            .rhs_expected = {30L, 40L, int64_t{2} << 32},
+        },
+        OrParams{
+            .name = "rhs_empty",
+            .lhs_input = {10L, 20L},
+            .rhs_input = {},
+            .lhs_expected = {10L, 20L},
+            .rhs_expected = {},
+        },
+        OrParams{
+            .name = "overlapping",
+            .lhs_input = {10L, 20L, 30L},
+            .rhs_input = {20L, 40L},
+            .lhs_expected = {10L, 20L, 30L, 40L},
+            .rhs_expected = {20L, 40L},
+        },
+        OrParams{
+            .name = "sparse_multi_key",
+            .lhs_input = {100L, (int64_t{1} << 32) | 200L},
+            .rhs_input = {(int64_t{2} << 32) | 300L, (int64_t{3} << 32) | 
400L},
+            .lhs_expected = {100L, (int64_t{1} << 32) | 200L, (int64_t{2} << 
32) | 300L,
+                             (int64_t{3} << 32) | 400L},
+            .rhs_expected = {(int64_t{2} << 32) | 300L, (int64_t{3} << 32) | 
400L},
+        }),
+    [](const ::testing::TestParamInfo<OrParams>& info) { return 
info.param.name; });
+
+enum class InteropBitmapShape {
+  kEmpty,
+  kOnly32BitPositions,
+  kSpreadAcrossKeys,
+};
+
+struct InteropCase {
+  const char* file_name;
+  InteropBitmapShape expected_shape;
+};
+
+void AssertInteropBitmapShape(const RoaringPositionBitmap& bitmap,
+                              InteropBitmapShape expected_shape) {
+  bool saw_pos_lt_32_bit = false;
+  bool saw_pos_ge_32_bit = false;
+
+  bitmap.ForEach([&](int64_t pos) {
+    if (pos < (int64_t{1} << 32)) {
+      saw_pos_lt_32_bit = true;
+    } else {
+      saw_pos_ge_32_bit = true;
+    }
+  });
+
+  switch (expected_shape) {
+    case InteropBitmapShape::kEmpty:
+      ASSERT_TRUE(bitmap.IsEmpty());
+      ASSERT_EQ(bitmap.Cardinality(), 0u);
+      break;
+    case InteropBitmapShape::kOnly32BitPositions:
+      ASSERT_GT(bitmap.Cardinality(), 0u);
+      ASSERT_TRUE(saw_pos_lt_32_bit);
+      ASSERT_FALSE(saw_pos_ge_32_bit);
+      break;
+    case InteropBitmapShape::kSpreadAcrossKeys:
+      ASSERT_GT(bitmap.Cardinality(), 0u);
+      ASSERT_TRUE(saw_pos_lt_32_bit);
+      ASSERT_TRUE(saw_pos_ge_32_bit);
+      break;
+  }
+}
+
+}  // namespace
+
+TEST(RoaringPositionBitmapTest, TestAdd) {
+  RoaringPositionBitmap bitmap;
+
+  ASSERT_THAT(bitmap.Add(10L), IsOk());
+  ASSERT_TRUE(bitmap.Contains(10L).value());
+
+  ASSERT_THAT(bitmap.Add(0L), IsOk());
+  ASSERT_TRUE(bitmap.Contains(0L).value());
+
+  ASSERT_THAT(bitmap.Add(10L), IsOk());  // duplicate
+  ASSERT_TRUE(bitmap.Contains(10L).value());
+}
+
+TEST(RoaringPositionBitmapTest, TestAddPositionsRequiringMultipleBitmaps) {
+  RoaringPositionBitmap bitmap;
+
+  int64_t pos1 = 10L;
+  int64_t pos2 = (int64_t{1} << 32) | 20L;
+  int64_t pos3 = (int64_t{2} << 32) | 30L;
+  int64_t pos4 = (int64_t{100} << 32) | 40L;
+
+  ASSERT_THAT(bitmap.Add(pos1), IsOk());
+  ASSERT_THAT(bitmap.Add(pos2), IsOk());
+  ASSERT_THAT(bitmap.Add(pos3), IsOk());
+  ASSERT_THAT(bitmap.Add(pos4), IsOk());
+
+  AssertEqualContent(bitmap, {pos1, pos2, pos3, pos4});
+  ASSERT_EQ(bitmap.SerializedSizeInBytes(), 1260);
+}
+
+TEST(RoaringPositionBitmapTest, TestAddEmptyRange) {
+  RoaringPositionBitmap bitmap;
+  ASSERT_THAT(bitmap.AddRange(10, 10), IsOk());
+  ASSERT_TRUE(bitmap.IsEmpty());
+}
+
+TEST(RoaringPositionBitmapTest, TestCardinality) {
+  RoaringPositionBitmap bitmap;
+
+  ASSERT_EQ(bitmap.Cardinality(), 0);
+
+  ASSERT_THAT(bitmap.Add(10L), IsOk());
+  ASSERT_THAT(bitmap.Add(20L), IsOk());
+  ASSERT_THAT(bitmap.Add(30L), IsOk());
+  ASSERT_EQ(bitmap.Cardinality(), 3);
+
+  ASSERT_THAT(bitmap.Add(10L), IsOk());  // already exists
+  ASSERT_EQ(bitmap.Cardinality(), 3);
+}
+
+TEST(RoaringPositionBitmapTest, TestCardinalitySparseBitmaps) {
+  RoaringPositionBitmap bitmap;
+
+  ASSERT_THAT(bitmap.Add(100L), IsOk());
+  ASSERT_THAT(bitmap.Add(101L), IsOk());
+  ASSERT_THAT(bitmap.Add(105L), IsOk());
+  ASSERT_THAT(bitmap.Add((int64_t{1} << 32) | 200L), IsOk());
+  ASSERT_THAT(bitmap.Add((int64_t{100} << 32) | 300L), IsOk());
+
+  ASSERT_EQ(bitmap.Cardinality(), 5);
+}
+
+TEST(RoaringPositionBitmapTest, TestSerializeDeserializeRoundTrip) {
+  RoaringPositionBitmap bitmap;
+  ASSERT_THAT(bitmap.Add(10L), IsOk());
+  ASSERT_THAT(bitmap.Add(20L), IsOk());
+  ASSERT_THAT(bitmap.Add((int64_t{1} << 32) | 30L), IsOk());
+
+  auto copy = RoundTripSerialize(bitmap);
+  AssertEqualContent(copy, {10L, 20L, (int64_t{1} << 32) | 30L});
+}
+
+TEST(RoaringPositionBitmapTest, TestCopyConstructor) {
+  RoaringPositionBitmap bitmap;
+  ASSERT_THAT(bitmap.Add(10L), IsOk());
+  ASSERT_THAT(bitmap.Add((int64_t{2} << 32) | 30L), IsOk());
+
+  RoaringPositionBitmap copy(bitmap);
+
+  AssertEqualContent(copy, {10L, (int64_t{2} << 32) | 30L});
+
+  // Copy is independent from source.
+  ASSERT_THAT(copy.Add(99L), IsOk());
+  ASSERT_THAT(bitmap.Contains(99L), HasValue(::testing::Eq(false)));
+}
+
+TEST(RoaringPositionBitmapTest, TestCopyAssignment) {
+  RoaringPositionBitmap bitmap;
+  ASSERT_THAT(bitmap.Add(10L), IsOk());
+  ASSERT_THAT(bitmap.Add((int64_t{3} << 32) | 40L), IsOk());
+
+  RoaringPositionBitmap assigned;
+  ASSERT_THAT(assigned.Add(1L), IsOk());
+
+  assigned = bitmap;
+  AssertEqualContent(assigned, {10L, (int64_t{3} << 32) | 40L});
+
+  // Assignment result is independent from source.
+  ASSERT_THAT(bitmap.Add(200L), IsOk());
+  ASSERT_THAT(assigned.Contains(200L), HasValue(::testing::Eq(false)));
+}
+
+TEST(RoaringPositionBitmapTest, TestSerializeDeserializeEmpty) {
+  RoaringPositionBitmap bitmap;
+  auto copy = RoundTripSerialize(bitmap);
+  ASSERT_TRUE(copy.IsEmpty());
+  ASSERT_EQ(copy.Cardinality(), 0);
+}
+
+TEST(RoaringPositionBitmapTest, TestSerializeDeserializeAllContainerBitmap) {
+  RoaringPositionBitmap bitmap;
+
+  // bitmap 0, container 0 (array - few elements)
+  ASSERT_THAT(bitmap.Add(Position(0, 0, 5)), IsOk());
+  ASSERT_THAT(bitmap.Add(Position(0, 0, 7)), IsOk());
+
+  // bitmap 0, container 1 (array that can be compressed)
+  ASSERT_THAT(bitmap.AddRange(Position(0, 1, 1), Position(0, 1, 1000)), 
IsOk());
+
+  // bitmap 0, container 2 (bitset - nearly full container)
+  ASSERT_THAT(bitmap.AddRange(Position(0, 2, 1), Position(0, 2, 
kContainerOffset - 1)),
+              IsOk());
+
+  // bitmap 1, container 0 (array)
+  ASSERT_THAT(bitmap.Add(Position(1, 0, 10)), IsOk());
+  ASSERT_THAT(bitmap.Add(Position(1, 0, 20)), IsOk());
+
+  // bitmap 1, container 1 (array that can be compressed)
+  ASSERT_THAT(bitmap.AddRange(Position(1, 1, 10), Position(1, 1, 500)), 
IsOk());
+
+  // bitmap 1, container 2 (bitset)
+  ASSERT_THAT(bitmap.AddRange(Position(1, 2, 1), Position(1, 2, 
kContainerOffset - 1)),
+              IsOk());
+
+  ASSERT_TRUE(bitmap.Optimize());
+
+  auto copy = RoundTripSerialize(bitmap);
+  std::set<int64_t> expected_positions;
+  bitmap.ForEach([&](int64_t pos) { expected_positions.insert(pos); });
+
+  AssertEqualContent(copy, expected_positions);
+}
+
+TEST(RoaringPositionBitmapTest, TestForEach) {
+  RoaringPositionBitmap bitmap;
+  ASSERT_THAT(bitmap.Add(30L), IsOk());
+  ASSERT_THAT(bitmap.Add(10L), IsOk());
+  ASSERT_THAT(bitmap.Add(20L), IsOk());
+  ASSERT_THAT(bitmap.Add((int64_t{1} << 32) | 5L), IsOk());
+
+  std::vector<int64_t> positions;
+  bitmap.ForEach([&](int64_t pos) { positions.push_back(pos); });
+
+  ASSERT_EQ(positions.size(), 4u);
+  ASSERT_EQ(positions[0], 10L);
+  ASSERT_EQ(positions[1], 20L);
+  ASSERT_EQ(positions[2], 30L);
+  ASSERT_EQ(positions[3], (int64_t{1} << 32) | 5L);
+}
+
+TEST(RoaringPositionBitmapTest, TestIsEmpty) {
+  RoaringPositionBitmap bitmap;
+  ASSERT_TRUE(bitmap.IsEmpty());
+
+  ASSERT_THAT(bitmap.Add(10L), IsOk());
+  ASSERT_FALSE(bitmap.IsEmpty());
+}
+
+TEST(RoaringPositionBitmapTest, TestOptimize) {
+  RoaringPositionBitmap bitmap;
+  ASSERT_THAT(bitmap.AddRange(0, 10000), IsOk());
+  size_t size_before = bitmap.SerializedSizeInBytes();
+
+  bool changed = bitmap.Optimize();
+  ASSERT_TRUE(changed);
+
+  // RLE optimization should reduce size for this dense range
+  size_t size_after = bitmap.SerializedSizeInBytes();
+  ASSERT_GT(size_before, size_after);
+
+  // Content should be unchanged after RLE optimization
+  std::set<int64_t> expected_positions;
+  for (int64_t i = 0; i < 10000; ++i) {
+    expected_positions.insert(i);
+  }
+  AssertEqualContent(bitmap, expected_positions);
+
+  // Round-trip should preserve content after RLE
+  auto copy = RoundTripSerialize(bitmap);
+  AssertEqualContent(copy, expected_positions);
+}
+
+TEST(RoaringPositionBitmapTest, TestUnsupportedPositions) {
+  RoaringPositionBitmap bitmap;
+
+  // Negative position
+  ASSERT_THAT(bitmap.Add(-1L), IsError(ErrorKind::kInvalidArgument));
+
+  // Contains with negative position
+  ASSERT_THAT(bitmap.Contains(-1L), IsError(ErrorKind::kInvalidArgument));
+
+  // Position exceeding MAX_POSITION
+  ASSERT_THAT(bitmap.Add(RoaringPositionBitmap::kMaxPosition + 1L),
+              IsError(ErrorKind::kInvalidArgument));
+
+  // Contains with position exceeding MAX_POSITION
+  ASSERT_THAT(bitmap.Contains(RoaringPositionBitmap::kMaxPosition + 1L),
+              IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST(RoaringPositionBitmapTest, TestRandomSparseBitmap) {
+  std::mt19937_64 rng(42);
+  RoaringPositionBitmap bitmap;
+  std::set<int64_t> positions;
+
+  std::uniform_int_distribution<int64_t> dist(0, int64_t{5} << 32);
+
+  for (int i = 0; i < 100000; ++i) {
+    int64_t pos = dist(rng);
+    positions.insert(pos);
+    ASSERT_THAT(bitmap.Add(pos), IsOk());
+  }
+
+  AssertEqual(bitmap, positions);
+
+  // Random lookups
+  std::mt19937_64 rng2(123);
+  std::uniform_int_distribution<int64_t> lookup_dist(0,
+                                                     
RoaringPositionBitmap::kMaxPosition);
+  for (int i = 0; i < 20000; ++i) {
+    int64_t pos = lookup_dist(rng2);
+    auto result = bitmap.Contains(pos);
+    ASSERT_THAT(result, IsOk());
+    ASSERT_EQ(result.value(), positions.contains(pos));
+  }
+}
+
+TEST(RoaringPositionBitmapTest, TestRandomDenseBitmap) {
+  RoaringPositionBitmap bitmap;
+  std::set<int64_t> positions;
+
+  // Create dense ranges across multiple bitmap keys
+  for (int64_t offset : {int64_t{0}, int64_t{2} << 32, int64_t{5} << 32}) {
+    for (int64_t i = 0; i < 10000; ++i) {
+      ASSERT_THAT(bitmap.Add(offset + i), IsOk());
+      positions.insert(offset + i);
+    }
+  }
+
+  AssertEqual(bitmap, positions);
+}
+
+TEST(RoaringPositionBitmapTest, TestRandomMixedBitmap) {
+  std::mt19937_64 rng(42);
+  RoaringPositionBitmap bitmap;
+  std::set<int64_t> positions;
+
+  // Sparse positions in [3<<32, 5<<32)
+  std::uniform_int_distribution<int64_t> dist1(int64_t{3} << 32, int64_t{5} << 
32);
+  for (int i = 0; i < 50000; ++i) {
+    int64_t pos = dist1(rng);
+    positions.insert(pos);
+    ASSERT_THAT(bitmap.Add(pos), IsOk());
+  }
+
+  // Dense range in [0, 10000)
+  for (int64_t i = 0; i < 10000; ++i) {
+    ASSERT_THAT(bitmap.Add(i), IsOk());
+    positions.insert(i);
+  }
+
+  // More sparse positions in [0, 1<<32)
+  std::uniform_int_distribution<int64_t> dist2(0, int64_t{1} << 32);
+  for (int i = 0; i < 5000; ++i) {
+    int64_t pos = dist2(rng);
+    positions.insert(pos);
+    ASSERT_THAT(bitmap.Add(pos), IsOk());
+  }
+
+  AssertEqual(bitmap, positions);
+}
+
+TEST(RoaringPositionBitmapTest, TestDeserializeInvalidData) {
+  // Buffer too small
+  auto result = RoaringPositionBitmap::Deserialize("");
+  ASSERT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+
+  // Invalid bitmap count (very large)
+  std::string buf(8, '\xFF');
+  result = RoaringPositionBitmap::Deserialize(buf);
+  ASSERT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST(RoaringPositionBitmapInteropTest, 
TestDeserializeSupportedRoaringExamples) {
+  // These .bin fixtures are copied from the Apache Iceberg Java repository's
+  // roaring position bitmap interoperability test resources.
+  static const std::vector<InteropCase> kCases = {
+      {.file_name = "64map32bitvals.bin",
+       .expected_shape = InteropBitmapShape::kOnly32BitPositions},
+      {.file_name = "64mapempty.bin", .expected_shape = 
InteropBitmapShape::kEmpty},
+      {.file_name = "64mapspreadvals.bin",
+       .expected_shape = InteropBitmapShape::kSpreadAcrossKeys},
+  };
+
+  for (const auto& test_case : kCases) {
+    SCOPED_TRACE(test_case.file_name);
+    std::string data = ReadTestResource(test_case.file_name);
+    auto result = RoaringPositionBitmap::Deserialize(data);
+    ASSERT_THAT(result, IsOk());
+
+    const auto& bitmap = result.value();
+    AssertInteropBitmapShape(bitmap, test_case.expected_shape);
+
+    std::set<int64_t> positions;
+    bitmap.ForEach([&](int64_t pos) { positions.insert(pos); });
+    AssertEqualContent(bitmap, positions);
+
+    auto copy = RoundTripSerialize(bitmap);
+    AssertEqualContent(copy, positions);
+  }
+}
+
+TEST(RoaringPositionBitmapInteropTest, 
TestDeserializeUnsupportedRoaringExample) {
+  // This file is copied from the Apache Iceberg Java repository and it 
contains
+  // a value with key larger than max supported
+  std::string data = ReadTestResource("64maphighvals.bin");
+  auto result = RoaringPositionBitmap::Deserialize(data);
+  ASSERT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+  ASSERT_THAT(result, HasErrorMessage("Invalid unsigned key"));
+}
+
+}  // namespace iceberg


Reply via email to