PragmaTwice commented on code in PR #1901:
URL: https://github.com/apache/kvrocks/pull/1901#discussion_r1400839029


##########
src/types/redis_bitmap.cc:
##########
@@ -538,6 +545,281 @@ rocksdb::Status Bitmap::BitOp(BitOpFlags op_flag, const 
std::string &op_name, co
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+namespace {
+using SegmentCache = std::pair<std::string, std::string>;
+
+// used to simplify accessing bitfield that occupies multiple continuous 
segments.
+// you can access multiple segments like accessing in one big segments.
+class BitmapSegmentsView {
+ public:
+  BitmapSegmentsView() = default;
+  explicit BitmapSegmentsView(std::vector<SegmentCache *> segments, bool 
expand_in_advance = true)
+      : segments_(std::move(segments)) {
+    if (expand_in_advance && segments_.size() > 1) {
+      // expand segments during iteration (not in advance) maybe increase 
allocation times.
+      // we assume that all end of each segment except last one will be 
accessed.
+      // note: segments_.size() is unsigned.
+      for (size_t i = 0; i < segments_.size() - 1; ++i) {
+        ExpandBitmapSegment(&segments_[i]->second, kBitmapSegmentBytes);
+      }
+    }
+  }
+
+  char &operator[](uint32_t index) noexcept {
+    std::string &content = segments_[index / kBitmapSegmentBytes]->second;
+    index %= kBitmapSegmentBytes;
+    ExpandBitmapSegment(&content, index + 1);
+    return content[index];
+  }
+
+  const char &operator[](uint32_t index) const noexcept {
+    std::string &content = segments_[index / kBitmapSegmentBytes]->second;
+    index %= kBitmapSegmentBytes;
+    ExpandBitmapSegment(&content, index + 1);
+    return content[index];
+  }
+
+  SegmentCache &GetSegment(uint32_t index) noexcept { return 
*segments_[index]; }
+
+  const SegmentCache &GetSegment(uint32_t index) const noexcept { return 
*segments_[index]; }
+
+  size_t LastSegmentSize() const noexcept { return 
segments_.back()->second.size(); }
+
+  size_t SegmentsCount() const noexcept { return segments_.size(); }
+
+  auto SegmentBegin() noexcept { return segments_.begin(); }
+
+  auto SegmentEnd() noexcept { return segments_.end(); }
+
+ private:
+  std::vector<SegmentCache *> segments_;
+};
+}  // namespace
+
+template <bool ReadOnly>
+rocksdb::Status Bitmap::bitfield(const Slice &user_key, const 
std::vector<BitfieldOperation> &ops,
+                                 std::vector<std::string> *rets) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+
+  LockGuard guard;
+  if constexpr (!ReadOnly) {
+    guard = LockGuard(storage_->GetLockManager(), ns_key);
+  }
+
+  BitmapMetadata metadata;
+  std::string raw_value;
+  auto s = GetMetadata(ns_key, &metadata, &raw_value);
+  if (!s.ok() && !s.IsNotFound()) {
+    return s;
+  }
+
+  if (metadata.Type() == RedisType::kRedisString) {
+    if constexpr (ReadOnly) {
+      s = BitmapString::BitfieldReadOnly(ns_key, raw_value, ops, rets);
+    } else {
+      s = BitmapString(storage_, namespace_).Bitfield(ns_key, &raw_value, ops, 
rets);
+    }
+    return s;
+  }
+
+  if (metadata.Type() != RedisType::kRedisBitmap) {
+    return rocksdb::Status::InvalidArgument("The value is not a bitmap or 
string.");
+  }
+
+  std::unordered_map<uint32_t, SegmentCache> seg_cache;
+  auto get_segment = [&](uint32_t index, SegmentCache **res) {
+    auto [seg_itor, no_cache] = seg_cache.try_emplace(index);
+    auto &[sub_key, value] = seg_itor->second;
+    if (no_cache) {
+      auto sub_sub_key = std::to_string(index * kBitmapSegmentBytes);
+      sub_key = InternalKey(ns_key, sub_sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+      rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), sub_key, 
&value);
+      if (!s.ok() && !s.IsNotFound()) {
+        return s;
+      }
+    }
+    *res = &seg_itor->second;
+    return rocksdb::Status::OK();
+  };
+
+  auto init_view = [&get_segment](uint32_t offset, uint32_t bits, 
BitmapSegmentsView *view) {
+    uint32_t first_segment = offset / kBitmapSegmentBits;
+    uint32_t last_segment = (offset + bits - 1) / kBitmapSegmentBits;
+
+    std::vector<SegmentCache *> segments(last_segment - first_segment + 1);
+    for (auto &seg : segments) {
+      auto seg_status = get_segment(first_segment++, &seg);
+      if (!seg_status.ok()) {
+        return seg_status;
+      }
+    }
+    *view = BitmapSegmentsView(std::move(segments));
+    return rocksdb::Status::OK();
+  };
+
+  // get bitfield value from entire bitmap
+  auto get = [&init_view](uint32_t offset, BitfieldEncoding enc, uint64_t 
*res) {
+    BitmapSegmentsView view;
+    auto view_status = init_view(offset, enc.Bits(), &view);
+    if (!view_status.ok()) {
+      return view_status;
+    }
+
+    uint32_t first_bit_segment_offset = offset % kBitmapSegmentBits;
+    if (enc.IsSigned()) {
+      *res = GetSignedBitfield(view, first_bit_segment_offset, enc.Bits());
+    } else {
+      *res = GetUnsignedBitfield(view, first_bit_segment_offset, enc.Bits());
+    }
+
+    return rocksdb::Status::OK();
+  };
+
+  if constexpr (ReadOnly) {
+    std::string ret;
+    for (BitfieldOperation op : ops) {
+      if (op.type != BitfieldOperation::Type::kGet) {
+        return rocksdb::Status::InvalidArgument("Write bitfield in read-only 
mode.");
+      }
+
+      uint64_t unsigned_old_value = 0;
+      s = get(op.offset, op.encoding, &unsigned_old_value);
+      if (!s.ok()) {
+        return s;
+      }
+
+      if (op.encoding.IsSigned()) {
+        ret = 
redis::Integer(CastToSignedWithoutBitChanges(unsigned_old_value));
+      } else {
+        ret = redis::Integer(unsigned_old_value);
+      }
+
+      rets->push_back(std::move(ret));
+    }
+    return rocksdb::Status::OK();
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  bitfieldWriteAheadLog(batch, ops);
+
+  // set bitfield value among segments
+  auto set = [&](uint32_t offset, BitfieldEncoding enc, 
BitfieldOverflowBehavior overflow, uint64_t value) {
+    BitmapSegmentsView view;
+    s = init_view(offset, enc.Bits(), &view);
+    if (!s.ok()) {
+      return s;
+    }
+
+    uint32_t first_bit_segment_offset = offset % kBitmapSegmentBits;
+    SetBitfield(view, first_bit_segment_offset, enc.Bits(), value);
+
+    for (auto segment = view.SegmentBegin(); segment != view.SegmentEnd(); 
++segment) {
+      batch->Put((*segment)->first, (*segment)->second);
+    }
+
+    uint64_t segments_cnt = view.SegmentsCount();
+    uint64_t used_size = (offset / 8) + (segments_cnt - 1) * 
kBitmapSegmentBytes + view.LastSegmentSize();
+    if (metadata.size < used_size) {
+      metadata.size = used_size;
+      std::string bytes;
+      metadata.Encode(&bytes);
+      batch->Put(metadata_cf_handle_, ns_key, bytes);
+    }
+
+    return rocksdb::Status::OK();
+  };
+
+  std::string ret;
+  for (BitfieldOperation op : ops) {
+    uint64_t unsigned_old_value = 0;
+    s = get(op.offset, op.encoding, &unsigned_old_value);
+    if (!s.ok()) {
+      return s;
+    }
+
+    uint64_t unsigned_new_value = 0;
+    if (BitfieldOp(op, unsigned_old_value, &unsigned_new_value)) {
+      if (op.type != BitfieldOperation::Type::kGet) {
+        s = set(op.offset, op.encoding, op.overflow, unsigned_new_value);
+        if (!s.ok()) {
+          return s;
+        }
+      }
+
+      if (op.type == BitfieldOperation::Type::kSet) {
+        unsigned_new_value = unsigned_old_value;
+      }
+
+      if (op.encoding.IsSigned()) {
+        ret = 
redis::Integer(CastToSignedWithoutBitChanges(unsigned_new_value));
+      } else {
+        ret = redis::Integer(unsigned_new_value);
+      }
+    } else {
+      ret = redis::NilString();
+    }

Review Comment:
   I think it would be better to return `vector<optional<integer>>` here rather 
than vector of RESP strings here, in case that we can reuse the result.



##########
src/types/redis_bitmap_string.cc:
##########
@@ -204,4 +205,113 @@ int64_t BitmapString::RawBitpos(const uint8_t *c, int64_t 
count, bool bit) {
   return res;
 }
 
+namespace {
+class ReadOnlyStringBitmapView {
+ public:
+  explicit ReadOnlyStringBitmapView(std::string_view str) noexcept : str_(str) 
{}
+
+  char operator[](uint32_t index) const noexcept {
+    if (index >= str_.size()) {
+      return 0;
+    }
+    return str_[index];
+  }
+
+ private:
+  std::string_view str_;
+};
+
+class ReadWriteStringBitmapView {
+ public:
+  explicit ReadWriteStringBitmapView(std::string *str) : str_(str) {}
+
+  char &operator[](uint32_t index) {
+    if (index >= str_->size()) {
+      str_->append((index + 1) - str_->size(), 0);
+    }
+    return (*str_)[index];
+  }
+
+  const char &operator[](uint32_t index) const {
+    if (index >= str_->size()) {
+      str_->append((index + 1) - str_->size(), 0);
+    }
+    return (*str_)[index];
+  }
+
+ private:
+  std::string *str_;
+};
+}  // namespace
+
+rocksdb::Status BitmapString::Bitfield(const Slice &ns_key, std::string 
*raw_value,
+                                       const std::vector<BitfieldOperation> 
&ops, std::vector<std::string> *rets) {
+  auto header_offset = Metadata::GetOffsetAfterExpire((*raw_value)[0]);
+  std::string string_value = raw_value->substr(header_offset);
+  ReadWriteStringBitmapView view(&string_value);
+  std::string ret;
+  for (BitfieldOperation op : ops) {
+    uint64_t unsigned_old_value = 0;
+    if (op.encoding.IsSigned()) {
+      unsigned_old_value = GetSignedBitfield(view, op.offset, 
op.encoding.Bits());
+    } else {
+      unsigned_old_value = GetUnsignedBitfield(view, op.offset, 
op.encoding.Bits());
+    }
+
+    uint64_t unsigned_new_value = 0;
+    if (BitfieldOp(op, unsigned_old_value, &unsigned_new_value)) {
+      if (op.type != BitfieldOperation::Type::kGet) {
+        SetBitfield(view, op.offset, op.encoding.Bits(), unsigned_new_value);
+      }
+
+      if (op.type == BitfieldOperation::Type::kSet) {
+        unsigned_new_value = unsigned_old_value;
+      }
+
+      if (op.encoding.IsSigned()) {
+        ret = 
redis::Integer(CastToSignedWithoutBitChanges(unsigned_new_value));
+      } else {
+        ret = redis::Integer(unsigned_new_value);
+      }
+    } else {
+      ret = redis::NilString();
+    }

Review Comment:
   ditto



##########
src/types/redis_bitmap_string.cc:
##########
@@ -204,4 +205,113 @@ int64_t BitmapString::RawBitpos(const uint8_t *c, int64_t 
count, bool bit) {
   return res;
 }
 
+namespace {
+class ReadOnlyStringBitmapView {
+ public:
+  explicit ReadOnlyStringBitmapView(std::string_view str) noexcept : str_(str) 
{}
+
+  char operator[](uint32_t index) const noexcept {
+    if (index >= str_.size()) {
+      return 0;
+    }
+    return str_[index];
+  }
+
+ private:
+  std::string_view str_;
+};
+
+class ReadWriteStringBitmapView {
+ public:
+  explicit ReadWriteStringBitmapView(std::string *str) : str_(str) {}
+
+  char &operator[](uint32_t index) {
+    if (index >= str_->size()) {
+      str_->append((index + 1) - str_->size(), 0);

Review Comment:
   ```suggestion
         str_->resize(index + 1);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to