julic20s commented on code in PR #1901:
URL: https://github.com/apache/kvrocks/pull/1901#discussion_r1403363941
##########
src/types/redis_bitmap.cc:
##########
@@ -538,6 +546,320 @@ rocksdb::Status Bitmap::BitOp(BitOpFlags op_flag, const
std::string &op_name, co
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
+// SegmentCacheStore is used to read segments from storage.
+class Bitmap::SegmentCacheStore {
+ public:
+ SegmentCacheStore(engine::Storage *storage, rocksdb::ColumnFamilyHandle
*metadata_cf_handle,
+ std::string namespace_key, const Metadata &bitmap_metadata)
+ : storage_(storage),
+ metadata_cf_handle_(metadata_cf_handle),
+ ns_key_(std::move(namespace_key)),
+ metadata_(bitmap_metadata) {}
+
+ // Get a read-only segment by given index
+ rocksdb::Status Get(uint32_t index, const std::string **cache) {
+ std::string *res = nullptr;
+ auto s = get(index, false, &res);
+ if (s.ok()) {
+ *cache = res;
+ }
+ return s;
+ }
+
+ // Get a segment by given index, and mark it dirty.
+ rocksdb::Status GetMut(uint32_t index, std::string **cache) { return
get(index, true, cache); }
+
+ // Add all dirty segments into write batch.
+ void BatchForFlush(ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch) {
+ uint64_t used_size = 0;
+ for (auto &[index, content] : cache_) {
+ if (content.first) {
+ InternalKey sub_key(ns_key_, getSegmentSubKey(index),
metadata_.version, storage_->IsSlotIdEncoded());
+ batch->Put(sub_key.Encode(), content.second);
+ used_size = std::max(used_size, index * kBitmapSegmentBytes +
content.second.size());
+ }
+ }
+ if (used_size > metadata_.size) {
+ metadata_.size = used_size;
+ std::string bytes;
+ metadata_.Encode(&bytes);
+ batch->Put(metadata_cf_handle_, ns_key_, bytes);
+ }
+ }
+
+ private:
+ rocksdb::Status get(uint32_t index, bool set_dirty, std::string **cache) {
+ auto [seg_itor, no_cache] = cache_.try_emplace(index);
+ auto &[is_dirty, str] = seg_itor->second;
+
+ if (no_cache) {
+ is_dirty = false;
+ InternalKey sub_key(ns_key_, getSegmentSubKey(index), metadata_.version,
storage_->IsSlotIdEncoded());
+ rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(),
sub_key.Encode(), &str);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+ }
+
+ is_dirty |= set_dirty;
+ *cache = &str;
+ return rocksdb::Status::OK();
+ }
+
+ static std::string getSegmentSubKey(uint32_t index) { return
std::to_string(index * kBitmapSegmentBytes); }
+
+ engine::Storage *storage_;
+ rocksdb::ColumnFamilyHandle *metadata_cf_handle_;
+ std::string ns_key_;
+ Metadata metadata_;
+ // Segment index -> [is_dirty, segment_cache_string]
+ std::unordered_map<uint32_t, std::pair<bool, std::string>> cache_;
+};
+
+namespace {
+
+// Copy a range of bytes from entire bitmap and store them into
ArrayBitfieldBitmap.
+static rocksdb::Status CopySegmentsBytesToBitfield(Bitmap::SegmentCacheStore
&store, uint32_t byte_offset,
+ uint32_t bytes,
ArrayBitfieldBitmap *bitfield) {
+ bitfield->SetOffset(byte_offset);
+ bitfield->Reset();
+
+ uint32_t segment_index = byte_offset / kBitmapSegmentBytes;
+ int64_t remain_bytes = bytes;
+ // the byte_offset in current segment.
+ int segment_byte_offset = static_cast<int>(byte_offset %
kBitmapSegmentBytes);
+ for (; remain_bytes > 0; ++segment_index) {
+ const std::string *cache = nullptr;
+ auto cache_status = store.Get(segment_index, &cache);
+ if (!cache_status.ok()) {
+ return cache_status;
+ }
+
+ int cache_size = static_cast<int>(cache->size());
+ auto copyable = std::max(0, cache_size - segment_byte_offset);
+ auto copy_count = std::min(static_cast<int>(remain_bytes), copyable);
+ auto src = reinterpret_cast<const uint8_t *>(cache->data() +
segment_byte_offset);
+ auto status = bitfield->Set(byte_offset, copy_count, src);
+ if (!status) {
+ return rocksdb::Status::InvalidArgument();
+ }
+
+ // next segment will copy from its front.
+ byte_offset = (segment_index + 1) * kBitmapSegmentBytes;
+ // maybe negative, but still correct.
+ remain_bytes -= kBitmapSegmentBytes - segment_byte_offset;
+ segment_byte_offset = 0;
+ }
+
+ return rocksdb::Status::OK();
+}
+
+static rocksdb::Status GetBitfieldInteger(const ArrayBitfieldBitmap &bitfield,
uint32_t offset, BitfieldEncoding enc,
+ uint64_t *res) {
+ if (enc.IsSigned()) {
+ auto status = bitfield.GetSignedBitfield(offset, enc.Bits());
+ if (!status) {
+ return rocksdb::Status::InvalidArgument();
+ }
+ *res = status.GetValue();
+ } else {
+ auto status = bitfield.GetUnsignedBitfield(offset, enc.Bits());
+ if (!status) {
+ return rocksdb::Status::InvalidArgument();
+ }
+ *res = status.GetValue();
+ }
+ return rocksdb::Status::OK();
+}
+
+static rocksdb::Status CopyBitfieldBytesToSegments(Bitmap::SegmentCacheStore
&store,
+ const ArrayBitfieldBitmap
&bitfield, uint32_t byte_offset,
+ uint32_t bytes) {
+ uint32_t segment_index = byte_offset / kBitmapSegmentBytes;
+ int segment_byte_offset = static_cast<int>(byte_offset %
kBitmapSegmentBytes);
+ auto remain_bytes = static_cast<int32_t>(bytes);
+ for (; remain_bytes > 0; ++segment_index) {
+ std::string *cache = nullptr;
+ auto cache_status = store.GetMut(segment_index, &cache);
+ if (!cache_status.ok()) {
+ return cache_status;
+ }
+
+ auto copy_count = std::min(remain_bytes,
static_cast<int32_t>(kBitmapSegmentBytes - segment_byte_offset));
+ if (static_cast<int>(cache->size()) < segment_byte_offset + copy_count) {
+ cache->resize(segment_byte_offset + copy_count);
+ }
+
+ auto status = bitfield.Get(byte_offset, copy_count, cache->data() +
segment_byte_offset);
+ if (!status) {
+ return rocksdb::Status::InvalidArgument();
+ }
+
+ // next segment will copy from its front.
+ byte_offset = (segment_index + 1) * kBitmapSegmentBytes;
+ // maybe negative, but still correct.
+ remain_bytes -= static_cast<int32_t>(kBitmapSegmentBytes -
segment_byte_offset);
+ segment_byte_offset = 0;
+ }
+ return rocksdb::Status::OK();
+}
+
+} // namespace
+
+template <bool ReadOnly>
+rocksdb::Status Bitmap::bitfield(const Slice &user_key, const
std::vector<BitfieldOperation> &ops,
+ std::vector<std::optional<BitfieldValue>>
*rets) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+
+ std::optional<LockGuard> guard;
+ if constexpr (!ReadOnly) {
+ guard = LockGuard(storage_->GetLockManager(), ns_key);
+ }
+
+ BitmapMetadata metadata;
+ std::string raw_value;
+ auto s = GetMetadata(ns_key, &metadata, &raw_value);
+ if (!s.ok() && !s.IsNotFound()) {
+ return s;
+ }
+
+ if (metadata.Type() == RedisType::kRedisString) {
+ if constexpr (ReadOnly) {
+ s = BitmapString::BitfieldReadOnly(ns_key, raw_value, ops, rets);
+ } else {
+ s = BitmapString(storage_, namespace_).Bitfield(ns_key, &raw_value, ops,
rets);
+ }
+ return s;
+ }
+
+ if (metadata.Type() != RedisType::kRedisBitmap) {
+ return rocksdb::Status::InvalidArgument("The value is not a bitmap or
string.");
+ }
+
+ // We firstly do the bitfield operation by fetching segments into memory.
+ // Use SegmentCacheStore to record dirty segments. (if not read-only mode)
+ SegmentCacheStore cache(storage_, metadata_cf_handle_, ns_key, metadata);
+ runBitfieldOperationsWithCache<ReadOnly>(cache, ops, rets);
+
+ if constexpr (ReadOnly) {
+ return rocksdb::Status::OK();
+ }
+
+ // Write changes into storage.
+ auto batch = storage_->GetWriteBatchBase();
+ bitfieldWriteAheadLog(batch, ops);
+ cache.BatchForFlush(batch);
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+template <bool ReadOnly>
+rocksdb::Status Bitmap::runBitfieldOperationsWithCache(SegmentCacheStore
&cache,
+ const
std::vector<BitfieldOperation> &ops,
+
std::vector<std::optional<BitfieldValue>> *rets) {
+ ArrayBitfieldBitmap tmp_bitfield;
+ for (BitfieldOperation op : ops) {
+ // found all bytes that contents the bitfield.
+ uint32_t first_byte = op.offset / 8;
+ uint32_t last_bytes = (op.offset + op.encoding.Bits() - 1) / 8 + 1;
+ uint32_t bytes = last_bytes - first_byte;
+
+ ArrayBitfieldBitmap bitfield;
+ auto segment_status = CopySegmentsBytesToBitfield(cache, first_byte,
bytes, &bitfield);
+ if (!segment_status.ok()) {
+ return segment_status;
+ }
+
+ // Covert the bitfield from a buffer to an integer.
+ uint64_t unsigned_old_value = 0;
+ auto s = GetBitfieldInteger(bitfield, op.offset, op.encoding,
&unsigned_old_value);
+ if (!s.ok()) {
+ return s;
+ }
+
+ if constexpr (ReadOnly) {
+ rets->emplace_back() = {op.encoding, unsigned_old_value};
+ continue;
+ }
+
+ auto &ret = rets->emplace_back();
+ uint64_t unsigned_new_value = 0;
+ // BitfieldOp failed only when the length or bits not illegal.
Review Comment:
There is a typo. Should be `BitfieldOp failed only when the length or bits
illegal.`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]