This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new c9b1e8e6 Use MultiLockGuard to guarantee atomicity for multiple keys 
commands (#1700)
c9b1e8e6 is described below

commit c9b1e8e65258a37bcdbdc24e74b4096976493327
Author: Binbin <[email protected]>
AuthorDate: Sat Aug 26 13:37:40 2023 +0800

    Use MultiLockGuard to guarantee atomicity for multiple keys commands (#1700)
    
    In kvrocks, for multiple key commands, we may break atomicity.
    such as `ZUNION`, `ZUNIONSTORE`, `ZINTERSTOR`E, `SUNION`, `SUNIONSTORE`,
    `SINTER`, `SINTERSTORE`, `SDIFF` and `SDIFFSTORE`.
    
    For `SDIFFSTORE destination key1 [key2 ...]` command, kvrocks will
    read key1, key2 ... orderly without any lock, and then restore the
    diff into the destination key. So maybe some keys may be changed when
    We read them orderly, which breaks atomicity but Redis can guarantee
    atomicity.
    
    In this PR, we are using `MultiLockGuard` to lock these keys before
    we read or write, so it is impossible to change these keys. This change
    affects all commands listed previously.
    
    Fixes #1692
---
 src/types/redis_set.cc    | 24 ++++++++++++++++++++++++
 src/types/redis_string.cc |  4 ++--
 src/types/redis_zset.cc   | 16 ++++++++++++++++
 3 files changed, 42 insertions(+), 2 deletions(-)

diff --git a/src/types/redis_set.cc b/src/types/redis_set.cc
index 420904cb..ddfd6c0b 100644
--- a/src/types/redis_set.cc
+++ b/src/types/redis_set.cc
@@ -272,6 +272,14 @@ rocksdb::Status Set::Scan(const Slice &user_key, const 
std::string &cursor, uint
  * DIFF key1 key2 key3 = {b,d}
  */
 rocksdb::Status Set::Diff(const std::vector<Slice> &keys, 
std::vector<std::string> *members) {
+  std::vector<std::string> lock_keys;
+  lock_keys.reserve(keys.size());
+  for (const auto key : keys) {
+    std::string ns_key = AppendNamespacePrefix(key);
+    lock_keys.emplace_back(std::move(ns_key));
+  }
+  MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
+
   members->clear();
   std::vector<std::string> source_members;
   auto s = Members(keys[0], &source_members);
@@ -303,6 +311,14 @@ rocksdb::Status Set::Diff(const std::vector<Slice> &keys, 
std::vector<std::strin
  * UNION key1 key2 key3 = {a,b,c,d,e}
  */
 rocksdb::Status Set::Union(const std::vector<Slice> &keys, 
std::vector<std::string> *members) {
+  std::vector<std::string> lock_keys;
+  lock_keys.reserve(keys.size());
+  for (const auto key : keys) {
+    std::string ns_key = AppendNamespacePrefix(key);
+    lock_keys.emplace_back(std::move(ns_key));
+  }
+  MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
+
   members->clear();
 
   std::map<std::string, bool> union_members;
@@ -329,6 +345,14 @@ rocksdb::Status Set::Union(const std::vector<Slice> &keys, 
std::vector<std::stri
  * INTER key1 key2 key3 = {c}
  */
 rocksdb::Status Set::Inter(const std::vector<Slice> &keys, 
std::vector<std::string> *members) {
+  std::vector<std::string> lock_keys;
+  lock_keys.reserve(keys.size());
+  for (const auto key : keys) {
+    std::string ns_key = AppendNamespacePrefix(key);
+    lock_keys.emplace_back(std::move(ns_key));
+  }
+  MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
+
   members->clear();
 
   std::map<std::string, size_t> member_counters;
diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc
index 844e1ca9..ab07d329 100644
--- a/src/types/redis_string.cc
+++ b/src/types/redis_string.cc
@@ -369,7 +369,7 @@ rocksdb::Status String::MSet(const std::vector<StringPair> 
&pairs, uint64_t ttl,
     lock_keys.reserve(pairs.size());
     for (const StringPair &pair : pairs) {
       std::string ns_key = AppendNamespacePrefix(pair.key);
-      lock_keys.emplace_back(ns_key);
+      lock_keys.emplace_back(std::move(ns_key));
     }
     guard.emplace(storage_->GetLockManager(), lock_keys);
   }
@@ -400,7 +400,7 @@ rocksdb::Status String::MSetNX(const 
std::vector<StringPair> &pairs, uint64_t tt
 
   for (StringPair pair : pairs) {
     std::string ns_key = AppendNamespacePrefix(pair.key);
-    lock_keys.emplace_back(ns_key);
+    lock_keys.emplace_back(std::move(ns_key));
     keys.emplace_back(pair.key);
   }
 
diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc
index d6ffc58b..09495151 100644
--- a/src/types/redis_zset.cc
+++ b/src/types/redis_zset.cc
@@ -626,6 +626,14 @@ rocksdb::Status ZSet::Overwrite(const Slice &user_key, 
const MemberScores &mscor
 
 rocksdb::Status ZSet::InterStore(const Slice &dst, const 
std::vector<KeyWeight> &keys_weights,
                                  AggregateMethod aggregate_method, uint64_t 
*saved_cnt) {
+  std::vector<std::string> lock_keys;
+  lock_keys.reserve(keys_weights.size());
+  for (const auto &key_weight : keys_weights) {
+    std::string ns_key = AppendNamespacePrefix(key_weight.key);
+    lock_keys.emplace_back(std::move(ns_key));
+  }
+  MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
+
   if (saved_cnt) *saved_cnt = 0;
 
   std::map<std::string, double> dst_zset;
@@ -697,6 +705,14 @@ rocksdb::Status ZSet::UnionStore(const Slice &dst, const 
std::vector<KeyWeight>
 
 rocksdb::Status ZSet::Union(const std::vector<KeyWeight> &keys_weights, 
AggregateMethod aggregate_method,
                             uint64_t *saved_cnt, std::vector<MemberScore> 
*members) {
+  std::vector<std::string> lock_keys;
+  lock_keys.reserve(keys_weights.size());
+  for (const auto &key_weight : keys_weights) {
+    std::string ns_key = AppendNamespacePrefix(key_weight.key);
+    lock_keys.emplace_back(std::move(ns_key));
+  }
+  MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
+
   if (saved_cnt) *saved_cnt = 0;
 
   std::map<std::string, double> dst_zset;

Reply via email to