This is an automated email from the ASF dual-hosted git repository.

PragmaTwice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 13e5cae2e feat(hash): track persistent hash fields & support hash 
field expiration commands (#3472)
13e5cae2e is described below

commit 13e5cae2ecea271e3edac4e33dea780b0eb4d23d
Author: Twice <[email protected]>
AuthorDate: Fri May 15 23:58:26 2026 +0800

    feat(hash): track persistent hash fields & support hash field expiration 
commands (#3472)
    
    Proposal #3432. Tracking issue #3436.
    
    Add hash field expiration support based on the new persistent-field
    metadata model.
    
      This PR:
    - Tracks persistent hash fields in metadata for field-expiration
    encoding.
      - Adds `HEXPIRE` and `HPERSIST`.
    - Updates existing hash read/write commands to handle expired fields
    correctly.
    - Filters expired fields from hash read commands without mutating
    metadata.
    - Adds C++ and Go coverage for persistent, TTL, expired, and
    compaction-ghost states.
    
    Assisted-by: Codex/GPT 5.5 xhigh
---
 src/commands/cmd_hash.cc                      | 153 ++++-
 src/commands/cmd_key.cc                       |   2 +-
 src/storage/redis_metadata.cc                 |   6 +-
 src/storage/redis_metadata.h                  |   2 +-
 src/types/redis_hash.cc                       | 554 +++++++++++++++--
 src/types/redis_hash.h                        |  13 +
 tests/cppunit/metadata_test.cc                |   6 +-
 tests/cppunit/types/hash_test.cc              | 575 +++++++++++++++++-
 tests/gocase/unit/kmetadata/kmetadata_test.go |  10 +-
 tests/gocase/unit/type/hash/hash_hfe_test.go  | 843 ++++++++++++++++++++++++++
 tests/gocase/util/kmetadata.go                | 111 ++++
 11 files changed, 2193 insertions(+), 82 deletions(-)

diff --git a/src/commands/cmd_hash.cc b/src/commands/cmd_hash.cc
index 35e7c9856..1b2d86965 100644
--- a/src/commands/cmd_hash.cc
+++ b/src/commands/cmd_hash.cc
@@ -18,6 +18,8 @@
  *
  */
 
+#include <limits>
+
 #include "commander.h"
 #include "commands/command_parser.h"
 #include "error_constants.h"
@@ -27,6 +29,31 @@
 #include "types/redis_hash.h"
 
 namespace redis {
+namespace {
+
+template <typename Parser>
+Status ParseHashFieldListTail(Parser &parser, std::vector<std::string> 
*fields) {
+  if (!parser.Good()) {
+    return {Status::RedisParseErr, errWrongNumOfArguments};
+  }
+
+  auto num_fields = parser.template TakeInt<int64_t>(NumericRange<int64_t>{1, 
std::numeric_limits<int64_t>::max()}, 10);
+  if (!num_fields) {
+    return {Status::RedisParseErr, errValueNotInteger};
+  }
+  if (static_cast<size_t>(*num_fields) != parser.Remains()) {
+    return {Status::RedisParseErr, errWrongNumOfArguments};
+  }
+
+  fields->clear();
+  fields->reserve(static_cast<size_t>(*num_fields));
+  while (parser.Good()) {
+    fields->emplace_back(GET_OR_RET(parser.TakeStr()));
+  }
+  return Status::OK();
+}
+
+}  // namespace
 
 class CommandHGet : public Commander {
  public:
@@ -481,6 +508,128 @@ class CommandHRandField : public Commander {
   bool no_parameters_ = true;
 };
 
+class CommandHExpire : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 2);
+
+    auto seconds = parser.TakeInt<int64_t>();
+    if (!seconds) {
+      return {Status::RedisParseErr, errValueNotInteger};
+    }
+    if (*seconds < 0) {
+      return {Status::RedisParseErr, "invalid expire time, must be >= 0"};
+    }
+    seconds_ = *seconds;
+    condition_ = HashFieldExpireCondition::kNone;
+
+    while (parser.Good()) {
+      if (parser.EatEqICase("FIELDS")) {
+        GET_OR_RET(ParseHashFieldListTail(parser, &fields_));
+        return Commander::Parse(args);
+      }
+
+      HashFieldExpireCondition parsed_condition = 
HashFieldExpireCondition::kNone;
+      if (parser.EatEqICase("NX")) {
+        parsed_condition = HashFieldExpireCondition::kNX;
+      } else if (parser.EatEqICase("XX")) {
+        parsed_condition = HashFieldExpireCondition::kXX;
+      } else if (parser.EatEqICase("GT")) {
+        parsed_condition = HashFieldExpireCondition::kGT;
+      } else if (parser.EatEqICase("LT")) {
+        parsed_condition = HashFieldExpireCondition::kLT;
+      } else {
+        return {Status::RedisParseErr, errInvalidSyntax};
+      }
+      if (condition_ != HashFieldExpireCondition::kNone) {
+        return {Status::RedisParseErr, errInvalidSyntax};
+      }
+      condition_ = parsed_condition;
+    }
+    return {Status::RedisParseErr, errInvalidSyntax};
+  }
+
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    uint64_t expire_at = 0;
+    if (seconds_ > 0) {
+      auto seconds = static_cast<uint64_t>(seconds_);
+      if (seconds > std::numeric_limits<uint64_t>::max() / 1000) {
+        return {Status::RedisExecErr, "expire time overflow"};
+      }
+      uint64_t ttl_ms = seconds * 1000;
+      uint64_t now = util::GetTimeStampMS();
+      if (ttl_ms > std::numeric_limits<uint64_t>::max() - now) {
+        return {Status::RedisExecErr, "expire time overflow"};
+      }
+      expire_at = now + ttl_ms;
+    } else {
+      expire_at = util::GetTimeStampMS();
+    }
+
+    std::vector<int64_t> results;
+    std::vector<Slice> fields;
+    fields.reserve(fields_.size());
+    for (const auto &field : fields_) {
+      fields.emplace_back(field);
+    }
+    redis::Hash hash_db(srv->storage, conn->GetNamespace());
+    auto s = hash_db.ExpireFields(ctx, args_[1], fields, expire_at, 
condition_, &results);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+
+    std::vector<std::string> entries;
+    entries.reserve(results.size());
+    for (auto result : results) {
+      entries.emplace_back(redis::Integer(result));
+    }
+    *output = redis::Array(entries);
+    return Status::OK();
+  }
+
+ private:
+  int64_t seconds_ = 0;
+  HashFieldExpireCondition condition_ = HashFieldExpireCondition::kNone;
+  std::vector<std::string> fields_;
+};
+
+class CommandHPersist : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    CommandParser parser(args, 2);
+    if (!parser.EatEqICase("FIELDS")) {
+      return {Status::RedisParseErr, errInvalidSyntax};
+    }
+    GET_OR_RET(ParseHashFieldListTail(parser, &fields_));
+    return Commander::Parse(args);
+  }
+
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
+    std::vector<int64_t> results;
+    std::vector<Slice> fields;
+    fields.reserve(fields_.size());
+    for (const auto &field : fields_) {
+      fields.emplace_back(field);
+    }
+    redis::Hash hash_db(srv->storage, conn->GetNamespace());
+    auto s = hash_db.PersistFields(ctx, args_[1], fields, &results);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+
+    std::vector<std::string> entries;
+    entries.reserve(results.size());
+    for (auto result : results) {
+      entries.emplace_back(redis::Integer(result));
+    }
+    *output = redis::Array(entries);
+    return Status::OK();
+  }
+
+ private:
+  std::vector<std::string> fields_;
+};
+
 REDIS_REGISTER_COMMANDS(Hash, MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 
1, 1, 1),
                         MakeCmdAttr<CommandHIncrBy>("hincrby", 4, "write", 1, 
1, 1),
                         MakeCmdAttr<CommandHIncrByFloat>("hincrbyfloat", 4, 
"write", 1, 1, 1),
@@ -498,6 +647,8 @@ REDIS_REGISTER_COMMANDS(Hash, 
MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1
                         MakeCmdAttr<CommandHGetAll>("hgetall", 2, "read-only 
slow", 1, 1, 1),
                         MakeCmdAttr<CommandHScan>("hscan", -3, "read-only", 1, 
1, 1),
                         MakeCmdAttr<CommandHRangeByLex>("hrangebylex", -4, 
"read-only", 1, 1, 1),
-                        MakeCmdAttr<CommandHRandField>("hrandfield", -2, 
"read-only slow", 1, 1, 1), )
+                        MakeCmdAttr<CommandHRandField>("hrandfield", -2, 
"read-only slow", 1, 1, 1),
+                        MakeCmdAttr<CommandHExpire>("hexpire", -6, "write", 1, 
1, 1),
+                        MakeCmdAttr<CommandHPersist>("hpersist", -5, "write", 
1, 1, 1), )
 
 }  // namespace redis
diff --git a/src/commands/cmd_key.cc b/src/commands/cmd_key.cc
index 2aa647220..584f41293 100644
--- a/src/commands/cmd_key.cc
+++ b/src/commands/cmd_key.cc
@@ -609,7 +609,7 @@ class CommandKMetadata : public Commander {
 
       response.insert({redis::BulkString("mode"), 
redis::BulkString(HashSubkeyEncodingModeName(hash_metadata.mode))});
       if (hash_metadata.IsFieldExpirationEncoding()) {
-        response.insert({redis::BulkString("expsz"), 
redis::Integer(hash_metadata.expsz)});
+        response.insert({redis::BulkString("persist"), 
redis::Integer(hash_metadata.persist)});
         response.insert({redis::BulkString("lower"), 
redis::Integer(hash_metadata.lower)});
         response.insert({redis::BulkString("upper"), 
redis::Integer(hash_metadata.upper)});
       }
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 6945a0a6d..71a279149 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -347,7 +347,7 @@ void HashMetadata::Encode(std::string *dst) const {
   }
 
   PutFixed8(dst, static_cast<uint8_t>(mode));
-  PutFixed64(dst, expsz);
+  PutFixed64(dst, persist);
   PutFixed64(dst, lower);
   PutFixed64(dst, upper);
 }
@@ -359,7 +359,7 @@ rocksdb::Status HashMetadata::Decode(Slice *input) {
 
   if (input->empty()) {
     mode = HashSubkeyEncodingMode::kLegacy;
-    expsz = 0;
+    persist = 0;
     lower = 0;
     upper = 0;
     return rocksdb::Status::OK();
@@ -376,7 +376,7 @@ rocksdb::Status HashMetadata::Decode(Slice *input) {
   }
 
   mode = static_cast<HashSubkeyEncodingMode>(encoded_mode);
-  GetFixed64(input, &expsz);
+  GetFixed64(input, &persist);
   GetFixed64(input, &lower);
   GetFixed64(input, &upper);
   return rocksdb::Status::OK();
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 2e79cf232..abfc60989 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -224,7 +224,7 @@ class HashMetadata : public Metadata {
   static constexpr size_t kFieldExpirationPrefixSize = sizeof(uint64_t);
 
   HashSubkeyEncodingMode mode = HashSubkeyEncodingMode::kLegacy;
-  uint64_t expsz = 0;
+  uint64_t persist = 0;
   uint64_t lower = 0;
   uint64_t upper = 0;
 
diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc
index be481110b..a6bbea72b 100644
--- a/src/types/redis_hash.cc
+++ b/src/types/redis_hash.cc
@@ -26,14 +26,151 @@
 #include <cctype>
 #include <cmath>
 #include <random>
+#include <unordered_map>
+#include <unordered_set>
 #include <utility>
 
 #include "common/string_util.h"
 #include "db_util.h"
 #include "parse_util.h"
 #include "sample_helper.h"
+#include "time_util.h"
 
 namespace redis {
+namespace {
+
+enum class HashFieldStateKind {
+  kMissing,
+  kPersistent,
+  kLiveTTL,
+  kExpiredTTLPhysical,
+};
+
+struct HashFieldState {
+  HashFieldStateKind kind = HashFieldStateKind::kMissing;
+  std::string value;
+  uint64_t expire = 0;
+};
+
+bool IsFieldExpired(uint64_t expire, uint64_t now) { return expire != 0 && 
expire < now; }
+
+bool IsImmediateExpire(uint64_t expire_at, uint64_t now) { return expire_at <= 
now; }
+
+uint64_t SaturatingSub(uint64_t lhs, uint64_t rhs) { return rhs > lhs ? 0 : 
lhs - rhs; }
+
+void ClearBoundsIfNoTtlCandidates(HashMetadata *metadata) {
+  if (metadata->IsFieldExpirationEncoding() && metadata->size == 
metadata->persist) {
+    metadata->lower = 0;
+    metadata->upper = 0;
+  }
+}
+
+void ExpandExpireBounds(HashMetadata *metadata, uint64_t expire_at) {
+  if (!metadata->IsFieldExpirationEncoding()) return;
+  if (metadata->size == metadata->persist) {
+    metadata->lower = expire_at;
+    metadata->upper = expire_at;
+    return;
+  }
+  metadata->lower = std::min(metadata->lower, expire_at);
+  metadata->upper = std::max(metadata->upper, expire_at);
+}
+
+void ApplyMissingToPersistent(HashMetadata *metadata) {
+  metadata->size += 1;
+  if (metadata->IsFieldExpirationEncoding()) {
+    metadata->persist += 1;
+    ClearBoundsIfNoTtlCandidates(metadata);
+  }
+}
+
+void ApplyPersistentToTTL(HashMetadata *metadata, uint64_t expire_at) {
+  ExpandExpireBounds(metadata, expire_at);
+  metadata->persist = SaturatingSub(metadata->persist, 1);
+}
+
+void ApplyTTLToTTL(HashMetadata *metadata, uint64_t expire_at) { 
ExpandExpireBounds(metadata, expire_at); }
+
+void ApplyTTLToPersistent(HashMetadata *metadata) {
+  metadata->persist += 1;
+  if (metadata->persist > metadata->size) {
+    metadata->persist = metadata->size;
+  }
+  ClearBoundsIfNoTtlCandidates(metadata);
+}
+
+void ApplyPersistentToDeleted(HashMetadata *metadata) {
+  metadata->size = SaturatingSub(metadata->size, 1);
+  metadata->persist = SaturatingSub(metadata->persist, 1);
+  ClearBoundsIfNoTtlCandidates(metadata);
+}
+
+void ApplyTTLToDeleted(HashMetadata *metadata) {
+  metadata->size = SaturatingSub(metadata->size, 1);
+  if (metadata->persist > metadata->size) {
+    metadata->persist = metadata->size;
+  }
+  ClearBoundsIfNoTtlCandidates(metadata);
+}
+
+rocksdb::Status DecodeFieldState(const HashMetadata &metadata, Slice 
raw_value, uint64_t now, HashFieldState *state) {
+  state->kind = HashFieldStateKind::kMissing;
+  state->value.clear();
+  state->expire = 0;
+
+  uint64_t expire = 0;
+  auto s = metadata.DecodeSubkeyValue(&raw_value, &expire);
+  if (!s.ok()) return s;
+
+  state->value.assign(raw_value.data(), raw_value.size());
+  state->expire = expire;
+  if (expire == 0) {
+    state->kind = HashFieldStateKind::kPersistent;
+  } else if (IsFieldExpired(expire, now)) {
+    state->kind = HashFieldStateKind::kExpiredTTLPhysical;
+  } else {
+    state->kind = HashFieldStateKind::kLiveTTL;
+  }
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status LoadFieldState(engine::Storage *storage, engine::Context &ctx, 
const HashMetadata &metadata,
+                               const std::string &sub_key, uint64_t now, 
HashFieldState *state) {
+  std::string raw_value;
+  auto s = storage->Get(ctx, ctx.GetReadOptions(), sub_key, &raw_value);
+  if (s.IsNotFound()) {
+    *state = HashFieldState{};
+    return rocksdb::Status::OK();
+  }
+  if (!s.ok()) return s;
+  return DecodeFieldState(metadata, Slice(raw_value), now, state);
+}
+
+bool HExpireConditionPasses(HashFieldExpireCondition condition, const 
HashFieldState &state, uint64_t expire_at) {
+  switch (state.kind) {
+    case HashFieldStateKind::kMissing:
+    case HashFieldStateKind::kExpiredTTLPhysical:
+      return false;
+    case HashFieldStateKind::kPersistent:
+      return condition == HashFieldExpireCondition::kNone || condition == 
HashFieldExpireCondition::kNX ||
+             condition == HashFieldExpireCondition::kLT;
+    case HashFieldStateKind::kLiveTTL:
+      switch (condition) {
+        case HashFieldExpireCondition::kNone:
+        case HashFieldExpireCondition::kXX:
+          return true;
+        case HashFieldExpireCondition::kNX:
+          return false;
+        case HashFieldExpireCondition::kGT:
+          return expire_at > state.expire;
+        case HashFieldExpireCondition::kLT:
+          return expire_at < state.expire;
+      }
+  }
+  return false;
+}
+
+}  // namespace
 
 HashMetadata Hash::createMetadataForWrite(bool generate_version) const {
   return HashMetadata(generate_version, 
storage_->GetConfig()->hash_encoding_mode);
@@ -71,16 +208,21 @@ rocksdb::Status Hash::Get(engine::Context &ctx, const 
Slice &user_key, const Sli
   std::string raw_value;
   s = getRawValue(ctx, sub_key, &raw_value);
   if (!s.ok()) return s;
-  Slice payload(raw_value);
-  s = decodeValue(metadata, &payload);
+  HashFieldState state;
+  s = DecodeFieldState(metadata, Slice(raw_value), util::GetTimeStampMS(), 
&state);
   if (!s.ok()) return s;
-  value->assign(payload.data(), payload.size());
+  if (state.kind == HashFieldStateKind::kExpiredTTLPhysical) {
+    return rocksdb::Status::NotFound();
+  }
+  value->assign(state.value);
   return rocksdb::Status::OK();
 }
 
 rocksdb::Status Hash::IncrBy(engine::Context &ctx, const Slice &user_key, 
const Slice &field, int64_t increment,
                              int64_t *new_value) {
   bool exists = false;
+  bool expired_ttl_to_persistent = false;
+  uint64_t keep_expire = 0;
   int64_t old_value = 0;
 
   std::string ns_key = AppendNamespacePrefix(user_key);
@@ -91,23 +233,23 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const 
Slice &user_key, const
 
   std::string sub_key = InternalKey(ns_key, field, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
   if (s.ok()) {
-    std::string raw_value;
-    Slice value_bytes;
-    s = getRawValue(ctx, sub_key, &raw_value);
-    if (!s.ok() && !s.IsNotFound()) return s;
-    if (s.ok()) {
-      value_bytes = Slice(raw_value);
-      s = decodeValue(metadata, &value_bytes);
-      if (!s.ok()) return s;
-      auto parse_result = ParseInt<int64_t>(value_bytes.ToStringView(), 10);
+    HashFieldState state;
+    s = LoadFieldState(storage_, ctx, metadata, sub_key, 
util::GetTimeStampMS(), &state);
+    if (!s.ok()) return s;
+    if (state.kind == HashFieldStateKind::kPersistent || state.kind == 
HashFieldStateKind::kLiveTTL) {
+      auto parse_result = ParseInt<int64_t>(state.value, 10);
       if (!parse_result) {
         return rocksdb::Status::InvalidArgument(parse_result.Msg());
       }
-      if (isspace(value_bytes[0])) {
+      if (!state.value.empty() && isspace(state.value[0])) {
         return rocksdb::Status::InvalidArgument("value is not an integer");
       }
       old_value = *parse_result;
       exists = true;
+      keep_expire = state.expire;
+    } else if (state.kind == HashFieldStateKind::kExpiredTTLPhysical) {
+      exists = true;
+      expired_ttl_to_persistent = true;
     }
   }
   if ((increment < 0 && old_value < 0 && increment < (LLONG_MIN - old_value)) 
||
@@ -120,11 +262,17 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const 
Slice &user_key, const
   WriteBatchLogData log_data(kRedisHash);
   s = batch->PutLogData(log_data.Encode());
   if (!s.ok()) return s;
-  std::string encoded_value = 
metadata.EncodeSubkeyValue(std::to_string(*new_value));
+  std::string encoded_value = 
metadata.EncodeSubkeyValue(std::to_string(*new_value), keep_expire);
   s = batch->Put(sub_key, encoded_value);
   if (!s.ok()) return s;
   if (!exists) {
-    metadata.size += 1;
+    ApplyMissingToPersistent(&metadata);
+    std::string bytes;
+    metadata.Encode(&bytes);
+    s = batch->Put(metadata_cf_handle_, ns_key, bytes);
+    if (!s.ok()) return s;
+  } else if (metadata.IsFieldExpirationEncoding() && 
expired_ttl_to_persistent) {
+    ApplyTTLToPersistent(&metadata);
     std::string bytes;
     metadata.Encode(&bytes);
     s = batch->Put(metadata_cf_handle_, ns_key, bytes);
@@ -136,6 +284,8 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const 
Slice &user_key, const
 rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, const Slice &user_key, 
const Slice &field, double increment,
                                   double *new_value) {
   bool exists = false;
+  bool expired_ttl_to_persistent = false;
+  uint64_t keep_expire = 0;
   double old_value = 0;
 
   std::string ns_key = AppendNamespacePrefix(user_key);
@@ -146,20 +296,20 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, 
const Slice &user_key, c
 
   std::string sub_key = InternalKey(ns_key, field, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
   if (s.ok()) {
-    std::string raw_value;
-    Slice value_bytes;
-    s = getRawValue(ctx, sub_key, &raw_value);
-    if (!s.ok() && !s.IsNotFound()) return s;
-    if (s.ok()) {
-      value_bytes = Slice(raw_value);
-      s = decodeValue(metadata, &value_bytes);
-      if (!s.ok()) return s;
-      auto value_stat = ParseFloat(value_bytes.ToStringView());
-      if (!value_stat || isspace(value_bytes[0])) {
+    HashFieldState state;
+    s = LoadFieldState(storage_, ctx, metadata, sub_key, 
util::GetTimeStampMS(), &state);
+    if (!s.ok()) return s;
+    if (state.kind == HashFieldStateKind::kPersistent || state.kind == 
HashFieldStateKind::kLiveTTL) {
+      auto value_stat = ParseFloat(state.value);
+      if (!value_stat || (!state.value.empty() && isspace(state.value[0]))) {
         return rocksdb::Status::InvalidArgument("value is not a number");
       }
       old_value = *value_stat;
       exists = true;
+      keep_expire = state.expire;
+    } else if (state.kind == HashFieldStateKind::kExpiredTTLPhysical) {
+      exists = true;
+      expired_ttl_to_persistent = true;
     }
   }
   double n = old_value + increment;
@@ -172,11 +322,17 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx, 
const Slice &user_key, c
   WriteBatchLogData log_data(kRedisHash);
   s = batch->PutLogData(log_data.Encode());
   if (!s.ok()) return s;
-  std::string encoded_value = 
metadata.EncodeSubkeyValue(util::Float2String(*new_value));
+  std::string encoded_value = 
metadata.EncodeSubkeyValue(util::Float2String(*new_value), keep_expire);
   s = batch->Put(sub_key, encoded_value);
   if (!s.ok()) return s;
   if (!exists) {
-    metadata.size += 1;
+    ApplyMissingToPersistent(&metadata);
+    std::string bytes;
+    metadata.Encode(&bytes);
+    s = batch->Put(metadata_cf_handle_, ns_key, bytes);
+    if (!s.ok()) return s;
+  } else if (metadata.IsFieldExpirationEncoding() && 
expired_ttl_to_persistent) {
+    ApplyTTLToPersistent(&metadata);
     std::string bytes;
     metadata.Encode(&bytes);
     s = batch->Put(metadata_cf_handle_, ns_key, bytes);
@@ -218,10 +374,15 @@ rocksdb::Status Hash::MGet(engine::Context &ctx, const 
Slice &user_key, const st
   for (size_t i = 0; i < keys.size(); i++) {
     if (!statuses_vector[i].ok() && !statuses_vector[i].IsNotFound()) return 
statuses_vector[i];
     if (statuses_vector[i].ok()) {
-      Slice value(values_vector[i]);
-      s = decodeValue(metadata, &value);
+      HashFieldState state;
+      s = DecodeFieldState(metadata, Slice(values_vector[i]), 
util::GetTimeStampMS(), &state);
       if (!s.ok()) return s;
-      values->emplace_back(value.data(), value.size());
+      if (state.kind == HashFieldStateKind::kExpiredTTLPhysical) {
+        values->emplace_back("");
+        statuses->emplace_back(rocksdb::Status::NotFound());
+        continue;
+      }
+      values->emplace_back(std::move(state.value));
     } else {
       values->emplace_back("");
     }
@@ -249,24 +410,49 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const 
Slice &user_key, const
   s = getMetadata(ctx, ns_key, &metadata);
   if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
 
-  std::string value;
+  uint64_t physical_removed = 0;
+  uint64_t persistent_removed = 0;
   std::unordered_set<std::string_view> field_set;
+  uint64_t now = util::GetTimeStampMS();
   for (const auto &field : fields) {
     if (!field_set.emplace(field.ToStringView()).second) {
       continue;
     }
     std::string sub_key = InternalKey(ns_key, field, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
-    s = storage_->Get(ctx, ctx.GetReadOptions(), sub_key, &value);
-    if (s.ok()) {
-      *deleted_cnt += 1;
-      s = batch->Delete(sub_key);
-      if (!s.ok()) return s;
+    HashFieldState state;
+    s = LoadFieldState(storage_, ctx, metadata, sub_key, now, &state);
+    if (!s.ok()) return s;
+    switch (state.kind) {
+      case HashFieldStateKind::kMissing:
+        break;
+      case HashFieldStateKind::kPersistent:
+        persistent_removed += 1;
+        physical_removed += 1;
+        *deleted_cnt += 1;
+        s = batch->Delete(sub_key);
+        if (!s.ok()) return s;
+        break;
+      case HashFieldStateKind::kLiveTTL:
+        physical_removed += 1;
+        *deleted_cnt += 1;
+        s = batch->Delete(sub_key);
+        if (!s.ok()) return s;
+        break;
+      case HashFieldStateKind::kExpiredTTLPhysical:
+        physical_removed += 1;
+        s = batch->Delete(sub_key);
+        if (!s.ok()) return s;
+        break;
     }
   }
-  if (*deleted_cnt == 0) {
+  if (physical_removed == 0) {
     return rocksdb::Status::OK();
   }
-  metadata.size -= *deleted_cnt;
+  metadata.size = SaturatingSub(metadata.size, physical_removed);
+  if (metadata.IsFieldExpirationEncoding()) {
+    metadata.persist = SaturatingSub(metadata.persist, persistent_removed);
+    ClearBoundsIfNoTtlCandidates(&metadata);
+  }
   std::string bytes;
   metadata.Encode(&bytes);
   s = batch->Put(metadata_cf_handle_, ns_key, bytes);
@@ -282,12 +468,14 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const 
Slice &user_key, const st
   HashMetadata metadata = createMetadataForWrite();
   rocksdb::Status s = getMetadata(ctx, ns_key, &metadata);
   if (!s.ok() && !s.IsNotFound()) return s;
+  bool had_existing_fields = s.ok() && metadata.size > 0;
   bool ttl_updated = false;
   if (expire > 0 && metadata.expire != expire) {
     metadata.expire = expire;
     ttl_updated = true;
   }
   int added = 0;
+  bool metadata_changed = ttl_updated;
   auto batch = storage_->GetWriteBatchBase();
   WriteBatchLogData log_data(kRedisHash);
   s = batch->PutLogData(log_data.Encode());
@@ -298,6 +486,7 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const 
Slice &user_key, const st
   std::vector<std::string> keys_encoded;
   std::vector<std::string_view> values;
   keys.reserve(field_values.size());
+  keys_encoded.reserve(field_values.size());
   values.reserve(field_values.size());
   for (auto it = field_values.rbegin(); it != field_values.rend(); it++) {
     if (!field_set.insert(it->field).second) {
@@ -311,7 +500,7 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const 
Slice &user_key, const st
 
   std::vector<rocksdb::PinnableSlice> values_vector(keys.size());
   std::vector<rocksdb::Status> statuses_vector(keys.size());
-  if (metadata.size > 0) {
+  if (had_existing_fields) {
     rocksdb::ReadOptions read_options = ctx.DefaultMultiGetOptions();
     storage_->MultiGet(ctx, read_options, 
storage_->GetDB()->DefaultColumnFamily(), keys.size(), keys.data(),
                        values_vector.data(), statuses_vector.data());
@@ -319,29 +508,44 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const 
Slice &user_key, const st
 
   for (size_t field_index = 0; field_index < keys.size(); field_index++) {
     const rocksdb::Slice field_key = keys[field_index];
-    bool exists = false;
+    HashFieldStateKind state_kind = HashFieldStateKind::kMissing;
 
-    if (metadata.size > 0) {
+    if (had_existing_fields) {
       rocksdb::Status &field_status = statuses_vector[field_index];
       if (!field_status.ok() && !field_status.IsNotFound()) {
         return field_status;
       }
       if (field_status.ok()) {
-        if (nx) {
+        HashFieldState state;
+        s = DecodeFieldState(metadata, Slice(values_vector[field_index]), 
util::GetTimeStampMS(), &state);
+        if (!s.ok()) return s;
+        state_kind = state.kind;
+        if (nx && state.kind != HashFieldStateKind::kExpiredTTLPhysical) {
           continue;
         }
-        Slice existing_value(values_vector[field_index]);
-        s = decodeValue(metadata, &existing_value);
-        if (!s.ok()) return s;
-        if (existing_value.ToStringView() == values[field_index]) {
+        if (state.kind == HashFieldStateKind::kPersistent && state.value == 
values[field_index]) {
           continue;
         }
-        exists = true;
       }
     }
 
-    if (!exists) {
-      added++;
+    switch (state_kind) {
+      case HashFieldStateKind::kMissing:
+        ApplyMissingToPersistent(&metadata);
+        added++;
+        metadata_changed = true;
+        break;
+      case HashFieldStateKind::kPersistent:
+        break;
+      case HashFieldStateKind::kLiveTTL:
+        ApplyTTLToPersistent(&metadata);
+        metadata_changed = true;
+        break;
+      case HashFieldStateKind::kExpiredTTLPhysical:
+        ApplyTTLToPersistent(&metadata);
+        added++;
+        metadata_changed = true;
+        break;
     }
 
     std::string encoded_value = 
metadata.EncodeSubkeyValue(values[field_index]);
@@ -349,9 +553,8 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const 
Slice &user_key, const st
     if (!s.ok()) return s;
   }
 
-  if (added > 0 || ttl_updated) {
+  if (metadata_changed) {
     *added_cnt = added;
-    metadata.size += added;
     std::string bytes;
     metadata.Encode(&bytes);
     s = batch->Put(metadata_cf_handle_, ns_key, bytes);
@@ -410,12 +613,14 @@ rocksdb::Status Hash::RangeByLex(engine::Context &ctx, 
const Slice &user_key, co
           (!spec.max_infinite && ikey.GetSubKey().ToString() > spec.max))
         break;
     }
-    if (spec.offset >= 0 && pos++ < spec.offset) continue;
-
-    Slice value(iter->value());
-    s = decodeValue(metadata, &value);
+    HashFieldState state;
+    s = DecodeFieldState(metadata, Slice(iter->value()), 
util::GetTimeStampMS(), &state);
     if (!s.ok()) return s;
-    field_values->emplace_back(ikey.GetSubKey().ToString(), 
std::string(value.data(), value.size()));
+    if (state.kind == HashFieldStateKind::kExpiredTTLPhysical) {
+      continue;
+    }
+    if (spec.offset >= 0 && pos++ < spec.offset) continue;
+    field_values->emplace_back(ikey.GetSubKey().ToString(), 
std::move(state.value));
     if (spec.count > 0 && field_values->size() >= 
static_cast<unsigned>(spec.count)) break;
   }
   return rocksdb::Status::OK();
@@ -440,20 +645,20 @@ rocksdb::Status Hash::GetAll(engine::Context &ctx, const 
Slice &user_key, std::v
 
   auto iter = util::UniqueIterator(ctx, read_options);
   for (iter->Seek(prefix_key); iter->Valid() && 
iter->key().starts_with(prefix_key); iter->Next()) {
+    HashFieldState state;
+    s = DecodeFieldState(metadata, Slice(iter->value()), 
util::GetTimeStampMS(), &state);
+    if (!s.ok()) return s;
+    if (state.kind == HashFieldStateKind::kExpiredTTLPhysical) {
+      continue;
+    }
     if (type == HashFetchType::kOnlyKey) {
       InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
       field_values->emplace_back(ikey.GetSubKey().ToString(), "");
     } else if (type == HashFetchType::kOnlyValue) {
-      Slice value(iter->value());
-      s = decodeValue(metadata, &value);
-      if (!s.ok()) return s;
-      field_values->emplace_back("", std::string(value.data(), value.size()));
+      field_values->emplace_back("", std::move(state.value));
     } else {
       InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
-      Slice value(iter->value());
-      s = decodeValue(metadata, &value);
-      if (!s.ok()) return s;
-      field_values->emplace_back(ikey.GetSubKey().ToString(), 
std::string(value.data(), value.size()));
+      field_values->emplace_back(ikey.GetSubKey().ToString(), 
std::move(state.value));
     }
   }
   return rocksdb::Status::OK();
@@ -462,9 +667,51 @@ rocksdb::Status Hash::GetAll(engine::Context &ctx, const 
Slice &user_key, std::v
 rocksdb::Status Hash::Scan(engine::Context &ctx, const Slice &user_key, const 
std::string &cursor, uint64_t limit,
                            const std::string &field_prefix, 
std::vector<std::string> *fields,
                            std::vector<std::string> *values) {
-  return scanSubkeys<HashMetadata>(
-      ctx, kRedisHash, user_key, cursor, limit, field_prefix, fields, values,
-      [](const HashMetadata &metadata, Slice *value) { return 
metadata.DecodeSubkeyValue(value); });
+  fields->clear();
+  if (values != nullptr) values->clear();
+
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  HashMetadata metadata(false);
+  rocksdb::Status s = getMetadata(ctx, ns_key, &metadata);
+  if (!s.ok()) return s;
+
+  auto iter = util::UniqueIterator(ctx, ctx.DefaultScanOptions());
+  std::string match_prefix_key =
+      InternalKey(ns_key, field_prefix, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+
+  std::string start_key;
+  if (!cursor.empty()) {
+    start_key = InternalKey(ns_key, cursor, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+  } else {
+    start_key = match_prefix_key;
+  }
+
+  uint64_t live_count = 0;
+  uint64_t now = util::GetTimeStampMS();
+  for (iter->Seek(start_key); iter->Valid(); iter->Next()) {
+    if (!cursor.empty() && iter->key() == start_key) {
+      continue;
+    }
+    if (!iter->key().starts_with(match_prefix_key)) {
+      break;
+    }
+    HashFieldState state;
+    s = DecodeFieldState(metadata, Slice(iter->value()), now, &state);
+    if (!s.ok()) return s;
+    if (state.kind == HashFieldStateKind::kExpiredTTLPhysical) {
+      continue;
+    }
+    InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
+    fields->emplace_back(ikey.GetSubKey().ToString());
+    if (values != nullptr) {
+      values->emplace_back(std::move(state.value));
+    }
+    live_count++;
+    if (limit > 0 && live_count >= limit) {
+      break;
+    }
+  }
+  return iter->status();
 }
 
 rocksdb::Status Hash::RandField(engine::Context &ctx, const Slice &user_key, 
int64_t command_count,
@@ -489,6 +736,9 @@ rocksdb::Status Hash::RandField(engine::Context &ctx, const 
Slice &user_key, int
   if (!s.ok()) {
     return s;
   }
+  if (field_values->empty()) {
+    return rocksdb::Status::NotFound();
+  }
   switch (type) {
     case HashFetchType::kAll:
       break;
@@ -505,4 +755,174 @@ rocksdb::Status Hash::RandField(engine::Context &ctx, 
const Slice &user_key, int
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status Hash::ExpireFields(engine::Context &ctx, const Slice 
&user_key, const std::vector<Slice> &fields,
+                                   uint64_t expire_at_ms, 
HashFieldExpireCondition condition,
+                                   std::vector<int64_t> *results) {
+  results->clear();
+  results->resize(fields.size(), -2);
+
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  HashMetadata metadata(false);
+  rocksdb::Status s = getMetadata(ctx, ns_key, &metadata);
+  if (s.IsNotFound()) {
+    return rocksdb::Status::OK();
+  }
+  if (!s.ok()) return s;
+  if (!metadata.IsFieldExpirationEncoding()) {
+    return rocksdb::Status::InvalidArgument("hash field expiration is not 
supported by legacy hash encoding");
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisHash);
+  s = batch->PutLogData(log_data.Encode());
+  if (!s.ok()) return s;
+
+  bool metadata_changed = false;
+  uint64_t now = util::GetTimeStampMS();
+  bool immediate = IsImmediateExpire(expire_at_ms, now);
+  std::unordered_map<std::string, HashFieldState> state_cache;
+
+  for (size_t i = 0; i < fields.size(); i++) {
+    std::string field = fields[i].ToString();
+    std::string sub_key = InternalKey(ns_key, field, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+
+    auto cache_iter = state_cache.find(field);
+    if (cache_iter == state_cache.end()) {
+      HashFieldState state;
+      s = LoadFieldState(storage_, ctx, metadata, sub_key, now, &state);
+      if (!s.ok()) return s;
+      cache_iter = state_cache.emplace(field, std::move(state)).first;
+    }
+    HashFieldState &state = cache_iter->second;
+
+    if (state.kind == HashFieldStateKind::kMissing) {
+      (*results)[i] = -2;
+      continue;
+    }
+    if (state.kind == HashFieldStateKind::kExpiredTTLPhysical) {
+      s = batch->Delete(sub_key);
+      if (!s.ok()) return s;
+      ApplyTTLToDeleted(&metadata);
+      metadata_changed = true;
+      state = HashFieldState{};
+      (*results)[i] = -2;
+      continue;
+    }
+    if (!HExpireConditionPasses(condition, state, expire_at_ms)) {
+      (*results)[i] = 0;
+      continue;
+    }
+
+    if (immediate) {
+      s = batch->Delete(sub_key);
+      if (!s.ok()) return s;
+      if (state.kind == HashFieldStateKind::kPersistent) {
+        ApplyPersistentToDeleted(&metadata);
+      } else {
+        ApplyTTLToDeleted(&metadata);
+      }
+      metadata_changed = true;
+      state = HashFieldState{};
+      (*results)[i] = 2;
+      continue;
+    }
+
+    if (state.kind == HashFieldStateKind::kPersistent) {
+      ApplyPersistentToTTL(&metadata, expire_at_ms);
+    } else {
+      ApplyTTLToTTL(&metadata, expire_at_ms);
+    }
+    metadata_changed = true;
+    s = batch->Put(sub_key, metadata.EncodeSubkeyValue(state.value, 
expire_at_ms));
+    if (!s.ok()) return s;
+    state.kind = HashFieldStateKind::kLiveTTL;
+    state.expire = expire_at_ms;
+    (*results)[i] = 1;
+  }
+
+  if (metadata_changed) {
+    std::string bytes;
+    metadata.Encode(&bytes);
+    s = batch->Put(metadata_cf_handle_, ns_key, bytes);
+    if (!s.ok()) return s;
+    return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+  }
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status Hash::PersistFields(engine::Context &ctx, const Slice 
&user_key, const std::vector<Slice> &fields,
+                                    std::vector<int64_t> *results) {
+  results->clear();
+  results->resize(fields.size(), -2);
+
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  HashMetadata metadata(false);
+  rocksdb::Status s = getMetadata(ctx, ns_key, &metadata);
+  if (s.IsNotFound()) {
+    return rocksdb::Status::OK();
+  }
+  if (!s.ok()) return s;
+  if (!metadata.IsFieldExpirationEncoding()) {
+    return rocksdb::Status::InvalidArgument("hash field expiration is not 
supported by legacy hash encoding");
+  }
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisHash);
+  s = batch->PutLogData(log_data.Encode());
+  if (!s.ok()) return s;
+
+  bool metadata_changed = false;
+  uint64_t now = util::GetTimeStampMS();
+  std::unordered_map<std::string, HashFieldState> state_cache;
+
+  for (size_t i = 0; i < fields.size(); i++) {
+    std::string field = fields[i].ToString();
+    std::string sub_key = InternalKey(ns_key, field, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+
+    auto cache_iter = state_cache.find(field);
+    if (cache_iter == state_cache.end()) {
+      HashFieldState state;
+      s = LoadFieldState(storage_, ctx, metadata, sub_key, now, &state);
+      if (!s.ok()) return s;
+      cache_iter = state_cache.emplace(field, std::move(state)).first;
+    }
+    HashFieldState &state = cache_iter->second;
+
+    switch (state.kind) {
+      case HashFieldStateKind::kMissing:
+        (*results)[i] = -2;
+        break;
+      case HashFieldStateKind::kPersistent:
+        (*results)[i] = -1;
+        break;
+      case HashFieldStateKind::kExpiredTTLPhysical:
+        s = batch->Delete(sub_key);
+        if (!s.ok()) return s;
+        ApplyTTLToDeleted(&metadata);
+        metadata_changed = true;
+        state = HashFieldState{};
+        (*results)[i] = -2;
+        break;
+      case HashFieldStateKind::kLiveTTL:
+        ApplyTTLToPersistent(&metadata);
+        metadata_changed = true;
+        s = batch->Put(sub_key, metadata.EncodeSubkeyValue(state.value));
+        if (!s.ok()) return s;
+        state.kind = HashFieldStateKind::kPersistent;
+        state.expire = 0;
+        (*results)[i] = 1;
+        break;
+    }
+  }
+
+  if (metadata_changed) {
+    std::string bytes;
+    metadata.Encode(&bytes);
+    s = batch->Put(metadata_cf_handle_, ns_key, bytes);
+    if (!s.ok()) return s;
+    return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+  }
+  return rocksdb::Status::OK();
+}
+
 }  // namespace redis
diff --git a/src/types/redis_hash.h b/src/types/redis_hash.h
index 400c2401b..d18906aa7 100644
--- a/src/types/redis_hash.h
+++ b/src/types/redis_hash.h
@@ -39,6 +39,14 @@ struct FieldValue {
 
 enum class HashFetchType { kAll = 0, kOnlyKey = 1, kOnlyValue = 2 };
 
+enum class HashFieldExpireCondition {
+  kNone,
+  kNX,
+  kXX,
+  kGT,
+  kLT,
+};
+
 namespace redis {
 
 class Hash : public SubKeyScanner {
@@ -68,6 +76,11 @@ class Hash : public SubKeyScanner {
                        std::vector<std::string> *values = nullptr);
   rocksdb::Status RandField(engine::Context &ctx, const Slice &user_key, 
int64_t command_count,
                             std::vector<FieldValue> *field_values, 
HashFetchType type = HashFetchType::kOnlyKey);
+  rocksdb::Status ExpireFields(engine::Context &ctx, const Slice &user_key, 
const std::vector<Slice> &fields,
+                               uint64_t expire_at_ms, HashFieldExpireCondition 
condition,
+                               std::vector<int64_t> *results);
+  rocksdb::Status PersistFields(engine::Context &ctx, const Slice &user_key, 
const std::vector<Slice> &fields,
+                                std::vector<int64_t> *results);
 
  private:
   [[nodiscard]] HashMetadata createMetadataForWrite(bool generate_version = 
true) const;
diff --git a/tests/cppunit/metadata_test.cc b/tests/cppunit/metadata_test.cc
index dfd8a43bb..146ac4739 100644
--- a/tests/cppunit/metadata_test.cc
+++ b/tests/cppunit/metadata_test.cc
@@ -81,7 +81,7 @@ TEST(HashMetadata, DecodeLegacyMetadataWithoutExtensions) {
   EXPECT_EQ(decoded.version, legacy_md.version);
   EXPECT_EQ(decoded.size, legacy_md.size);
   EXPECT_EQ(decoded.mode, HashSubkeyEncodingMode::kLegacy);
-  EXPECT_EQ(decoded.expsz, 0);
+  EXPECT_EQ(decoded.persist, 0);
   EXPECT_EQ(decoded.lower, 0);
   EXPECT_EQ(decoded.upper, 0);
 }
@@ -91,7 +91,7 @@ TEST(HashMetadata, EncodeAndDecodeWithExtensions) {
   metadata.expire = 123000;
   metadata.version = 9;
   metadata.size = 11;
-  metadata.expsz = 3;
+  metadata.persist = 8;
   metadata.lower = 1000;
   metadata.upper = 2000;
 
@@ -104,7 +104,7 @@ TEST(HashMetadata, EncodeAndDecodeWithExtensions) {
   EXPECT_EQ(decoded.version, metadata.version);
   EXPECT_EQ(decoded.size, metadata.size);
   EXPECT_EQ(decoded.mode, HashSubkeyEncodingMode::kFieldExpiration);
-  EXPECT_EQ(decoded.expsz, metadata.expsz);
+  EXPECT_EQ(decoded.persist, metadata.persist);
   EXPECT_EQ(decoded.lower, metadata.lower);
   EXPECT_EQ(decoded.upper, metadata.upper);
 }
diff --git a/tests/cppunit/types/hash_test.cc b/tests/cppunit/types/hash_test.cc
index e939e5ad5..4332d5d90 100644
--- a/tests/cppunit/types/hash_test.cc
+++ b/tests/cppunit/types/hash_test.cc
@@ -29,9 +29,11 @@
 #include <memory>
 #include <random>
 #include <string>
+#include <vector>
 
 #include "parse_util.h"
 #include "test_base.h"
+#include "time_util.h"
 #include "types/redis_hash.h"
 
 class RedisHashTest : public TestBase {
@@ -100,6 +102,50 @@ class RedisHashFieldExpirationEncodingTest : public 
::testing::Test {
     return raw_value;
   }
 
+  HashMetadata hashMetadata(const std::string &key) {
+    HashMetadata metadata(false);
+    std::string ns_key = db_->AppendNamespacePrefix(key);
+    auto s = db_->GetMetadata(*ctx_, {kRedisHash}, ns_key, &metadata);
+    assert(s.ok());
+    return metadata;
+  }
+
+  rocksdb::Status getHashMetadata(const std::string &key, HashMetadata 
*metadata) {
+    std::string ns_key = db_->AppendNamespacePrefix(key);
+    return db_->GetMetadata(*ctx_, {kRedisHash}, ns_key, metadata);
+  }
+
+  std::string hashSubKey(const std::string &key, const std::string &field, 
const HashMetadata &metadata) {
+    std::string ns_key = db_->AppendNamespacePrefix(key);
+    return InternalKey(ns_key, field, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+  }
+
+  rocksdb::Status putRawHashValue(const std::string &key, const std::string 
&field, uint64_t expire,
+                                  const std::string &value) {
+    HashMetadata metadata = hashMetadata(key);
+    auto batch = storage_->GetWriteBatchBase();
+    auto s = batch->Put(hashSubKey(key, field, metadata), 
metadata.EncodeSubkeyValue(value, expire));
+    if (!s.ok()) return s;
+    return storage_->Write(*ctx_, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+  }
+
+  rocksdb::Status putHashMetadata(const std::string &key, const HashMetadata 
&metadata) {
+    std::string bytes;
+    metadata.Encode(&bytes);
+    auto batch = storage_->GetWriteBatchBase();
+    auto s = batch->Put(storage_->GetCFHandle(ColumnFamilyID::Metadata), 
db_->AppendNamespacePrefix(key), bytes);
+    if (!s.ok()) return s;
+    return storage_->Write(*ctx_, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+  }
+
+  rocksdb::Status deleteRawHashValue(const std::string &key, const std::string 
&field) {
+    HashMetadata metadata = hashMetadata(key);
+    auto batch = storage_->GetWriteBatchBase();
+    auto s = batch->Delete(hashSubKey(key, field, metadata));
+    if (!s.ok()) return s;
+    return storage_->Write(*ctx_, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+  }
+
   Config config_;
   std::unique_ptr<engine::Storage> storage_;
   std::unique_ptr<engine::Context> ctx_;
@@ -251,7 +297,8 @@ TEST_F(RedisHashFieldExpirationEncodingTest, 
StoreAndScanValuesWithModeOneEncodi
   HashMetadata metadata(false);
   std::string raw_value = rawHashValue(key.ToString(), field.ToString(), 
&metadata);
   EXPECT_EQ(metadata.mode, HashSubkeyEncodingMode::kFieldExpiration);
-  EXPECT_EQ(metadata.expsz, 0);
+  EXPECT_EQ(metadata.size, 1);
+  EXPECT_EQ(metadata.persist, 1);
   EXPECT_EQ(raw_value.size(), HashMetadata::kFieldExpirationPrefixSize + 
value.size());
 
   Slice decoded_value(raw_value);
@@ -280,6 +327,532 @@ TEST_F(RedisHashFieldExpirationEncodingTest, 
StoreAndScanValuesWithModeOneEncodi
   EXPECT_EQ(field_values[0].value, "value-1");
 }
 
+TEST_F(RedisHashFieldExpirationEncodingTest, 
PersistentCountTracksPersistentFieldWrites) {
+  const Slice key = "mode-one-persist-count";
+
+  uint64_t ret = 0;
+  auto s = hash_->MSet(*ctx_, key, {{"field-1", "1"}, {"field-2", "2"}}, 
false, &ret);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(ret, 2);
+
+  HashMetadata metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.mode, HashSubkeyEncodingMode::kFieldExpiration);
+  EXPECT_EQ(metadata.size, 2);
+  EXPECT_EQ(metadata.persist, 2);
+  EXPECT_EQ(metadata.lower, 0);
+  EXPECT_EQ(metadata.upper, 0);
+
+  int64_t new_int = 0;
+  s = hash_->IncrBy(*ctx_, key, "field-3", 3, &new_int);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(new_int, 3);
+
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 3);
+  EXPECT_EQ(metadata.persist, 3);
+
+  double new_float = 0;
+  s = hash_->IncrByFloat(*ctx_, key, "field-4", 1.5, &new_float);
+  ASSERT_TRUE(s.ok());
+  EXPECT_DOUBLE_EQ(new_float, 1.5);
+
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 4);
+  EXPECT_EQ(metadata.persist, 4);
+
+  s = hash_->Delete(*ctx_, key, {"field-1", "field-2"}, &ret);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(ret, 2);
+
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 2);
+  EXPECT_EQ(metadata.persist, 2);
+  EXPECT_EQ(metadata.lower, 0);
+  EXPECT_EQ(metadata.upper, 0);
+}
+
+TEST_F(RedisHashFieldExpirationEncodingTest, 
ExpireFieldsMaintainsPersistentToTTLAndTTLToTTLMetadata) {
+  const Slice key = "hfe-expire-fields-metadata";
+  uint64_t ret = 0;
+  auto s = hash_->MSet(*ctx_, key, {{"a", "1"}, {"b", "2"}}, false, &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(ret, 2);
+
+  HashMetadata metadata = hashMetadata(key.ToString());
+  ASSERT_EQ(metadata.size, 2);
+  ASSERT_EQ(metadata.persist, 2);
+  ASSERT_EQ(metadata.lower, 0);
+  ASSERT_EQ(metadata.upper, 0);
+
+  std::vector<int64_t> results;
+  uint64_t now = util::GetTimeStampMS();
+  uint64_t t10 = now + 10'000;
+  uint64_t t20 = now + 20'000;
+  uint64_t t5 = now + 5'000;
+  uint64_t t30 = now + 30'000;
+
+  s = hash_->ExpireFields(*ctx_, key, {"a"}, t10, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok());
+  ASSERT_EQ(results, std::vector<int64_t>({1}));
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 2);
+  EXPECT_EQ(metadata.persist, 1);
+  EXPECT_EQ(metadata.lower, t10);
+  EXPECT_EQ(metadata.upper, t10);
+
+  s = hash_->ExpireFields(*ctx_, key, {"b"}, t20, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok());
+  ASSERT_EQ(results, std::vector<int64_t>({1}));
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 2);
+  EXPECT_EQ(metadata.persist, 0);
+  EXPECT_EQ(metadata.lower, t10);
+  EXPECT_EQ(metadata.upper, t20);
+
+  s = hash_->ExpireFields(*ctx_, key, {"b"}, t5, 
HashFieldExpireCondition::kLT, &results);
+  ASSERT_TRUE(s.ok());
+  ASSERT_EQ(results, std::vector<int64_t>({1}));
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 2);
+  EXPECT_EQ(metadata.persist, 0);
+  EXPECT_EQ(metadata.lower, t5);
+  EXPECT_EQ(metadata.upper, t20);
+
+  s = hash_->ExpireFields(*ctx_, key, {"a"}, t30, 
HashFieldExpireCondition::kGT, &results);
+  ASSERT_TRUE(s.ok());
+  ASSERT_EQ(results, std::vector<int64_t>({1}));
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 2);
+  EXPECT_EQ(metadata.persist, 0);
+  EXPECT_EQ(metadata.lower, t5);
+  EXPECT_EQ(metadata.upper, t30);
+}
+
+TEST_F(RedisHashFieldExpirationEncodingTest, 
PersistFieldsMaintainsTTLToPersistentMetadata) {
+  const Slice key = "hfe-persist-fields-metadata";
+  uint64_t ret = 0;
+  auto s = hash_->MSet(*ctx_, key, {{"a", "1"}, {"b", "2"}}, false, &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(ret, 2);
+
+  std::vector<int64_t> results;
+  uint64_t now = util::GetTimeStampMS();
+  uint64_t t10 = now + 10'000;
+  uint64_t t20 = now + 20'000;
+  s = hash_->ExpireFields(*ctx_, key, {"a"}, t10, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok());
+  s = hash_->ExpireFields(*ctx_, key, {"b"}, t20, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok());
+
+  HashMetadata metadata = hashMetadata(key.ToString());
+  ASSERT_EQ(metadata.size, 2);
+  ASSERT_EQ(metadata.persist, 0);
+  ASSERT_EQ(metadata.lower, t10);
+  ASSERT_EQ(metadata.upper, t20);
+
+  s = hash_->PersistFields(*ctx_, key, {"a"}, &results);
+  ASSERT_TRUE(s.ok());
+  ASSERT_EQ(results, std::vector<int64_t>({1}));
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 2);
+  EXPECT_EQ(metadata.persist, 1);
+  EXPECT_EQ(metadata.lower, t10);
+  EXPECT_EQ(metadata.upper, t20);
+
+  s = hash_->PersistFields(*ctx_, key, {"b"}, &results);
+  ASSERT_TRUE(s.ok());
+  ASSERT_EQ(results, std::vector<int64_t>({1}));
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 2);
+  EXPECT_EQ(metadata.persist, 2);
+  EXPECT_EQ(metadata.lower, 0);
+  EXPECT_EQ(metadata.upper, 0);
+
+  s = hash_->PersistFields(*ctx_, key, {"a"}, &results);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(results, std::vector<int64_t>({-1}));
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 2);
+  EXPECT_EQ(metadata.persist, 2);
+  EXPECT_EQ(metadata.lower, 0);
+  EXPECT_EQ(metadata.upper, 0);
+}
+
+TEST_F(RedisHashFieldExpirationEncodingTest, 
ExpiredTTLPhysicalIsMissingForReadsAndDoesNotMutateMetadata) {
+  const Slice key = "hfe-expired-ttl-read";
+  uint64_t ret = 0;
+  auto s = hash_->MSet(*ctx_, key, {{"a", "1"}, {"b", "2"}, {"c", "3"}}, 
false, &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(ret, 3);
+
+  HashMetadata metadata = hashMetadata(key.ToString());
+  metadata.persist = 2;
+  metadata.lower = util::GetTimeStampMS() - 1000;
+  metadata.upper = metadata.lower;
+  s = putHashMetadata(key.ToString(), metadata);
+  ASSERT_TRUE(s.ok());
+  s = putRawHashValue(key.ToString(), "a", metadata.lower, "1");
+  ASSERT_TRUE(s.ok());
+
+  HashMetadata before = hashMetadata(key.ToString());
+  std::string got;
+  s = hash_->Get(*ctx_, key, "a", &got);
+  EXPECT_TRUE(s.IsNotFound());
+
+  std::vector<std::string> values;
+  std::vector<rocksdb::Status> statuses;
+  s = hash_->MGet(*ctx_, key, {"a", "b"}, &values, &statuses);
+  ASSERT_TRUE(s.ok());
+  EXPECT_TRUE(statuses[0].IsNotFound());
+  EXPECT_EQ(values[1], "2");
+
+  std::vector<FieldValue> field_values;
+  s = hash_->GetAll(*ctx_, key, &field_values);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(field_values.size(), 2);
+
+  RangeLexSpec spec;
+  spec.min = "a";
+  spec.max = "z";
+  spec.count = INT_MAX;
+  s = hash_->RangeByLex(*ctx_, key, spec, &field_values);
+  ASSERT_TRUE(s.ok());
+  ASSERT_EQ(field_values.size(), 2);
+
+  std::vector<std::string> fields;
+  s = hash_->Scan(*ctx_, key, "", 10, "", &fields, &values);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(fields.size(), 2);
+
+  HashMetadata after = hashMetadata(key.ToString());
+  EXPECT_EQ(after.size, before.size);
+  EXPECT_EQ(after.persist, before.persist);
+  EXPECT_EQ(after.lower, before.lower);
+  EXPECT_EQ(after.upper, before.upper);
+}
+
+TEST_F(RedisHashFieldExpirationEncodingTest, 
DuplicateFieldsUseCommandLocalState) {
+  const Slice key = "hfe-duplicate-fields";
+  uint64_t ret = 0;
+  auto s = hash_->MSet(*ctx_, key, {{"a", "1"}, {"b", "2"}, {"c", "3"}}, 
false, &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(ret, 3);
+
+  std::vector<int64_t> results;
+  uint64_t future = util::GetTimeStampMS() + 60'000;
+  s = hash_->ExpireFields(*ctx_, key, {"a", "a"}, future, 
HashFieldExpireCondition::kNX, &results);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  EXPECT_EQ(results, std::vector<int64_t>({1, 0}));
+  HashMetadata metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 3);
+  EXPECT_EQ(metadata.persist, 2);
+
+  s = hash_->PersistFields(*ctx_, key, {"a", "a"}, &results);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  EXPECT_EQ(results, std::vector<int64_t>({1, -1}));
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 3);
+  EXPECT_EQ(metadata.persist, 3);
+  EXPECT_EQ(metadata.lower, 0);
+  EXPECT_EQ(metadata.upper, 0);
+
+  s = hash_->ExpireFields(*ctx_, key, {"b", "b"}, util::GetTimeStampMS(), 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  EXPECT_EQ(results, std::vector<int64_t>({2, -2}));
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 2);
+  EXPECT_EQ(metadata.persist, 2);
+}
+
+TEST_F(RedisHashFieldExpirationEncodingTest, 
CompactionGhostDoesNotDecrementMetadataOnMissingSubkey) {
+  const Slice key = "hfe-compaction-ghost";
+  uint64_t ret = 0;
+  auto s = hash_->MSet(*ctx_, key, {{"field1", "1"}}, false, &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(ret, 1);
+
+  std::vector<int64_t> results;
+  uint64_t future = util::GetTimeStampMS() + 60'000;
+  s = hash_->ExpireFields(*ctx_, key, {"field1"}, future, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(results, std::vector<int64_t>({1}));
+
+  HashMetadata before = hashMetadata(key.ToString());
+  ASSERT_EQ(before.size, 1);
+  ASSERT_EQ(before.persist, 0);
+  s = deleteRawHashValue(key.ToString(), "field1");
+  ASSERT_TRUE(s.ok()) << s.ToString();
+
+  s = hash_->PersistFields(*ctx_, key, {"field1"}, &results);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  EXPECT_EQ(results, std::vector<int64_t>({-2}));
+  HashMetadata metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, before.size);
+  EXPECT_EQ(metadata.persist, before.persist);
+  EXPECT_EQ(metadata.lower, before.lower);
+  EXPECT_EQ(metadata.upper, before.upper);
+
+  s = hash_->ExpireFields(*ctx_, key, {"field1"}, future + 10'000, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  EXPECT_EQ(results, std::vector<int64_t>({-2}));
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, before.size);
+  EXPECT_EQ(metadata.persist, before.persist);
+  EXPECT_EQ(metadata.lower, before.lower);
+  EXPECT_EQ(metadata.upper, before.upper);
+
+  s = hash_->Set(*ctx_, key, "field1", "new", &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  EXPECT_EQ(ret, 1);
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 2);
+  EXPECT_EQ(metadata.persist, 1);
+  EXPECT_EQ(metadata.lower, before.lower);
+  EXPECT_EQ(metadata.upper, before.upper);
+}
+
+TEST_F(RedisHashFieldExpirationEncodingTest, 
DeleteHandlesPersistentLiveExpiredMissingAndDuplicateFields) {
+  const Slice key = "hfe-delete-state-matrix";
+  uint64_t ret = 0;
+  auto s =
+      hash_->MSet(*ctx_, key, {{"persistent", "1"}, {"live", "2"}, {"expired", 
"3"}, {"keeper", "4"}}, false, &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(ret, 4);
+
+  std::vector<int64_t> results;
+  uint64_t now = util::GetTimeStampMS();
+  s = hash_->ExpireFields(*ctx_, key, {"live"}, now + 60'000, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  s = hash_->ExpireFields(*ctx_, key, {"expired"}, now, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(results, std::vector<int64_t>({2}));
+
+  HashMetadata metadata = hashMetadata(key.ToString());
+  ASSERT_EQ(metadata.size, 3);
+  ASSERT_EQ(metadata.persist, 2);
+
+  s = putRawHashValue(key.ToString(), "expired", now - 1, "3");
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  metadata.size = 4;
+  metadata.persist = 2;
+  metadata.lower = now - 1;
+  metadata.upper = now + 60'000;
+  s = putHashMetadata(key.ToString(), metadata);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+
+  s = hash_->Delete(*ctx_, key, {"persistent", "live", "expired", "missing", 
"persistent"}, &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  EXPECT_EQ(ret, 2);
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 1);
+  EXPECT_EQ(metadata.persist, 1);
+  EXPECT_EQ(metadata.lower, 0);
+  EXPECT_EQ(metadata.upper, 0);
+
+  std::vector<FieldValue> fields;
+  s = hash_->GetAll(*ctx_, key, &fields);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(fields.size(), 1);
+  EXPECT_EQ(fields[0].field, "keeper");
+  EXPECT_EQ(fields[0].value, "4");
+}
+
+TEST_F(RedisHashFieldExpirationEncodingTest, 
MSetHandlesPersistentLiveExpiredAndGhostFields) {
+  const Slice key = "hfe-mset-state-matrix";
+  uint64_t ret = 0;
+  auto s = hash_->MSet(*ctx_, key, {{"persistent", "1"}, {"live", "2"}, 
{"expired", "3"}, {"ghost", "4"}}, false, &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(ret, 4);
+
+  std::vector<int64_t> results;
+  uint64_t now = util::GetTimeStampMS();
+  s = hash_->ExpireFields(*ctx_, key, {"live"}, now + 60'000, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  s = hash_->ExpireFields(*ctx_, key, {"expired"}, now, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  s = hash_->ExpireFields(*ctx_, key, {"ghost"}, now + 120'000, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+
+  HashMetadata before = hashMetadata(key.ToString());
+  ASSERT_EQ(before.size, 3);
+  ASSERT_EQ(before.persist, 1);
+  s = putRawHashValue(key.ToString(), "expired", now - 1, "3");
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  before.size = 4;
+  before.lower = now - 1;
+  before.upper = now + 120'000;
+  s = putHashMetadata(key.ToString(), before);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  s = deleteRawHashValue(key.ToString(), "ghost");
+  ASSERT_TRUE(s.ok()) << s.ToString();
+
+  s = hash_->MSet(*ctx_, key,
+                  {{"persistent", "11"}, {"live", "22"}, {"expired", "33"}, 
{"ghost", "44"}, {"missing", "55"}}, false,
+                  &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  EXPECT_EQ(ret, 3);
+  HashMetadata metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 6);
+  EXPECT_EQ(metadata.persist, 5);
+  EXPECT_EQ(metadata.lower, before.lower);
+  EXPECT_EQ(metadata.upper, before.upper);
+
+  std::string value;
+  s = hash_->Get(*ctx_, key, "persistent", &value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(value, "11");
+  s = hash_->Get(*ctx_, key, "live", &value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(value, "22");
+  s = hash_->Get(*ctx_, key, "expired", &value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(value, "33");
+  s = hash_->Get(*ctx_, key, "ghost", &value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(value, "44");
+  s = hash_->Get(*ctx_, key, "missing", &value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(value, "55");
+
+  s = hash_->PersistFields(*ctx_, key, {"ghost"}, &results);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(results, std::vector<int64_t>({-1}));
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 6);
+  EXPECT_EQ(metadata.persist, 5);
+}
+
+TEST_F(RedisHashFieldExpirationEncodingTest, 
SetNXHandlesPersistentLiveExpiredAndGhostFields) {
+  const Slice key = "hfe-msetnx-state-matrix";
+  uint64_t ret = 0;
+  auto s = hash_->MSet(*ctx_, key, {{"persistent", "1"}, {"live", "2"}, 
{"expired", "3"}, {"ghost", "4"}}, false, &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(ret, 4);
+
+  std::vector<int64_t> results;
+  uint64_t now = util::GetTimeStampMS();
+  s = hash_->ExpireFields(*ctx_, key, {"live"}, now + 60'000, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok());
+  s = hash_->ExpireFields(*ctx_, key, {"expired"}, now, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok());
+  s = hash_->ExpireFields(*ctx_, key, {"ghost"}, now + 120'000, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok());
+
+  HashMetadata before = hashMetadata(key.ToString());
+  s = putRawHashValue(key.ToString(), "expired", now - 1, "3");
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  before.size = 4;
+  before.lower = now - 1;
+  before.upper = now + 120'000;
+  s = putHashMetadata(key.ToString(), before);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  s = deleteRawHashValue(key.ToString(), "ghost");
+  ASSERT_TRUE(s.ok()) << s.ToString();
+
+  s = hash_->MSet(*ctx_, key,
+                  {{"persistent", "11"}, {"live", "22"}, {"expired", "33"}, 
{"ghost", "44"}, {"missing", "55"}}, true,
+                  &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  EXPECT_EQ(ret, 3);
+  HashMetadata metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 6);
+  EXPECT_EQ(metadata.persist, 4);
+  EXPECT_EQ(metadata.lower, before.lower);
+  EXPECT_EQ(metadata.upper, before.upper);
+
+  std::string value;
+  s = hash_->Get(*ctx_, key, "persistent", &value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(value, "1");
+  s = hash_->Get(*ctx_, key, "live", &value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(value, "2");
+  s = hash_->Get(*ctx_, key, "expired", &value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(value, "33");
+  s = hash_->Get(*ctx_, key, "ghost", &value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(value, "44");
+  s = hash_->Get(*ctx_, key, "missing", &value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(value, "55");
+}
+
+TEST_F(RedisHashFieldExpirationEncodingTest, 
IncrementsKeepLiveTTLAndTreatExpiredPhysicalAndGhostAsZero) {
+  const Slice key = "hfe-incr-state-matrix";
+  uint64_t ret = 0;
+  auto s =
+      hash_->MSet(*ctx_, key, {{"persistent", "10"}, {"live", "20"}, 
{"expired", "30"}, {"ghost", "40"}}, false, &ret);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  ASSERT_EQ(ret, 4);
+
+  std::vector<int64_t> results;
+  uint64_t now = util::GetTimeStampMS();
+  s = hash_->ExpireFields(*ctx_, key, {"live"}, now + 60'000, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok());
+  s = hash_->ExpireFields(*ctx_, key, {"expired"}, now, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok());
+  s = hash_->ExpireFields(*ctx_, key, {"ghost"}, now + 120'000, 
HashFieldExpireCondition::kNone, &results);
+  ASSERT_TRUE(s.ok());
+
+  HashMetadata metadata = hashMetadata(key.ToString());
+  s = putRawHashValue(key.ToString(), "expired", now - 1, "30");
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  metadata.size = 4;
+  metadata.lower = now - 1;
+  metadata.upper = now + 120'000;
+  s = putHashMetadata(key.ToString(), metadata);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+  s = deleteRawHashValue(key.ToString(), "ghost");
+  ASSERT_TRUE(s.ok()) << s.ToString();
+
+  int64_t int_value = 0;
+  s = hash_->IncrBy(*ctx_, key, "persistent", 1, &int_value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(int_value, 11);
+  s = hash_->IncrBy(*ctx_, key, "live", 1, &int_value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(int_value, 21);
+  metadata = hashMetadata(key.ToString());
+  std::string raw_value = rawHashValue(key.ToString(), "live", &metadata);
+  Slice decoded_value(raw_value);
+  uint64_t live_expire = 0;
+  ASSERT_TRUE(metadata.DecodeSubkeyValue(&decoded_value, &live_expire).ok());
+  EXPECT_EQ(decoded_value.ToStringView(), "21");
+  EXPECT_EQ(live_expire, now + 60'000);
+  s = hash_->IncrBy(*ctx_, key, "expired", 1, &int_value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(int_value, 1);
+  s = hash_->IncrBy(*ctx_, key, "ghost", 1, &int_value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(int_value, 1);
+  s = hash_->IncrBy(*ctx_, key, "missing", 1, &int_value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_EQ(int_value, 1);
+
+  metadata = hashMetadata(key.ToString());
+  EXPECT_EQ(metadata.size, 6);
+  EXPECT_EQ(metadata.persist, 4);
+  EXPECT_EQ(metadata.lower, now - 1);
+  EXPECT_EQ(metadata.upper, now + 120'000);
+
+  double float_value = 0;
+  s = hash_->IncrByFloat(*ctx_, key, "live", 0.5, &float_value);
+  ASSERT_TRUE(s.ok());
+  EXPECT_DOUBLE_EQ(float_value, 21.5);
+  metadata = hashMetadata(key.ToString());
+  raw_value = rawHashValue(key.ToString(), "live", &metadata);
+  decoded_value = Slice(raw_value);
+  live_expire = 0;
+  ASSERT_TRUE(metadata.DecodeSubkeyValue(&decoded_value, &live_expire).ok());
+  EXPECT_EQ(decoded_value.ToStringView(), "21.5");
+  EXPECT_EQ(live_expire, now + 60'000);
+  EXPECT_EQ(metadata.size, 6);
+  EXPECT_EQ(metadata.persist, 4);
+  EXPECT_EQ(metadata.lower, now - 1);
+  EXPECT_EQ(metadata.upper, now + 120'000);
+}
+
 TEST_F(RedisHashTest, HIncr) {
   int64_t value = 0;
   Slice field("hash-incrby-invalid-field");
diff --git a/tests/gocase/unit/kmetadata/kmetadata_test.go 
b/tests/gocase/unit/kmetadata/kmetadata_test.go
index 96fdbed12..5e3188ece 100644
--- a/tests/gocase/unit/kmetadata/kmetadata_test.go
+++ b/tests/gocase/unit/kmetadata/kmetadata_test.go
@@ -38,7 +38,7 @@ type kMetadataResponse struct {
        version int64  `redis:"version"`
        mode    string `redis:"mode"`
        format  string `redis:"format"`
-       expsz   int64  `redis:"expsz"`
+       persist int64  `redis:"persist"`
        lower   int64  `redis:"lower"`
        upper   int64  `redis:"upper"`
        head    int64  `redis:"head"`
@@ -72,7 +72,7 @@ func ExtractKMetadataResponse(result interface{}) 
(*kMetadataResponse, error) {
                "size":    &response.size,
                "flags":   &response.flags,
                "version": &response.version,
-               "expsz":   &response.expsz,
+               "persist": &response.persist,
                "lower":   &response.lower,
                "upper":   &response.upper,
                "head":    &response.head,
@@ -172,14 +172,14 @@ var testKMetadata = func(t *testing.T, configs 
util.KvrocksServerConfigs) {
                require.Equal(t, int64(2), metaResponse.size)
                require.Equal(t, configs["hash-encoding-mode"], 
metaResponse.mode)
                if configs["hash-encoding-mode"] == "field-expiration" {
-                       require.Equal(t, int64(0), metaResponse.expsz)
+                       require.Equal(t, int64(2), metaResponse.persist)
                        require.Equal(t, int64(0), metaResponse.lower)
                        require.Equal(t, int64(0), metaResponse.upper)
-                       require.Contains(t, resultMap, "expsz")
+                       require.Contains(t, resultMap, "persist")
                        require.Contains(t, resultMap, "lower")
                        require.Contains(t, resultMap, "upper")
                } else {
-                       require.NotContains(t, resultMap, "expsz")
+                       require.NotContains(t, resultMap, "persist")
                        require.NotContains(t, resultMap, "lower")
                        require.NotContains(t, resultMap, "upper")
                }
diff --git a/tests/gocase/unit/type/hash/hash_hfe_test.go 
b/tests/gocase/unit/type/hash/hash_hfe_test.go
new file mode 100644
index 000000000..e53217990
--- /dev/null
+++ b/tests/gocase/unit/type/hash/hash_hfe_test.go
@@ -0,0 +1,843 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package hash
+
+import (
+       "context"
+       "errors"
+       "sort"
+       "testing"
+       "time"
+
+       "github.com/redis/go-redis/v9"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+)
+
+const (
+       hfePersistentField = "a-persistent"
+       hfeLiveField       = "b-live"
+       hfeExpiredField    = "c-expired"
+       hfeMissingField    = "d-missing"
+       hfeKeeperField     = "z-keeper"
+       hfeLiveTTLSeconds  = 300
+)
+
+func runWithFieldExpirationHash(t *testing.T, fn func(t *testing.T, rdb 
*redis.Client, ctx context.Context)) {
+       t.Helper()
+
+       srv := util.StartServer(t, util.KvrocksServerConfigs{
+               "hash-encoding-mode": "field-expiration",
+               "resp3-enabled":      "yes",
+       })
+       defer srv.Close()
+
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+       fn(t, rdb, ctx)
+}
+
+func requireHashMetadata(t *testing.T, meta util.KMetadataResponse, size, 
persist int64) {
+       t.Helper()
+
+       require.Equal(t, "hash", meta.Type)
+       require.Equal(t, "field-expiration", meta.Mode)
+       require.Equal(t, size, meta.Size)
+       require.Equal(t, persist, meta.Persist)
+       require.LessOrEqual(t, meta.Persist, meta.Size)
+       if meta.Size == meta.Persist {
+               require.Equal(t, int64(0), meta.Lower)
+               require.Equal(t, int64(0), meta.Upper)
+       } else {
+               require.Greater(t, meta.Lower, int64(0))
+               require.GreaterOrEqual(t, meta.Upper, meta.Lower)
+       }
+}
+
+func waitHashFieldExpired(t *testing.T, rdb *redis.Client, ctx 
context.Context, key, field string) {
+       t.Helper()
+
+       require.Eventually(t, func() bool {
+               err := rdb.HGet(ctx, key, field).Err()
+               return errors.Is(err, redis.Nil)
+       }, 3*time.Second, 50*time.Millisecond)
+}
+
+func requireIntArray(t *testing.T, got interface{}, want []int64) {
+       t.Helper()
+
+       items, ok := got.([]interface{})
+       require.Truef(t, ok, "expected []interface{}, got %T", got)
+       require.Len(t, items, len(want))
+       for i, item := range items {
+               require.Equal(t, want[i], item)
+       }
+}
+
+func createHashFieldStates(t *testing.T, rdb *redis.Client, ctx 
context.Context, key string) {
+       t.Helper()
+
+       require.Equal(t, int64(4), rdb.HSet(ctx, key,
+               hfePersistentField, "10",
+               hfeLiveField, "20",
+               hfeExpiredField, "30",
+               hfeKeeperField, "40").Val())
+       requireIntArray(t, rdb.Do(ctx, "hexpire", key, hfeLiveTTLSeconds, 
"FIELDS", 1, hfeLiveField).Val(), []int64{1})
+       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1, "FIELDS", 1, 
hfeExpiredField).Val(), []int64{1})
+       waitHashFieldExpired(t, rdb, ctx, key, hfeExpiredField)
+       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 4, 2)
+}
+
+func scanPairsToMap(t *testing.T, pairs []string) map[string]string {
+       t.Helper()
+
+       require.Equal(t, 0, len(pairs)%2)
+       result := make(map[string]string, len(pairs)/2)
+       for i := 0; i < len(pairs); i += 2 {
+               result[pairs[i]] = pairs[i+1]
+       }
+       return result
+}
+
+func requireHashValues(t *testing.T, rdb *redis.Client, ctx context.Context, 
key string, want map[string]string) {
+       t.Helper()
+
+       for field, value := range want {
+               require.Equal(t, value, rdb.HGet(ctx, key, field).Val(), field)
+       }
+}
+
+func TestHashFieldExpirationMetadataLifecycle(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               key := "hfe-lifecycle"
+               require.Equal(t, int64(2), rdb.HSet(ctx, key, "a", "1", "b", 
"2").Val())
+               requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 2, 
2)
+
+               requireIntArray(t, rdb.Do(ctx, "hexpire", key, 60, "FIELDS", 1, 
"a").Val(), []int64{1})
+               m1 := util.GetKMetadata(t, rdb, ctx, key)
+               requireHashMetadata(t, m1, 2, 1)
+               require.Equal(t, m1.Lower, m1.Upper)
+
+               requireIntArray(t, rdb.Do(ctx, "hexpire", key, 120, "FIELDS", 
1, "b").Val(), []int64{1})
+               m2 := util.GetKMetadata(t, rdb, ctx, key)
+               requireHashMetadata(t, m2, 2, 0)
+               require.Equal(t, m1.Lower, m2.Lower)
+               require.Greater(t, m2.Upper, m1.Upper)
+
+               requireIntArray(t, rdb.Do(ctx, "hexpire", key, 30, "LT", 
"FIELDS", 1, "b").Val(), []int64{1})
+               m3 := util.GetKMetadata(t, rdb, ctx, key)
+               requireHashMetadata(t, m3, 2, 0)
+               require.Less(t, m3.Lower, m2.Lower)
+               require.Equal(t, m2.Upper, m3.Upper)
+
+               requireIntArray(t, rdb.Do(ctx, "hpersist", key, "FIELDS", 1, 
"b").Val(), []int64{1})
+               m4 := util.GetKMetadata(t, rdb, ctx, key)
+               requireHashMetadata(t, m4, 2, 1)
+               require.Equal(t, m3.Lower, m4.Lower)
+               require.Equal(t, m3.Upper, m4.Upper)
+
+               requireIntArray(t, rdb.Do(ctx, "hpersist", key, "FIELDS", 1, 
"a").Val(), []int64{1})
+               requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 2, 
2)
+       })
+}
+
+func TestHashFieldExpirationFiltersReadsWithoutMutatingMetadata(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               key := "hfe-read-filter"
+               require.Equal(t, int64(3), rdb.HSet(ctx, key, "a", "1", "b", 
"2", "c", "3").Val())
+               requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1, "FIELDS", 1, 
"a").Val(), []int64{1})
+               before := util.GetKMetadata(t, rdb, ctx, key)
+               requireHashMetadata(t, before, 3, 2)
+               waitHashFieldExpired(t, rdb, ctx, key, "a")
+
+               require.ErrorIs(t, rdb.HGet(ctx, key, "a").Err(), redis.Nil)
+               require.False(t, rdb.HExists(ctx, key, "a").Val())
+               require.Equal(t, int64(0), rdb.HStrLen(ctx, key, "a").Val())
+               require.Equal(t, []interface{}{nil, "2"}, rdb.HMGet(ctx, key, 
"a", "b").Val())
+
+               all := rdb.HGetAll(ctx, key).Val()
+               require.NotContains(t, all, "a")
+               keys := rdb.HKeys(ctx, key).Val()
+               require.NotContains(t, keys, "a")
+               values := rdb.HVals(ctx, key).Val()
+               require.ElementsMatch(t, []string{"2", "3"}, values)
+               scanned, _, err := rdb.HScan(ctx, key, 0, "", 10).Result()
+               require.NoError(t, err)
+               require.NotContains(t, scanned, "a")
+               scanned, cursor, err := rdb.HScan(ctx, key, 0, "", 1).Result()
+               require.NoError(t, err)
+               require.Equal(t, []string{"b", "2"}, scanned)
+               require.NotZero(t, cursor)
+               rangeByLex := rdb.Do(ctx, "hrangebylex", key, "[a", "[zz", 
"LIMIT", 0, 10).Val()
+               require.NotContains(t, rangeByLex, "a")
+               randField := rdb.HRandField(ctx, key, 10).Val()
+               require.NotContains(t, randField, "a")
+               require.Equal(t, int64(3), rdb.HLen(ctx, key).Val())
+
+               after := util.GetKMetadata(t, rdb, ctx, key)
+               require.Equal(t, before, after)
+       })
+}
+
+func TestHashFieldExpirationWriteCleanupMetadata(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               makeExpired := func(t *testing.T, key, value string) {
+                       t.Helper()
+                       require.Equal(t, int64(2), rdb.HSet(ctx, key, "a", 
value, "b", "2").Val())
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1, 
"FIELDS", 1, "a").Val(), []int64{1})
+                       waitHashFieldExpired(t, rdb, ctx, key, "a")
+               }
+
+               t.Run("hdel", func(t *testing.T) {
+                       key := "hfe-cleanup-hdel"
+                       makeExpired(t, key, "1")
+                       require.Equal(t, int64(0), rdb.HDel(ctx, key, 
"a").Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 1, 1)
+               })
+
+               t.Run("hpersist", func(t *testing.T) {
+                       key := "hfe-cleanup-hpersist"
+                       makeExpired(t, key, "1")
+                       requireIntArray(t, rdb.Do(ctx, "hpersist", key, 
"FIELDS", 1, "a").Val(), []int64{-2})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 1, 1)
+               })
+
+               t.Run("hexpire", func(t *testing.T) {
+                       key := "hfe-cleanup-hexpire"
+                       makeExpired(t, key, "1")
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 60, 
"FIELDS", 1, "a").Val(), []int64{-2})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 1, 1)
+               })
+
+               t.Run("hset", func(t *testing.T) {
+                       key := "hfe-cleanup-hset"
+                       makeExpired(t, key, "1")
+                       require.Equal(t, int64(1), rdb.HSet(ctx, key, "a", 
"new").Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 2, 2)
+                       require.Equal(t, "new", rdb.HGet(ctx, key, "a").Val())
+               })
+
+               t.Run("hsetnx", func(t *testing.T) {
+                       key := "hfe-cleanup-hsetnx"
+                       makeExpired(t, key, "1")
+                       require.Equal(t, true, rdb.HSetNX(ctx, key, "a", 
"new").Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 2, 2)
+               })
+
+               t.Run("hincrby", func(t *testing.T) {
+                       key := "hfe-cleanup-hincrby"
+                       makeExpired(t, key, "bad")
+                       require.Equal(t, int64(2), rdb.HIncrBy(ctx, key, "a", 
2).Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 2, 2)
+               })
+
+               t.Run("hincrbyfloat", func(t *testing.T) {
+                       key := "hfe-cleanup-hincrbyfloat"
+                       makeExpired(t, key, "bad")
+                       require.Equal(t, 1.5, rdb.HIncrByFloat(ctx, key, "a", 
1.5).Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 2, 2)
+               })
+       })
+}
+
+func TestHashFieldExpirationReadCommandsAcrossFieldStates(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               key := "hfe-read-state-matrix"
+               createHashFieldStates(t, rdb, ctx, key)
+               before := util.GetKMetadata(t, rdb, ctx, key)
+
+               require.Equal(t, "10", rdb.HGet(ctx, key, 
hfePersistentField).Val())
+               require.Equal(t, "20", rdb.HGet(ctx, key, hfeLiveField).Val())
+               require.ErrorIs(t, rdb.HGet(ctx, key, hfeExpiredField).Err(), 
redis.Nil)
+               require.ErrorIs(t, rdb.HGet(ctx, key, hfeMissingField).Err(), 
redis.Nil)
+
+               require.True(t, rdb.HExists(ctx, key, hfePersistentField).Val())
+               require.True(t, rdb.HExists(ctx, key, hfeLiveField).Val())
+               require.False(t, rdb.HExists(ctx, key, hfeExpiredField).Val())
+               require.False(t, rdb.HExists(ctx, key, hfeMissingField).Val())
+
+               require.Equal(t, int64(2), rdb.HStrLen(ctx, key, 
hfePersistentField).Val())
+               require.Equal(t, int64(2), rdb.HStrLen(ctx, key, 
hfeLiveField).Val())
+               require.Equal(t, int64(0), rdb.HStrLen(ctx, key, 
hfeExpiredField).Val())
+               require.Equal(t, int64(0), rdb.HStrLen(ctx, key, 
hfeMissingField).Val())
+
+               require.Equal(t, []interface{}{"10", "20", nil, nil},
+                       rdb.HMGet(ctx, key, hfePersistentField, hfeLiveField, 
hfeExpiredField, hfeMissingField).Val())
+               require.Equal(t, int64(4), rdb.HLen(ctx, key).Val())
+
+               require.Equal(t, map[string]string{
+                       hfePersistentField: "10",
+                       hfeLiveField:       "20",
+                       hfeKeeperField:     "40",
+               }, rdb.HGetAll(ctx, key).Val())
+               require.ElementsMatch(t, []string{hfePersistentField, 
hfeLiveField, hfeKeeperField}, rdb.HKeys(ctx, key).Val())
+               require.ElementsMatch(t, []string{"10", "20", "40"}, 
rdb.HVals(ctx, key).Val())
+
+               scanned, cursor, err := rdb.HScan(ctx, key, 0, "", 100).Result()
+               require.NoError(t, err)
+               require.Zero(t, cursor)
+               require.Equal(t, map[string]string{
+                       hfePersistentField: "10",
+                       hfeLiveField:       "20",
+                       hfeKeeperField:     "40",
+               }, scanPairsToMap(t, scanned))
+
+               scannedKeys, cursor, err := rdb.HScanNoValues(ctx, key, 0, "", 
100).Result()
+               require.NoError(t, err)
+               require.Zero(t, cursor)
+               require.ElementsMatch(t, []string{hfePersistentField, 
hfeLiveField, hfeKeeperField}, scannedKeys)
+
+               rangeByLex := rdb.Do(ctx, "hrangebylex", key, "[a", "[zz", 
"LIMIT", 0, 10).Val()
+               require.Equal(t, []interface{}{
+                       hfePersistentField, "10",
+                       hfeLiveField, "20",
+                       hfeKeeperField, "40",
+               }, rangeByLex)
+               require.Equal(t, []interface{}{hfeLiveField, "20"},
+                       rdb.Do(ctx, "hrangebylex", key, "[a", "[zz", "LIMIT", 
1, 1).Val())
+               require.Equal(t, []interface{}{
+                       hfeKeeperField, "40",
+                       hfeLiveField, "20",
+                       hfePersistentField, "10",
+               }, rdb.Do(ctx, "hrangebylex", key, "[zz", "[a", "REV", "LIMIT", 
0, 10).Val())
+
+               randFields := rdb.HRandField(ctx, key, 20).Val()
+               require.ElementsMatch(t, []string{hfePersistentField, 
hfeLiveField, hfeKeeperField}, randFields)
+               randFields = rdb.HRandField(ctx, key, -20).Val()
+               require.NotContains(t, randFields, hfeExpiredField)
+               require.NotContains(t, randFields, hfeMissingField)
+               for _, field := range randFields {
+                       require.Contains(t, []string{hfePersistentField, 
hfeLiveField, hfeKeeperField}, field)
+               }
+               randWithValues := rdb.HRandFieldWithValues(ctx, key, 20).Val()
+               gotRandValues := map[string]string{}
+               for _, kv := range randWithValues {
+                       gotRandValues[kv.Key] = kv.Value
+               }
+               require.Equal(t, map[string]string{
+                       hfePersistentField: "10",
+                       hfeLiveField:       "20",
+                       hfeKeeperField:     "40",
+               }, gotRandValues)
+
+               after := util.GetKMetadata(t, rdb, ctx, key)
+               require.Equal(t, before, after)
+       })
+}
+
+func TestHashFieldExpirationWriteCommandsAcrossFieldStates(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               t.Run("hdel mixed fields", func(t *testing.T) {
+                       key := "hfe-write-hdel-mixed"
+                       createHashFieldStates(t, rdb, ctx, key)
+                       require.Equal(t, int64(2),
+                               rdb.HDel(ctx, key, hfePersistentField, 
hfeLiveField, hfeExpiredField, hfeMissingField, hfePersistentField).Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 1, 1)
+                       require.Equal(t, map[string]string{hfeKeeperField: 
"40"}, rdb.HGetAll(ctx, key).Val())
+               })
+
+               t.Run("hset clears ttl and treats expired as new", func(t 
*testing.T) {
+                       key := "hfe-write-hset-mixed"
+                       createHashFieldStates(t, rdb, ctx, key)
+                       require.Equal(t, int64(2), rdb.HSet(ctx, key,
+                               hfePersistentField, "11",
+                               hfeLiveField, "21",
+                               hfeExpiredField, "31",
+                               hfeMissingField, "41").Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 5, 5)
+                       requireHashValues(t, rdb, ctx, key, map[string]string{
+                               hfePersistentField: "11",
+                               hfeLiveField:       "21",
+                               hfeExpiredField:    "31",
+                               hfeMissingField:    "41",
+                               hfeKeeperField:     "40",
+                       })
+                       requireIntArray(t, rdb.Do(ctx, "hpersist", key, 
"FIELDS", 4,
+                               hfePersistentField, hfeLiveField, 
hfeExpiredField, hfeMissingField).Val(), []int64{-1, -1, -1, -1})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 5, 5)
+               })
+
+               t.Run("hmset clears ttl and returns ok", func(t *testing.T) {
+                       key := "hfe-write-hmset-mixed"
+                       createHashFieldStates(t, rdb, ctx, key)
+                       require.True(t, rdb.HMSet(ctx, key,
+                               hfePersistentField, "11",
+                               hfeLiveField, "21",
+                               hfeExpiredField, "31",
+                               hfeMissingField, "41").Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 5, 5)
+                       requireHashValues(t, rdb, ctx, key, map[string]string{
+                               hfePersistentField: "11",
+                               hfeLiveField:       "21",
+                               hfeExpiredField:    "31",
+                               hfeMissingField:    "41",
+                               hfeKeeperField:     "40",
+                       })
+               })
+
+               t.Run("hsetnx writes only missing and expired fields", func(t 
*testing.T) {
+                       key := "hfe-write-hsetnx-mixed"
+                       createHashFieldStates(t, rdb, ctx, key)
+                       require.Equal(t, int64(2), rdb.Do(ctx, "hsetnx", key,
+                               hfePersistentField, "11",
+                               hfeLiveField, "21",
+                               hfeExpiredField, "31",
+                               hfeMissingField, "41").Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 5, 4)
+                       require.Equal(t, "10", rdb.HGet(ctx, key, 
hfePersistentField).Val())
+                       require.Equal(t, "20", rdb.HGet(ctx, key, 
hfeLiveField).Val())
+                       require.Equal(t, "31", rdb.HGet(ctx, key, 
hfeExpiredField).Val())
+                       require.Equal(t, "41", rdb.HGet(ctx, key, 
hfeMissingField).Val())
+                       requireIntArray(t, rdb.Do(ctx, "hpersist", key, 
"FIELDS", 1, hfeLiveField).Val(), []int64{1})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 5, 5)
+               })
+
+               t.Run("hincrby keeps live ttl and ignores expired value", 
func(t *testing.T) {
+                       key := "hfe-write-hincrby-mixed"
+                       createHashFieldStates(t, rdb, ctx, key)
+                       require.Equal(t, int64(15), rdb.HIncrBy(ctx, key, 
hfePersistentField, 5).Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 4, 2)
+                       require.Equal(t, int64(25), rdb.HIncrBy(ctx, key, 
hfeLiveField, 5).Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 4, 2)
+                       require.Equal(t, int64(5), rdb.HIncrBy(ctx, key, 
hfeExpiredField, 5).Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 4, 3)
+                       require.Equal(t, int64(5), rdb.HIncrBy(ctx, key, 
hfeMissingField, 5).Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 5, 4)
+                       requireIntArray(t, rdb.Do(ctx, "hpersist", key, 
"FIELDS", 1, hfeLiveField).Val(), []int64{1})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 5, 5)
+               })
+
+               t.Run("hincrbyfloat keeps live ttl and ignores expired value", 
func(t *testing.T) {
+                       key := "hfe-write-hincrbyfloat-mixed"
+                       createHashFieldStates(t, rdb, ctx, key)
+                       require.Equal(t, 10.5, rdb.HIncrByFloat(ctx, key, 
hfePersistentField, 0.5).Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 4, 2)
+                       require.Equal(t, 20.5, rdb.HIncrByFloat(ctx, key, 
hfeLiveField, 0.5).Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 4, 2)
+                       require.Equal(t, 0.5, rdb.HIncrByFloat(ctx, key, 
hfeExpiredField, 0.5).Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 4, 3)
+                       require.Equal(t, 0.5, rdb.HIncrByFloat(ctx, key, 
hfeMissingField, 0.5).Val())
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 5, 4)
+                       requireIntArray(t, rdb.Do(ctx, "hpersist", key, 
"FIELDS", 1, hfeLiveField).Val(), []int64{1})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 5, 5)
+               })
+       })
+}
+
+func TestHashFieldExpirationSetExpireAcrossFieldStates(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               key := "hfe-hsetexpire-mixed"
+               createHashFieldStates(t, rdb, ctx, key)
+
+               require.Equal(t, "OK", rdb.Do(ctx, "hsetexpire", key, 60,
+                       hfePersistentField, "11",
+                       hfeLiveField, "21",
+                       hfeExpiredField, "31",
+                       hfeMissingField, "41").Val())
+               requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 5, 
5)
+               requireHashValues(t, rdb, ctx, key, map[string]string{
+                       hfePersistentField: "11",
+                       hfeLiveField:       "21",
+                       hfeExpiredField:    "31",
+                       hfeMissingField:    "41",
+                       hfeKeeperField:     "40",
+               })
+               require.Greater(t, rdb.TTL(ctx, key).Val(), time.Duration(0))
+               requireIntArray(t, rdb.Do(ctx, "hpersist", key, "FIELDS", 4,
+                       hfePersistentField, hfeLiveField, hfeExpiredField, 
hfeMissingField).Val(), []int64{-1, -1, -1, -1})
+               requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 5, 
5)
+       })
+}
+
+func TestHashFieldExpirationExpireAndPersistAcrossFieldStates(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               t.Run("hexpire mixed field states", func(t *testing.T) {
+                       key := "hfe-hexpire-mixed"
+                       createHashFieldStates(t, rdb, ctx, key)
+
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 600, 
"FIELDS", 4,
+                               hfePersistentField, hfeLiveField, 
hfeExpiredField, hfeMissingField).Val(), []int64{1, 1, -2, -2})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 3, 1)
+                       require.Equal(t, map[string]string{
+                               hfePersistentField: "10",
+                               hfeLiveField:       "20",
+                               hfeKeeperField:     "40",
+                       }, rdb.HGetAll(ctx, key).Val())
+               })
+
+               t.Run("hexpire nx only persistent", func(t *testing.T) {
+                       key := "hfe-hexpire-nx"
+                       createHashFieldStates(t, rdb, ctx, key)
+
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 600, 
"NX", "FIELDS", 4,
+                               hfePersistentField, hfeLiveField, 
hfeExpiredField, hfeMissingField).Val(), []int64{1, 0, -2, -2})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 3, 1)
+                       requireIntArray(t, rdb.Do(ctx, "hpersist", key, 
"FIELDS", 3,
+                               hfePersistentField, hfeLiveField, 
hfeKeeperField).Val(), []int64{1, 1, -1})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 3, 3)
+               })
+
+               t.Run("hexpire xx only live ttl", func(t *testing.T) {
+                       key := "hfe-hexpire-xx"
+                       createHashFieldStates(t, rdb, ctx, key)
+
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 600, 
"XX", "FIELDS", 4,
+                               hfePersistentField, hfeLiveField, 
hfeExpiredField, hfeMissingField).Val(), []int64{0, 1, -2, -2})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 3, 2)
+                       requireIntArray(t, rdb.Do(ctx, "hpersist", key, 
"FIELDS", 1, hfeLiveField).Val(), []int64{1})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 3, 3)
+               })
+
+               t.Run("hexpire gt and lt compare against current ttl", func(t 
*testing.T) {
+                       key := "hfe-hexpire-gt-lt"
+                       createHashFieldStates(t, rdb, ctx, key)
+
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 
hfeLiveTTLSeconds-60, "GT", "FIELDS", 1, hfeLiveField).Val(),
+                               []int64{0})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 4, 2)
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 
hfeLiveTTLSeconds+600, "GT", "FIELDS", 1, hfeLiveField).Val(),
+                               []int64{1})
+                       afterGT := util.GetKMetadata(t, rdb, ctx, key)
+                       requireHashMetadata(t, afterGT, 4, 2)
+
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 
hfeLiveTTLSeconds+1200, "LT", "FIELDS", 1, hfeLiveField).Val(),
+                               []int64{0})
+                       require.Equal(t, afterGT, util.GetKMetadata(t, rdb, 
ctx, key))
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 
hfeLiveTTLSeconds, "LT", "FIELDS", 1, hfeLiveField).Val(),
+                               []int64{1})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 4, 2)
+               })
+
+               t.Run("hexpire immediate mixed field states", func(t 
*testing.T) {
+                       key := "hfe-hexpire-immediate"
+                       createHashFieldStates(t, rdb, ctx, key)
+
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 0, 
"FIELDS", 4,
+                               hfePersistentField, hfeLiveField, 
hfeExpiredField, hfeMissingField).Val(), []int64{2, 2, -2, -2})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 1, 1)
+                       require.Equal(t, map[string]string{hfeKeeperField: 
"40"}, rdb.HGetAll(ctx, key).Val())
+               })
+
+               t.Run("hpersist mixed field states", func(t *testing.T) {
+                       key := "hfe-hpersist-mixed"
+                       createHashFieldStates(t, rdb, ctx, key)
+
+                       requireIntArray(t, rdb.Do(ctx, "hpersist", key, 
"FIELDS", 4,
+                               hfePersistentField, hfeLiveField, 
hfeExpiredField, hfeMissingField).Val(), []int64{-1, 1, -2, -2})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 3, 3)
+                       require.Equal(t, map[string]string{
+                               hfePersistentField: "10",
+                               hfeLiveField:       "20",
+                               hfeKeeperField:     "40",
+                       }, rdb.HGetAll(ctx, key).Val())
+               })
+       })
+}
+
+func TestHashFieldExpirationOptionsAndDuplicates(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               key := "hfe-options"
+               require.Equal(t, int64(3), rdb.HSet(ctx, key, "a", "1", "b", 
"2", "c", "3").Val())
+
+               requireIntArray(t, rdb.Do(ctx, "hexpire", key, 10, "NX", 
"FIELDS", 2, "a", "a").Val(), []int64{1, 0})
+               requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 3, 
2)
+               requireIntArray(t, rdb.Do(ctx, "hpersist", key, "FIELDS", 2, 
"a", "a").Val(), []int64{1, -1})
+               requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 3, 
3)
+
+               requireIntArray(t, rdb.Do(ctx, "hexpire", key, 0, "GT", 
"FIELDS", 1, "b").Val(), []int64{0})
+               requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 3, 
3)
+               requireIntArray(t, rdb.Do(ctx, "hexpire", key, 0, "LT", 
"FIELDS", 1, "b").Val(), []int64{2})
+               requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 2, 
2)
+
+               requireIntArray(t, rdb.Do(ctx, "hexpire", key, 0, "FIELDS", 2, 
"c", "c").Val(), []int64{2, -2})
+               require.Equal(t, int64(1), rdb.HLen(ctx, key).Val())
+       })
+}
+
+func TestHashFieldExpirationHLenMetadataSize(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               key := "hfe-hlen"
+               require.Equal(t, int64(2), rdb.HSet(ctx, key, "a", "1", "b", 
"2").Val())
+               requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1, "FIELDS", 1, 
"a").Val(), []int64{1})
+               waitHashFieldExpired(t, rdb, ctx, key, "a")
+               require.Equal(t, int64(2), rdb.HLen(ctx, key).Val())
+               require.Equal(t, map[string]string{"b": "2"}, rdb.HGetAll(ctx, 
key).Val())
+       })
+}
+
+func TestHashFieldExpirationLegacyRejectsFieldTTLCommands(t *testing.T) {
+       srv := util.StartServer(t, util.KvrocksServerConfigs{
+               "hash-encoding-mode": "legacy",
+               "resp3-enabled":      "yes",
+       })
+       defer srv.Close()
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       key := "hfe-legacy"
+       require.Equal(t, int64(1), rdb.HSet(ctx, key, "a", "1").Val())
+       require.Error(t, rdb.Do(ctx, "hexpire", key, 10, "FIELDS", 1, 
"a").Err())
+       require.Error(t, rdb.Do(ctx, "hpersist", key, "FIELDS", 1, "a").Err())
+       require.Equal(t, "1", rdb.HGet(ctx, key, "a").Val())
+}
+
+func TestHashFieldExpirationParseErrors(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               key := "hfe-parse"
+               require.Equal(t, int64(1), rdb.HSet(ctx, key, "a", "1").Val())
+
+               for _, test := range []struct {
+                       name        string
+                       args        []interface{}
+                       errContains string
+               }{
+                       {
+                               name:        "hexpire missing fields clause",
+                               args:        []interface{}{"hexpire", key, 10},
+                               errContains: "wrong number of arguments",
+                       },
+                       {
+                               name:        "hexpire missing fields clause 
after option",
+                               args:        []interface{}{"hexpire", key, 10, 
"NX"},
+                               errContains: "wrong number of arguments",
+                       },
+                       {
+                               name:        "hexpire missing numfields",
+                               args:        []interface{}{"hexpire", key, 10, 
"FIELDS"},
+                               errContains: "wrong number of arguments",
+                       },
+                       {
+                               name:        "hexpire numfields is zero",
+                               args:        []interface{}{"hexpire", key, 10, 
"FIELDS", 0, "a"},
+                               errContains: "integer",
+                       },
+                       {
+                               name:        "hexpire numfields is negative",
+                               args:        []interface{}{"hexpire", key, 10, 
"FIELDS", -1, "a"},
+                               errContains: "integer",
+                       },
+                       {
+                               name:        "hexpire numfields is not an 
integer",
+                               args:        []interface{}{"hexpire", key, 10, 
"FIELDS", "not-int", "a"},
+                               errContains: "integer",
+                       },
+                       {
+                               name:        "hexpire numfields is out of 
range",
+                               args:        []interface{}{"hexpire", key, 10, 
"FIELDS", "9223372036854775808", "a"},
+                               errContains: "integer",
+                       },
+                       {
+                               name:        "hexpire has too few fields",
+                               args:        []interface{}{"hexpire", key, 10, 
"FIELDS", 2, "a"},
+                               errContains: "wrong number of arguments",
+                       },
+                       {
+                               name:        "hexpire has too many fields",
+                               args:        []interface{}{"hexpire", key, 10, 
"FIELDS", 1, "a", "b"},
+                               errContains: "wrong number of arguments",
+                       },
+                       {
+                               name:        "hexpire option after fields",
+                               args:        []interface{}{"hexpire", key, 10, 
"FIELDS", 1, "a", "NX"},
+                               errContains: "wrong number of arguments",
+                       },
+                       {
+                               name:        "hexpire unknown option",
+                               args:        []interface{}{"hexpire", key, 10, 
"UNKNOWN", "FIELDS", 1, "a"},
+                               errContains: "syntax",
+                       },
+                       {
+                               name:        "hexpire duplicate option",
+                               args:        []interface{}{"hexpire", key, 10, 
"NX", "NX", "FIELDS", 1, "a"},
+                               errContains: "syntax",
+                       },
+                       {
+                               name:        "hexpire mutually exclusive 
options",
+                               args:        []interface{}{"hexpire", key, 10, 
"NX", "XX", "FIELDS", 1, "a"},
+                               errContains: "syntax",
+                       },
+                       {
+                               name:        "hexpire ttl is not an integer",
+                               args:        []interface{}{"hexpire", key, 
"not-int", "FIELDS", 1, "a"},
+                               errContains: "integer",
+                       },
+                       {
+                               name:        "hexpire ttl is negative",
+                               args:        []interface{}{"hexpire", key, -1, 
"FIELDS", 1, "a"},
+                               errContains: "invalid expire time",
+                       },
+                       {
+                               name:        "hexpire ttl has trailing 
characters",
+                               args:        []interface{}{"hexpire", key, 
"10ms", "FIELDS", 1, "a"},
+                               errContains: "integer",
+                       },
+                       {
+                               name:        "hexpire ttl is out of int64 
range",
+                               args:        []interface{}{"hexpire", key, 
"9223372036854775808", "FIELDS", 1, "a"},
+                               errContains: "integer",
+                       },
+                       {
+                               name:        "hpersist missing fields clause",
+                               args:        []interface{}{"hpersist", key},
+                               errContains: "wrong number of arguments",
+                       },
+                       {
+                               name:        "hpersist wrong fields keyword",
+                               args:        []interface{}{"hpersist", key, 
"FIELD", 1, "a"},
+                               errContains: "syntax",
+                       },
+                       {
+                               name:        "hpersist missing numfields",
+                               args:        []interface{}{"hpersist", key, 
"FIELDS"},
+                               errContains: "wrong number of arguments",
+                       },
+                       {
+                               name:        "hpersist numfields is zero",
+                               args:        []interface{}{"hpersist", key, 
"FIELDS", 0, "a"},
+                               errContains: "integer",
+                       },
+                       {
+                               name:        "hpersist numfields is negative",
+                               args:        []interface{}{"hpersist", key, 
"FIELDS", -1, "a"},
+                               errContains: "integer",
+                       },
+                       {
+                               name:        "hpersist numfields is not an 
integer",
+                               args:        []interface{}{"hpersist", key, 
"FIELDS", "not-int", "a"},
+                               errContains: "integer",
+                       },
+                       {
+                               name:        "hpersist numfields is out of 
range",
+                               args:        []interface{}{"hpersist", key, 
"FIELDS", "9223372036854775808", "a"},
+                               errContains: "integer",
+                       },
+                       {
+                               name:        "hpersist has too few fields",
+                               args:        []interface{}{"hpersist", key, 
"FIELDS", 2, "a"},
+                               errContains: "wrong number of arguments",
+                       },
+                       {
+                               name:        "hpersist has too many fields",
+                               args:        []interface{}{"hpersist", key, 
"FIELDS", 1, "a", "b"},
+                               errContains: "wrong number of arguments",
+                       },
+               } {
+                       t.Run(test.name, func(t *testing.T) {
+                               require.ErrorContains(t, rdb.Do(ctx, 
test.args...).Err(), test.errContains)
+                       })
+               }
+       })
+}
+
+func TestHashFieldExpirationInputCornerCases(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               t.Run("hexpire zero ttl deletes immediately", func(t 
*testing.T) {
+                       key := "hfe-zero-ttl"
+                       require.Equal(t, int64(3), rdb.HSet(ctx, key, "a", "1", 
"b", "2", "keeper", "3").Val())
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 0, 
"FIELDS", 3, "a", "b", "missing").Val(),
+                               []int64{2, 2, -2})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 1, 1)
+                       require.Equal(t, map[string]string{"keeper": "3"}, 
rdb.HGetAll(ctx, key).Val())
+                       require.ErrorIs(t, rdb.HGet(ctx, key, "a").Err(), 
redis.Nil)
+                       require.ErrorIs(t, rdb.HGet(ctx, key, "b").Err(), 
redis.Nil)
+               })
+
+               t.Run("hexpire and hpersist return missing for missing key", 
func(t *testing.T) {
+                       key := "hfe-missing-key"
+                       require.Equal(t, int64(0), rdb.Exists(ctx, key).Val())
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 0, 
"FIELDS", 2, "a", "b").Val(), []int64{-2, -2})
+                       requireIntArray(t, rdb.Do(ctx, "hpersist", key, 
"FIELDS", 2, "a", "b").Val(), []int64{-2, -2})
+                       require.Equal(t, int64(0), rdb.Exists(ctx, key).Val())
+               })
+
+               t.Run("hexpire ttl overflow leaves field and metadata 
unchanged", func(t *testing.T) {
+                       key := "hfe-ttl-overflow"
+                       require.Equal(t, int64(1), rdb.HSet(ctx, key, "a", 
"1").Val())
+                       before := util.GetKMetadata(t, rdb, ctx, key)
+                       require.ErrorContains(t, rdb.Do(ctx, "hexpire", key, 
"9223372036854775807", "FIELDS", 1, "a").Err(),
+                               "overflow")
+                       require.Equal(t, before, util.GetKMetadata(t, rdb, ctx, 
key))
+                       require.Equal(t, "1", rdb.HGet(ctx, key, "a").Val())
+               })
+
+               t.Run("hexpire and hpersist reject wrong type", func(t 
*testing.T) {
+                       key := "hfe-wrong-type"
+                       require.NoError(t, rdb.Set(ctx, key, "value", 0).Err())
+                       require.ErrorContains(t, rdb.Do(ctx, "hexpire", key, 
10, "FIELDS", 1, "a").Err(), "WRONGTYPE")
+                       require.ErrorContains(t, rdb.Do(ctx, "hpersist", key, 
"FIELDS", 1, "a").Err(), "WRONGTYPE")
+                       require.Equal(t, "value", rdb.Get(ctx, key).Val())
+               })
+
+               t.Run("keywords and command name are case insensitive", func(t 
*testing.T) {
+                       key := "hfe-case-insensitive"
+                       require.Equal(t, int64(2), rdb.HSet(ctx, key, "a", "1", 
"b", "2").Val())
+                       requireIntArray(t, rdb.Do(ctx, "hExPiRe", key, 60, 
"nX", "fIeLdS", 1, "a").Val(), []int64{1})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 2, 1)
+                       requireIntArray(t, rdb.Do(ctx, "hPeRsIsT", key, 
"fIeLdS", 1, "a").Val(), []int64{1})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 2, 2)
+               })
+
+               t.Run("empty field name is valid", func(t *testing.T) {
+                       key := "hfe-empty-field"
+                       require.Equal(t, int64(2), rdb.HSet(ctx, key, "", 
"empty", "normal", "value").Val())
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 60, 
"FIELDS", 1, "").Val(), []int64{1})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 2, 1)
+                       requireIntArray(t, rdb.Do(ctx, "hpersist", key, 
"FIELDS", 1, "").Val(), []int64{1})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 2, 2)
+                       requireIntArray(t, rdb.Do(ctx, "hexpire", key, 0, 
"FIELDS", 1, "").Val(), []int64{2})
+                       requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, 
key), 1, 1)
+                       require.Equal(t, map[string]string{"normal": "value"}, 
rdb.HGetAll(ctx, key).Val())
+               })
+       })
+}
+
+func TestHashFieldExpirationReadCommandSet(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               key := "hfe-read-command-set"
+               require.Equal(t, int64(4), rdb.HSet(ctx, key, "a", "1", "b", 
"2", "c", "3", "d", "4").Val())
+               requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1, "FIELDS", 2, 
"a", "c").Val(), []int64{1, 1})
+               waitHashFieldExpired(t, rdb, ctx, key, "a")
+               waitHashFieldExpired(t, rdb, ctx, key, "c")
+
+               keys := rdb.HKeys(ctx, key).Val()
+               sort.Strings(keys)
+               require.Equal(t, []string{"b", "d"}, keys)
+               require.ElementsMatch(t, []string{"2", "4"}, rdb.HVals(ctx, 
key).Val())
+               require.Equal(t, []interface{}{"b", "2", "d", "4"}, rdb.Do(ctx, 
"hrangebylex", key, "[a", "[z").Val())
+               require.Equal(t, []interface{}{"d", "4"}, rdb.Do(ctx, 
"hrangebylex", key, "[a", "[z", "LIMIT", 1, 1).Val())
+       })
+}
+
+func TestHashFieldExpirationRandFieldAllExpired(t *testing.T) {
+       runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx 
context.Context) {
+               key := "hfe-rand-all-expired"
+               require.Equal(t, int64(2), rdb.HSet(ctx, key, "a", "1", "b", 
"2").Val())
+               requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1, "FIELDS", 2, 
"a", "b").Val(), []int64{1, 1})
+               waitHashFieldExpired(t, rdb, ctx, key, "a")
+               waitHashFieldExpired(t, rdb, ctx, key, "b")
+
+               require.Nil(t, rdb.Do(ctx, "hrandfield", key).Val())
+               require.Equal(t, []interface{}{}, rdb.Do(ctx, "hrandfield", 
key, 10).Val())
+               requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 2, 
0)
+       })
+}
diff --git a/tests/gocase/util/kmetadata.go b/tests/gocase/util/kmetadata.go
new file mode 100644
index 000000000..892198dc3
--- /dev/null
+++ b/tests/gocase/util/kmetadata.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package util
+
+import (
+       "context"
+       "fmt"
+       "testing"
+
+       "github.com/redis/go-redis/v9"
+       "github.com/stretchr/testify/require"
+)
+
+type KMetadataResponse struct {
+       Expire  int64
+       Size    int64
+       Type    string
+       Flags   int64
+       Version int64
+       Mode    string
+       Format  string
+       Persist int64
+       Lower   int64
+       Upper   int64
+       Head    int64
+       Tail    int64
+}
+
+func kMetadataToInt64(val interface{}) (int64, error) {
+       switch v := val.(type) {
+       case int64:
+               return v, nil
+       case int:
+               return int64(v), nil
+       case float64:
+               return int64(v), nil
+       default:
+               return 0, fmt.Errorf("value is not a number, got %T", val)
+       }
+}
+
+func ExtractKMetadataResponse(result interface{}) (*KMetadataResponse, error) {
+       resultMap, ok := result.(map[interface{}]interface{})
+       if !ok {
+               return nil, fmt.Errorf("expected map[interface{}]interface{}, 
got %T", result)
+       }
+
+       response := &KMetadataResponse{}
+       for field, target := range map[string]*int64{
+               "expire":  &response.Expire,
+               "size":    &response.Size,
+               "flags":   &response.Flags,
+               "version": &response.Version,
+               "persist": &response.Persist,
+               "lower":   &response.Lower,
+               "upper":   &response.Upper,
+               "head":    &response.Head,
+               "tail":    &response.Tail,
+       } {
+               if val, ok := resultMap[field]; ok {
+                       converted, err := kMetadataToInt64(val)
+                       if err != nil {
+                               return nil, fmt.Errorf("%s: %v", field, err)
+                       }
+                       *target = converted
+               }
+       }
+
+       for field, target := range map[string]*string{
+               "type":   &response.Type,
+               "mode":   &response.Mode,
+               "format": &response.Format,
+       } {
+               if val, ok := resultMap[field]; ok {
+                       strVal, ok := val.(string)
+                       if !ok {
+                               return nil, fmt.Errorf("%s is not a string, got 
%T", field, val)
+                       }
+                       *target = strVal
+               }
+       }
+
+       return response, nil
+}
+
+func GetKMetadata(t testing.TB, rdb *redis.Client, ctx context.Context, key 
string) KMetadataResponse {
+       t.Helper()
+
+       result, err := rdb.Do(ctx, "kmetadata", key).Result()
+       require.NoError(t, err)
+       meta, err := ExtractKMetadataResponse(result)
+       require.NoError(t, err)
+       return *meta
+}

Reply via email to