This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new f7c5688f feat(hash): add the support of HSETEXPIRE command (HSET +
EXPIRE) (#2750)
f7c5688f is described below
commit f7c5688fe4030f148195e7043208ae659df48ea7
Author: Luigi Tagliamonte <[email protected]>
AuthorDate: Thu Jan 30 05:11:49 2025 +0100
feat(hash): add the support of HSETEXPIRE command (HSET + EXPIRE) (#2750)
---
src/commands/cmd_hash.cc | 32 ++++++++
src/types/redis_hash.cc | 10 ++-
src/types/redis_hash.h | 2 +-
tests/gocase/unit/type/hash/hash_test.go | 134 ++++++++++++++++++++++++++++++-
4 files changed, 173 insertions(+), 5 deletions(-)
diff --git a/src/commands/cmd_hash.cc b/src/commands/cmd_hash.cc
index 30d6eb28..13d13de8 100644
--- a/src/commands/cmd_hash.cc
+++ b/src/commands/cmd_hash.cc
@@ -23,6 +23,7 @@
#include "error_constants.h"
#include "scan_base.h"
#include "server/server.h"
+#include "time_util.h"
#include "types/redis_hash.h"
namespace redis {
@@ -258,6 +259,36 @@ class CommandHMSet : public Commander {
std::vector<FieldValue> field_values_;
};
+class CommandHSetExpire : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ ttl_ = GET_OR_RET(ParseInt<uint64_t>(args[2], 10));
+ if ((args.size() - 3) % 2 != 0) {
+ return {Status::RedisParseErr, "Invalid number of arguments: field-value
pairs must be complete"};
+ }
+ for (size_t i = 3; i < args_.size(); i += 2) {
+ field_values_.emplace_back(args_[i], args_[i + 1]);
+ }
+ return Commander::Parse(args);
+ }
+
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
+ uint64_t ret = 0;
+ redis::Hash hash_db(srv->storage, conn->GetNamespace());
+
+ auto s = hash_db.MSet(ctx, args_[1], field_values_, false, &ret, ttl_ *
1000 + util::GetTimeStampMS());
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ *output = redis::RESP_OK;
+ return Status::OK();
+ }
+
+ private:
+ std::vector<FieldValue> field_values_;
+ uint64_t ttl_ = 0;
+};
+
class CommandHKeys : public Commander {
public:
Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
@@ -448,6 +479,7 @@ REDIS_REGISTER_COMMANDS(Hash,
MakeCmdAttr<CommandHGet>("hget", 3, "read-only", 1
MakeCmdAttr<CommandHIncrBy>("hincrby", 4, "write", 1,
1, 1),
MakeCmdAttr<CommandHIncrByFloat>("hincrbyfloat", 4,
"write", 1, 1, 1),
MakeCmdAttr<CommandHMSet>("hset", -4, "write", 1, 1,
1),
+ MakeCmdAttr<CommandHSetExpire>("hsetexpire", -5,
"write", 1, 1, 1),
MakeCmdAttr<CommandHSetNX>("hsetnx", -4, "write", 1,
1, 1),
MakeCmdAttr<CommandHDel>("hdel", -3, "write
no-dbsize-check", 1, 1, 1),
MakeCmdAttr<CommandHStrlen>("hstrlen", 3, "read-only",
1, 1, 1),
diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc
index 905efdad..f509bcea 100644
--- a/src/types/redis_hash.cc
+++ b/src/types/redis_hash.cc
@@ -239,14 +239,18 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const
Slice &user_key, const
}
rocksdb::Status Hash::MSet(engine::Context &ctx, const Slice &user_key, const
std::vector<FieldValue> &field_values,
- bool nx, uint64_t *added_cnt) {
+ bool nx, uint64_t *added_cnt, uint64_t expire) {
*added_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
-
+ bool ttl_updated = false;
+ if (expire > 0 && metadata.expire != expire) {
+ metadata.expire = expire;
+ ttl_updated = true;
+ }
int added = 0;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisHash);
@@ -279,7 +283,7 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const
Slice &user_key, const st
if (!s.ok()) return s;
}
- if (added > 0) {
+ if (added > 0 || ttl_updated) {
*added_cnt = added;
metadata.size += added;
std::string bytes;
diff --git a/src/types/redis_hash.h b/src/types/redis_hash.h
index e5779c7c..94f722c4 100644
--- a/src/types/redis_hash.h
+++ b/src/types/redis_hash.h
@@ -56,7 +56,7 @@ class Hash : public SubKeyScanner {
rocksdb::Status IncrByFloat(engine::Context &ctx, const Slice &user_key,
const Slice &field, double increment,
double *new_value);
rocksdb::Status MSet(engine::Context &ctx, const Slice &user_key, const
std::vector<FieldValue> &field_values,
- bool nx, uint64_t *added_cnt);
+ bool nx, uint64_t *added_cnt, uint64_t expire = 0);
rocksdb::Status RangeByLex(engine::Context &ctx, const Slice &user_key,
const RangeLexSpec &spec,
std::vector<FieldValue> *field_values);
rocksdb::Status MGet(engine::Context &ctx, const Slice &user_key, const
std::vector<Slice> &fields,
diff --git a/tests/gocase/unit/type/hash/hash_test.go
b/tests/gocase/unit/type/hash/hash_test.go
index caab802a..29db4747 100644
--- a/tests/gocase/unit/type/hash/hash_test.go
+++ b/tests/gocase/unit/type/hash/hash_test.go
@@ -30,8 +30,10 @@ import (
"testing"
"time"
- "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
)
func getKeys(hash map[string]string) []string {
@@ -118,6 +120,136 @@ var testHash = func(t *testing.T, configs
util.KvrocksServerConfigs) {
require.Equal(t, int64(1), rdb.HSet(ctx, "hmsetmulti", "key1",
"val1", "key3", "val3").Val())
})
+ t.Run("HSETEXPIRE wrong number of args", func(t *testing.T) {
+ pattern := ".*wrong number.*"
+ ttlStr := "3600"
+ testKey := "hsetKey"
+ r := rdb.Do(ctx, "hsetexpire", testKey, ttlStr)
+ util.ErrorRegexp(t, r.Err(), pattern)
+ })
+
+ t.Run("HSETEXPIRE incomplete pairs", func(t *testing.T) {
+ pattern := ".*field-value pairs must be complete.*"
+ ttlStr := "3600"
+ testKey := "hsetKey"
+ r := rdb.Do(ctx, "hsetexpire", testKey, ttlStr, "key1", "val1",
"key2")
+ util.ErrorRegexp(t, r.Err(), pattern)
+ })
+
+ t.Run("HSET/HSETEXPIRE/HSETEXPIRE/persist update expire time", func(t
*testing.T) {
+ ttlStr := "3600"
+ testKey := "hsetKeyUpdateTime"
+ // create an hash without expiration
+ r := rdb.Do(ctx, "hset", testKey, "key1", "val1", "key2",
"val2")
+ require.NoError(t, r.Err())
+ noExp := rdb.ExpireTime(ctx, testKey)
+ // make sure there is not exp set on the key
+ assert.Equal(t, -1*time.Nanosecond, noExp.Val())
+ // validate we inserted the key/vals
+ values := rdb.HGetAll(ctx, testKey)
+ assert.Equal(t, 2, len(values.Val()))
+
+ // update the hash and add expiration
+ r = rdb.Do(ctx, "hsetexpire", testKey, ttlStr, "key3", "val3")
+ require.NoError(t, r.Err())
+ assert.Equal(t, "OK", r.Val())
+ firstExp := rdb.ExpireTime(ctx, testKey)
+ firstExpireTime := time.Date(1970, 1, 1, 0, 0, 0, 0,
time.UTC).Add(firstExp.Val()).Unix()
+ // validate there is exp set on the key
+ assert.NotEqual(t, -1, firstExpireTime)
+ assert.Greater(t, firstExpireTime, time.Now().Unix())
+ // validate we updated the key/vals
+ values = rdb.HGetAll(ctx, testKey)
+ assert.Equal(t, 3, len(values.Val()))
+
+ // update the has and expiration
+ time.Sleep(1 * time.Second)
+ r = rdb.Do(ctx, "hsetexpire", testKey, ttlStr, "key4", "val4")
+ require.NoError(t, r.Err())
+ assert.Equal(t, "OK", r.Val())
+ // validate there is exp set on the key and it is new
+ secondExp := rdb.ExpireTime(ctx, testKey)
+ secondExpireTime := time.Date(1970, 1, 1, 0, 0, 0, 0,
time.UTC).Add(secondExp.Val()).Unix()
+ assert.NotEqual(t, -1, secondExpireTime)
+ assert.Greater(t, secondExpireTime, time.Now().Unix())
+ assert.Greater(t, secondExpireTime, firstExpireTime)
+ // validate we updated the key/vals
+ values = rdb.HGetAll(ctx, testKey)
+ assert.Equal(t, 4, len(values.Val()))
+
+ //remove expiration on the key and verify
+ r = rdb.Do(ctx, "persist", testKey)
+ require.NoError(t, r.Err())
+ persist := rdb.ExpireTime(ctx, testKey)
+ assert.Equal(t, -1*time.Nanosecond, persist.Val())
+ // validate we still have the correct number of key/vals
+ values = rdb.HGetAll(ctx, testKey)
+ assert.Equal(t, 4, len(values.Val()))
+ })
+
+ t.Run("HSETEXPIRE/HLEN/EXPIRETIME - Small hash creation", func(t
*testing.T) {
+ ttlStr := "3600"
+ testKey := "hsetexsmallhash"
+ hsetExSmallHash := make(map[string]string)
+ for i := 0; i < 8; i++ {
+ key := "__avoid_collisions__" + util.RandString(0, 8,
util.Alpha)
+ val := "__avoid_collisions__" + util.RandString(0, 8,
util.Alpha)
+ if _, ok := hsetExSmallHash[key]; ok {
+ i--
+ }
+ rdb.Do(ctx, "hsetexpire", testKey, ttlStr, key, val)
+ hsetExSmallHash[key] = val
+ }
+ require.Equal(t, int64(8), rdb.HLen(ctx, testKey).Val())
+ val := rdb.ExpireTime(ctx, testKey).Val()
+ expireTime := time.Date(1970, 1, 1, 0, 0, 0, 0,
time.UTC).Add(val).Unix()
+ require.Greater(t, expireTime, time.Now().Unix())
+ })
+
+ t.Run("HSETEXPIRE/HLEN/EXPIRETIME - Big hash creation", func(t
*testing.T) {
+ ttlStr := "3600"
+ testKey := "hsetexbighash"
+ hsetExBigHash := make(map[string]string)
+ for i := 0; i < 1024; i++ {
+ key := "__avoid_collisions__" + util.RandString(0, 8,
util.Alpha)
+ val := "__avoid_collisions__" + util.RandString(0, 8,
util.Alpha)
+ if _, ok := hsetExBigHash[key]; ok {
+ i--
+ }
+ rdb.Do(ctx, "hsetexpire", testKey, ttlStr, key, val)
+ hsetExBigHash[key] = val
+ }
+ require.Equal(t, int64(1024), rdb.HLen(ctx, testKey).Val())
+ val := rdb.ExpireTime(ctx, testKey).Val()
+ expireTime := time.Date(1970, 1, 1, 0, 0, 0, 0,
time.UTC).Add(val).Unix()
+ require.Greater(t, expireTime, time.Now().Unix())
+ })
+
+ t.Run("HSETEXPIRE/HLEN/EXPIRETIME - Multi field-value pairs creation",
func(t *testing.T) {
+ ttlStr := "3600"
+ testKey := "hsetexbighashPair"
+ hsetExBigHash := make(map[string]string)
+ cmd := []string{"hsetexpire", testKey, ttlStr}
+ for i := 0; i < 10; i++ {
+ key := "__avoid_collisions__" + util.RandString(0, 8,
util.Alpha)
+ val := "__avoid_collisions__" + util.RandString(0, 8,
util.Alpha)
+ if _, ok := hsetExBigHash[key]; ok {
+ i--
+ }
+ cmd = append(cmd, key, val)
+ hsetExBigHash[key] = val
+ }
+ args := make([]interface{}, len(cmd))
+ for i, v := range cmd {
+ args[i] = v
+ }
+ rdb.Do(ctx, args...)
+ require.Equal(t, int64(10), rdb.HLen(ctx, testKey).Val())
+ val := rdb.ExpireTime(ctx, testKey).Val()
+ expireTime := time.Date(1970, 1, 1, 0, 0, 0, 0,
time.UTC).Add(val).Unix()
+ require.Greater(t, expireTime, time.Now().Unix())
+ })
+
t.Run("HGET against the small hash", func(t *testing.T) {
var err error
for key, val := range smallhash {