jjz921024 commented on code in PR #2402:
URL: https://github.com/apache/kvrocks/pull/2402#discussion_r1693914981
##########
src/types/redis_hash.cc:
##########
@@ -418,4 +469,272 @@ rocksdb::Status Hash::RandField(const Slice &user_key,
int64_t command_count, st
return rocksdb::Status::OK();
}
+rocksdb::Status Hash::ExpireFields(const Slice &user_key, uint64_t expire_ms,
const std::vector<Slice> &fields,
+ HashFieldExpireType type,
std::vector<int8_t> *ret) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ HashMetadata metadata(false);
+ LatestSnapShot ss(storage_);
+ rocksdb::Status s = GetMetadata(GetOptions{ss.GetSnapShot()}, ns_key,
&metadata);
+ if (!s.ok() && !s.IsNotFound()) return s;
+ if (s.IsNotFound()) {
+ ret->resize(fields.size(), -2);
+ return rocksdb::Status::OK();
+ }
+
+ rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions();
+ read_options.snapshot = ss.GetSnapShot();
+
+ std::vector<rocksdb::Slice> keys;
+ keys.reserve(fields.size());
+ std::vector<std::string> sub_keys;
+ sub_keys.resize(fields.size());
+ for (size_t i = 0; i < fields.size(); i++) {
+ auto &field = fields[i];
+ sub_keys[i] = InternalKey(ns_key, field, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ keys.emplace_back(sub_keys[i]);
+ }
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisHash);
+ batch->PutLogData(log_data.Encode());
+
+ // expire special field
+ std::vector<rocksdb::PinnableSlice> values;
+ values.resize(sub_keys.size());
+ std::vector<rocksdb::Status> statuses;
+ statuses.resize(sub_keys.size());
+ storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(),
keys.size(), keys.data(), values.data(),
+ statuses.data());
+
+ auto now = util::GetTimeStampMS();
+ for (size_t i = 0; i < keys.size(); i++) {
+ if (!statuses[i].ok() && !statuses[i].IsNotFound()) return statuses[i];
+
+ // no such field exists
+ if (statuses[i].IsNotFound()) {
+ ret->emplace_back(-2);
+ continue;
+ }
+
+ // expire with a pass time
+ if (expire_ms <= now) {
+ batch->Delete(sub_keys[i]);
+ ret->emplace_back(2);
+ metadata.size -= 1;
+ continue;
+ }
+
+ auto value = values[i].ToString();
+ uint64_t field_expire = 0;
+ decodeExpireFromValue(metadata, &value, field_expire);
+
+ // if a field has no associated expiration, we treated it expiration is
infinite
+ auto treated_expire = field_expire == 0 ? UINT64_MAX : field_expire;
+ if (type == HashFieldExpireType::None || (type == HashFieldExpireType::NX
&& field_expire == 0) ||
+ (type == HashFieldExpireType::XX && field_expire != 0) ||
+ (type == HashFieldExpireType::GT && expire_ms > treated_expire) ||
+ (type == HashFieldExpireType::LT && expire_ms < treated_expire)) {
+ encodeExpireToValue(&value, expire_ms);
+ batch->Put(sub_keys[i], value);
+ // 1 if expiration was updated
+ ret->emplace_back(1);
+ } else {
+ // 0 if condition has not been met
+ ret->emplace_back(0);
+ }
+ }
+
+ // convert rest field encoding
+ if (!metadata.IsFieldExpirationEnabled()) {
+ metadata.field_encoding = HashSubkeyEncoding::VALUE_WITH_TTL;
+
+ std::unordered_set<std::string_view> field_set;
+ for (auto field : fields) {
+ if (!field_set.emplace(field.ToStringView()).second) {
+ continue;
+ }
+ }
+
+ std::string prefix_key = InternalKey(ns_key, "", metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ std::string next_version_prefix_key =
+ InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+
+ rocksdb::Slice upper_bound(next_version_prefix_key);
+ read_options.iterate_upper_bound = &upper_bound;
+
+ auto iter = util::UniqueIterator(storage_, read_options);
+ for (iter->Seek(prefix_key); iter->Valid() &&
iter->key().starts_with(prefix_key); iter->Next()) {
+ InternalKey sub_ikey(iter->key(), storage_->IsSlotIdEncoded());
+ auto value = iter->value().ToString();
+ if (field_set.find(sub_ikey.GetSubKey().ToStringView()) ==
field_set.end()) {
+ encodeExpireToValue(&value, 0);
+ batch->Put(sub_ikey.Encode(), value);
+ }
+ }
+ }
+
+ std::string bytes;
+ metadata.Encode(&bytes);
+ batch->Put(metadata_cf_handle_, ns_key, bytes);
+
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status Hash::PersistFields(const Slice &user_key, const
std::vector<Slice> &fields, std::vector<int8_t> *ret) {
Review Comment:
Sorry, I don't understand what you meat. Persist a field is equivalent to
setting the expiration of the field to 0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]