git-hulk commented on code in PR #2402:
URL: https://github.com/apache/kvrocks/pull/2402#discussion_r1684372863


##########
src/commands/cmd_hash.cc:
##########
@@ -429,6 +430,234 @@ class CommandHRandField : public Commander {
   bool no_parameters_ = true;
 };
 
+class CommandFieldExpireBase : public Commander {
+ protected:
+  Status commonParse(const std::vector<std::string> &args, int start_idx) {
+    CommandParser parser(args, start_idx);
+    std::string_view expire_flag, num_flag;
+    uint64_t fields_num = 0;
+    while (parser.Good()) {
+      if (parser.EatEqICaseFlag("FIELDS", num_flag)) {
+        fields_num = GET_OR_RET(parser.template TakeInt<uint64_t>());
+        break;
+      } else if (parser.EatEqICaseFlag("NX", expire_flag)) {
+        field_expire_type_ = HashFieldExpireType::NX;
+      } else if (parser.EatEqICaseFlag("XX", expire_flag)) {
+        field_expire_type_ = HashFieldExpireType::XX;
+      } else if (parser.EatEqICaseFlag("GT", expire_flag)) {
+        field_expire_type_ = HashFieldExpireType::GT;
+      } else if (parser.EatEqICaseFlag("LT", expire_flag)) {
+        field_expire_type_ = HashFieldExpireType::LT;
+      } else {
+        return parser.InvalidSyntax();
+      }
+    }
+
+    auto remains = parser.Remains();
+    auto size = args.size();
+    if (remains != fields_num) {
+      return {Status::RedisParseErr, errWrongNumOfArguments};
+    }
+
+    for (size_t i = size - remains; i < size; i++) {
+      fields_.emplace_back(args_[i]);
+    }
+
+    return Status::OK();
+  }
+
+  Status expireFieldExecute(Server *srv, Connection *conn, std::string 
*output) {
+    std::vector<int8_t> ret;
+    redis::Hash hash_db(srv->storage, conn->GetNamespace());
+    auto s = hash_db.ExpireFields(args_[1], expire_, fields_, 
field_expire_type_, &ret);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+
+    *output = redis::MultiLen(ret.size());
+    for (const auto &i : ret) {
+      output->append(redis::Integer(i));
+    }
+
+    return Status::OK();
+  }
+
+  Status ttlExpireExecute(Server *srv, Connection *conn, std::vector<int64_t> 
&ret) {
+    redis::Hash hash_db(srv->storage, conn->GetNamespace());
+    auto s = hash_db.TTLFields(args_[1], fields_, &ret);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+    return Status::OK();
+  }
+
+  uint64_t expire_ = 0;
+  HashFieldExpireType field_expire_type_ = HashFieldExpireType::None;
+  std::vector<Slice> fields_;
+};
+
+class CommandHExpire : public CommandFieldExpireBase {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    auto parse_result = ParseInt<uint64_t>(args[2], 10);
+    if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};
+
+    expire_ = *parse_result * 1000 + util::GetTimeStampMS();
+    return CommandFieldExpireBase::commonParse(args, 3);
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    return expireFieldExecute(srv, conn, output);
+  }
+};
+
+class CommandHExpireAt : public CommandFieldExpireBase {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    auto parse_result = ParseInt<uint64_t>(args[2], 10);
+    if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};
+
+    expire_ = *parse_result * 1000;
+    return CommandFieldExpireBase::commonParse(args, 3);
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    return expireFieldExecute(srv, conn, output);
+  }
+};
+
+class CommandHPExpire : public CommandFieldExpireBase {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    auto parse_result = ParseInt<uint64_t>(args[2], 10);
+    if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};
+
+    expire_ = *parse_result + util::GetTimeStampMS();
+    return CommandFieldExpireBase::commonParse(args, 3);
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    return expireFieldExecute(srv, conn, output);
+  }
+};
+
+class CommandHPExpireAt : public CommandFieldExpireBase {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    auto parse_result = ParseInt<uint64_t>(args[2], 10);
+    if (!parse_result) return {Status::RedisParseErr, errValueNotInteger};
+
+    expire_ = *parse_result;
+    return CommandFieldExpireBase::commonParse(args, 3);
+  }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    return expireFieldExecute(srv, conn, output);
+  }
+};
+
+class CommandHExpireTime : public CommandFieldExpireBase {
+ public:
+  Status Parse(const std::vector<std::string> &args) override { return 
CommandFieldExpireBase::commonParse(args, 2); }
+
+  Status Execute(Server *srv, Connection *conn, std::string *output) override {
+    std::vector<int64_t> ret;
+    auto s = ttlExpireExecute(srv, conn, ret);
+    if (!s.IsOK()) {
+      return {Status::RedisExecErr, s.Msg()};
+    }
+    auto now = util::GetTimeStampMS();
+    *output = redis::MultiLen(ret.size());
+    for (const auto &ttl : ret) {
+      if (ttl > 0) {
+        output->append(redis::Integer((now + ttl) / 1000));
+      } else {
+        output->append(redis::Integer(ttl));
+      }
+    }
+    return Status::OK();
+  }
+};
+
+class CommandHPExpireTime : public CommandFieldExpireBase {

Review Comment:
   The command HExpireTime/HTTL/HPTTL doesn't support NX/XX/LT/GT flags. By the 
way,  it should be good to remove the ttlExpireExecute/expireFieldExecute and 
only add a reply helper function like `redis::IntArray`. @torwig @PragmaTwice 
What do you think?



##########
src/storage/redis_db.cc:
##########
@@ -655,6 +680,22 @@ rocksdb::Status Database::typeInternal(const Slice &key, 
RedisType *type) {
   if (!s.ok()) return s;
   if (metadata.Expired()) {
     *type = kRedisNone;
+  } else if (metadata.Type() == kRedisHash) {
+    HashMetadata hash_metadata(false);

Review Comment:
   Can add a `Count` method for the HashDB to avoid storing keys in memory?



##########
src/types/redis_hash.cc:
##########
@@ -45,7 +46,13 @@ rocksdb::Status Hash::Size(const Slice &user_key, uint64_t 
*size) {
   HashMetadata metadata(false);
   rocksdb::Status s = GetMetadata(Database::GetOptions{}, ns_key, &metadata);
   if (!s.ok()) return s;
-  *size = metadata.size;
+  if (!metadata.IsEncodedFieldExpire()) {
+    *size = metadata.size;
+  } else {
+    std::vector<FieldValue> field_values;
+    GetAll(user_key, &field_values, HashFetchType::kOnlyKey);

Review Comment:
   What @torwig means is adding a method called `Count` to count the number of 
keys instead of storing keys in memory. 



##########
src/types/redis_hash.cc:
##########
@@ -418,4 +469,272 @@ rocksdb::Status Hash::RandField(const Slice &user_key, 
int64_t command_count, st
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status Hash::ExpireFields(const Slice &user_key, uint64_t expire_ms, 
const std::vector<Slice> &fields,
+                                   HashFieldExpireType type, 
std::vector<int8_t> *ret) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  HashMetadata metadata(false);
+  LatestSnapShot ss(storage_);
+  rocksdb::Status s = GetMetadata(GetOptions{ss.GetSnapShot()}, ns_key, 
&metadata);
+  if (!s.ok() && !s.IsNotFound()) return s;
+  if (s.IsNotFound()) {
+    ret->resize(fields.size(), -2);
+    return rocksdb::Status::OK();
+  }
+
+  rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions();
+  read_options.snapshot = ss.GetSnapShot();
+
+  std::vector<rocksdb::Slice> keys;
+  keys.reserve(fields.size());
+  std::vector<std::string> sub_keys;
+  sub_keys.resize(fields.size());
+  for (size_t i = 0; i < fields.size(); i++) {
+    auto &field = fields[i];
+    sub_keys[i] = InternalKey(ns_key, field, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+    keys.emplace_back(sub_keys[i]);
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisHash);
+  batch->PutLogData(log_data.Encode());
+
+  // expire special field
+  std::vector<rocksdb::PinnableSlice> values;
+  values.resize(sub_keys.size());
+  std::vector<rocksdb::Status> statuses;
+  statuses.resize(sub_keys.size());
+  storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(), 
keys.size(), keys.data(), values.data(),
+                     statuses.data());
+
+  auto now = util::GetTimeStampMS();
+  for (size_t i = 0; i < keys.size(); i++) {
+    if (!statuses[i].ok() && !statuses[i].IsNotFound()) return statuses[i];
+
+    // no such field exists
+    if (statuses[i].IsNotFound()) {
+      ret->emplace_back(-2);
+      continue;
+    }
+
+    // expire with a pass time
+    if (expire_ms <= now) {
+      batch->Delete(sub_keys[i]);
+      ret->emplace_back(2);
+      metadata.size -= 1;
+      continue;
+    }
+
+    auto value = values[i].ToString();
+    uint64_t field_expire = 0;
+    decodeExpireFromValue(metadata, &value, field_expire);
+
+    // if a field has no associated expiration, we treated it expiration is 
infinite
+    auto treated_expire = field_expire == 0 ? UINT64_MAX : field_expire;
+    if (type == HashFieldExpireType::None || (type == HashFieldExpireType::NX 
&& field_expire == 0) ||
+        (type == HashFieldExpireType::XX && field_expire != 0) ||
+        (type == HashFieldExpireType::GT && expire_ms > treated_expire) ||
+        (type == HashFieldExpireType::LT && expire_ms < treated_expire)) {
+      encodeExpireToValue(&value, expire_ms);
+      batch->Put(sub_keys[i], value);
+      // 1 if expiration was updated
+      ret->emplace_back(1);
+    } else {
+      // 0 if condition has not been met
+      ret->emplace_back(0);
+    }
+  }
+
+  // convert rest field encoding
+  if (!metadata.IsFieldExpirationEnabled()) {
+    metadata.field_encoding = HashSubkeyEncoding::VALUE_WITH_TTL;
+
+    std::unordered_set<std::string_view> field_set;
+    for (auto field : fields) {
+      if (!field_set.emplace(field.ToStringView()).second) {
+        continue;
+      }
+    }
+
+    std::string prefix_key = InternalKey(ns_key, "", metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+    std::string next_version_prefix_key =
+        InternalKey(ns_key, "", metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
+
+    rocksdb::Slice upper_bound(next_version_prefix_key);
+    read_options.iterate_upper_bound = &upper_bound;
+
+    auto iter = util::UniqueIterator(storage_, read_options);
+    for (iter->Seek(prefix_key); iter->Valid() && 
iter->key().starts_with(prefix_key); iter->Next()) {
+      InternalKey sub_ikey(iter->key(), storage_->IsSlotIdEncoded());
+      auto value = iter->value().ToString();
+      if (field_set.find(sub_ikey.GetSubKey().ToStringView()) == 
field_set.end()) {
+        encodeExpireToValue(&value, 0);
+        batch->Put(sub_ikey.Encode(), value);
+      }
+    }

Review Comment:
   It's not good to do this because it will be too slow if a hash has a lot of 
fields. Even having the old hash key not allow to expire fields is better than 
the current implementation. cc @PragmaTwice @torwig 



##########
src/types/redis_hash.cc:
##########
@@ -418,4 +469,272 @@ rocksdb::Status Hash::RandField(const Slice &user_key, 
int64_t command_count, st
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status Hash::ExpireFields(const Slice &user_key, uint64_t expire_ms, 
const std::vector<Slice> &fields,
+                                   HashFieldExpireType type, 
std::vector<int8_t> *ret) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  HashMetadata metadata(false);
+  LatestSnapShot ss(storage_);
+  rocksdb::Status s = GetMetadata(GetOptions{ss.GetSnapShot()}, ns_key, 
&metadata);
+  if (!s.ok() && !s.IsNotFound()) return s;
+  if (s.IsNotFound()) {
+    ret->resize(fields.size(), -2);
+    return rocksdb::Status::OK();
+  }
+
+  rocksdb::ReadOptions read_options = storage_->DefaultMultiGetOptions();
+  read_options.snapshot = ss.GetSnapShot();
+
+  std::vector<rocksdb::Slice> keys;
+  keys.reserve(fields.size());
+  std::vector<std::string> sub_keys;
+  sub_keys.resize(fields.size());
+  for (size_t i = 0; i < fields.size(); i++) {
+    auto &field = fields[i];
+    sub_keys[i] = InternalKey(ns_key, field, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+    keys.emplace_back(sub_keys[i]);
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisHash);
+  batch->PutLogData(log_data.Encode());
+
+  // expire special field
+  std::vector<rocksdb::PinnableSlice> values;
+  values.resize(sub_keys.size());
+  std::vector<rocksdb::Status> statuses;
+  statuses.resize(sub_keys.size());
+  storage_->MultiGet(read_options, storage_->GetDB()->DefaultColumnFamily(), 
keys.size(), keys.data(), values.data(),
+                     statuses.data());
+
+  auto now = util::GetTimeStampMS();
+  for (size_t i = 0; i < keys.size(); i++) {
+    if (!statuses[i].ok() && !statuses[i].IsNotFound()) return statuses[i];
+
+    // no such field exists
+    if (statuses[i].IsNotFound()) {
+      ret->emplace_back(-2);
+      continue;
+    }
+
+    // expire with a pass time
+    if (expire_ms <= now) {
+      batch->Delete(sub_keys[i]);
+      ret->emplace_back(2);
+      metadata.size -= 1;
+      continue;
+    }
+
+    auto value = values[i].ToString();
+    uint64_t field_expire = 0;
+    decodeExpireFromValue(metadata, &value, field_expire);
+
+    // if a field has no associated expiration, we treated it expiration is 
infinite
+    auto treated_expire = field_expire == 0 ? UINT64_MAX : field_expire;
+    if (type == HashFieldExpireType::None || (type == HashFieldExpireType::NX 
&& field_expire == 0) ||
+        (type == HashFieldExpireType::XX && field_expire != 0) ||
+        (type == HashFieldExpireType::GT && expire_ms > treated_expire) ||
+        (type == HashFieldExpireType::LT && expire_ms < treated_expire)) {
+      encodeExpireToValue(&value, expire_ms);
+      batch->Put(sub_keys[i], value);
+      // 1 if expiration was updated
+      ret->emplace_back(1);
+    } else {
+      // 0 if condition has not been met
+      ret->emplace_back(0);
+    }
+  }
+
+  // convert rest field encoding
+  if (!metadata.IsFieldExpirationEnabled()) {
+    metadata.field_encoding = HashSubkeyEncoding::VALUE_WITH_TTL;
+
+    std::unordered_set<std::string_view> field_set;
+    for (auto field : fields) {
+      if (!field_set.emplace(field.ToStringView()).second) {
+        continue;
+      }
+    }
+
+    std::string prefix_key = InternalKey(ns_key, "", metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+    std::string next_version_prefix_key =
+        InternalKey(ns_key, "", metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
+
+    rocksdb::Slice upper_bound(next_version_prefix_key);
+    read_options.iterate_upper_bound = &upper_bound;
+
+    auto iter = util::UniqueIterator(storage_, read_options);
+    for (iter->Seek(prefix_key); iter->Valid() && 
iter->key().starts_with(prefix_key); iter->Next()) {
+      InternalKey sub_ikey(iter->key(), storage_->IsSlotIdEncoded());
+      auto value = iter->value().ToString();
+      if (field_set.find(sub_ikey.GetSubKey().ToStringView()) == 
field_set.end()) {
+        encodeExpireToValue(&value, 0);
+        batch->Put(sub_ikey.Encode(), value);
+      }
+    }
+  }
+
+  std::string bytes;
+  metadata.Encode(&bytes);
+  batch->Put(metadata_cf_handle_, ns_key, bytes);
+
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
+rocksdb::Status Hash::PersistFields(const Slice &user_key, const 
std::vector<Slice> &fields, std::vector<int8_t> *ret) {

Review Comment:
   I do not quite understand why we need to copy `ExpireFields` to implement 
the persist fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to