This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 4d855383 Make BITPOS in Bitmap handling `stop_given` (#2085)
4d855383 is described below

commit 4d85538375eff11e8d264822ed45fa6e808e62cf
Author: mwish <[email protected]>
AuthorDate: Fri Feb 2 18:24:48 2024 +0800

    Make BITPOS in Bitmap handling `stop_given` (#2085)
---
 src/types/redis_bitmap.cc          | 59 ++++++++++++++++++++++++++++----------
 tests/cppunit/types/bitmap_test.cc | 28 ++++++++++++++++++
 2 files changed, 72 insertions(+), 15 deletions(-)

diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc
index 9fc93eb8..01360829 100644
--- a/src/types/redis_bitmap.cc
+++ b/src/types/redis_bitmap.cc
@@ -285,6 +285,10 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool 
bit, int64_t start, i
   std::tie(start, stop) = BitmapString::NormalizeRange(start, stop, 
static_cast<int64_t>(metadata.size));
   auto u_start = static_cast<uint32_t>(start);
   auto u_stop = static_cast<uint32_t>(stop);
+  if (u_start > u_stop) {
+    *pos = -1;
+    return rocksdb::Status::OK();
+  }
 
   auto bit_pos_in_byte = [](char byte, bool bit) -> int {
     for (int i = 0; i < 8; i++) {
@@ -300,8 +304,9 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, bool 
bit, int64_t start, i
   uint32_t start_index = u_start / kBitmapSegmentBytes;
   uint32_t stop_index = u_stop / kBitmapSegmentBytes;
   // Don't use multi get to prevent large range query, and take too much memory
-  rocksdb::PinnableSlice pin_value;
+  // Searching bits in segments [start_index, stop_index].
   for (uint32_t i = start_index; i <= stop_index; i++) {
+    rocksdb::PinnableSlice pin_value;
     std::string sub_key =
         InternalKey(ns_key, std::to_string(i * kBitmapSegmentBytes), 
metadata.version, storage_->IsSlotIdEncoded())
             .Encode();
@@ -309,32 +314,56 @@ rocksdb::Status Bitmap::BitPos(const Slice &user_key, 
bool bit, int64_t start, i
     if (!s.ok() && !s.IsNotFound()) return s;
     if (s.IsNotFound()) {
       if (!bit) {
+        // Note: even if stop is given, we can return immediately when bit is 
0.
+        // because bit_pos will always be greater.
         *pos = i * kBitmapSegmentBits;
         return rocksdb::Status::OK();
       }
       continue;
     }
-    size_t j = 0;
-    if (i == start_index) j = u_start % kBitmapSegmentBytes;
-    for (; j < pin_value.size(); j++) {
-      if (i == stop_index && j > (u_stop % kBitmapSegmentBytes)) break;
-      if (bit_pos_in_byte(pin_value[j], bit) != -1) {
-        *pos = static_cast<int64_t>(i * kBitmapSegmentBits + j * 8 + 
bit_pos_in_byte(pin_value[j], bit));
+    size_t byte_pos_in_segment = 0;
+    if (i == start_index) byte_pos_in_segment = u_start % kBitmapSegmentBytes;
+    size_t stop_byte_in_segment = pin_value.size();
+    if (i == stop_index) {
+      DCHECK_LE(u_stop % kBitmapSegmentBytes + 1, pin_value.size());
+      stop_byte_in_segment = u_stop % kBitmapSegmentBytes + 1;
+    }
+    // Invariant:
+    // 1. pin_value.size() <= kBitmapSegmentBytes.
+    // 2. If it's the last segment, metadata.size % kBitmapSegmentBytes <= 
pin_value.size().
+    for (; byte_pos_in_segment < stop_byte_in_segment; byte_pos_in_segment++) {
+      int bit_pos_in_byte_value = 
bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
+      if (bit_pos_in_byte_value != -1) {
+        *pos = static_cast<int64_t>(i * kBitmapSegmentBits + 
byte_pos_in_segment * 8 + bit_pos_in_byte_value);
         return rocksdb::Status::OK();
       }
     }
-    if (!bit && pin_value.size() < kBitmapSegmentBytes) {
-      // ExpandBitmapSegment might generate a segment with size less than 
kBitmapSegmentBytes,
-      // but larger than logical size, so we need to align the last segment 
with `metadata.size`
-      // rather than `pin_value.size()`.
-      auto last_segment_bytes = metadata.size % kBitmapSegmentBytes;
-      DCHECK_LE(last_segment_bytes, pin_value.size());
-      *pos = static_cast<int64_t>(i * kBitmapSegmentBits + last_segment_bytes 
* 8);
+    if (bit) {
+      continue;
+    }
+    // There're two cases that `pin_value.size() < kBitmapSegmentBytes`:
+    // 1. If it's the last segment, we've done searching in the above loop.
+    // 2. If it's not the last segment, we can check if the segment is all 0.
+    if (pin_value.size() < kBitmapSegmentBytes) {
+      if (i == stop_index) {
+        continue;
+      }
+      *pos = static_cast<int64_t>(i * kBitmapSegmentBits + pin_value.size() * 
8);
       return rocksdb::Status::OK();
     }
-    pin_value.Reset();
   }
   // bit was not found
+  /* If we are looking for clear bits, and the user specified an exact
+   * range with start-end, we can't consider the right of the range as
+   * zero padded (as we do when no explicit end is given).
+   *
+   * So if redisBitpos() returns the first bit outside the range,
+   * we return -1 to the caller, to mean, in the specified range there
+   * is not a single "0" bit. */
+  if (stop_given && bit == 0) {
+    *pos = -1;
+    return rocksdb::Status::OK();
+  }
   *pos = bit ? -1 : static_cast<int64_t>(metadata.size * 8);
   return rocksdb::Status::OK();
 }
diff --git a/tests/cppunit/types/bitmap_test.cc 
b/tests/cppunit/types/bitmap_test.cc
index 7c6b64f0..92a12e48 100644
--- a/tests/cppunit/types/bitmap_test.cc
+++ b/tests/cppunit/types/bitmap_test.cc
@@ -194,6 +194,34 @@ TEST_P(RedisBitmapTest, BitPosNegative) {
   auto s = bitmap_->Del(key_);
 }
 
+// When `stop_given` is true, even searching for 0,
+// we cannot exceeds the stop position.
+TEST_P(RedisBitmapTest, BitPosStopGiven) {
+  for (int i = 0; i < 8; ++i) {
+    bool bit = true;
+    bitmap_->SetBit(key_, i, true, &bit);
+    EXPECT_FALSE(bit);
+  }
+  int64_t pos = 0;
+  bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/true, &pos);
+  EXPECT_EQ(-1, pos);
+  bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/false, &pos);
+  EXPECT_EQ(8, pos);
+
+  // Set a bit at 8 not affect that
+  {
+    bool bit = true;
+    bitmap_->SetBit(key_, 8, true, &bit);
+    EXPECT_FALSE(bit);
+  }
+  bitmap_->BitPos(key_, false, 0, 0, /*stop_given=*/true, &pos);
+  EXPECT_EQ(-1, pos);
+  bitmap_->BitPos(key_, false, 0, 1, /*stop_given=*/false, &pos);
+  EXPECT_EQ(9, pos);
+
+  auto s = bitmap_->Del(key_);
+}
+
 TEST_P(RedisBitmapTest, BitfieldGetSetTest) {
   constexpr uint32_t magic = 0xdeadbeef;
 

Reply via email to