This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new fb7acf6a Bitmap: Extract common bit operations like GetBit and
SetBitTo (#2105)
fb7acf6a is described below
commit fb7acf6a5f3c010fa61f30f1f8fbef63efd0b31d
Author: mwish <[email protected]>
AuthorDate: Mon Feb 19 17:20:30 2024 +0800
Bitmap: Extract common bit operations like GetBit and SetBitTo (#2105)
This is a refactor, see: https://github.com/apache/kvrocks/issues/2076
Previously, adhoc bit operations are used. This patch extract `GetBit` and
`SetBitTo` operation to simplify the impl.
Also, because the LSB/MSB format we're using it different in BitmapString
and Bitmap( see https://github.com/apache/kvrocks-website/pull/198 ), `lsb` and
`msb` is extracted to prevent from mistakes.
---
src/common/bit_util.h | 149 +++++++++++++++++++++++++++++++++++++++
src/storage/batch_extractor.cc | 3 +-
src/types/redis_bitmap.cc | 69 ++++++++++--------
src/types/redis_bitmap.h | 9 ++-
src/types/redis_bitmap_string.cc | 113 ++++-------------------------
src/types/redis_bitmap_string.h | 8 +--
6 files changed, 212 insertions(+), 139 deletions(-)
diff --git a/src/common/bit_util.h b/src/common/bit_util.h
new file mode 100644
index 00000000..9c23d78b
--- /dev/null
+++ b/src/common/bit_util.h
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+namespace util {
+
+/* Count number of bits set in the binary array pointed by 's' and long
+ * 'count' bytes. The implementation of this function is required to
+ * work with a input string length up to 512 MB.
+ * */
+inline size_t RawPopcount(const uint8_t *p, int64_t count) {
+ size_t bits = 0;
+
+ for (; count >= 8; p += 8, count -= 8) {
+ bits += __builtin_popcountll(*reinterpret_cast<const uint64_t *>(p));
+ }
+
+ if (count > 0) {
+ uint64_t v = 0;
+ __builtin_memcpy(&v, p, count);
+ bits += __builtin_popcountll(v);
+ }
+
+ return bits;
+}
+
+template <typename T = void>
+inline int ClzllWithEndian(uint64_t x) {
+ if constexpr (IsLittleEndian()) {
+ return __builtin_clzll(__builtin_bswap64(x));
+ } else if constexpr (IsBigEndian()) {
+ return __builtin_clzll(x);
+ } else {
+ static_assert(AlwaysFalse<T>);
+ }
+}
+
+// Return the number of bytes needed to fit the given number of bits
+constexpr int64_t BytesForBits(int64_t bits) {
+ // This formula avoids integer overflow on very large `bits`
+ return (bits >> 3) + ((bits & 7) != 0);
+}
+
+namespace lsb {
+static constexpr bool GetBit(const uint8_t *bits, uint64_t i) { return (bits[i
>> 3] >> (i & 0x07)) & 1; }
+
+// Bitmask selecting the k-th bit in a byte
+static constexpr uint8_t kBitmask[] = {1, 2, 4, 8, 16, 32, 64, 128};
+
+// Gets the i-th bit from a byte. Should only be used with i <= 7.
+static constexpr bool GetBitFromByte(uint8_t byte, uint8_t i) { return byte &
kBitmask[i]; }
+
+static inline void SetBitTo(uint8_t *bits, int64_t i, bool bit_is_set) {
+ // https://graphics.stanford.edu/~seander/bithacks.html
+ // "Conditionally set or clear bits without branching"
+ // NOTE: this seems to confuse Valgrind as it reads from potentially
+ // uninitialized memory
+ bits[i / 8] ^= static_cast<uint8_t>(-static_cast<uint8_t>(bit_is_set) ^
bits[i / 8]) & kBitmask[i % 8];
+}
+} // namespace lsb
+
+namespace msb {
+static constexpr bool GetBit(const uint8_t *bits, uint64_t i) { return (bits[i
>> 3] >> (7 - (i & 0x07))) & 1; }
+
+// Bitmask selecting the k-th bit in a byte
+static constexpr uint8_t kBitmask[] = {128, 64, 32, 16, 8, 4, 2, 1};
+
+// Gets the i-th bit from a byte. Should only be used with i <= 7.
+static constexpr bool GetBitFromByte(uint8_t byte, uint8_t i) { return byte &
kBitmask[i]; }
+
+static inline void SetBitTo(uint8_t *bits, int64_t i, bool bit_is_set) {
+ // https://graphics.stanford.edu/~seander/bithacks.html
+ // "Conditionally set or clear bits without branching"
+ // NOTE: this seems to confuse Valgrind as it reads from potentially
+ // uninitialized memory
+ bits[i / 8] ^= static_cast<uint8_t>(-static_cast<uint8_t>(bit_is_set) ^
bits[i / 8]) & kBitmask[i % 8];
+}
+
+/* Return the position of the first bit set to one (if 'bit' is 1) or
+ * zero (if 'bit' is 0) in the bitmap starting at 's' and long 'count' bytes.
+ *
+ * The function is guaranteed to return a value >= 0 if 'bit' is 0 since if
+ * no zero bit is found, it returns count*8 assuming the string is zero
+ * padded on the right. However if 'bit' is 1 it is possible that there is
+ * not a single set bit in the bitmap. In this special case -1 is returned.
+ * */
+inline int64_t RawBitpos(const uint8_t *c, int64_t count, bool bit) {
+ int64_t res = 0;
+
+ if (bit) {
+ int64_t ct = count;
+
+ for (; count >= 8; c += 8, count -= 8) {
+ uint64_t x = *reinterpret_cast<const uint64_t *>(c);
+ if (x != 0) {
+ return res + ClzllWithEndian(x);
+ }
+ res += 64;
+ }
+
+ if (count > 0) {
+ uint64_t v = 0;
+ __builtin_memcpy(&v, c, count);
+ res += v == 0 ? count * 8 : ClzllWithEndian(v);
+ }
+
+ if (res == ct * 8) {
+ return -1;
+ }
+ } else {
+ for (; count >= 8; c += 8, count -= 8) {
+ uint64_t x = *reinterpret_cast<const uint64_t *>(c);
+ if (x != (uint64_t)-1) {
+ return res + ClzllWithEndian(~x);
+ }
+ res += 64;
+ }
+
+ if (count > 0) {
+ uint64_t v = -1;
+ __builtin_memcpy(&v, c, count);
+ res += v == (uint64_t)-1 ? count * 8 : ClzllWithEndian(~v);
+ }
+ }
+
+ return res;
+}
+
+} // namespace msb
+
+} // namespace util
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index 56f58874..f60669eb 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -214,8 +214,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
return rocksdb::Status::InvalidArgument(
fmt::format("failed to parse an offset of SETBIT: {}",
parsed_offset.Msg()));
}
-
- bool bit_value =
redis::Bitmap::GetBitFromValueAndOffset(value.ToString(), *parsed_offset);
+ bool bit_value =
redis::Bitmap::GetBitFromValueAndOffset(value.ToStringView(), *parsed_offset);
command_args = {"SETBIT", user_key, (*args)[1], bit_value ? "1" :
"0"};
break;
}
diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc
index 48b1125b..d2d7ff3a 100644
--- a/src/types/redis_bitmap.cc
+++ b/src/types/redis_bitmap.cc
@@ -24,16 +24,17 @@
#include <memory>
#include <vector>
+#include "common/bit_util.h"
#include "db_util.h"
#include "parse_util.h"
#include "redis_bitmap_string.h"
namespace redis {
-const uint32_t kBitmapSegmentBits = 1024 * 8;
-const uint32_t kBitmapSegmentBytes = 1024;
+constexpr uint32_t kBitmapSegmentBits = 1024 * 8;
+constexpr uint32_t kBitmapSegmentBytes = 1024;
-const char kErrBitmapStringOutOfRange[] =
+constexpr char kErrBitmapStringOutOfRange[] =
"The size of the bitmap string exceeds the "
"configuration item max-bitmap-to-string-mb";
@@ -81,6 +82,13 @@ void ExpandBitmapSegment(std::string *segment, size_t
min_bytes) {
}
}
+// Constructing sub-key index, see:
+//
https://kvrocks.apache.org/community/data-structure-on-rocksdb#bitmap-sub-keys-values
+// The value is also equal to the offset of the bytes in the bitmap.
+uint32_t SegmentSubKeyIndexForBit(uint32_t bit_offset) {
+ return (bit_offset / kBitmapSegmentBits) * kBitmapSegmentBytes;
+}
+
rocksdb::Status Bitmap::GetMetadata(const Slice &ns_key, BitmapMetadata
*metadata, std::string *raw_value) {
auto s = GetRawMetadata(ns_key, raw_value);
if (!s.ok()) return s;
@@ -89,7 +97,7 @@ rocksdb::Status Bitmap::GetMetadata(const Slice &ns_key,
BitmapMetadata *metadat
return ParseMetadata({kRedisBitmap, kRedisString}, &slice, metadata);
}
-rocksdb::Status Bitmap::GetBit(const Slice &user_key, uint32_t offset, bool
*bit) {
+rocksdb::Status Bitmap::GetBit(const Slice &user_key, uint32_t bit_offset,
bool *bit) {
*bit = false;
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);
@@ -100,20 +108,23 @@ rocksdb::Status Bitmap::GetBit(const Slice &user_key,
uint32_t offset, bool *bit
if (metadata.Type() == kRedisString) {
redis::BitmapString bitmap_string_db(storage_, namespace_);
- return bitmap_string_db.GetBit(raw_value, offset, bit);
+ return bitmap_string_db.GetBit(raw_value, bit_offset, bit);
}
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
- uint32_t index = (offset / kBitmapSegmentBits) * kBitmapSegmentBytes;
rocksdb::PinnableSlice value;
- std::string sub_key =
- InternalKey(ns_key, std::to_string(index), metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ std::string sub_key = InternalKey(ns_key,
std::to_string(SegmentSubKeyIndexForBit(bit_offset)), metadata.version,
+ storage_->IsSlotIdEncoded())
+ .Encode();
s = storage_->Get(read_options, sub_key, &value);
+ // If s.IsNotFound(), it means all bits in this segment are 0,
+ // so we can return with *bit == false directly.
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
- uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes;
- if ((byte_index < value.size() && (value[byte_index] & (1 << (offset %
8))))) {
+ uint32_t bit_offset_in_segment = bit_offset % kBitmapSegmentBits;
+ if (bit_offset_in_segment / 8 < value.size() &&
+ util::lsb::GetBit(reinterpret_cast<const uint8_t *>(value.data()),
bit_offset_in_segment)) {
*bit = true;
}
return rocksdb::Status::OK();
@@ -163,7 +174,7 @@ rocksdb::Status Bitmap::GetString(const Slice &user_key,
const uint32_t max_btos
return rocksdb::Status::OK();
}
-rocksdb::Status Bitmap::SetBit(const Slice &user_key, uint32_t offset, bool
new_bit, bool *old_bit) {
+rocksdb::Status Bitmap::SetBit(const Slice &user_key, uint32_t bit_offset,
bool new_bit, bool *old_bit) {
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);
@@ -174,31 +185,28 @@ rocksdb::Status Bitmap::SetBit(const Slice &user_key,
uint32_t offset, bool new_
if (metadata.Type() == kRedisString) {
redis::BitmapString bitmap_string_db(storage_, namespace_);
- return bitmap_string_db.SetBit(ns_key, &raw_value, offset, new_bit,
old_bit);
+ return bitmap_string_db.SetBit(ns_key, &raw_value, bit_offset, new_bit,
old_bit);
}
std::string value;
- uint32_t index = (offset / kBitmapSegmentBits) * kBitmapSegmentBytes;
+ uint32_t segment_index = SegmentSubKeyIndexForBit(bit_offset);
std::string sub_key =
- InternalKey(ns_key, std::to_string(index), metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ InternalKey(ns_key, std::to_string(segment_index), metadata.version,
storage_->IsSlotIdEncoded()).Encode();
if (s.ok()) {
s = storage_->Get(rocksdb::ReadOptions(), sub_key, &value);
if (!s.ok() && !s.IsNotFound()) return s;
}
- uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes;
- uint64_t used_size = index + byte_index + 1;
+ uint32_t bit_offset_in_segment = bit_offset % kBitmapSegmentBits;
+ uint32_t byte_index = (bit_offset / 8) % kBitmapSegmentBytes;
+ uint64_t used_size = segment_index + byte_index + 1;
uint64_t bitmap_size = std::max(used_size, metadata.size);
// NOTE: value.size() might be greater than metadata.size.
ExpandBitmapSegment(&value, byte_index + 1);
- uint32_t bit_offset = offset % 8;
- *old_bit = (value[byte_index] & (1 << bit_offset)) != 0;
- if (new_bit) {
- value[byte_index] = static_cast<char>(value[byte_index] | (1 <<
bit_offset));
- } else {
- value[byte_index] = static_cast<char>(value[byte_index] & (~(1 <<
bit_offset)));
- }
+ auto *data_ptr = reinterpret_cast<uint8_t *>(value.data());
+ *old_bit = util::lsb::GetBit(data_ptr, bit_offset_in_segment);
+ util::lsb::SetBitTo(data_ptr, bit_offset_in_segment, new_bit);
auto batch = storage_->GetWriteBatchBase();
- WriteBatchLogData log_data(kRedisBitmap, {std::to_string(kRedisCmdSetBit),
std::to_string(offset)});
+ WriteBatchLogData log_data(kRedisBitmap, {std::to_string(kRedisCmdSetBit),
std::to_string(bit_offset)});
batch->PutLogData(log_data.Encode());
batch->Put(sub_key, value);
if (metadata.size != bitmap_size) {
@@ -270,20 +278,20 @@ rocksdb::Status Bitmap::BitCount(const Slice &user_key,
int64_t start, int64_t s
if (is_bit_index && start_in_segment <= readable_stop_in_segment &&
first_byte_neg_mask != 0) {
uint8_t first_mask_byte =
kBitSwapTable[static_cast<uint8_t>(pin_value[start_in_segment])] &
first_byte_neg_mask;
- mask_cnt += BitmapString::RawPopcount(&first_mask_byte, 1);
+ mask_cnt += util::RawPopcount(&first_mask_byte, 1);
}
}
if (i == stop_index) {
stop_in_segment = u_stop % kBitmapSegmentBytes;
if (is_bit_index && stop_in_segment <= readable_stop_in_segment &&
last_byte_neg_mask != 0) {
uint8_t last_mask_byte =
kBitSwapTable[static_cast<uint8_t>(pin_value[stop_in_segment])] &
last_byte_neg_mask;
- mask_cnt += BitmapString::RawPopcount(&last_mask_byte, 1);
+ mask_cnt += util::RawPopcount(&last_mask_byte, 1);
}
}
if (stop_in_segment >= start_in_segment && readable_stop_in_segment >=
start_in_segment) {
int64_t bytes = 0;
bytes = std::min(stop_in_segment, readable_stop_in_segment) -
start_in_segment + 1;
- *cnt += BitmapString::RawPopcount(reinterpret_cast<const uint8_t
*>(pin_value.data()) + start_in_segment, bytes);
+ *cnt += util::RawPopcount(reinterpret_cast<const uint8_t
*>(pin_value.data()) + start_in_segment, bytes);
}
}
*cnt -= mask_cnt;
@@ -909,10 +917,11 @@ bool Bitmap::bitfieldWriteAheadLog(const
ObserverOrUniquePtr<rocksdb::WriteBatch
return false;
}
-bool Bitmap::GetBitFromValueAndOffset(const std::string &value, uint32_t
offset) {
+bool Bitmap::GetBitFromValueAndOffset(std::string_view value, uint32_t
bit_offset) {
bool bit = false;
- uint32_t byte_index = (offset / 8) % kBitmapSegmentBytes;
- if ((byte_index < value.size() && (value[byte_index] & (1 << (offset %
8))))) {
+ uint32_t byte_index = (bit_offset / 8) % kBitmapSegmentBytes;
+ if (byte_index < value.size() &&
+ util::lsb::GetBit(reinterpret_cast<const uint8_t *>(value.data()),
bit_offset % kBitmapSegmentBits)) {
bit = true;
}
return bit;
diff --git a/src/types/redis_bitmap.h b/src/types/redis_bitmap.h
index b6b0cc69..9466593d 100644
--- a/src/types/redis_bitmap.h
+++ b/src/types/redis_bitmap.h
@@ -41,14 +41,17 @@ enum BitOpFlags {
namespace redis {
+// We use least-significant bit (LSB) numbering (also known as bit-endianness).
+// This means that within a group of 8 bits, we read right-to-left.
+// This is different from applying "bit" commands to string, which uses MSB.
class Bitmap : public Database {
public:
class SegmentCacheStore;
Bitmap(engine::Storage *storage, const std::string &ns) : Database(storage,
ns) {}
- rocksdb::Status GetBit(const Slice &user_key, uint32_t offset, bool *bit);
+ rocksdb::Status GetBit(const Slice &user_key, uint32_t bit_offset, bool
*bit);
rocksdb::Status GetString(const Slice &user_key, uint32_t max_btos_size,
std::string *value);
- rocksdb::Status SetBit(const Slice &user_key, uint32_t offset, bool new_bit,
bool *old_bit);
+ rocksdb::Status SetBit(const Slice &user_key, uint32_t bit_offset, bool
new_bit, bool *old_bit);
rocksdb::Status BitCount(const Slice &user_key, int64_t start, int64_t stop,
bool is_bit_index, uint32_t *cnt);
rocksdb::Status BitPos(const Slice &user_key, bool bit, int64_t start,
int64_t stop, bool stop_given, int64_t *pos);
rocksdb::Status BitOp(BitOpFlags op_flag, const std::string &op_name, const
Slice &user_key,
@@ -63,7 +66,7 @@ class Bitmap : public Database {
std::vector<std::optional<BitfieldValue>>
*rets) {
return bitfield<true>(user_key, ops, rets);
}
- static bool GetBitFromValueAndOffset(const std::string &value, uint32_t
offset);
+ static bool GetBitFromValueAndOffset(std::string_view value, uint32_t
bit_offset);
static bool IsEmptySegment(const Slice &segment);
private:
diff --git a/src/types/redis_bitmap_string.cc b/src/types/redis_bitmap_string.cc
index 3583ed42..b226d9c2 100644
--- a/src/types/redis_bitmap_string.cc
+++ b/src/types/redis_bitmap_string.cc
@@ -24,6 +24,7 @@
#include <cstdint>
+#include "common/bit_util.h"
#include "redis_string.h"
#include "server/redis_reply.h"
#include "storage/redis_metadata.h"
@@ -31,33 +32,28 @@
namespace redis {
-rocksdb::Status BitmapString::GetBit(const std::string &raw_value, uint32_t
offset, bool *bit) {
+rocksdb::Status BitmapString::GetBit(const std::string &raw_value, uint32_t
bit_offset, bool *bit) {
std::string_view string_value =
std::string_view{raw_value}.substr(Metadata::GetOffsetAfterExpire(raw_value[0]));
- uint32_t byte_index = offset >> 3;
- uint32_t bit_val = 0;
- uint32_t bit_offset = 7 - (offset & 0x7);
+ uint32_t byte_index = bit_offset >> 3;
if (byte_index < string_value.size()) {
- bit_val = string_value[byte_index] & (1 << bit_offset);
+ *bit = util::msb::GetBit(reinterpret_cast<const uint8_t
*>(string_value.data()), bit_offset);
+ } else {
+ *bit = false;
}
- *bit = bit_val != 0;
return rocksdb::Status::OK();
}
-rocksdb::Status BitmapString::SetBit(const Slice &ns_key, std::string
*raw_value, uint32_t offset, bool new_bit,
+rocksdb::Status BitmapString::SetBit(const Slice &ns_key, std::string
*raw_value, uint32_t bit_offset, bool new_bit,
bool *old_bit) {
size_t header_offset = Metadata::GetOffsetAfterExpire((*raw_value)[0]);
auto string_value = raw_value->substr(header_offset);
- uint32_t byte_index = offset >> 3;
+ uint32_t byte_index = bit_offset >> 3;
if (byte_index >= string_value.size()) { // expand the bitmap
string_value.append(byte_index - string_value.size() + 1, 0);
}
- uint32_t bit_offset = 7 - (offset & 0x7);
- auto byteval = string_value[byte_index];
- *old_bit = (byteval & (1 << bit_offset)) != 0;
-
- byteval = static_cast<char>(byteval & (~(1 << bit_offset)));
- byteval = static_cast<char>(byteval | ((new_bit & 0x1) << bit_offset));
- string_value[byte_index] = byteval;
+ auto *data_ptr = reinterpret_cast<uint8_t *>(string_value.data());
+ *old_bit = util::msb::GetBit(data_ptr, bit_offset);
+ util::msb::SetBitTo(data_ptr, bit_offset, new_bit);
*raw_value = raw_value->substr(0, header_offset);
raw_value->append(string_value);
@@ -92,12 +88,12 @@ rocksdb::Status BitmapString::BitCount(const std::string
&raw_value, int64_t sta
/* Precondition: end >= 0 && end < strlen, so the only condition where
* zero can be returned is: start > stop. */
int64_t bytes = stop_byte - start_byte + 1;
- *cnt = RawPopcount(reinterpret_cast<const uint8_t *>(string_value.data()) +
start_byte, bytes);
+ *cnt = util::RawPopcount(reinterpret_cast<const uint8_t
*>(string_value.data()) + start_byte, bytes);
if (first_byte_neg_mask != 0 || last_byte_neg_mask != 0) {
uint8_t firstlast[2] = {0, 0};
if (first_byte_neg_mask != 0) firstlast[0] = string_value[start_byte] &
first_byte_neg_mask;
if (last_byte_neg_mask != 0) firstlast[1] = string_value[stop_byte] &
last_byte_neg_mask;
- *cnt -= RawPopcount(firstlast, 2);
+ *cnt -= util::RawPopcount(firstlast, 2);
}
return rocksdb::Status::OK();
@@ -114,7 +110,7 @@ rocksdb::Status BitmapString::BitPos(const std::string
&raw_value, bool bit, int
*pos = -1;
} else {
int64_t bytes = stop - start + 1;
- *pos = RawBitpos(reinterpret_cast<const uint8_t *>(string_value.data()) +
start, bytes, bit);
+ *pos = util::msb::RawBitpos(reinterpret_cast<const uint8_t
*>(string_value.data()) + start, bytes, bit);
/* If we are looking for clear bits, and the user specified an exact
* range with start-end, we can't consider the right of the range as
@@ -132,87 +128,6 @@ rocksdb::Status BitmapString::BitPos(const std::string
&raw_value, bool bit, int
return rocksdb::Status::OK();
}
-/* Count number of bits set in the binary array pointed by 's' and long
- * 'count' bytes. The implementation of this function is required to
- * work with a input string length up to 512 MB.
- * */
-size_t BitmapString::RawPopcount(const uint8_t *p, int64_t count) {
- size_t bits = 0;
-
- for (; count >= 8; p += 8, count -= 8) {
- bits += __builtin_popcountll(*reinterpret_cast<const uint64_t *>(p));
- }
-
- if (count > 0) {
- uint64_t v = 0;
- __builtin_memcpy(&v, p, count);
- bits += __builtin_popcountll(v);
- }
-
- return bits;
-}
-
-template <typename T = void>
-inline int ClzllWithEndian(uint64_t x) {
- if constexpr (IsLittleEndian()) {
- return __builtin_clzll(__builtin_bswap64(x));
- } else if constexpr (IsBigEndian()) {
- return __builtin_clzll(x);
- } else {
- static_assert(AlwaysFalse<T>);
- }
-}
-
-/* Return the position of the first bit set to one (if 'bit' is 1) or
- * zero (if 'bit' is 0) in the bitmap starting at 's' and long 'count' bytes.
- *
- * The function is guaranteed to return a value >= 0 if 'bit' is 0 since if
- * no zero bit is found, it returns count*8 assuming the string is zero
- * padded on the right. However if 'bit' is 1 it is possible that there is
- * not a single set bit in the bitmap. In this special case -1 is returned.
- * */
-int64_t BitmapString::RawBitpos(const uint8_t *c, int64_t count, bool bit) {
- int64_t res = 0;
-
- if (bit) {
- int64_t ct = count;
-
- for (; count >= 8; c += 8, count -= 8) {
- uint64_t x = *reinterpret_cast<const uint64_t *>(c);
- if (x != 0) {
- return res + ClzllWithEndian(x);
- }
- res += 64;
- }
-
- if (count > 0) {
- uint64_t v = 0;
- __builtin_memcpy(&v, c, count);
- res += v == 0 ? count * 8 : ClzllWithEndian(v);
- }
-
- if (res == ct * 8) {
- return -1;
- }
- } else {
- for (; count >= 8; c += 8, count -= 8) {
- uint64_t x = *reinterpret_cast<const uint64_t *>(c);
- if (x != (uint64_t)-1) {
- return res + ClzllWithEndian(~x);
- }
- res += 64;
- }
-
- if (count > 0) {
- uint64_t v = -1;
- __builtin_memcpy(&v, c, count);
- res += v == (uint64_t)-1 ? count * 8 : ClzllWithEndian(~v);
- }
- }
-
- return res;
-}
-
std::pair<int64_t, int64_t> BitmapString::NormalizeRange(int64_t origin_start,
int64_t origin_end, int64_t length) {
if (origin_start < 0) origin_start = length + origin_start;
if (origin_end < 0) origin_end = length + origin_end;
diff --git a/src/types/redis_bitmap_string.h b/src/types/redis_bitmap_string.h
index ed4559c5..7997165a 100644
--- a/src/types/redis_bitmap_string.h
+++ b/src/types/redis_bitmap_string.h
@@ -30,11 +30,12 @@
namespace redis {
+// BitmapString handling bits using MSB numbering (also known as
bit-endianness).
class BitmapString : public Database {
public:
BitmapString(engine::Storage *storage, const std::string &ns) :
Database(storage, ns) {}
- static rocksdb::Status GetBit(const std::string &raw_value, uint32_t offset,
bool *bit);
- rocksdb::Status SetBit(const Slice &ns_key, std::string *raw_value, uint32_t
offset, bool new_bit, bool *old_bit);
+ static rocksdb::Status GetBit(const std::string &raw_value, uint32_t
bit_offset, bool *bit);
+ rocksdb::Status SetBit(const Slice &ns_key, std::string *raw_value, uint32_t
bit_offset, bool new_bit, bool *old_bit);
static rocksdb::Status BitCount(const std::string &raw_value, int64_t start,
int64_t stop, bool is_bit_index,
uint32_t *cnt);
static rocksdb::Status BitPos(const std::string &raw_value, bool bit,
int64_t start, int64_t stop, bool stop_given,
@@ -45,9 +46,6 @@ class BitmapString : public Database {
const std::vector<BitfieldOperation>
&ops,
std::vector<std::optional<BitfieldValue>> *rets);
- static size_t RawPopcount(const uint8_t *p, int64_t count);
- static int64_t RawBitpos(const uint8_t *c, int64_t count, bool bit);
-
// NormalizeRange converts a range to a normalized range, which is a range
with start and stop in [0, length).
//
// If start/end is negative, it will be converted to positive by adding
length to it, and if the result is still