This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new fb7acf6a Bitmap: Extract common bit operations like GetBit and 
SetBitTo (#2105)
fb7acf6a is described below

commit fb7acf6a5f3c010fa61f30f1f8fbef63efd0b31d
Author: mwish <[email protected]>
AuthorDate: Mon Feb 19 17:20:30 2024 +0800

    Bitmap: Extract common bit operations like GetBit and SetBitTo (#2105)
    
    This is a refactor, see: https://github.com/apache/kvrocks/issues/2076
    
    Previously, adhoc bit operations are used. This patch extract `GetBit` and 
`SetBitTo` operation to simplify the impl.
    
    Also, because the LSB/MSB format we're using it different in BitmapString 
and Bitmap( see https://github.com/apache/kvrocks-website/pull/198 ), `lsb` and 
`msb` is extracted to prevent from mistakes.
---
 src/common/bit_util.h            | 149 +++++++++++++++++++++++++++++++++++++++
 src/storage/batch_extractor.cc   |   3 +-
 src/types/redis_bitmap.cc        |  69 ++++++++++--------
 src/types/redis_bitmap.h         |   9 ++-
 src/types/redis_bitmap_string.cc | 113 ++++-------------------------
 src/types/redis_bitmap_string.h  |   8 +--
 6 files changed, 212 insertions(+), 139 deletions(-)

diff --git a/src/common/bit_util.h b/src/common/bit_util.h
new file mode 100644
index 00000000..9c23d78b
--- /dev/null
+++ b/src/common/bit_util.h
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+namespace util {
+
+/* Count number of bits set in the binary array pointed by 's' and long
+ * 'count' bytes. The implementation of this function is required to
+ * work with a input string length up to 512 MB.
+ * */
+inline size_t RawPopcount(const uint8_t *p, int64_t count) {
+  size_t bits = 0;
+
+  for (; count >= 8; p += 8, count -= 8) {
+    bits += __builtin_popcountll(*reinterpret_cast<const uint64_t *>(p));
+  }
+
+  if (count > 0) {
+    uint64_t v = 0;
+    __builtin_memcpy(&v, p, count);
+    bits += __builtin_popcountll(v);
+  }
+
+  return bits;
+}
+
+template <typename T = void>
+inline int ClzllWithEndian(uint64_t x) {
+  if constexpr (IsLittleEndian()) {
+    return __builtin_clzll(__builtin_bswap64(x));
+  } else if constexpr (IsBigEndian()) {
+    return __builtin_clzll(x);
+  } else {
+    static_assert(AlwaysFalse<T>);
+  }
+}
+
+// Return the number of bytes needed to fit the given number of bits
+constexpr int64_t BytesForBits(int64_t bits) {
+  // This formula avoids integer overflow on very large `bits`
+  return (bits >> 3) + ((bits & 7) != 0);
+}
+
+namespace lsb {
+static constexpr bool GetBit(const uint8_t *bits, uint64_t i) { return (bits[i 
>> 3] >> (i & 0x07)) & 1; }
+
+// Bitmask selecting the k-th bit in a byte
+static constexpr uint8_t kBitmask[] = {1, 2, 4, 8, 16, 32, 64, 128};
+
+// Gets the i-th bit from a byte. Should only be used with i <= 7.
+static constexpr bool GetBitFromByte(uint8_t byte, uint8_t i) { return byte & 
kBitmask[i]; }
+
+static inline void SetBitTo(uint8_t *bits, int64_t i, bool bit_is_set) {
+  // https://graphics.stanford.edu/~seander/bithacks.html
+  // "Conditionally set or clear bits without branching"
+  // NOTE: this seems to confuse Valgrind as it reads from potentially
+  // uninitialized memory
+  bits[i / 8] ^= static_cast<uint8_t>(-static_cast<uint8_t>(bit_is_set) ^ 
bits[i / 8]) & kBitmask[i % 8];
+}
+}  // namespace lsb
+
+namespace msb {
+static constexpr bool GetBit(const uint8_t *bits, uint64_t i) { return (bits[i 
>> 3] >> (7 - (i & 0x07))) & 1; }
+
+// Bitmask selecting the k-th bit in a byte
+static constexpr uint8_t kBitmask[] = {128, 64, 32, 16, 8, 4, 2, 1};
+
+// Gets the i-th bit from a byte. Should only be used with i <= 7.
+static constexpr bool GetBitFromByte(uint8_t byte, uint8_t i) { return byte & 
kBitmask[i]; }
+
+static inline void SetBitTo(uint8_t *bits, int64_t i, bool bit_is_set) {
+  // https://graphics.stanford.edu/~seander/bithacks.html
+  // "Conditionally set or clear bits without branching"
+  // NOTE: this seems to confuse Valgrind as it reads from potentially
+  // uninitialized memory
+  bits[i / 8] ^= static_cast<uint8_t>(-static_cast<uint8_t>(bit_is_set) ^ 
bits[i / 8]) & kBitmask[i % 8];
+}
+
+/* Return the position of the first bit set to one (if 'bit' is 1) or
+ * zero (if 'bit' is 0) in the bitmap starting at 's' and long 'count' bytes.
+ *
+ * The function is guaranteed to return a value >= 0 if 'bit' is 0 since if
+ * no zero bit is found, it returns count*8 assuming the string is zero
+ * padded on the right. However if 'bit' is 1 it is possible that there is
+ * not a single set bit in the bitmap. In this special case -1 is returned.
+ * */
+inline int64_t RawBitpos(const uint8_t *c, int64_t count, bool bit) {
+  int64_t res = 0;
+
+  if (bit) {
+    int64_t ct = count;
+
+    for (; count >= 8; c += 8, count -= 8) {
+      uint64_t x = *reinterpret_cast<const uint64_t *>(c);
+      if (x != 0) {
+        return res + ClzllWithEndian(x);
+      }
+      res += 64;
+    }
+
+    if (count > 0) {
+      uint64_t v = 0;
+      __builtin_memcpy(&v, c, count);
+      res += v == 0 ? count * 8 : ClzllWithEndian(v);
+    }
+
+    if (res == ct * 8) {
+      return -1;
+    }
+  } else {
+    for (; count >= 8; c += 8, count -= 8) {
+      uint64_t x = *reinterpret_cast<const uint64_t *>(c);
+      if (x != (uint64_t)-1) {
+        return res + ClzllWithEndian(~x);
+      }
+      res += 64;
+    }
+
+    if (count > 0) {
+      uint64_t v = -1;
+      __builtin_memcpy(&v, c, count);
+      res += v == (uint64_t)-1 ? count * 8 : ClzllWithEndian(~v);
+    }
+  }
+
+  return res;
+}
+
+}  // namespace msb
+
+}  // namespace util
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index 56f58874..f60669eb 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -214,8 +214,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
               return rocksdb::Status::InvalidArgument(
                   fmt::format("failed to parse an offset of SETBIT: {}", 
parsed_offset.Msg()));
             }
-
-            bool bit_value = 
redis::Bitmap::GetBitFromValueAndOffset(value.ToString(), *parsed_offset);
+            bool bit_value = 
redis::Bitmap::GetBitFromValueAndOffset(value.ToStringView(), *parsed_offset);
             command_args = {"SETBIT", user_key, (*args)[1], bit_value ? "1" : 
"0"};
             break;
           }
diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc
index 48b1125b..d2d7ff3a 100644
--- a/src/types/redis_bitmap.cc
+++ b/src/types/redis_bitmap.cc
@@ -24,16 +24,17 @@
 #include <memory>
 #include <vector>
 
+#include "common/bit_util.h"
 #include "db_util.h"
 #include "parse_util.h"
 #include "redis_bitmap_string.h"
 
 namespace redis {
 
-const uint32_t kBitmapSegmentBits = 1024 * 8;
-const uint32_t kBitmapSegmentBytes = 1024;
+constexpr uint32_t kBitmapSegmentBits = 1024 * 8;
+constexpr uint32_t kBitmapSegmentBytes = 1024;
 
-const char kErrBitmapStringOutOfRange[] =
+constexpr char kErrBitmapStringOutOfRange[] =
     "The size of the bitmap string exceeds the "
     "configuration item max-bitmap-to-string-mb";
 
@@ -81,6 +82,13 @@ void ExpandBitmapSegment(std::string *segment, size_t 
min_bytes) {
   }
 }
 
+// Constructing sub-key index, see:
+// 
https://kvrocks.apache.org/community/data-structure-on-rocksdb#bitmap-sub-keys-values
+// The value is also equal to the offset of the bytes in the bitmap.
+uint32_t SegmentSubKeyIndexForBit(uint32_t bit_offset) {
+  return (bit_offset / kBitmapSegmentBits) * kBitmapSegmentBytes;
+}
+
 rocksdb::Status Bitmap::GetMetadata(const Slice &ns_key, BitmapMetadata 
*metadata, std::string *raw_value) {
   auto s = GetRawMetadata(ns_key, raw_value);
   if (!s.ok()) return s;
@@ -89,7 +97,7 @@ rocksdb::Status Bitmap::GetMetadata(const Slice &ns_key, 
BitmapMetadata *metadat
   return ParseMetadata({kRedisBitmap, kRedisString}, &slice, metadata);
 }
 
-rocksdb::Status Bitmap::GetBit(const Slice &user_key, uint32_t offset, bool 
*bit) {
+rocksdb::Status Bitmap::GetBit(const Slice &user_key, uint32_t bit_offset, 
bool *bit) {
   *bit = false;
   std::string raw_value;
   std::string ns_key = AppendNamespacePrefix(user_key);
@@ -100,20 +108,23 @@ rocksdb::Status Bitmap::GetBit(const Slice &user_key, 
uint32_t offset, bool *bit
 
   if (metadata.Type() == kRedisString) {
     redis::BitmapString bitmap_string_db(storage_, namespace_);
-    return bitmap_string_db.GetBit(raw_value, offset, bit);
+    return bitmap_string_db.GetBit(raw_value, bit_offset, bit);
   }
 
   LatestSnapShot ss(storage_);
   rocksdb::ReadOptions read_options;
   read_options.snapshot = ss.GetSnapShot();
-  uint32_t index = (offset / kBitmapSegmentBits) * kBitmapSegmentBytes;
   rocksdb::PinnableSlice value;
-  std::string sub_key =
-      InternalKey(ns_key, std::to_string(index), metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+  std::string sub_key = InternalKey(ns_key, 
std::to_string(SegmentSubKeyIndexForBit(bit_offset)), metadata.version,
+                                    storage_->IsSlotIdEncoded())
+                            .Encode();
   s = storage_->Get(read_options, sub_key, &value);
+  // If s.IsNotFound(), it means all bits in this segment are 0,
+  // so we can return with *bit == false directly.
   if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
-  uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes;
-  if ((byte_index < value.size() && (value[byte_index] & (1 << (offset % 
8))))) {
+  uint32_t bit_offset_in_segment = bit_offset % kBitmapSegmentBits;
+  if (bit_offset_in_segment / 8 < value.size() &&
+      util::lsb::GetBit(reinterpret_cast<const uint8_t *>(value.data()), 
bit_offset_in_segment)) {
     *bit = true;
   }
   return rocksdb::Status::OK();
@@ -163,7 +174,7 @@ rocksdb::Status Bitmap::GetString(const Slice &user_key, 
const uint32_t max_btos
   return rocksdb::Status::OK();
 }
 
-rocksdb::Status Bitmap::SetBit(const Slice &user_key, uint32_t offset, bool 
new_bit, bool *old_bit) {
+rocksdb::Status Bitmap::SetBit(const Slice &user_key, uint32_t bit_offset, 
bool new_bit, bool *old_bit) {
   std::string raw_value;
   std::string ns_key = AppendNamespacePrefix(user_key);
 
@@ -174,31 +185,28 @@ rocksdb::Status Bitmap::SetBit(const Slice &user_key, 
uint32_t offset, bool new_
 
   if (metadata.Type() == kRedisString) {
     redis::BitmapString bitmap_string_db(storage_, namespace_);
-    return bitmap_string_db.SetBit(ns_key, &raw_value, offset, new_bit, 
old_bit);
+    return bitmap_string_db.SetBit(ns_key, &raw_value, bit_offset, new_bit, 
old_bit);
   }
 
   std::string value;
-  uint32_t index = (offset / kBitmapSegmentBits) * kBitmapSegmentBytes;
+  uint32_t segment_index = SegmentSubKeyIndexForBit(bit_offset);
   std::string sub_key =
-      InternalKey(ns_key, std::to_string(index), metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+      InternalKey(ns_key, std::to_string(segment_index), metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
   if (s.ok()) {
     s = storage_->Get(rocksdb::ReadOptions(), sub_key, &value);
     if (!s.ok() && !s.IsNotFound()) return s;
   }
-  uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes;
-  uint64_t used_size = index + byte_index + 1;
+  uint32_t bit_offset_in_segment = bit_offset % kBitmapSegmentBits;
+  uint32_t byte_index = (bit_offset / 8) % kBitmapSegmentBytes;
+  uint64_t used_size = segment_index + byte_index + 1;
   uint64_t bitmap_size = std::max(used_size, metadata.size);
   // NOTE: value.size() might be greater than metadata.size.
   ExpandBitmapSegment(&value, byte_index + 1);
-  uint32_t bit_offset = offset % 8;
-  *old_bit = (value[byte_index] & (1 << bit_offset)) != 0;
-  if (new_bit) {
-    value[byte_index] = static_cast<char>(value[byte_index] | (1 << 
bit_offset));
-  } else {
-    value[byte_index] = static_cast<char>(value[byte_index] & (~(1 << 
bit_offset)));
-  }
+  auto *data_ptr = reinterpret_cast<uint8_t *>(value.data());
+  *old_bit = util::lsb::GetBit(data_ptr, bit_offset_in_segment);
+  util::lsb::SetBitTo(data_ptr, bit_offset_in_segment, new_bit);
   auto batch = storage_->GetWriteBatchBase();
-  WriteBatchLogData log_data(kRedisBitmap, {std::to_string(kRedisCmdSetBit), 
std::to_string(offset)});
+  WriteBatchLogData log_data(kRedisBitmap, {std::to_string(kRedisCmdSetBit), 
std::to_string(bit_offset)});
   batch->PutLogData(log_data.Encode());
   batch->Put(sub_key, value);
   if (metadata.size != bitmap_size) {
@@ -270,20 +278,20 @@ rocksdb::Status Bitmap::BitCount(const Slice &user_key, 
int64_t start, int64_t s
       if (is_bit_index && start_in_segment <= readable_stop_in_segment && 
first_byte_neg_mask != 0) {
         uint8_t first_mask_byte =
             kBitSwapTable[static_cast<uint8_t>(pin_value[start_in_segment])] & 
first_byte_neg_mask;
-        mask_cnt += BitmapString::RawPopcount(&first_mask_byte, 1);
+        mask_cnt += util::RawPopcount(&first_mask_byte, 1);
       }
     }
     if (i == stop_index) {
       stop_in_segment = u_stop % kBitmapSegmentBytes;
       if (is_bit_index && stop_in_segment <= readable_stop_in_segment && 
last_byte_neg_mask != 0) {
         uint8_t last_mask_byte = 
kBitSwapTable[static_cast<uint8_t>(pin_value[stop_in_segment])] & 
last_byte_neg_mask;
-        mask_cnt += BitmapString::RawPopcount(&last_mask_byte, 1);
+        mask_cnt += util::RawPopcount(&last_mask_byte, 1);
       }
     }
     if (stop_in_segment >= start_in_segment && readable_stop_in_segment >= 
start_in_segment) {
       int64_t bytes = 0;
       bytes = std::min(stop_in_segment, readable_stop_in_segment) - 
start_in_segment + 1;
-      *cnt += BitmapString::RawPopcount(reinterpret_cast<const uint8_t 
*>(pin_value.data()) + start_in_segment, bytes);
+      *cnt += util::RawPopcount(reinterpret_cast<const uint8_t 
*>(pin_value.data()) + start_in_segment, bytes);
     }
   }
   *cnt -= mask_cnt;
@@ -909,10 +917,11 @@ bool Bitmap::bitfieldWriteAheadLog(const 
ObserverOrUniquePtr<rocksdb::WriteBatch
   return false;
 }
 
-bool Bitmap::GetBitFromValueAndOffset(const std::string &value, uint32_t 
offset) {
+bool Bitmap::GetBitFromValueAndOffset(std::string_view value, uint32_t 
bit_offset) {
   bool bit = false;
-  uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes;
-  if ((byte_index < value.size() && (value[byte_index] & (1 << (offset % 
8))))) {
+  uint32_t byte_index = (bit_offset / 8) % kBitmapSegmentBytes;
+  if (byte_index < value.size() &&
+      util::lsb::GetBit(reinterpret_cast<const uint8_t *>(value.data()), 
bit_offset % kBitmapSegmentBits)) {
     bit = true;
   }
   return bit;
diff --git a/src/types/redis_bitmap.h b/src/types/redis_bitmap.h
index b6b0cc69..9466593d 100644
--- a/src/types/redis_bitmap.h
+++ b/src/types/redis_bitmap.h
@@ -41,14 +41,17 @@ enum BitOpFlags {
 
 namespace redis {
 
+// We use least-significant bit (LSB) numbering (also known as bit-endianness).
+// This means that within a group of 8 bits, we read right-to-left.
+// This is different from applying "bit" commands to string, which uses MSB.
 class Bitmap : public Database {
  public:
   class SegmentCacheStore;
 
   Bitmap(engine::Storage *storage, const std::string &ns) : Database(storage, 
ns) {}
-  rocksdb::Status GetBit(const Slice &user_key, uint32_t offset, bool *bit);
+  rocksdb::Status GetBit(const Slice &user_key, uint32_t bit_offset, bool 
*bit);
   rocksdb::Status GetString(const Slice &user_key, uint32_t max_btos_size, 
std::string *value);
-  rocksdb::Status SetBit(const Slice &user_key, uint32_t offset, bool new_bit, 
bool *old_bit);
+  rocksdb::Status SetBit(const Slice &user_key, uint32_t bit_offset, bool 
new_bit, bool *old_bit);
   rocksdb::Status BitCount(const Slice &user_key, int64_t start, int64_t stop, 
bool is_bit_index, uint32_t *cnt);
   rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start, 
int64_t stop, bool stop_given, int64_t *pos);
   rocksdb::Status BitOp(BitOpFlags op_flag, const std::string &op_name, const 
Slice &user_key,
@@ -63,7 +66,7 @@ class Bitmap : public Database {
                                    std::vector<std::optional<BitfieldValue>> 
*rets) {
     return bitfield<true>(user_key, ops, rets);
   }
-  static bool GetBitFromValueAndOffset(const std::string &value, uint32_t 
offset);
+  static bool GetBitFromValueAndOffset(std::string_view value, uint32_t 
bit_offset);
   static bool IsEmptySegment(const Slice &segment);
 
  private:
diff --git a/src/types/redis_bitmap_string.cc b/src/types/redis_bitmap_string.cc
index 3583ed42..b226d9c2 100644
--- a/src/types/redis_bitmap_string.cc
+++ b/src/types/redis_bitmap_string.cc
@@ -24,6 +24,7 @@
 
 #include <cstdint>
 
+#include "common/bit_util.h"
 #include "redis_string.h"
 #include "server/redis_reply.h"
 #include "storage/redis_metadata.h"
@@ -31,33 +32,28 @@
 
 namespace redis {
 
-rocksdb::Status BitmapString::GetBit(const std::string &raw_value, uint32_t 
offset, bool *bit) {
+rocksdb::Status BitmapString::GetBit(const std::string &raw_value, uint32_t 
bit_offset, bool *bit) {
   std::string_view string_value = 
std::string_view{raw_value}.substr(Metadata::GetOffsetAfterExpire(raw_value[0]));
-  uint32_t byte_index = offset >> 3;
-  uint32_t bit_val = 0;
-  uint32_t bit_offset = 7 - (offset & 0x7);
+  uint32_t byte_index = bit_offset >> 3;
   if (byte_index < string_value.size()) {
-    bit_val = string_value[byte_index] & (1 << bit_offset);
+    *bit = util::msb::GetBit(reinterpret_cast<const uint8_t 
*>(string_value.data()), bit_offset);
+  } else {
+    *bit = false;
   }
-  *bit = bit_val != 0;
   return rocksdb::Status::OK();
 }
 
-rocksdb::Status BitmapString::SetBit(const Slice &ns_key, std::string 
*raw_value, uint32_t offset, bool new_bit,
+rocksdb::Status BitmapString::SetBit(const Slice &ns_key, std::string 
*raw_value, uint32_t bit_offset, bool new_bit,
                                      bool *old_bit) {
   size_t header_offset = Metadata::GetOffsetAfterExpire((*raw_value)[0]);
   auto string_value = raw_value->substr(header_offset);
-  uint32_t byte_index = offset >> 3;
+  uint32_t byte_index = bit_offset >> 3;
   if (byte_index >= string_value.size()) {  // expand the bitmap
     string_value.append(byte_index - string_value.size() + 1, 0);
   }
-  uint32_t bit_offset = 7 - (offset & 0x7);
-  auto byteval = string_value[byte_index];
-  *old_bit = (byteval & (1 << bit_offset)) != 0;
-
-  byteval = static_cast<char>(byteval & (~(1 << bit_offset)));
-  byteval = static_cast<char>(byteval | ((new_bit & 0x1) << bit_offset));
-  string_value[byte_index] = byteval;
+  auto *data_ptr = reinterpret_cast<uint8_t *>(string_value.data());
+  *old_bit = util::msb::GetBit(data_ptr, bit_offset);
+  util::msb::SetBitTo(data_ptr, bit_offset, new_bit);
 
   *raw_value = raw_value->substr(0, header_offset);
   raw_value->append(string_value);
@@ -92,12 +88,12 @@ rocksdb::Status BitmapString::BitCount(const std::string 
&raw_value, int64_t sta
   /* Precondition: end >= 0 && end < strlen, so the only condition where
    * zero can be returned is: start > stop. */
   int64_t bytes = stop_byte - start_byte + 1;
-  *cnt = RawPopcount(reinterpret_cast<const uint8_t *>(string_value.data()) + 
start_byte, bytes);
+  *cnt = util::RawPopcount(reinterpret_cast<const uint8_t 
*>(string_value.data()) + start_byte, bytes);
   if (first_byte_neg_mask != 0 || last_byte_neg_mask != 0) {
     uint8_t firstlast[2] = {0, 0};
     if (first_byte_neg_mask != 0) firstlast[0] = string_value[start_byte] & 
first_byte_neg_mask;
     if (last_byte_neg_mask != 0) firstlast[1] = string_value[stop_byte] & 
last_byte_neg_mask;
-    *cnt -= RawPopcount(firstlast, 2);
+    *cnt -= util::RawPopcount(firstlast, 2);
   }
 
   return rocksdb::Status::OK();
@@ -114,7 +110,7 @@ rocksdb::Status BitmapString::BitPos(const std::string 
&raw_value, bool bit, int
     *pos = -1;
   } else {
     int64_t bytes = stop - start + 1;
-    *pos = RawBitpos(reinterpret_cast<const uint8_t *>(string_value.data()) + 
start, bytes, bit);
+    *pos = util::msb::RawBitpos(reinterpret_cast<const uint8_t 
*>(string_value.data()) + start, bytes, bit);
 
     /* If we are looking for clear bits, and the user specified an exact
      * range with start-end, we can't consider the right of the range as
@@ -132,87 +128,6 @@ rocksdb::Status BitmapString::BitPos(const std::string 
&raw_value, bool bit, int
   return rocksdb::Status::OK();
 }
 
-/* Count number of bits set in the binary array pointed by 's' and long
- * 'count' bytes. The implementation of this function is required to
- * work with a input string length up to 512 MB.
- * */
-size_t BitmapString::RawPopcount(const uint8_t *p, int64_t count) {
-  size_t bits = 0;
-
-  for (; count >= 8; p += 8, count -= 8) {
-    bits += __builtin_popcountll(*reinterpret_cast<const uint64_t *>(p));
-  }
-
-  if (count > 0) {
-    uint64_t v = 0;
-    __builtin_memcpy(&v, p, count);
-    bits += __builtin_popcountll(v);
-  }
-
-  return bits;
-}
-
-template <typename T = void>
-inline int ClzllWithEndian(uint64_t x) {
-  if constexpr (IsLittleEndian()) {
-    return __builtin_clzll(__builtin_bswap64(x));
-  } else if constexpr (IsBigEndian()) {
-    return __builtin_clzll(x);
-  } else {
-    static_assert(AlwaysFalse<T>);
-  }
-}
-
-/* Return the position of the first bit set to one (if 'bit' is 1) or
- * zero (if 'bit' is 0) in the bitmap starting at 's' and long 'count' bytes.
- *
- * The function is guaranteed to return a value >= 0 if 'bit' is 0 since if
- * no zero bit is found, it returns count*8 assuming the string is zero
- * padded on the right. However if 'bit' is 1 it is possible that there is
- * not a single set bit in the bitmap. In this special case -1 is returned.
- * */
-int64_t BitmapString::RawBitpos(const uint8_t *c, int64_t count, bool bit) {
-  int64_t res = 0;
-
-  if (bit) {
-    int64_t ct = count;
-
-    for (; count >= 8; c += 8, count -= 8) {
-      uint64_t x = *reinterpret_cast<const uint64_t *>(c);
-      if (x != 0) {
-        return res + ClzllWithEndian(x);
-      }
-      res += 64;
-    }
-
-    if (count > 0) {
-      uint64_t v = 0;
-      __builtin_memcpy(&v, c, count);
-      res += v == 0 ? count * 8 : ClzllWithEndian(v);
-    }
-
-    if (res == ct * 8) {
-      return -1;
-    }
-  } else {
-    for (; count >= 8; c += 8, count -= 8) {
-      uint64_t x = *reinterpret_cast<const uint64_t *>(c);
-      if (x != (uint64_t)-1) {
-        return res + ClzllWithEndian(~x);
-      }
-      res += 64;
-    }
-
-    if (count > 0) {
-      uint64_t v = -1;
-      __builtin_memcpy(&v, c, count);
-      res += v == (uint64_t)-1 ? count * 8 : ClzllWithEndian(~v);
-    }
-  }
-
-  return res;
-}
-
 std::pair<int64_t, int64_t> BitmapString::NormalizeRange(int64_t origin_start, 
int64_t origin_end, int64_t length) {
   if (origin_start < 0) origin_start = length + origin_start;
   if (origin_end < 0) origin_end = length + origin_end;
diff --git a/src/types/redis_bitmap_string.h b/src/types/redis_bitmap_string.h
index ed4559c5..7997165a 100644
--- a/src/types/redis_bitmap_string.h
+++ b/src/types/redis_bitmap_string.h
@@ -30,11 +30,12 @@
 
 namespace redis {
 
+// BitmapString handling bits using MSB numbering (also known as 
bit-endianness).
 class BitmapString : public Database {
  public:
   BitmapString(engine::Storage *storage, const std::string &ns) : 
Database(storage, ns) {}
-  static rocksdb::Status GetBit(const std::string &raw_value, uint32_t offset, 
bool *bit);
-  rocksdb::Status SetBit(const Slice &ns_key, std::string *raw_value, uint32_t 
offset, bool new_bit, bool *old_bit);
+  static rocksdb::Status GetBit(const std::string &raw_value, uint32_t 
bit_offset, bool *bit);
+  rocksdb::Status SetBit(const Slice &ns_key, std::string *raw_value, uint32_t 
bit_offset, bool new_bit, bool *old_bit);
   static rocksdb::Status BitCount(const std::string &raw_value, int64_t start, 
int64_t stop, bool is_bit_index,
                                   uint32_t *cnt);
   static rocksdb::Status BitPos(const std::string &raw_value, bool bit, 
int64_t start, int64_t stop, bool stop_given,
@@ -45,9 +46,6 @@ class BitmapString : public Database {
                                           const std::vector<BitfieldOperation> 
&ops,
                                           
std::vector<std::optional<BitfieldValue>> *rets);
 
-  static size_t RawPopcount(const uint8_t *p, int64_t count);
-  static int64_t RawBitpos(const uint8_t *c, int64_t count, bool bit);
-
   // NormalizeRange converts a range to a normalized range, which is a range 
with start and stop in [0, length).
   //
   // If start/end is negative, it will be converted to positive by adding 
length to it, and if the result is still

Reply via email to