torwig commented on code in PR #2402:
URL: https://github.com/apache/kvrocks/pull/2402#discussion_r1677092449
##########
src/types/redis_hash.cc:
##########
@@ -418,4 +473,206 @@ rocksdb::Status Hash::RandField(const Slice &user_key,
int64_t command_count, st
return rocksdb::Status::OK();
}
+rocksdb::Status Hash::ExpireFields(const Slice &user_key, uint64_t expire_ms,
+ const std::vector<Slice> &fields,
HashFieldExpireType type,
+ bool is_persist, std::vector<int8_t> *ret) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+ HashMetadata metadata(false);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{ss.GetSnapShot()}, ns_key,
&metadata);
+ if (!s.ok()) {
+ ret->resize(fields.size(), -2);
+ return rocksdb::Status::OK();
+ }
+
+ rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions();
+ read_options.snapshot = ss.GetSnapShot();
+
+ std::vector<Slice> sub_keys;
+ sub_keys.reserve(fields.size());
+ for (auto field : fields) {
+ sub_keys.emplace_back(InternalKey(ns_key, field, metadata.version,
storage_->IsSlotIdEncoded()).Encode());
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisHash);
+ batch->PutLogData(log_data.Encode());
+
+ // expire special field
+ std::vector<rocksdb::PinnableSlice> values_vector;
+ values_vector.resize(sub_keys.size());
+ std::vector<rocksdb::Status> statuses_vector;
+ statuses_vector.resize(sub_keys.size());
+ storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(),
sub_keys.size(), sub_keys.data(),
+ values_vector.data(), statuses_vector.data());
+
+ auto now = util::GetTimeStampMS();
+ for (size_t i = 0; i < sub_keys.size(); i++) {
+ if (!statuses_vector[i].ok() && !statuses_vector[i].IsNotFound()) return
statuses_vector[i];
+ auto status = statuses_vector[i];
+
+ // no such field exists
+ if (status.IsNotFound()) {
+ ret->emplace_back(-2);
+ continue;
+ }
+
+ InternalKey sub_ikey(ns_key, fields[i], metadata.version,
storage_->IsSlotIdEncoded());
+
+ // expire with a pass time
+ if (expire_ms <= now && !is_persist) {
+ batch->Delete(sub_ikey.Encode());
+ ret->emplace_back(2);
+ metadata.size -= 1;
+ continue;
+ }
+
+ auto value = values_vector[i].ToString();
+ uint64_t field_expire = 0;
+ decodeFieldValue(metadata, &value, field_expire);
+ if (isMeetCondition(type, expire_ms, field_expire)) {
+ encodeValueExpire(&value, expire_ms);
+ batch->Put(sub_ikey.Encode(), value);
+ if (is_persist && field_expire == 0) {
+ // for hpersist command, -1 if the field exists but has no associated
expiration
+ ret->emplace_back(-1);
+ } else {
+ // 1 if expiration was updated or removed
+ ret->emplace_back(1);
+ }
+ } else {
+ ret->emplace_back(0);
+ }
+ }
+
+ // convert rest field encoding
+ if (!metadata.IsEncodedFieldExpire()) {
+ metadata.flags |= METADATA_HASH_FIELD_EXPIRE_MASK;
+
+ std::unordered_set<std::string_view> field_set;
+ for (auto field : fields) {
+ if (!field_set.emplace(field.ToStringView()).second) {
+ continue;
+ }
+ }
+
+ std::string prefix_key = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ std::string next_version_prefix_key =
+ InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+
+ rocksdb::Slice upper_bound(next_version_prefix_key);
+ read_options.iterate_upper_bound = &upper_bound;
+
+ auto iter = util::UniqueIterator(storage_, read_options);
+ for (iter->Seek(prefix_key); iter->Valid() &&
iter->key().starts_with(prefix_key); iter->Next()) {
+ InternalKey sub_ikey(iter->key(), storage_->IsSlotIdEncoded());
+ auto value = iter->value().ToString();
+ if (field_set.find(sub_ikey.GetSubKey().ToStringView()) ==
field_set.end()) {
+ encodeValueExpire(&value, 0);
+ batch->Put(sub_ikey.Encode(), value);
+ }
+ }
+ }
+
+ std::string bytes;
+ metadata.Encode(&bytes);
+ batch->Put(metadata_cf_handle_, ns_key, bytes);
+
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status Hash::TTLFields(const Slice &user_key, const
std::vector<Slice> &fields, std::vector<int64_t> *ret) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+ HashMetadata metadata(false);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{ss.GetSnapShot()}, ns_key,
&metadata);
+ if (!s.ok()) {
+ ret->resize(fields.size(), -2);
+ return rocksdb::Status::OK();
+ }
+
+ rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions();
+ read_options.snapshot = ss.GetSnapShot();
+ std::vector<rocksdb::Slice> keys;
+
+ keys.reserve(fields.size());
+ std::vector<std::string> sub_keys;
+ sub_keys.resize(fields.size());
+ for (size_t i = 0; i < fields.size(); i++) {
+ auto &field = fields[i];
+ sub_keys[i] = InternalKey(ns_key, field, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ keys.emplace_back(sub_keys[i]);
+ }
+
+ std::vector<rocksdb::PinnableSlice> values_vector;
+ values_vector.resize(keys.size());
+ std::vector<rocksdb::Status> statuses_vector;
+ statuses_vector.resize(keys.size());
+ storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(),
keys.size(), keys.data(),
+ values_vector.data(), statuses_vector.data());
+
+ ret->reserve(fields.size());
+ auto now = util::GetTimeStampMS();
+ for (size_t i = 0; i < keys.size(); i++) {
+ if (!statuses_vector[i].ok() && !statuses_vector[i].IsNotFound()) return
statuses_vector[i];
+ auto value = values_vector[i].ToString();
+ auto status = statuses_vector[i];
+
+ if (status.IsNotFound()) {
+ ret->emplace_back(-2);
+ continue;
+ }
+
+ uint64_t expire = 0;
+ status = decodeFieldValue(metadata, &value, expire);
+ if (status.IsNotFound()) {
+ ret->emplace_back(-2);
+ } else if (expire == 0) {
+ ret->emplace_back(-1);
+ } else {
+ ret->emplace_back(int64_t(expire - now));
+ }
+ }
+ return rocksdb::Status::OK();
+}
+
+bool Hash::IsExpiredField(Metadata &metadata, const Slice &value) {
+ if (!(static_cast<HashMetadata*>(&metadata))->IsEncodedFieldExpire()) {
+ return false;
+ }
+ uint64_t expire = 0;
+ rocksdb::Slice data(value);
+ GetFixed64(&data, &expire);
+ return expire != 0 && expire < util::GetTimeStampMS();
+}
+
+rocksdb::Status Hash::decodeFieldValue(const HashMetadata &metadata,
std::string *value, uint64_t &expire) {
+ if (!metadata.IsEncodedFieldExpire()) {
+ return rocksdb::Status::OK();
+ }
+ rocksdb::Slice data(value->data(), value->size());
+ GetFixed64(&data, &expire);
+ *value = data.ToString();
+ return (expire == 0 || expire > util::GetTimeStampMS()) ?
rocksdb::Status::OK() : rocksdb::Status::NotFound();
+}
+
+rocksdb::Status Hash::encodeValueExpire(std::string *value, uint64_t expire) {
+ std::string buf;
+ PutFixed64(&buf, expire);
+ buf.append(*value);
+ value->assign(buf.data(), buf.size());
+ return rocksdb::Status::OK();
+}
+
+bool Hash::isMeetCondition(HashFieldExpireType type, uint64_t new_expire,
uint64_t old_expire) {
+ if (type == HashFieldExpireType::None) return true;
+ if (type == HashFieldExpireType::NX && old_expire == 0) return true;
+ if (type == HashFieldExpireType::XX && old_expire != 0) return true;
+ // if a filed has no associated expiration, we treated it expiration is
infinite
Review Comment:
Typo in the same word `filed`.
##########
src/storage/redis_db.cc:
##########
@@ -578,9 +579,20 @@ rocksdb::Status SubKeyScanner::Scan(RedisType type, const
Slice &user_key, const
break;
}
InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
+ auto value = iter->value().ToString();
+ // filter expired hash feild
Review Comment:
Typo in the same word. `feild`. I also tend to make typos in the word
`field` :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]