This is an automated email from the ASF dual-hosted git repository.
PragmaTwice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 5264d5922 feat(hash): implement hlen scan repair (#3501)
5264d5922 is described below
commit 5264d5922189cecd1371d2a713244e6a7891950b
Author: Twice <[email protected]>
AuthorDate: Tue May 26 18:31:12 2026 +0800
feat(hash): implement hlen scan repair (#3501)
Implements the HLEN fast-path algorithms and SCAN_AND_REPAIR lazy
deletion for hash field expiration, including APPROX/REPAIR handling and
coverage for expired physical fields and compaction ghosts.
Follows the HFE proposal in #3432 and addresses the HLEN/SCAN_AND_REPAIR
task from tracking issue #3436.
Assisted-by: Codex/GPT5.5 xhigh
---
src/commands/cmd_hash.cc | 42 +++-
src/types/redis_hash.cc | 106 +++++++++-
src/types/redis_hash.h | 3 +
tests/cppunit/types/hash_test.cc | 84 ++++++++
tests/gocase/unit/type/hash/hash_hfe_test.go | 279 ++++++++++++++++++++++++++-
5 files changed, 502 insertions(+), 12 deletions(-)
diff --git a/src/commands/cmd_hash.cc b/src/commands/cmd_hash.cc
index 1b2d86965..a8489b3ca 100644
--- a/src/commands/cmd_hash.cc
+++ b/src/commands/cmd_hash.cc
@@ -19,6 +19,7 @@
*/
#include <limits>
+#include <optional>
#include "commander.h"
#include "commands/command_parser.h"
@@ -53,6 +54,22 @@ Status ParseHashFieldListTail(Parser &parser,
std::vector<std::string> *fields)
return Status::OK();
}
+uint64_t GenerateHLenFlags(uint64_t flags, const std::vector<std::string>
&args, const Config &config) {
+ bool needs_repair = false;
+ if (args.size() == 2) {
+ needs_repair = config.hash_encoding_mode ==
HashSubkeyEncodingMode::kFieldExpiration &&
+ config.hash_length_mode == HashLengthMode::kAccurate;
+ } else if (args.size() == 3) {
+ needs_repair = util::EqualICase(args[2], "REPAIR");
+ }
+
+ if (!needs_repair) {
+ return flags;
+ }
+
+ return (flags | kCmdWrite | kCmdNoDBSizeCheck) & ~kCmdReadOnly;
+}
+
} // namespace
class CommandHGet : public Commander {
@@ -155,11 +172,29 @@ class CommandHExists : public Commander {
class CommandHLen : public Commander {
public:
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() == 2) {
+ length_mode_ = std::nullopt;
+ return Commander::Parse(args);
+ }
+ if (args.size() != 3) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+ if (util::EqualICase(args[2], "APPROX")) {
+ length_mode_ = HashLengthMode::kApproximate;
+ } else if (util::EqualICase(args[2], "REPAIR")) {
+ length_mode_ = HashLengthMode::kAccurate;
+ } else {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+ return Commander::Parse(args);
+ }
+
Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
uint64_t count = 0;
redis::Hash hash_db(srv->storage, conn->GetNamespace());
- auto s = hash_db.Size(ctx, args_[1], &count);
+ auto s = length_mode_ ? hash_db.Size(ctx, args_[1], &count, *length_mode_)
: hash_db.Size(ctx, args_[1], &count);
if (!s.ok() && !s.IsNotFound()) {
return {Status::RedisExecErr, s.ToString()};
}
@@ -167,6 +202,9 @@ class CommandHLen : public Commander {
*output = s.IsNotFound() ? redis::Integer(0) : redis::Integer(count);
return Status::OK();
}
+
+ private:
+ std::optional<HashLengthMode> length_mode_;
};
class CommandHIncrBy : public Commander {
@@ -639,7 +677,7 @@ REDIS_REGISTER_COMMANDS(Hash,
MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1
MakeCmdAttr<CommandHDel>("hdel", -3, "write
no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandHStrlen>("hstrlen", 3, "read-only",
1, 1, 1),
MakeCmdAttr<CommandHExists>("hexists", 3, "read-only",
1, 1, 1),
- MakeCmdAttr<CommandHLen>("hlen", 2, "read-only", 1, 1,
1),
+ MakeCmdAttr<CommandHLen>("hlen", -2, "read-only", 1,
1, 1, GenerateHLenFlags),
MakeCmdAttr<CommandHMGet>("hmget", -3, "read-only", 1,
1, 1),
MakeCmdAttr<CommandHMSet>("hmset", -4, "write", 1, 1,
1),
MakeCmdAttr<CommandHKeys>("hkeys", 2, "read-only
slow", 1, 1, 1),
diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc
index a6bbea72b..2bf8ff5e4 100644
--- a/src/types/redis_hash.cc
+++ b/src/types/redis_hash.cc
@@ -189,13 +189,117 @@ rocksdb::Status Hash::decodeValue(const HashMetadata
&metadata, Slice *value, ui
}
rocksdb::Status Hash::Size(engine::Context &ctx, const Slice &user_key,
uint64_t *size) {
+ return Size(ctx, user_key, size, storage_->GetConfig()->hash_length_mode);
+}
+
+rocksdb::Status Hash::Size(engine::Context &ctx, const Slice &user_key,
uint64_t *size, HashLengthMode length_mode) {
*size = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
HashMetadata metadata(false);
rocksdb::Status s = getMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
- *size = metadata.size;
+
+ if (metadata.IsLegacySubkeyEncoding() || length_mode ==
HashLengthMode::kApproximate) {
+ *size = metadata.size;
+ return rocksdb::Status::OK();
+ }
+
+ if (metadata.persist > metadata.size) {
+ return scanAndRepair(ctx, ns_key, &metadata, util::GetTimeStampMS(), size);
+ }
+
+ uint64_t ttl_candidates = metadata.size - metadata.persist;
+ if (ttl_candidates == 0) {
+ *size = metadata.size;
+ return rocksdb::Status::OK();
+ }
+
+ uint64_t now = util::GetTimeStampMS();
+ if (metadata.lower != 0 && now < metadata.lower) {
+ *size = metadata.size;
+ return rocksdb::Status::OK();
+ }
+ if (metadata.upper != 0 && now > metadata.upper && metadata.persist == 0) {
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisHash);
+ s = batch->PutLogData(log_data.Encode());
+ if (!s.ok()) return s;
+ s = batch->Delete(metadata_cf_handle_, ns_key);
+ if (!s.ok()) return s;
+ s = storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+ if (!s.ok()) return s;
+ *size = 0;
+ return rocksdb::Status::OK();
+ }
+
+ return scanAndRepair(ctx, ns_key, &metadata, now, size);
+}
+
+rocksdb::Status Hash::scanAndRepair(engine::Context &ctx, const Slice &ns_key,
HashMetadata *metadata, uint64_t now,
+ uint64_t *size) {
+ *size = metadata->size;
+ if (!metadata->IsFieldExpirationEncoding()) {
+ return rocksdb::Status::OK();
+ }
+
+ HashMetadata repaired_metadata = *metadata;
+ repaired_metadata.size = 0;
+ repaired_metadata.persist = 0;
+ repaired_metadata.lower = 0;
+ repaired_metadata.upper = 0;
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisHash);
+ auto s = batch->PutLogData(log_data.Encode());
+ if (!s.ok()) return s;
+
+ std::string prefix_key = InternalKey(ns_key, "", metadata->version,
storage_->IsSlotIdEncoded()).Encode();
+ std::string next_version_prefix_key =
+ InternalKey(ns_key, "", metadata->version + 1,
storage_->IsSlotIdEncoded()).Encode();
+
+ rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+ rocksdb::Slice lower_bound(prefix_key);
+ rocksdb::Slice upper_bound(next_version_prefix_key);
+ read_options.iterate_lower_bound = &lower_bound;
+ read_options.iterate_upper_bound = &upper_bound;
+
+ auto iter = util::UniqueIterator(ctx, read_options);
+ for (iter->Seek(prefix_key); iter->Valid() &&
iter->key().starts_with(prefix_key); iter->Next()) {
+ HashFieldState state;
+ s = DecodeFieldState(*metadata, Slice(iter->value()), now, &state);
+ if (!s.ok()) return s;
+ if (state.kind == HashFieldStateKind::kExpiredTTLPhysical) {
+ s = batch->Delete(iter->key());
+ if (!s.ok()) return s;
+ continue;
+ }
+ repaired_metadata.size += 1;
+ if (state.kind == HashFieldStateKind::kPersistent) {
+ repaired_metadata.persist += 1;
+ } else if (state.kind == HashFieldStateKind::kLiveTTL) {
+ if (repaired_metadata.lower == 0 || state.expire <
repaired_metadata.lower) {
+ repaired_metadata.lower = state.expire;
+ }
+ repaired_metadata.upper = std::max(repaired_metadata.upper,
state.expire);
+ }
+ }
+ s = iter->status();
+ if (!s.ok()) return s;
+
+ *size = repaired_metadata.size;
+ if (repaired_metadata.size == 0) {
+ s = batch->Delete(metadata_cf_handle_, ns_key);
+ } else {
+ std::string bytes;
+ repaired_metadata.Encode(&bytes);
+ s = batch->Put(metadata_cf_handle_, ns_key, bytes);
+ }
+ if (!s.ok()) return s;
+
+ s = storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+ if (!s.ok()) return s;
+ *metadata = repaired_metadata;
return rocksdb::Status::OK();
}
diff --git a/src/types/redis_hash.h b/src/types/redis_hash.h
index d18906aa7..ad6c94e2c 100644
--- a/src/types/redis_hash.h
+++ b/src/types/redis_hash.h
@@ -54,6 +54,7 @@ class Hash : public SubKeyScanner {
Hash(engine::Storage *storage, const std::string &ns) :
SubKeyScanner(storage, ns) {}
rocksdb::Status Size(engine::Context &ctx, const Slice &user_key, uint64_t
*size);
+ rocksdb::Status Size(engine::Context &ctx, const Slice &user_key, uint64_t
*size, HashLengthMode length_mode);
rocksdb::Status Get(engine::Context &ctx, const Slice &user_key, const Slice
&field, std::string *value);
rocksdb::Status Set(engine::Context &ctx, const Slice &user_key, const Slice
&field, const Slice &value,
uint64_t *added_cnt);
@@ -87,6 +88,8 @@ class Hash : public SubKeyScanner {
rocksdb::Status getMetadata(engine::Context &ctx, const Slice &ns_key,
HashMetadata *metadata);
rocksdb::Status getRawValue(engine::Context &ctx, const std::string
&sub_key, std::string *value);
static rocksdb::Status decodeValue(const HashMetadata &metadata, Slice
*value, uint64_t *expire = nullptr);
+ rocksdb::Status scanAndRepair(engine::Context &ctx, const Slice &ns_key,
HashMetadata *metadata, uint64_t now,
+ uint64_t *size);
friend struct FieldValueRetriever;
};
diff --git a/tests/cppunit/types/hash_test.cc b/tests/cppunit/types/hash_test.cc
index 4332d5d90..ddfd684e9 100644
--- a/tests/cppunit/types/hash_test.cc
+++ b/tests/cppunit/types/hash_test.cc
@@ -611,6 +611,90 @@ TEST_F(RedisHashFieldExpirationEncodingTest,
CompactionGhostDoesNotDecrementMeta
EXPECT_EQ(metadata.upper, before.upper);
}
+TEST_F(RedisHashFieldExpirationEncodingTest,
SizeRepairsExpiredPhysicalAndGhostMetadata) {
+ const Slice key = "hfe-size-repair";
+ uint64_t ret = 0;
+ auto s = hash_->MSet(*ctx_, key, {{"persistent", "1"}, {"live", "2"},
{"expired", "3"}, {"ghost", "4"}}, false, &ret);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ ASSERT_EQ(ret, 4);
+
+ std::vector<int64_t> results;
+ uint64_t now = util::GetTimeStampMS();
+ uint64_t live_expire = now + 60'000;
+ uint64_t expired_at = now - 1;
+ s = hash_->ExpireFields(*ctx_, key, {"live"}, live_expire,
HashFieldExpireCondition::kNone, &results);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ s = hash_->ExpireFields(*ctx_, key, {"ghost"}, live_expire + 60'000,
HashFieldExpireCondition::kNone, &results);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+
+ HashMetadata metadata = hashMetadata(key.ToString());
+ s = putRawHashValue(key.ToString(), "expired", expired_at, "3");
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ s = deleteRawHashValue(key.ToString(), "ghost");
+ ASSERT_TRUE(s.ok()) << s.ToString();
+
+ metadata.size = 4;
+ metadata.persist = 1;
+ metadata.lower = expired_at;
+ metadata.upper = live_expire + 60'000;
+ s = putHashMetadata(key.ToString(), metadata);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+
+ s = hash_->Size(*ctx_, key, &ret);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ EXPECT_EQ(ret, 2);
+
+ metadata = hashMetadata(key.ToString());
+ EXPECT_EQ(metadata.size, 2);
+ EXPECT_EQ(metadata.persist, 1);
+ EXPECT_EQ(metadata.lower, live_expire);
+ EXPECT_EQ(metadata.upper, live_expire);
+
+ std::string value;
+ s = hash_->Get(*ctx_, key, "expired", &value);
+ EXPECT_TRUE(s.IsNotFound());
+ s = hash_->Get(*ctx_, key, "ghost", &value);
+ EXPECT_TRUE(s.IsNotFound());
+ s = hash_->Get(*ctx_, key, "persistent", &value);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ EXPECT_EQ(value, "1");
+ s = hash_->Get(*ctx_, key, "live", &value);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ EXPECT_EQ(value, "2");
+}
+
+TEST_F(RedisHashFieldExpirationEncodingTest,
SizeDeletesHashWhenAllTtlCandidatesExpired) {
+ const Slice key = "hfe-size-delete-all-expired";
+ uint64_t ret = 0;
+ auto s = hash_->MSet(*ctx_, key, {{"a", "1"}, {"b", "2"}}, false, &ret);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ ASSERT_EQ(ret, 2);
+
+ uint64_t now = util::GetTimeStampMS();
+ uint64_t lower = now - 2'000;
+ uint64_t upper = now - 1'000;
+ s = putRawHashValue(key.ToString(), "a", lower, "1");
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ s = putRawHashValue(key.ToString(), "b", upper, "2");
+ ASSERT_TRUE(s.ok()) << s.ToString();
+
+ HashMetadata metadata = hashMetadata(key.ToString());
+ metadata.size = 2;
+ metadata.persist = 0;
+ metadata.lower = lower;
+ metadata.upper = upper;
+ s = putHashMetadata(key.ToString(), metadata);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+
+ s = hash_->Size(*ctx_, key, &ret);
+ ASSERT_TRUE(s.ok()) << s.ToString();
+ EXPECT_EQ(ret, 0);
+
+ metadata = HashMetadata(false);
+ s = getHashMetadata(key.ToString(), &metadata);
+ EXPECT_TRUE(s.IsNotFound());
+}
+
TEST_F(RedisHashFieldExpirationEncodingTest,
DeleteHandlesPersistentLiveExpiredMissingAndDuplicateFields) {
const Slice key = "hfe-delete-state-matrix";
uint64_t ret = 0;
diff --git a/tests/gocase/unit/type/hash/hash_hfe_test.go
b/tests/gocase/unit/type/hash/hash_hfe_test.go
index e53217990..7484e2cce 100644
--- a/tests/gocase/unit/type/hash/hash_hfe_test.go
+++ b/tests/gocase/unit/type/hash/hash_hfe_test.go
@@ -44,10 +44,20 @@ const (
func runWithFieldExpirationHash(t *testing.T, fn func(t *testing.T, rdb
*redis.Client, ctx context.Context)) {
t.Helper()
- srv := util.StartServer(t, util.KvrocksServerConfigs{
+ runWithFieldExpirationHashConfigs(t, nil, fn)
+}
+
+func runWithFieldExpirationHashConfigs(t *testing.T, configs
util.KvrocksServerConfigs, fn func(t *testing.T, rdb *redis.Client, ctx
context.Context)) {
+ t.Helper()
+
+ serverConfigs := util.KvrocksServerConfigs{
"hash-encoding-mode": "field-expiration",
"resp3-enabled": "yes",
- })
+ }
+ for k, v := range configs {
+ serverConfigs[k] = v
+ }
+ srv := util.StartServer(t, serverConfigs)
defer srv.Close()
ctx := context.Background()
@@ -73,13 +83,25 @@ func requireHashMetadata(t *testing.T, meta
util.KMetadataResponse, size, persis
}
}
+func requireHLenCommandInfoFlags(t *testing.T, rdb *redis.Client, ctx
context.Context, want []interface{}) {
+ t.Helper()
+
+ info, err := rdb.Do(ctx, "command", "info", "hlen").Slice()
+ require.NoError(t, err)
+ require.Len(t, info, 1)
+ hlenInfo := info[0].([]interface{})
+ require.Len(t, hlenInfo, 6)
+ require.Equal(t, "hlen", hlenInfo[0])
+ require.Equal(t, want, hlenInfo[2])
+}
+
func waitHashFieldExpired(t *testing.T, rdb *redis.Client, ctx
context.Context, key, field string) {
t.Helper()
require.Eventually(t, func() bool {
err := rdb.HGet(ctx, key, field).Err()
return errors.Is(err, redis.Nil)
- }, 3*time.Second, 50*time.Millisecond)
+ }, 5*time.Second, 50*time.Millisecond)
}
func requireIntArray(t *testing.T, got interface{}, want []int64) {
@@ -191,8 +213,6 @@ func
TestHashFieldExpirationFiltersReadsWithoutMutatingMetadata(t *testing.T) {
require.NotContains(t, rangeByLex, "a")
randField := rdb.HRandField(ctx, key, 10).Val()
require.NotContains(t, randField, "a")
- require.Equal(t, int64(3), rdb.HLen(ctx, key).Val())
-
after := util.GetKMetadata(t, rdb, ctx, key)
require.Equal(t, before, after)
})
@@ -282,7 +302,6 @@ func TestHashFieldExpirationReadCommandsAcrossFieldStates(t
*testing.T) {
require.Equal(t, []interface{}{"10", "20", nil, nil},
rdb.HMGet(ctx, key, hfePersistentField, hfeLiveField,
hfeExpiredField, hfeMissingField).Val())
- require.Equal(t, int64(4), rdb.HLen(ctx, key).Val())
require.Equal(t, map[string]string{
hfePersistentField: "10",
@@ -573,17 +592,259 @@ func TestHashFieldExpirationOptionsAndDuplicates(t
*testing.T) {
})
}
-func TestHashFieldExpirationHLenMetadataSize(t *testing.T) {
+func TestHashFieldExpirationHLenFastPathAndRepair(t *testing.T) {
runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx
context.Context) {
- key := "hfe-hlen"
+ key := "hfe-hlen-repair"
require.Equal(t, int64(2), rdb.HSet(ctx, key, "a", "1", "b",
"2").Val())
requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1, "FIELDS", 1,
"a").Val(), []int64{1})
- waitHashFieldExpired(t, rdb, ctx, key, "a")
+ before := util.GetKMetadata(t, rdb, ctx, key)
+ requireHashMetadata(t, before, 2, 1)
+
require.Equal(t, int64(2), rdb.HLen(ctx, key).Val())
+ require.Equal(t, before, util.GetKMetadata(t, rdb, ctx, key))
+
+ waitHashFieldExpired(t, rdb, ctx, key, "a")
+ require.Equal(t, int64(2), rdb.Do(ctx, "hlen", key,
"APPROX").Val())
+ require.Equal(t, before, util.GetKMetadata(t, rdb, ctx, key))
+ require.Equal(t, int64(1), rdb.HLen(ctx, key).Val())
+ requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 1,
1)
require.Equal(t, map[string]string{"b": "2"}, rdb.HGetAll(ctx,
key).Val())
+
+ requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1, "FIELDS", 1,
"b").Val(), []int64{1})
+ waitHashFieldExpired(t, rdb, ctx, key, "b")
+ require.Equal(t, int64(0), rdb.Do(ctx, "hlen", key,
"REPAIR").Val())
+ require.Equal(t, int64(0), rdb.Exists(ctx, key).Val())
+ require.Error(t, rdb.Do(ctx, "kmetadata", key).Err())
+ })
+}
+
+func TestHashFieldExpirationHLenApproximateConfig(t *testing.T) {
+ runWithFieldExpirationHashConfigs(t, util.KvrocksServerConfigs{
+ "hash-length-mode": "approximate",
+ }, func(t *testing.T, rdb *redis.Client, ctx context.Context) {
+ key := "hfe-hlen-approx-config"
+ require.Equal(t, int64(2), rdb.HSet(ctx, key, "a", "1", "b",
"2").Val())
+ requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1, "FIELDS", 1,
"a").Val(), []int64{1})
+ before := util.GetKMetadata(t, rdb, ctx, key)
+ waitHashFieldExpired(t, rdb, ctx, key, "a")
+
+ require.Equal(t, int64(2), rdb.HLen(ctx, key).Val())
+ require.Equal(t, before, util.GetKMetadata(t, rdb, ctx, key))
+ require.Equal(t, int64(1), rdb.Do(ctx, "hlen", key,
"REPAIR").Val())
+ requireHashMetadata(t, util.GetKMetadata(t, rdb, ctx, key), 1,
1)
+ })
+}
+
+func TestHashFieldExpirationHLenProposalFastPathTimeline(t *testing.T) {
+ runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx
context.Context) {
+ key := "hfe-hlen-proposal-timeline"
+ require.Equal(t, int64(2), rdb.HSet(ctx, key, "field1",
"value1", "field2", "value2").Val())
+ requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1, "FIELDS", 1,
"field1").Val(), []int64{1})
+ requireIntArray(t, rdb.Do(ctx, "hexpire", key, 4, "FIELDS", 1,
"field2").Val(), []int64{1})
+
+ initial := util.GetKMetadata(t, rdb, ctx, key)
+ requireHashMetadata(t, initial, 2, 0)
+ require.Less(t, initial.Lower, initial.Upper)
+ require.Equal(t, int64(2), rdb.Do(ctx, "hlen", key,
"APPROX").Val())
+ require.Equal(t, int64(2), rdb.HLen(ctx, key).Val())
+ require.Equal(t, initial, util.GetKMetadata(t, rdb, ctx, key))
+
+ waitHashFieldExpired(t, rdb, ctx, key, "field1")
+ require.Equal(t, "value2", rdb.HGet(ctx, key, "field2").Val())
+ require.Equal(t, int64(2), rdb.Do(ctx, "hlen", key,
"APPROX").Val())
+ require.Equal(t, initial, util.GetKMetadata(t, rdb, ctx, key))
+
+ require.Equal(t, int64(1), rdb.HLen(ctx, key).Val())
+ afterRepair := util.GetKMetadata(t, rdb, ctx, key)
+ requireHashMetadata(t, afterRepair, 1, 0)
+ require.Equal(t, initial.Upper, afterRepair.Lower)
+ require.Equal(t, initial.Upper, afterRepair.Upper)
+ require.Equal(t, int64(1), rdb.Do(ctx, "hlen", key,
"APPROX").Val())
+
+ require.Equal(t, int64(1), rdb.HLen(ctx, key).Val())
+ require.Equal(t, afterRepair, util.GetKMetadata(t, rdb, ctx,
key))
+
+ waitHashFieldExpired(t, rdb, ctx, key, "field2")
+ require.Equal(t, int64(1), rdb.Do(ctx, "hlen", key,
"APPROX").Val())
+ require.Equal(t, afterRepair, util.GetKMetadata(t, rdb, ctx,
key))
+ require.Equal(t, int64(0), rdb.HLen(ctx, key).Val())
+ require.Equal(t, int64(0), rdb.Exists(ctx, key).Val())
+ require.Error(t, rdb.Do(ctx, "kmetadata", key).Err())
+ })
+}
+
+func TestHashFieldExpirationHLenMetadataEffectsByPath(t *testing.T) {
+ runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx
context.Context) {
+ t.Run("no ttl candidates fast path does not mutate metadata",
func(t *testing.T) {
+ key := "hfe-hlen-effect-persistent"
+ require.Equal(t, int64(2), rdb.HSet(ctx, key, "a", "1",
"b", "2").Val())
+ before := util.GetKMetadata(t, rdb, ctx, key)
+ requireHashMetadata(t, before, 2, 2)
+
+ require.Equal(t, int64(2), rdb.Do(ctx, "hlen", key,
"APPROX").Val())
+ require.Equal(t, int64(2), rdb.HLen(ctx, key).Val())
+ require.Equal(t, before, util.GetKMetadata(t, rdb, ctx,
key))
+ })
+
+ t.Run("future ttl lower bound fast path does not mutate
metadata", func(t *testing.T) {
+ key := "hfe-hlen-effect-future"
+ require.Equal(t, int64(2), rdb.HSet(ctx, key, "ttl",
"1", "persist", "2").Val())
+ requireIntArray(t, rdb.Do(ctx, "hexpire", key, 300,
"FIELDS", 1, "ttl").Val(), []int64{1})
+ before := util.GetKMetadata(t, rdb, ctx, key)
+ requireHashMetadata(t, before, 2, 1)
+
+ require.Equal(t, int64(2), rdb.Do(ctx, "hlen", key,
"APPROX").Val())
+ require.Equal(t, int64(2), rdb.HLen(ctx, key).Val())
+ require.Equal(t, before, util.GetKMetadata(t, rdb, ctx,
key))
+ })
+
+ t.Run("slow repair removes expired ttl candidates and rewrites
metadata", func(t *testing.T) {
+ key := "hfe-hlen-effect-repair"
+ require.Equal(t, int64(3), rdb.HSet(ctx, key,
"persist", "1", "expired", "2", "live", "3").Val())
+ requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1,
"FIELDS", 1, "expired").Val(), []int64{1})
+ requireIntArray(t, rdb.Do(ctx, "hexpire", key, 300,
"FIELDS", 1, "live").Val(), []int64{1})
+ before := util.GetKMetadata(t, rdb, ctx, key)
+ requireHashMetadata(t, before, 3, 1)
+ require.Less(t, before.Lower, before.Upper)
+ waitHashFieldExpired(t, rdb, ctx, key, "expired")
+
+ require.Equal(t, int64(3), rdb.Do(ctx, "hlen", key,
"APPROX").Val())
+ require.Equal(t, before, util.GetKMetadata(t, rdb, ctx,
key))
+ require.Equal(t, int64(2), rdb.HLen(ctx, key).Val())
+ afterRepair := util.GetKMetadata(t, rdb, ctx, key)
+ requireHashMetadata(t, afterRepair, 2, 1)
+ require.Equal(t, before.Upper, afterRepair.Lower)
+ require.Equal(t, before.Upper, afterRepair.Upper)
+ require.Equal(t, int64(2), rdb.Do(ctx, "hlen", key,
"APPROX").Val())
+ require.Equal(t, map[string]string{"persist": "1",
"live": "3"}, rdb.HGetAll(ctx, key).Val())
+ })
+
+ t.Run("all ttl candidates expired fast delete removes
metadata", func(t *testing.T) {
+ key := "hfe-hlen-effect-fast-delete"
+ require.Equal(t, int64(2), rdb.HSet(ctx, key, "a", "1",
"b", "2").Val())
+ requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1,
"FIELDS", 2, "a", "b").Val(), []int64{1, 1})
+ before := util.GetKMetadata(t, rdb, ctx, key)
+ requireHashMetadata(t, before, 2, 0)
+ waitHashFieldExpired(t, rdb, ctx, key, "a")
+ waitHashFieldExpired(t, rdb, ctx, key, "b")
+
+ require.Equal(t, int64(2), rdb.Do(ctx, "hlen", key,
"APPROX").Val())
+ require.Equal(t, before, util.GetKMetadata(t, rdb, ctx,
key))
+ require.Equal(t, int64(0), rdb.HLen(ctx, key).Val())
+ require.Equal(t, int64(0), rdb.Exists(ctx, key).Val())
+ require.Error(t, rdb.Do(ctx, "kmetadata", key).Err())
+ })
+ })
+}
+
+func TestHashFieldExpirationHLenParseErrors(t *testing.T) {
+ runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx
context.Context) {
+ key := "hfe-hlen-parse"
+ require.Equal(t, int64(1), rdb.HSet(ctx, key, "a", "1").Val())
+
+ require.ErrorContains(t, rdb.Do(ctx, "hlen", key, "BAD").Err(),
"syntax")
+ require.ErrorContains(t, rdb.Do(ctx, "hlen", key, "APPROX",
"REPAIR").Err(), "wrong number")
+ })
+}
+
+func TestHashFieldExpirationHLenReadonlyAndRepairFlags(t *testing.T) {
+ runWithFieldExpirationHash(t, func(t *testing.T, rdb *redis.Client, ctx
context.Context) {
+ requireHLenCommandInfoFlags(t, rdb, ctx,
[]interface{}{"readonly"})
+
+ key := "hfe-hlen-flags"
+ require.Equal(t, int64(2), rdb.HSet(ctx, key, "a", "1", "b",
"2").Val())
+ requireIntArray(t, rdb.Do(ctx, "hexpire", key, 1, "FIELDS", 1,
"a").Val(), []int64{1})
+ waitHashFieldExpired(t, rdb, ctx, key, "a")
+
+ require.ErrorContains(t,
+ rdb.Do(ctx, "eval_ro", `return redis.call('hlen',
KEYS[1])`, 1, key).Err(),
+ "Write commands are not allowed from read-only scripts")
+ require.Equal(t, int64(2), rdb.Do(ctx, "eval_ro", `return
redis.call('hlen', KEYS[1], 'APPROX')`, 1, key).Val())
+ require.ErrorContains(t,
+ rdb.Do(ctx, "eval_ro", `return redis.call('hlen',
KEYS[1], 'REPAIR')`, 1, key).Err(),
+ "Write commands are not allowed from read-only scripts")
})
}
+func TestHashFieldExpirationHLenAccurateConfigDynamicFlagsInEvalRO(t
*testing.T) {
+ runWithFieldExpirationHashConfigs(t, util.KvrocksServerConfigs{
+ "hash-length-mode": "accurate",
+ }, func(t *testing.T, rdb *redis.Client, ctx context.Context) {
+ requireHLenCommandInfoFlags(t, rdb, ctx,
[]interface{}{"readonly"})
+ key := "hfe-hlen-dynamic-accurate"
+ require.Equal(t, int64(1), rdb.HSet(ctx, key, "a", "1").Val())
+
+ require.ErrorContains(t,
+ rdb.Do(ctx, "eval_ro", `return redis.call('hlen',
KEYS[1])`, 1, key).Err(),
+ "Write commands are not allowed from read-only scripts")
+ require.Equal(t, int64(1),
+ rdb.Do(ctx, "eval_ro", `return redis.call('hlen',
KEYS[1], 'APPROX')`, 1, key).Val())
+ require.ErrorContains(t,
+ rdb.Do(ctx, "eval_ro", `return redis.call('hlen',
KEYS[1], 'REPAIR')`, 1, key).Err(),
+ "Write commands are not allowed from read-only scripts")
+ })
+}
+
+func TestHashFieldExpirationHLenApproximateConfigDynamicFlagsInEvalRO(t
*testing.T) {
+ runWithFieldExpirationHashConfigs(t, util.KvrocksServerConfigs{
+ "hash-length-mode": "approximate",
+ }, func(t *testing.T, rdb *redis.Client, ctx context.Context) {
+ requireHLenCommandInfoFlags(t, rdb, ctx,
[]interface{}{"readonly"})
+ key := "hfe-hlen-dynamic-approx"
+ require.Equal(t, int64(1), rdb.HSet(ctx, key, "a", "1").Val())
+
+ require.Equal(t, int64(1),
+ rdb.Do(ctx, "eval_ro", `return redis.call('hlen',
KEYS[1])`, 1, key).Val())
+ require.Equal(t, int64(1),
+ rdb.Do(ctx, "eval_ro", `return redis.call('hlen',
KEYS[1], 'APPROX')`, 1, key).Val())
+ require.ErrorContains(t,
+ rdb.Do(ctx, "eval_ro", `return redis.call('hlen',
KEYS[1], 'REPAIR')`, 1, key).Err(),
+ "Write commands are not allowed from read-only scripts")
+ })
+}
+
+func TestHashFieldExpirationHLenLegacyAccurateConfigDynamicFlagsInEvalRO(t
*testing.T) {
+ srv := util.StartServer(t, util.KvrocksServerConfigs{
+ "hash-encoding-mode": "legacy",
+ "hash-length-mode": "accurate",
+ "resp3-enabled": "yes",
+ })
+ defer srv.Close()
+
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ requireHLenCommandInfoFlags(t, rdb, ctx, []interface{}{"readonly"})
+ key := "hfe-hlen-dynamic-legacy"
+ require.Equal(t, int64(1), rdb.HSet(ctx, key, "a", "1").Val())
+ require.Equal(t, int64(1), rdb.Do(ctx, "eval_ro", `return
redis.call('hlen', KEYS[1])`, 1, key).Val())
+ require.Equal(t, int64(1), rdb.Do(ctx, "eval_ro", `return
redis.call('hlen', KEYS[1], 'APPROX')`, 1, key).Val())
+ require.ErrorContains(t,
+ rdb.Do(ctx, "eval_ro", `return redis.call('hlen', KEYS[1],
'REPAIR')`, 1, key).Err(),
+ "Write commands are not allowed from read-only scripts")
+}
+
+func TestHashFieldExpirationHLenLegacyConfigDefaultStaysReadonly(t *testing.T)
{
+ srv := util.StartServer(t, util.KvrocksServerConfigs{
+ "hash-encoding-mode": "legacy",
+ "hash-length-mode": "accurate",
+ "resp3-enabled": "yes",
+ })
+ defer srv.Close()
+
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ key := "hfe-hlen-legacy-flags"
+ require.Equal(t, int64(1), rdb.HSet(ctx, key, "a", "1").Val())
+ require.Equal(t, int64(1), rdb.Do(ctx, "eval_ro", `return
redis.call('hlen', KEYS[1])`, 1, key).Val())
+ require.ErrorContains(t,
+ rdb.Do(ctx, "eval_ro", `return redis.call('hlen', KEYS[1],
'REPAIR')`, 1, key).Err(),
+ "Write commands are not allowed from read-only scripts")
+}
+
func TestHashFieldExpirationLegacyRejectsFieldTTLCommands(t *testing.T) {
srv := util.StartServer(t, util.KvrocksServerConfigs{
"hash-encoding-mode": "legacy",