This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 9dbd4dedf perf(string): reduce the unnecessary reads in MSETEX (#3322)
9dbd4dedf is described below
commit 9dbd4dedff263d513d9a2ca7cf5746d6c4f5b0ef
Author: Byeonggyu Park <[email protected]>
AuthorDate: Sat Jan 3 22:59:50 2026 +0900
perf(string): reduce the unnecessary reads in MSETEX (#3322)
When running `MSETEX` command with `NX|XX` and `KEEPTTL`, it access 2*N
times for read.
- Reduce read access from 2*N to N.
- Change `Exists` ret type from `int` to `uint32_t`.
---
src/commands/cmd_key.cc | 4 +-
src/commands/cmd_server.cc | 4 +-
src/storage/redis_db.cc | 8 ++--
src/storage/redis_db.h | 5 ++-
src/types/redis_string.cc | 59 ++++++++++++--------------
tests/gocase/unit/type/strings/strings_test.go | 12 ++++++
6 files changed, 51 insertions(+), 41 deletions(-)
diff --git a/src/commands/cmd_key.cc b/src/commands/cmd_key.cc
index 093e0cc2b..476dc15a0 100644
--- a/src/commands/cmd_key.cc
+++ b/src/commands/cmd_key.cc
@@ -55,7 +55,7 @@ class CommandMove : public Commander {
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn,
std::string *output) override {
- int count = 0;
+ uint32_t count = 0;
redis::Database redis(srv->storage, conn->GetNamespace());
rocksdb::Status s = redis.Exists(ctx, {args_[1]}, &count);
@@ -163,7 +163,7 @@ class CommandExists : public Commander {
keys.emplace_back(args_[i]);
}
- int cnt = 0;
+ uint32_t cnt = 0;
redis::Database redis(srv->storage, conn->GetNamespace());
auto s = redis.Exists(ctx, keys, &cnt);
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 04c30ea68..be05d8f90 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1190,7 +1190,7 @@ class CommandRestore : public Commander {
redis::Database redis(srv->storage, conn->GetNamespace());
if (!replace_) {
- int count = 0;
+ uint32_t count = 0;
db_status = redis.Exists(ctx, {args_[1]}, &count);
if (!db_status.ok()) {
return {Status::RedisExecErr, db_status.ToString()};
@@ -1352,7 +1352,7 @@ class CommandDump : public Commander {
rocksdb::Status db_status;
std::string &key = args_[1];
redis::Database redis(srv->storage, conn->GetNamespace());
- int count = 0;
+ uint32_t count = 0;
db_status = redis.Exists(ctx, {key}, &count);
if (!db_status.ok()) {
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index c12bb00ec..99b3d96a3 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -207,7 +207,7 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const
std::vector<Slice> &k
return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
-rocksdb::Status Database::Exists(engine::Context &ctx, const
std::vector<Slice> &keys, int *ret) {
+rocksdb::Status Database::Exists(engine::Context &ctx, const
std::vector<Slice> &keys, uint32_t *ret) {
std::vector<std::string> ns_keys;
ns_keys.reserve(keys.size());
for (const auto &key : keys) {
@@ -543,7 +543,7 @@ rocksdb::Status
Database::ClearKeysOfSlotRange(engine::Context &ctx, const rocks
}
rocksdb::Status Database::KeyExist(engine::Context &ctx, const std::string
&key) {
- int cnt = 0;
+ uint32_t cnt = 0;
std::vector<rocksdb::Slice> keys{key};
auto s = Exists(ctx, keys, &cnt);
if (!s.ok()) {
@@ -621,7 +621,7 @@ Status WriteBatchLogData::Decode(const rocksdb::Slice
&blob) {
return Status::OK();
}
-rocksdb::Status Database::existsInternal(engine::Context &ctx, const
std::vector<std::string> &keys, int *ret) {
+rocksdb::Status Database::existsInternal(engine::Context &ctx, const
std::vector<std::string> &keys, uint32_t *ret) {
*ret = 0;
rocksdb::Status s;
std::string value;
@@ -666,7 +666,7 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const
std::string &key, con
}
if (nx) {
- int exist = 0;
+ uint32_t exist = 0;
if (s = existsInternal(ctx, {new_key}, &exist), !s.ok()) return s;
if (exist > 0) {
*res = CopyResult::KEY_ALREADY_EXIST;
diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h
index 16d98dcd3..506dc250b 100644
--- a/src/storage/redis_db.h
+++ b/src/storage/redis_db.h
@@ -111,7 +111,7 @@ class Database {
[[nodiscard]] rocksdb::Status Expire(engine::Context &ctx, const Slice
&user_key, uint64_t timestamp);
[[nodiscard]] rocksdb::Status Del(engine::Context &ctx, const Slice
&user_key);
[[nodiscard]] rocksdb::Status MDel(engine::Context &ctx, const
std::vector<Slice> &keys, uint64_t *deleted_cnt);
- [[nodiscard]] rocksdb::Status Exists(engine::Context &ctx, const
std::vector<Slice> &keys, int *ret);
+ [[nodiscard]] rocksdb::Status Exists(engine::Context &ctx, const
std::vector<Slice> &keys, uint32_t *ret);
[[nodiscard]] rocksdb::Status TTL(engine::Context &ctx, const Slice
&user_key, int64_t *ttl);
[[nodiscard]] rocksdb::Status GetExpireTime(engine::Context &ctx, const
Slice &user_key, uint64_t *timestamp);
[[nodiscard]] rocksdb::Status Type(engine::Context &ctx, const Slice &key,
RedisType *type);
@@ -155,7 +155,8 @@ class Database {
private:
// Already internal keys
- [[nodiscard]] rocksdb::Status existsInternal(engine::Context &ctx, const
std::vector<std::string> &keys, int *ret);
+ [[nodiscard]] rocksdb::Status existsInternal(engine::Context &ctx, const
std::vector<std::string> &keys,
+ uint32_t *ret);
[[nodiscard]] rocksdb::Status typeInternal(engine::Context &ctx, const Slice
&key, RedisType *type);
/// lookupKeyByPattern is a helper function of `Sort` to support `GET` and
`BY` fields.
diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc
index f7b267288..4bd728535 100644
--- a/src/types/redis_string.cc
+++ b/src/types/redis_string.cc
@@ -431,61 +431,58 @@ rocksdb::Status String::IncrByFloat(engine::Context &ctx,
const std::string &use
}
rocksdb::Status String::MSet(engine::Context &ctx, const
std::vector<StringPair> &pairs, StringMSetArgs args,
- bool *flag) {
- if (flag != nullptr) {
- *flag = false;
- }
-
- if (args.type != StringSetType::NONE) {
- int exists = 0;
- int key_count = static_cast<int>(pairs.size());
- std::vector<Slice> keys;
- keys.reserve(pairs.size());
- for (const auto &pair : pairs) {
- keys.emplace_back(pair.key);
- }
- auto s = Exists(ctx, keys, &exists);
- if (!s.ok()) return s;
- if ((args.type == StringSetType::NX && exists > 0) || (args.type ==
StringSetType::XX && exists < key_count)) {
- return rocksdb::Status::OK();
+ bool *flag = nullptr) {
+ if (flag) *flag = false;
+
+ std::vector<uint64_t> expires;
+ if (args.type != StringSetType::NONE || args.keep_ttl) {
+ expires.resize(pairs.size(), 0);
+ for (size_t i = 0; i < pairs.size(); i++) {
+ Metadata old_metadata(kRedisNone, false);
+ std::string ns_key = AppendNamespacePrefix(pairs[i].key);
+ auto s = GetMetadata(ctx, RedisTypes::All(), ns_key, &old_metadata);
+ if (!s.ok() && !s.IsNotFound()) return s;
+ if (s.ok() && !old_metadata.Expired()) {
+ if (args.type == StringSetType::NX) {
+ return rocksdb::Status::OK();
+ }
+ expires[i] = old_metadata.expire;
+ } else if (args.type == StringSetType::XX) {
+ return rocksdb::Status::OK();
+ }
}
}
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisString);
- rocksdb::Status s = batch->PutLogData(log_data.Encode());
+ auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
- for (const auto &pair : pairs) {
- std::string bytes;
+ for (size_t i = 0; i < pairs.size(); i++) {
Metadata metadata(kRedisString, false);
if (args.keep_ttl) {
- uint64_t old_expire = 0;
- rocksdb::Status s = GetExpireTime(ctx, pair.key, &old_expire);
- if (s.ok() && old_expire != 0) {
- metadata.expire = old_expire;
+ if (expires[i] != 0) {
+ metadata.expire = expires[i];
}
} else {
metadata.expire = args.expire;
}
+ std::string bytes;
metadata.Encode(&bytes);
- bytes.append(pair.value.data(), pair.value.size());
- std::string ns_key = AppendNamespacePrefix(pair.key);
+ bytes.append(pairs[i].value.data(), pairs[i].value.size());
+ std::string ns_key = AppendNamespacePrefix(pairs[i].key);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
}
s = storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
if (!s.ok()) return s;
- if (flag != nullptr) {
- *flag = true;
- }
-
+ if (flag) *flag = true;
return rocksdb::Status::OK();
}
rocksdb::Status String::MSet(engine::Context &ctx, const
std::vector<StringPair> &pairs) {
- return MSet(ctx, pairs, {/*expire=*/0, StringSetType::NONE,
/*keep_ttl=*/false}, nullptr);
+ return MSet(ctx, pairs, {/*expire=*/0, StringSetType::NONE,
/*keep_ttl=*/false});
}
rocksdb::Status String::MSetEX(engine::Context &ctx, const
std::vector<StringPair> &pairs, StringMSetArgs args,
diff --git a/tests/gocase/unit/type/strings/strings_test.go
b/tests/gocase/unit/type/strings/strings_test.go
index c1a684847..3c2515210 100644
--- a/tests/gocase/unit/type/strings/strings_test.go
+++ b/tests/gocase/unit/type/strings/strings_test.go
@@ -480,6 +480,18 @@ func testString(t *testing.T, configs
util.KvrocksServerConfigs) {
require.EqualValues(t, -1, rdb.TTL(ctx, "k2").Val())
})
+ t.Run("MSETEX with TXN", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "k1", "k2").Err())
+ res := rdb.MSetEX(ctx, redis.MSetEXArgs{}, "k1", "v1")
+ require.EqualValues(t, 1, res.Val())
+ txn := rdb.TxPipeline()
+ txn.MSetEX(ctx, redis.MSetEXArgs{Condition: redis.XX}, "k1",
"v10", "k2", "v20")
+ _, err := txn.Exec(ctx)
+ require.NoError(t, err)
+ require.Equal(t, "v1", rdb.Get(ctx, "k1").Val())
+ require.Equal(t, "", rdb.Get(ctx, "k2").Val())
+ })
+
t.Run("MSETNX with already existent key", func(t *testing.T) {
r := rdb.MSetNX(ctx, map[string]interface{}{
"x1": "xxx",