This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 9dbd4dedf perf(string): reduce the unnecessary reads in MSETEX (#3322)
9dbd4dedf is described below

commit 9dbd4dedff263d513d9a2ca7cf5746d6c4f5b0ef
Author: Byeonggyu Park <[email protected]>
AuthorDate: Sat Jan 3 22:59:50 2026 +0900

    perf(string): reduce the unnecessary reads in MSETEX (#3322)
    
    When running `MSETEX` command with `NX|XX` and `KEEPTTL`, it access 2*N
    times for read.
    - Reduce read access from 2*N to N.
    - Change `Exists` ret type from `int` to `uint32_t`.
---
 src/commands/cmd_key.cc                        |  4 +-
 src/commands/cmd_server.cc                     |  4 +-
 src/storage/redis_db.cc                        |  8 ++--
 src/storage/redis_db.h                         |  5 ++-
 src/types/redis_string.cc                      | 59 ++++++++++++--------------
 tests/gocase/unit/type/strings/strings_test.go | 12 ++++++
 6 files changed, 51 insertions(+), 41 deletions(-)

diff --git a/src/commands/cmd_key.cc b/src/commands/cmd_key.cc
index 093e0cc2b..476dc15a0 100644
--- a/src/commands/cmd_key.cc
+++ b/src/commands/cmd_key.cc
@@ -55,7 +55,7 @@ class CommandMove : public Commander {
   }
 
   Status Execute(engine::Context &ctx, Server *srv, Connection *conn, 
std::string *output) override {
-    int count = 0;
+    uint32_t count = 0;
     redis::Database redis(srv->storage, conn->GetNamespace());
 
     rocksdb::Status s = redis.Exists(ctx, {args_[1]}, &count);
@@ -163,7 +163,7 @@ class CommandExists : public Commander {
       keys.emplace_back(args_[i]);
     }
 
-    int cnt = 0;
+    uint32_t cnt = 0;
     redis::Database redis(srv->storage, conn->GetNamespace());
 
     auto s = redis.Exists(ctx, keys, &cnt);
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index 04c30ea68..be05d8f90 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1190,7 +1190,7 @@ class CommandRestore : public Commander {
     redis::Database redis(srv->storage, conn->GetNamespace());
 
     if (!replace_) {
-      int count = 0;
+      uint32_t count = 0;
       db_status = redis.Exists(ctx, {args_[1]}, &count);
       if (!db_status.ok()) {
         return {Status::RedisExecErr, db_status.ToString()};
@@ -1352,7 +1352,7 @@ class CommandDump : public Commander {
     rocksdb::Status db_status;
     std::string &key = args_[1];
     redis::Database redis(srv->storage, conn->GetNamespace());
-    int count = 0;
+    uint32_t count = 0;
 
     db_status = redis.Exists(ctx, {key}, &count);
     if (!db_status.ok()) {
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index c12bb00ec..99b3d96a3 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -207,7 +207,7 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const 
std::vector<Slice> &k
   return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
-rocksdb::Status Database::Exists(engine::Context &ctx, const 
std::vector<Slice> &keys, int *ret) {
+rocksdb::Status Database::Exists(engine::Context &ctx, const 
std::vector<Slice> &keys, uint32_t *ret) {
   std::vector<std::string> ns_keys;
   ns_keys.reserve(keys.size());
   for (const auto &key : keys) {
@@ -543,7 +543,7 @@ rocksdb::Status 
Database::ClearKeysOfSlotRange(engine::Context &ctx, const rocks
 }
 
 rocksdb::Status Database::KeyExist(engine::Context &ctx, const std::string 
&key) {
-  int cnt = 0;
+  uint32_t cnt = 0;
   std::vector<rocksdb::Slice> keys{key};
   auto s = Exists(ctx, keys, &cnt);
   if (!s.ok()) {
@@ -621,7 +621,7 @@ Status WriteBatchLogData::Decode(const rocksdb::Slice 
&blob) {
   return Status::OK();
 }
 
-rocksdb::Status Database::existsInternal(engine::Context &ctx, const 
std::vector<std::string> &keys, int *ret) {
+rocksdb::Status Database::existsInternal(engine::Context &ctx, const 
std::vector<std::string> &keys, uint32_t *ret) {
   *ret = 0;
   rocksdb::Status s;
   std::string value;
@@ -666,7 +666,7 @@ rocksdb::Status Database::Copy(engine::Context &ctx, const 
std::string &key, con
   }
 
   if (nx) {
-    int exist = 0;
+    uint32_t exist = 0;
     if (s = existsInternal(ctx, {new_key}, &exist), !s.ok()) return s;
     if (exist > 0) {
       *res = CopyResult::KEY_ALREADY_EXIST;
diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h
index 16d98dcd3..506dc250b 100644
--- a/src/storage/redis_db.h
+++ b/src/storage/redis_db.h
@@ -111,7 +111,7 @@ class Database {
   [[nodiscard]] rocksdb::Status Expire(engine::Context &ctx, const Slice 
&user_key, uint64_t timestamp);
   [[nodiscard]] rocksdb::Status Del(engine::Context &ctx, const Slice 
&user_key);
   [[nodiscard]] rocksdb::Status MDel(engine::Context &ctx, const 
std::vector<Slice> &keys, uint64_t *deleted_cnt);
-  [[nodiscard]] rocksdb::Status Exists(engine::Context &ctx, const 
std::vector<Slice> &keys, int *ret);
+  [[nodiscard]] rocksdb::Status Exists(engine::Context &ctx, const 
std::vector<Slice> &keys, uint32_t *ret);
   [[nodiscard]] rocksdb::Status TTL(engine::Context &ctx, const Slice 
&user_key, int64_t *ttl);
   [[nodiscard]] rocksdb::Status GetExpireTime(engine::Context &ctx, const 
Slice &user_key, uint64_t *timestamp);
   [[nodiscard]] rocksdb::Status Type(engine::Context &ctx, const Slice &key, 
RedisType *type);
@@ -155,7 +155,8 @@ class Database {
 
  private:
   // Already internal keys
-  [[nodiscard]] rocksdb::Status existsInternal(engine::Context &ctx, const 
std::vector<std::string> &keys, int *ret);
+  [[nodiscard]] rocksdb::Status existsInternal(engine::Context &ctx, const 
std::vector<std::string> &keys,
+                                               uint32_t *ret);
   [[nodiscard]] rocksdb::Status typeInternal(engine::Context &ctx, const Slice 
&key, RedisType *type);
 
   /// lookupKeyByPattern is a helper function of `Sort` to support `GET` and 
`BY` fields.
diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc
index f7b267288..4bd728535 100644
--- a/src/types/redis_string.cc
+++ b/src/types/redis_string.cc
@@ -431,61 +431,58 @@ rocksdb::Status String::IncrByFloat(engine::Context &ctx, 
const std::string &use
 }
 
 rocksdb::Status String::MSet(engine::Context &ctx, const 
std::vector<StringPair> &pairs, StringMSetArgs args,
-                             bool *flag) {
-  if (flag != nullptr) {
-    *flag = false;
-  }
-
-  if (args.type != StringSetType::NONE) {
-    int exists = 0;
-    int key_count = static_cast<int>(pairs.size());
-    std::vector<Slice> keys;
-    keys.reserve(pairs.size());
-    for (const auto &pair : pairs) {
-      keys.emplace_back(pair.key);
-    }
-    auto s = Exists(ctx, keys, &exists);
-    if (!s.ok()) return s;
-    if ((args.type == StringSetType::NX && exists > 0) || (args.type == 
StringSetType::XX && exists < key_count)) {
-      return rocksdb::Status::OK();
+                             bool *flag = nullptr) {
+  if (flag) *flag = false;
+
+  std::vector<uint64_t> expires;
+  if (args.type != StringSetType::NONE || args.keep_ttl) {
+    expires.resize(pairs.size(), 0);
+    for (size_t i = 0; i < pairs.size(); i++) {
+      Metadata old_metadata(kRedisNone, false);
+      std::string ns_key = AppendNamespacePrefix(pairs[i].key);
+      auto s = GetMetadata(ctx, RedisTypes::All(), ns_key, &old_metadata);
+      if (!s.ok() && !s.IsNotFound()) return s;
+      if (s.ok() && !old_metadata.Expired()) {
+        if (args.type == StringSetType::NX) {
+          return rocksdb::Status::OK();
+        }
+        expires[i] = old_metadata.expire;
+      } else if (args.type == StringSetType::XX) {
+        return rocksdb::Status::OK();
+      }
     }
   }
 
   auto batch = storage_->GetWriteBatchBase();
   WriteBatchLogData log_data(kRedisString);
-  rocksdb::Status s = batch->PutLogData(log_data.Encode());
+  auto s = batch->PutLogData(log_data.Encode());
   if (!s.ok()) return s;
 
-  for (const auto &pair : pairs) {
-    std::string bytes;
+  for (size_t i = 0; i < pairs.size(); i++) {
     Metadata metadata(kRedisString, false);
     if (args.keep_ttl) {
-      uint64_t old_expire = 0;
-      rocksdb::Status s = GetExpireTime(ctx, pair.key, &old_expire);
-      if (s.ok() && old_expire != 0) {
-        metadata.expire = old_expire;
+      if (expires[i] != 0) {
+        metadata.expire = expires[i];
       }
     } else {
       metadata.expire = args.expire;
     }
+    std::string bytes;
     metadata.Encode(&bytes);
-    bytes.append(pair.value.data(), pair.value.size());
-    std::string ns_key = AppendNamespacePrefix(pair.key);
+    bytes.append(pairs[i].value.data(), pairs[i].value.size());
+    std::string ns_key = AppendNamespacePrefix(pairs[i].key);
     s = batch->Put(metadata_cf_handle_, ns_key, bytes);
     if (!s.ok()) return s;
   }
   s = storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
   if (!s.ok()) return s;
 
-  if (flag != nullptr) {
-    *flag = true;
-  }
-
+  if (flag) *flag = true;
   return rocksdb::Status::OK();
 }
 
 rocksdb::Status String::MSet(engine::Context &ctx, const 
std::vector<StringPair> &pairs) {
-  return MSet(ctx, pairs, {/*expire=*/0, StringSetType::NONE, 
/*keep_ttl=*/false}, nullptr);
+  return MSet(ctx, pairs, {/*expire=*/0, StringSetType::NONE, 
/*keep_ttl=*/false});
 }
 
 rocksdb::Status String::MSetEX(engine::Context &ctx, const 
std::vector<StringPair> &pairs, StringMSetArgs args,
diff --git a/tests/gocase/unit/type/strings/strings_test.go 
b/tests/gocase/unit/type/strings/strings_test.go
index c1a684847..3c2515210 100644
--- a/tests/gocase/unit/type/strings/strings_test.go
+++ b/tests/gocase/unit/type/strings/strings_test.go
@@ -480,6 +480,18 @@ func testString(t *testing.T, configs 
util.KvrocksServerConfigs) {
                require.EqualValues(t, -1, rdb.TTL(ctx, "k2").Val())
        })
 
+       t.Run("MSETEX with TXN", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, "k1", "k2").Err())
+               res := rdb.MSetEX(ctx, redis.MSetEXArgs{}, "k1", "v1")
+               require.EqualValues(t, 1, res.Val())
+               txn := rdb.TxPipeline()
+               txn.MSetEX(ctx, redis.MSetEXArgs{Condition: redis.XX}, "k1", 
"v10", "k2", "v20")
+               _, err := txn.Exec(ctx)
+               require.NoError(t, err)
+               require.Equal(t, "v1", rdb.Get(ctx, "k1").Val())
+               require.Equal(t, "", rdb.Get(ctx, "k2").Val())
+       })
+
        t.Run("MSETNX with already existent key", func(t *testing.T) {
                r := rdb.MSetNX(ctx, map[string]interface{}{
                        "x1": "xxx",

Reply via email to