This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 1e25be6a refactor: hoist key mutexes to ExecuteCommands (#2620)
1e25be6a is described below
commit 1e25be6a4e20242c6f87c7b561f87666c6359437
Author: SiLe Zhou <[email protected]>
AuthorDate: Tue Nov 5 00:25:14 2024 +0800
refactor: hoist key mutexes to ExecuteCommands (#2620)
Co-authored-by: Twice <[email protected]>
Co-authored-by: hulk <[email protected]>
---
src/commands/blocking_commander.h | 11 ++++-
src/commands/cmd_list.cc | 29 ++++++++++++
src/commands/cmd_stream.cc | 8 ++++
src/commands/cmd_zset.cc | 20 ++++++++
src/server/redis_connection.cc | 18 ++++++-
src/storage/redis_db.cc | 20 ++++----
src/types/redis_bitmap.cc | 10 +---
src/types/redis_bloom_chain.cc | 2 -
src/types/redis_hash.cc | 5 +-
src/types/redis_hyperloglog.cc | 2 -
src/types/redis_json.cc | 21 +--------
src/types/redis_list.cc | 10 +---
src/types/redis_set.cc | 30 ------------
src/types/redis_sortedint.cc | 2 -
src/types/redis_stream.cc | 20 ++------
src/types/redis_string.cc | 36 ++------------
src/types/redis_string.h | 3 +-
src/types/redis_zset.cc | 36 --------------
tests/gocase/unit/protocol/regression_test.go | 3 --
tests/gocase/unit/type/list/list_test.go | 68 ++-------------------------
tests/gocase/unit/type/zset/zset_test.go | 7 ---
21 files changed, 107 insertions(+), 254 deletions(-)
diff --git a/src/commands/blocking_commander.h
b/src/commands/blocking_commander.h
index 05883a8a..27496819 100644
--- a/src/commands/blocking_commander.h
+++ b/src/commands/blocking_commander.h
@@ -21,6 +21,7 @@
#pragma once
#include "commander.h"
+#include "common/lock_manager.h"
#include "event_util.h"
#include "server/redis_connection.h"
@@ -44,6 +45,10 @@ class BlockingCommander : public Commander,
// in other words, returning true indicates ending the blocking
virtual bool OnBlockingWrite() = 0;
+ // GetLocks() locks the keys of the BlockingCommander with MultiLockGuard.
+ // When OnWrite() is triggered, BlockingCommander needs to relock the keys.
+ virtual MultiLockGuard GetLocks() = 0;
+
// to start the blocking process
// usually put to the end of the Execute method
Status StartBlocking(int64_t timeout, std::string *output) {
@@ -63,7 +68,11 @@ class BlockingCommander : public Commander,
}
void OnWrite(bufferevent *bev) {
- bool done = OnBlockingWrite();
+ bool done{false};
+ {
+ auto guard = GetLocks();
+ done = OnBlockingWrite();
+ }
if (!done) {
// The connection may be waked up but can't pop from the datatype.
diff --git a/src/commands/cmd_list.cc b/src/commands/cmd_list.cc
index cf588ec0..1a9f5d03 100644
--- a/src/commands/cmd_list.cc
+++ b/src/commands/cmd_list.cc
@@ -313,6 +313,16 @@ class CommandBPop : public BlockingCommander {
return s;
}
+ MultiLockGuard GetLocks() override {
+ std::vector<std::string> lock_keys;
+ lock_keys.reserve(keys_.size());
+ for (const auto &key : keys_) {
+ auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key,
srv_->storage->IsSlotIdEncoded());
+ lock_keys.emplace_back(std::move(ns_key));
+ }
+ return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
+ }
+
bool OnBlockingWrite() override {
engine::Context ctx(srv_->storage);
auto s = TryPopFromList(ctx);
@@ -436,6 +446,16 @@ class CommandBLMPop : public BlockingCommander {
}
}
+ MultiLockGuard GetLocks() override {
+ std::vector<std::string> lock_keys;
+ lock_keys.reserve(keys_.size());
+ for (const auto &key : keys_) {
+ auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key,
srv_->storage->IsSlotIdEncoded());
+ lock_keys.emplace_back(std::move(ns_key));
+ }
+ return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
+ }
+
bool OnBlockingWrite() override {
engine::Context ctx(srv_->storage);
auto s = ExecuteUnblocked(ctx);
@@ -767,6 +787,15 @@ class CommandBLMove : public BlockingCommander {
void UnblockKeys() override { srv_->UnblockOnKey(args_[1], conn_); }
+ MultiLockGuard GetLocks() override {
+ std::vector<std::string> lock_keys{
+ ComposeNamespaceKey(conn_->GetNamespace(), args_[1],
srv_->storage->IsSlotIdEncoded())};
+ if (args_[1] != args_[2]) {
+ lock_keys.emplace_back(ComposeNamespaceKey(conn_->GetNamespace(),
args_[2], srv_->storage->IsSlotIdEncoded()));
+ }
+ return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
+ }
+
bool OnBlockingWrite() override {
redis::List list_db(srv_->storage, conn_->GetNamespace());
std::string elem;
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 521e5ca3..0fe1cd3b 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -1635,6 +1635,14 @@ class CommandXReadGroup : public Commander,
redis::Stream stream_db(srv_->storage, conn_->GetNamespace());
std::vector<StreamReadResult> results;
+
+ std::vector<std::string> lock_keys;
+ lock_keys.reserve(streams_.size());
+ for (auto &stream_name : streams_) {
+ auto ns_key = stream_db.AppendNamespacePrefix(stream_name);
+ lock_keys.emplace_back(std::move(ns_key));
+ }
+ MultiLockGuard guard(srv_->storage->GetLockManager(), lock_keys);
engine::Context ctx(srv_->storage);
for (size_t i = 0; i < streams_.size(); ++i) {
redis::StreamRangeOptions options;
diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc
index 5b1f392f..99285a62 100644
--- a/src/commands/cmd_zset.cc
+++ b/src/commands/cmd_zset.cc
@@ -366,6 +366,16 @@ class CommandBZPop : public BlockingCommander {
conn_->Reply(output);
}
+ MultiLockGuard GetLocks() override {
+ std::vector<std::string> lock_keys;
+ lock_keys.reserve(keys_.size());
+ for (const auto &key : keys_) {
+ auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key,
srv_->storage->IsSlotIdEncoded());
+ lock_keys.emplace_back(std::move(ns_key));
+ }
+ return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
+ }
+
bool OnBlockingWrite() override {
std::string user_key;
std::vector<MemberScore> member_scores;
@@ -548,6 +558,16 @@ class CommandBZMPop : public BlockingCommander {
std::string NoopReply(const Connection *conn) override { return
conn->NilString(); }
+ MultiLockGuard GetLocks() override {
+ std::vector<std::string> lock_keys;
+ lock_keys.reserve(keys_.size());
+ for (const auto &key : keys_) {
+ auto ns_key = ComposeNamespaceKey(conn_->GetNamespace(), key,
srv_->storage->IsSlotIdEncoded());
+ lock_keys.emplace_back(std::move(ns_key));
+ }
+ return MultiLockGuard(srv_->storage->GetLockManager(), lock_keys);
+ }
+
bool OnBlockingWrite() override {
std::string user_key;
std::vector<MemberScore> member_scores;
diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc
index 70abfe70..54899afe 100644
--- a/src/server/redis_connection.cc
+++ b/src/server/redis_connection.cc
@@ -489,9 +489,24 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
SetLastCmd(cmd_name);
{
+ std::optional<MultiLockGuard> guard;
+ if (cmd_flags & kCmdWrite) {
+ std::vector<std::string> lock_keys;
+ attributes->ForEachKeyRange(
+ [&lock_keys, this](const std::vector<std::string> &args, const
CommandKeyRange &key_range) {
+ key_range.ForEachKey(
+ [&, this](const std::string &key) {
+ auto ns_key = ComposeNamespaceKey(ns_, key,
srv_->storage->IsSlotIdEncoded());
+ lock_keys.emplace_back(std::move(ns_key));
+ },
+ args);
+ },
+ cmd_tokens);
+
+ guard.emplace(srv_->storage->GetLockManager(), lock_keys);
+ }
engine::Context ctx(srv_->storage);
- // TODO: transaction support for index recording
std::vector<GlobalIndexer::RecordResult> index_records;
if (!srv_->index_mgr.index_map.empty() && IsCmdForIndexing(cmd_flags,
attributes->category) &&
!config->cluster_enabled) {
@@ -512,7 +527,6 @@ void Connection::ExecuteCommands(std::deque<CommandTokens>
*to_process_cmds) {
}
s = ExecuteCommand(ctx, cmd_name, cmd_tokens, current_cmd.get(), &reply);
- // TODO: transaction support for index updating
for (const auto &record : index_records) {
auto s = GlobalIndexer::Update(ctx, record);
if (!s.IsOK() && !s.Is<Status::TypeMismatched>()) {
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 5eabd8d8..26864f3e 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -112,7 +112,7 @@ rocksdb::Status Database::Expire(engine::Context &ctx,
const Slice &user_key, ui
std::string value;
Metadata metadata(kRedisNone, false);
- LockGuard guard(storage_->GetLockManager(), ns_key);
+
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(),
metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;
@@ -150,7 +150,7 @@ rocksdb::Status Database::Del(engine::Context &ctx, const
Slice &user_key) {
std::string ns_key = AppendNamespacePrefix(user_key);
std::string value;
- LockGuard guard(storage_->GetLockManager(), ns_key);
+
rocksdb::Status s = storage_->Get(ctx, ctx.GetReadOptions(),
metadata_cf_handle_, ns_key, &value);
if (!s.ok()) return s;
Metadata metadata(kRedisNone, false);
@@ -165,13 +165,12 @@ rocksdb::Status Database::Del(engine::Context &ctx, const
Slice &user_key) {
rocksdb::Status Database::MDel(engine::Context &ctx, const std::vector<Slice>
&keys, uint64_t *deleted_cnt) {
*deleted_cnt = 0;
- std::vector<std::string> lock_keys;
- lock_keys.reserve(keys.size());
+ std::vector<std::string> ns_keys;
+ ns_keys.reserve(keys.size());
for (const auto &key : keys) {
std::string ns_key = AppendNamespacePrefix(key);
- lock_keys.emplace_back(std::move(ns_key));
+ ns_keys.emplace_back(std::move(ns_key));
}
- MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisNone);
@@ -181,8 +180,8 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const
std::vector<Slice> &k
}
std::vector<Slice> slice_keys;
- slice_keys.reserve(lock_keys.size());
- for (const auto &ns_key : lock_keys) {
+ slice_keys.reserve(ns_keys.size());
+ for (const auto &ns_key : ns_keys) {
slice_keys.emplace_back(ns_key);
}
@@ -202,7 +201,7 @@ rocksdb::Status Database::MDel(engine::Context &ctx, const
std::vector<Slice> &k
if (!s.ok()) continue;
if (metadata.Expired()) continue;
- s = batch->Delete(metadata_cf_handle_, lock_keys[i]);
+ s = batch->Delete(metadata_cf_handle_, ns_keys[i]);
if (!s.ok()) return s;
*deleted_cnt += 1;
}
@@ -652,9 +651,6 @@ rocksdb::Status Database::typeInternal(engine::Context
&ctx, const Slice &key, R
rocksdb::Status Database::Copy(engine::Context &ctx, const std::string &key,
const std::string &new_key, bool nx,
bool delete_old, CopyResult *res) {
- std::vector<std::string> lock_keys = {key, new_key};
- MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
-
RedisType type = kRedisNone;
auto s = typeInternal(ctx, key, &type);
if (!s.ok()) return s;
diff --git a/src/types/redis_bitmap.cc b/src/types/redis_bitmap.cc
index ae9fefcd..e75deea3 100644
--- a/src/types/redis_bitmap.cc
+++ b/src/types/redis_bitmap.cc
@@ -184,7 +184,6 @@ rocksdb::Status Bitmap::SetBit(engine::Context &ctx, const
Slice &user_key, uint
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
BitmapMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -461,7 +460,6 @@ rocksdb::Status Bitmap::BitOp(engine::Context &ctx,
BitOpFlags op_flag, const st
const Slice &user_key, const std::vector<Slice>
&op_keys, int64_t *len) {
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
std::vector<std::pair<std::string, BitmapMetadata>> meta_pairs;
uint64_t max_bitmap_size = 0;
@@ -824,15 +822,9 @@ template <bool ReadOnly>
rocksdb::Status Bitmap::bitfield(engine::Context &ctx, const Slice &user_key,
const std::vector<BitfieldOperation> &ops,
std::vector<std::optional<BitfieldValue>>
*rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
-
- std::optional<LockGuard> guard;
- if constexpr (!ReadOnly) {
- guard = LockGuard(storage_->GetLockManager(), ns_key);
- }
-
BitmapMetadata metadata;
std::string raw_value;
- // TODO(mwish): maintain snapshot for read-only bitfield.
+
auto s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) {
return s;
diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc
index 30c07fe3..7dea8189 100644
--- a/src/types/redis_bloom_chain.cc
+++ b/src/types/redis_bloom_chain.cc
@@ -126,7 +126,6 @@ rocksdb::Status BloomChain::Reserve(engine::Context &ctx,
const Slice &user_key,
uint16_t expansion) {
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
BloomChainMetadata bloom_chain_metadata;
rocksdb::Status s = getBloomChainMetadata(ctx, ns_key,
&bloom_chain_metadata);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -156,7 +155,6 @@ rocksdb::Status BloomChain::InsertCommon(engine::Context
&ctx, const Slice &user
const BloomFilterInsertOptions
&insert_options,
std::vector<BloomFilterAddResult>
*rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &metadata);
diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc
index 8930c9fc..905efdad 100644
--- a/src/types/redis_hash.cc
+++ b/src/types/redis_hash.cc
@@ -66,7 +66,6 @@ rocksdb::Status Hash::IncrBy(engine::Context &ctx, const
Slice &user_key, const
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -117,7 +116,6 @@ rocksdb::Status Hash::IncrByFloat(engine::Context &ctx,
const Slice &user_key, c
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -211,7 +209,7 @@ rocksdb::Status Hash::Delete(engine::Context &ctx, const
Slice &user_key, const
WriteBatchLogData log_data(kRedisHash);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
- LockGuard guard(storage_->GetLockManager(), ns_key);
+
s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
@@ -245,7 +243,6 @@ rocksdb::Status Hash::MSet(engine::Context &ctx, const
Slice &user_key, const st
*added_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
HashMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
diff --git a/src/types/redis_hyperloglog.cc b/src/types/redis_hyperloglog.cc
index 562a7d6b..b7b2197f 100644
--- a/src/types/redis_hyperloglog.cc
+++ b/src/types/redis_hyperloglog.cc
@@ -112,7 +112,6 @@ rocksdb::Status HyperLogLog::Add(engine::Context &ctx,
const Slice &user_key,
*ret = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
HyperLogLogMetadata metadata{};
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
@@ -238,7 +237,6 @@ rocksdb::Status HyperLogLog::Merge(engine::Context &ctx,
const Slice &dest_user_
}
std::string dest_key = AppendNamespacePrefix(dest_user_key);
- LockGuard guard(storage_->GetLockManager(), dest_key);
std::vector<std::string> registers;
HyperLogLogMetadata metadata;
diff --git a/src/types/redis_json.cc b/src/types/redis_json.cc
index 5120573e..ba331ef9 100644
--- a/src/types/redis_json.cc
+++ b/src/types/redis_json.cc
@@ -124,8 +124,6 @@ rocksdb::Status Json::Set(engine::Context &ctx, const
std::string &user_key, con
const std::string &value) {
auto ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
-
JsonMetadata metadata;
JsonValue origin;
auto s = read(ctx, ns_key, &metadata, &origin);
@@ -190,8 +188,6 @@ rocksdb::Status Json::ArrAppend(engine::Context &ctx, const
std::string &user_ke
append_values.emplace_back(std::move(value.value));
}
- LockGuard guard(storage_->GetLockManager(), ns_key);
-
JsonMetadata metadata;
JsonValue value;
auto s = read(ctx, ns_key, &metadata, &value);
@@ -248,8 +244,6 @@ rocksdb::Status Json::Merge(engine::Context &ctx, const
std::string &user_key, c
const std::string &merge_value, bool &result) {
auto ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
-
JsonMetadata metadata;
JsonValue json_val;
@@ -279,8 +273,6 @@ rocksdb::Status Json::Clear(engine::Context &ctx, const
std::string &user_key, c
size_t *result) {
auto ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
-
JsonValue json_val;
JsonMetadata metadata;
auto s = read(ctx, ns_key, &metadata, &json_val);
@@ -327,8 +319,6 @@ rocksdb::Status Json::ArrInsert(engine::Context &ctx, const
std::string &user_ke
insert_values.emplace_back(std::move(value.value));
}
- LockGuard guard(storage_->GetLockManager(), ns_key);
-
JsonMetadata metadata;
JsonValue value;
auto s = read(ctx, ns_key, &metadata, &value);
@@ -349,8 +339,6 @@ rocksdb::Status Json::Toggle(engine::Context &ctx, const
std::string &user_key,
Optionals<bool> *results) {
auto ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
-
JsonMetadata metadata;
JsonValue origin;
auto s = read(ctx, ns_key, &metadata, &origin);
@@ -367,8 +355,6 @@ rocksdb::Status Json::ArrPop(engine::Context &ctx, const
std::string &user_key,
std::vector<std::optional<JsonValue>> *results) {
auto ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
-
JsonMetadata metadata;
JsonValue json_val;
auto s = read(ctx, ns_key, &metadata, &json_val);
@@ -403,8 +389,6 @@ rocksdb::Status Json::ArrTrim(engine::Context &ctx, const
std::string &user_key,
int64_t stop, Optionals<uint64_t> *results) {
auto ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
-
JsonMetadata metadata;
JsonValue json_val;
auto s = read(ctx, ns_key, &metadata, &json_val);
@@ -424,7 +408,7 @@ rocksdb::Status Json::Del(engine::Context &ctx, const
std::string &user_key, con
*result = 0;
auto ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
+
JsonValue json_val;
JsonMetadata metadata;
auto s = read(ctx, ns_key, &metadata, &json_val);
@@ -473,8 +457,6 @@ rocksdb::Status Json::numop(engine::Context &ctx,
JsonValue::NumOpEnum op, const
auto s = read(ctx, ns_key, &metadata, &json_val);
if (!s.ok()) return s;
- LockGuard guard(storage_->GetLockManager(), ns_key);
-
auto res = json_val.NumOp(path, number, op, result);
if (!res) {
return rocksdb::Status::InvalidArgument(res.Msg());
@@ -571,7 +553,6 @@ rocksdb::Status Json::MSet(engine::Context &ctx, const
std::vector<std::string>
std::string ns_key = AppendNamespacePrefix(user_key);
ns_keys.emplace_back(std::move(ns_key));
}
- MultiLockGuard guard(storage_->GetLockManager(), ns_keys);
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisJson);
diff --git a/src/types/redis_list.cc b/src/types/redis_list.cc
index e640ffb6..acef1ed9 100644
--- a/src/types/redis_list.cc
+++ b/src/types/redis_list.cc
@@ -63,7 +63,7 @@ rocksdb::Status List::push(engine::Context &ctx, const Slice
&user_key, const st
WriteBatchLogData log_data(kRedisList, {std::to_string(cmd)});
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
- LockGuard guard(storage_->GetLockManager(), ns_key);
+
s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !(create_if_missing && s.IsNotFound())) {
return s.IsNotFound() ? rocksdb::Status::OK() : s;
@@ -108,7 +108,6 @@ rocksdb::Status List::PopMulti(engine::Context &ctx, const
rocksdb::Slice &user_
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
@@ -177,7 +176,6 @@ rocksdb::Status List::Rem(engine::Context &ctx, const Slice
&user_key, int count
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
@@ -271,7 +269,6 @@ rocksdb::Status List::Insert(engine::Context &ctx, const
Slice &user_key, const
*new_size = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
@@ -462,7 +459,6 @@ rocksdb::Status List::Pos(engine::Context &ctx, const Slice
&user_key, const Sli
rocksdb::Status List::Set(engine::Context &ctx, const Slice &user_key, int
index, Slice elem) {
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
@@ -501,7 +497,6 @@ rocksdb::Status List::lmoveOnSingleList(engine::Context
&ctx, const rocksdb::Sli
std::string *elem) {
std::string ns_key = AppendNamespacePrefix(src);
- LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) {
@@ -567,8 +562,6 @@ rocksdb::Status List::lmoveOnTwoLists(engine::Context &ctx,
const rocksdb::Slice
std::string src_ns_key = AppendNamespacePrefix(src);
std::string dst_ns_key = AppendNamespacePrefix(dst);
- std::vector<std::string> lock_keys{src_ns_key, dst_ns_key};
- MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
ListMetadata src_metadata(false);
auto s = GetMetadata(ctx, src_ns_key, &src_metadata);
if (!s.ok()) {
@@ -636,7 +629,6 @@ rocksdb::Status List::Trim(engine::Context &ctx, const
Slice &user_key, int star
uint32_t trim_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
diff --git a/src/types/redis_set.cc b/src/types/redis_set.cc
index 043e0d36..01849328 100644
--- a/src/types/redis_set.cc
+++ b/src/types/redis_set.cc
@@ -37,7 +37,6 @@ rocksdb::Status Set::GetMetadata(engine::Context &ctx, const
Slice &ns_key, SetM
rocksdb::Status Set::Overwrite(engine::Context &ctx, Slice user_key, const
std::vector<std::string> &members) {
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
SetMetadata metadata;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisSet);
@@ -62,7 +61,6 @@ rocksdb::Status Set::Add(engine::Context &ctx, const Slice
&user_key, const std:
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
SetMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -100,7 +98,6 @@ rocksdb::Status Set::Remove(engine::Context &ctx, const
Slice &user_key, const s
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
SetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
@@ -218,9 +215,6 @@ rocksdb::Status Set::Take(engine::Context &ctx, const Slice
&user_key, std::vect
std::string ns_key = AppendNamespacePrefix(user_key);
- std::optional<LockGuard> lock_guard;
- if (pop) lock_guard.emplace(storage_->GetLockManager(), ns_key);
-
SetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
@@ -290,14 +284,6 @@ rocksdb::Status Set::Scan(engine::Context &ctx, const
Slice &user_key, const std
* DIFF key1 key2 key3 = {b,d}
*/
rocksdb::Status Set::Diff(engine::Context &ctx, const std::vector<Slice>
&keys, std::vector<std::string> *members) {
- std::vector<std::string> lock_keys;
- lock_keys.reserve(keys.size());
- for (const auto key : keys) {
- std::string ns_key = AppendNamespacePrefix(key);
- lock_keys.emplace_back(std::move(ns_key));
- }
- MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
-
members->clear();
std::vector<std::string> source_members;
auto s = Members(ctx, keys[0], &source_members);
@@ -329,14 +315,6 @@ rocksdb::Status Set::Diff(engine::Context &ctx, const
std::vector<Slice> &keys,
* UNION key1 key2 key3 = {a,b,c,d,e}
*/
rocksdb::Status Set::Union(engine::Context &ctx, const std::vector<Slice>
&keys, std::vector<std::string> *members) {
- std::vector<std::string> lock_keys;
- lock_keys.reserve(keys.size());
- for (const auto key : keys) {
- std::string ns_key = AppendNamespacePrefix(key);
- lock_keys.emplace_back(std::move(ns_key));
- }
- MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
-
members->clear();
std::map<std::string, bool> union_members;
@@ -363,14 +341,6 @@ rocksdb::Status Set::Union(engine::Context &ctx, const
std::vector<Slice> &keys,
* INTER key1 key2 key3 = {c}
*/
rocksdb::Status Set::Inter(engine::Context &ctx, const std::vector<Slice>
&keys, std::vector<std::string> *members) {
- std::vector<std::string> lock_keys;
- lock_keys.reserve(keys.size());
- for (const auto key : keys) {
- std::string ns_key = AppendNamespacePrefix(key);
- lock_keys.emplace_back(std::move(ns_key));
- }
- MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
-
members->clear();
std::map<std::string, size_t> member_counters;
diff --git a/src/types/redis_sortedint.cc b/src/types/redis_sortedint.cc
index 98e1f7a5..f29ca7de 100644
--- a/src/types/redis_sortedint.cc
+++ b/src/types/redis_sortedint.cc
@@ -38,7 +38,6 @@ rocksdb::Status Sortedint::Add(engine::Context &ctx, const
Slice &user_key, cons
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
SortedintMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -75,7 +74,6 @@ rocksdb::Status Sortedint::Remove(engine::Context &ctx, const
Slice &user_key, c
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
SortedintMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 009a98c0..df9284c2 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -96,7 +96,6 @@ rocksdb::Status Stream::Add(engine::Context &ctx, const Slice
&stream_name, cons
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -333,7 +332,6 @@ rocksdb::Status Stream::DeletePelEntries(engine::Context
&ctx, const Slice &stre
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) {
@@ -400,7 +398,7 @@ rocksdb::Status Stream::ClaimPelEntries(engine::Context
&ctx, const Slice &strea
const std::vector<StreamEntryID>
&entry_ids, const StreamClaimOptions &options,
StreamClaimResult *result) {
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
+
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
@@ -536,7 +534,6 @@ rocksdb::Status Stream::AutoClaim(engine::Context &ctx,
const Slice &stream_name
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata(false);
- LockGuard guard(storage_->GetLockManager(), ns_key);
auto s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) { // not found will be caught by outside with no such key or
consumer group
return s;
@@ -692,7 +689,6 @@ rocksdb::Status Stream::CreateGroup(engine::Context &ctx,
const Slice &stream_na
}
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
@@ -745,7 +741,6 @@ rocksdb::Status Stream::DestroyGroup(engine::Context &ctx,
const Slice &stream_n
*delete_cnt = 0;
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
@@ -849,14 +844,14 @@ rocksdb::Status
Stream::createConsumerWithoutLock(engine::Context &ctx, const Sl
rocksdb::Status Stream::CreateConsumer(engine::Context &ctx, const Slice
&stream_name, const std::string &group_name,
const std::string &consumer_name, int
*created_number) {
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
+
return createConsumerWithoutLock(ctx, stream_name, group_name,
consumer_name, created_number);
}
rocksdb::Status Stream::DestroyConsumer(engine::Context &ctx, const Slice
&stream_name, const std::string &group_name,
const std::string &consumer_name,
uint64_t &deleted_pel) {
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
+
StreamMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
@@ -923,7 +918,7 @@ rocksdb::Status Stream::DestroyConsumer(engine::Context
&ctx, const Slice &strea
rocksdb::Status Stream::GroupSetId(engine::Context &ctx, const Slice
&stream_name, const std::string &group_name,
const StreamXGroupCreateOptions &options) {
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
+
StreamMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
@@ -965,7 +960,6 @@ rocksdb::Status Stream::DeleteEntries(engine::Context &ctx,
const Slice &stream_
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) {
@@ -1211,7 +1205,6 @@ rocksdb::Status Stream::GetStreamInfo(engine::Context
&ctx, const rocksdb::Slice
uint64_t count, StreamInfo *info) {
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
@@ -1452,7 +1445,6 @@ rocksdb::Status Stream::RangeWithPending(engine::Context
&ctx, const Slice &stre
}
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
@@ -1582,8 +1574,6 @@ rocksdb::Status Stream::Trim(engine::Context &ctx, const
Slice &stream_name, con
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
-
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) {
@@ -1696,8 +1686,6 @@ rocksdb::Status Stream::SetId(engine::Context &ctx, const
Slice &stream_name, co
std::string ns_key = AppendNamespacePrefix(stream_name);
- LockGuard guard(storage_->GetLockManager(), ns_key);
-
StreamMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) {
diff --git a/src/types/redis_string.cc b/src/types/redis_string.cc
index 7d785342..210fb697 100644
--- a/src/types/redis_string.cc
+++ b/src/types/redis_string.cc
@@ -118,7 +118,6 @@ rocksdb::Status String::Append(engine::Context &ctx, const
std::string &user_key
*new_size = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
std::string raw_value;
rocksdb::Status s = getRawValue(ctx, ns_key, &raw_value);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -156,7 +155,6 @@ rocksdb::Status String::GetEx(engine::Context &ctx, const
std::string &user_key,
std::optional<uint64_t> expire) {
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
rocksdb::Status s = getValue(ctx, ns_key, value);
if (!s.ok()) return s;
@@ -190,7 +188,6 @@ rocksdb::Status String::GetSet(engine::Context &ctx, const
std::string &user_key
rocksdb::Status String::GetDel(engine::Context &ctx, const std::string
&user_key, std::string *value) {
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
rocksdb::Status s = getValue(ctx, ns_key, value);
if (!s.ok()) return s;
@@ -199,7 +196,7 @@ rocksdb::Status String::GetDel(engine::Context &ctx, const
std::string &user_key
rocksdb::Status String::Set(engine::Context &ctx, const std::string &user_key,
const std::string &value) {
std::vector<StringPair> pairs{StringPair{user_key, value}};
- return MSet(ctx, pairs, /*expire=*/0, /*lock=*/true);
+ return MSet(ctx, pairs, /*expire=*/0);
}
rocksdb::Status String::Set(engine::Context &ctx, const std::string &user_key,
const std::string &value,
@@ -207,7 +204,6 @@ rocksdb::Status String::Set(engine::Context &ctx, const
std::string &user_key, c
uint64_t expire = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
bool need_old_value = args.type != StringSetType::NONE || args.get ||
args.keep_ttl;
if (need_old_value) {
std::string old_value;
@@ -289,7 +285,6 @@ rocksdb::Status String::SetRange(engine::Context &ctx,
const std::string &user_k
const std::string &value, uint64_t *new_size)
{
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
std::string raw_value;
rocksdb::Status s = getRawValue(ctx, ns_key, &raw_value);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -329,7 +324,6 @@ rocksdb::Status String::IncrBy(engine::Context &ctx, const
std::string &user_key
int64_t *new_value) {
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
std::string raw_value;
rocksdb::Status s = getRawValue(ctx, ns_key, &raw_value);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -366,7 +360,7 @@ rocksdb::Status String::IncrBy(engine::Context &ctx, const
std::string &user_key
rocksdb::Status String::IncrByFloat(engine::Context &ctx, const std::string
&user_key, double increment,
double *new_value) {
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
+
std::string raw_value;
rocksdb::Status s = getRawValue(ctx, ns_key, &raw_value);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -397,21 +391,7 @@ rocksdb::Status String::IncrByFloat(engine::Context &ctx,
const std::string &use
return updateRawValue(ctx, ns_key, raw_value);
}
-rocksdb::Status String::MSet(engine::Context &ctx, const
std::vector<StringPair> &pairs, uint64_t expire_ms,
- bool lock) {
- // Data race, key string maybe overwrite by other key while didn't lock the
keys here,
- // to improve the set performance
- std::optional<MultiLockGuard> guard;
- if (lock) {
- std::vector<std::string> lock_keys;
- lock_keys.reserve(pairs.size());
- for (const StringPair &pair : pairs) {
- std::string ns_key = AppendNamespacePrefix(pair.key);
- lock_keys.emplace_back(std::move(ns_key));
- }
- guard.emplace(storage_->GetLockManager(), lock_keys);
- }
-
+rocksdb::Status String::MSet(engine::Context &ctx, const
std::vector<StringPair> &pairs, uint64_t expire_ms) {
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisString);
auto s = batch->PutLogData(log_data.Encode());
@@ -434,25 +414,19 @@ rocksdb::Status String::MSetNX(engine::Context &ctx,
const std::vector<StringPai
*flag = false;
int exists = 0;
- std::vector<std::string> lock_keys;
- lock_keys.reserve(pairs.size());
std::vector<Slice> keys;
keys.reserve(pairs.size());
for (StringPair pair : pairs) {
std::string ns_key = AppendNamespacePrefix(pair.key);
- lock_keys.emplace_back(std::move(ns_key));
keys.emplace_back(pair.key);
}
- // Lock these keys before doing anything.
- MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
-
if (Exists(ctx, keys, &exists).ok() && exists > 0) {
return rocksdb::Status::OK();
}
- rocksdb::Status s = MSet(ctx, pairs, /*expire_ms=*/expire_ms,
/*lock=*/false);
+ rocksdb::Status s = MSet(ctx, pairs, /*expire_ms=*/expire_ms);
if (!s.ok()) return s;
*flag = true;
@@ -471,7 +445,6 @@ rocksdb::Status String::CAS(engine::Context &ctx, const
std::string &user_key, c
std::string current_value;
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
rocksdb::Status s = getValue(ctx, ns_key, ¤t_value);
if (!s.ok() && !s.IsNotFound()) {
@@ -507,7 +480,6 @@ rocksdb::Status String::CAD(engine::Context &ctx, const
std::string &user_key, c
std::string current_value;
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
rocksdb::Status s = getValue(ctx, ns_key, ¤t_value);
if (!s.ok() && !s.IsNotFound()) {
diff --git a/src/types/redis_string.h b/src/types/redis_string.h
index b218bae3..e5025d64 100644
--- a/src/types/redis_string.h
+++ b/src/types/redis_string.h
@@ -100,8 +100,7 @@ class String : public Database {
rocksdb::Status IncrByFloat(engine::Context &ctx, const std::string
&user_key, double increment, double *new_value);
std::vector<rocksdb::Status> MGet(engine::Context &ctx, const
std::vector<Slice> &keys,
std::vector<std::string> *values);
- rocksdb::Status MSet(engine::Context &ctx, const std::vector<StringPair>
&pairs, uint64_t expire_ms,
- bool lock = true);
+ rocksdb::Status MSet(engine::Context &ctx, const std::vector<StringPair>
&pairs, uint64_t expire_ms);
rocksdb::Status MSetNX(engine::Context &ctx, const std::vector<StringPair>
&pairs, uint64_t expire_ms, bool *flag);
rocksdb::Status CAS(engine::Context &ctx, const std::string &user_key, const
std::string &old_value,
const std::string &new_value, uint64_t expire_ms, int
*flag);
diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc
index 4474b348..5fa586d6 100644
--- a/src/types/redis_zset.cc
+++ b/src/types/redis_zset.cc
@@ -42,7 +42,6 @@ rocksdb::Status ZSet::Add(engine::Context &ctx, const Slice
&user_key, ZAddFlags
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
ZSetMetadata metadata;
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok() && !s.IsNotFound()) return s;
@@ -159,7 +158,6 @@ rocksdb::Status ZSet::Pop(engine::Context &ctx, const Slice
&user_key, int count
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
@@ -224,8 +222,6 @@ rocksdb::Status ZSet::RangeByRank(engine::Context &ctx,
const Slice &user_key, c
std::string ns_key = AppendNamespacePrefix(user_key);
- std::optional<LockGuard> lock_guard;
- if (spec.with_deletion) lock_guard.emplace(storage_->GetLockManager(),
ns_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
@@ -306,8 +302,6 @@ rocksdb::Status ZSet::RangeByScore(engine::Context &ctx,
const Slice &user_key,
std::string ns_key = AppendNamespacePrefix(user_key);
- std::optional<LockGuard> lock_guard;
- if (spec.with_deletion) lock_guard.emplace(storage_->GetLockManager(),
ns_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
@@ -429,10 +423,6 @@ rocksdb::Status ZSet::RangeByLex(engine::Context &ctx,
const Slice &user_key, co
std::string ns_key = AppendNamespacePrefix(user_key);
- std::optional<LockGuard> lock_guard;
- if (spec.with_deletion) {
- lock_guard.emplace(storage_->GetLockManager(), ns_key);
- }
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
@@ -526,7 +516,6 @@ rocksdb::Status ZSet::Remove(engine::Context &ctx, const
Slice &user_key, const
*removed_cnt = 0;
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
ZSetMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
@@ -621,7 +610,6 @@ rocksdb::Status ZSet::Rank(engine::Context &ctx, const
Slice &user_key, const Sl
rocksdb::Status ZSet::Overwrite(engine::Context &ctx, const Slice &user_key,
const MemberScores &mscores) {
std::string ns_key = AppendNamespacePrefix(user_key);
- LockGuard guard(storage_->GetLockManager(), ns_key);
ZSetMetadata metadata;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisZSet);
@@ -658,14 +646,6 @@ rocksdb::Status ZSet::InterStore(engine::Context &ctx,
const Slice &dst, const s
rocksdb::Status ZSet::Inter(engine::Context &ctx, const std::vector<KeyWeight>
&keys_weights,
AggregateMethod aggregate_method,
std::vector<MemberScore> *members) {
- std::vector<std::string> lock_keys;
- lock_keys.reserve(keys_weights.size());
- for (const auto &key_weight : keys_weights) {
- std::string ns_key = AppendNamespacePrefix(key_weight.key);
- lock_keys.emplace_back(std::move(ns_key));
- }
- MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
-
std::map<std::string, double> dst_zset;
std::map<std::string, size_t> member_counters;
std::vector<MemberScore> target_mscores;
@@ -723,14 +703,6 @@ rocksdb::Status ZSet::Inter(engine::Context &ctx, const
std::vector<KeyWeight> &
rocksdb::Status ZSet::InterCard(engine::Context &ctx, const
std::vector<std::string> &user_keys, uint64_t limit,
uint64_t *inter_cnt) {
- std::vector<std::string> lock_keys;
- lock_keys.reserve(user_keys.size());
- for (const auto &user_key : user_keys) {
- std::string ns_key = AppendNamespacePrefix(user_key);
- lock_keys.emplace_back(std::move(ns_key));
- }
- MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
-
std::vector<MemberScores> mscores_list;
mscores_list.reserve(user_keys.size());
RangeScoreSpec spec;
@@ -780,14 +752,6 @@ rocksdb::Status ZSet::UnionStore(engine::Context &ctx,
const Slice &dst, const s
rocksdb::Status ZSet::Union(engine::Context &ctx, const std::vector<KeyWeight>
&keys_weights,
AggregateMethod aggregate_method,
std::vector<MemberScore> *members) {
- std::vector<std::string> lock_keys;
- lock_keys.reserve(keys_weights.size());
- for (const auto &key_weight : keys_weights) {
- std::string ns_key = AppendNamespacePrefix(key_weight.key);
- lock_keys.emplace_back(std::move(ns_key));
- }
- MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
-
std::map<std::string, double> dst_zset;
std::vector<MemberScore> target_mscores;
uint64_t target_size = 0;
diff --git a/tests/gocase/unit/protocol/regression_test.go
b/tests/gocase/unit/protocol/regression_test.go
index 7dd4ea22..091bed26 100644
--- a/tests/gocase/unit/protocol/regression_test.go
+++ b/tests/gocase/unit/protocol/regression_test.go
@@ -23,7 +23,6 @@ import (
"context"
"fmt"
"testing"
- "time"
"github.com/apache/kvrocks/tests/gocase/util"
"github.com/stretchr/testify/require"
@@ -42,8 +41,6 @@ func TestRegression(t *testing.T) {
proto := "*3\r\n$5\r\nBLPOP\r\n$6\r\nhandle\r\n$1\r\n0\r\n"
require.NoError(t, c.Write(fmt.Sprintf("%s%s", proto, proto)))
- // TODO: Remove time.Sleep after fix issue #2473
- time.Sleep(100 * time.Millisecond)
resList := []string{"*2", "$6", "handle", "$1", "a"}
v := rdb.RPush(ctx, "handle", "a")
diff --git a/tests/gocase/unit/type/list/list_test.go
b/tests/gocase/unit/type/list/list_test.go
index 8b7a2c58..f7509300 100644
--- a/tests/gocase/unit/type/list/list_test.go
+++ b/tests/gocase/unit/type/list/list_test.go
@@ -376,22 +376,13 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
createList("blist", []string{"a", "b", large, "c", "d"})
- // TODO: Remove time.Sleep after fix issue #2473
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blpop", "blist", "1"))
- time.Sleep(100 * time.Millisecond)
rd.MustReadStrings(t, []string{"blist", "a"})
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("brpop", "blist", "1"))
- time.Sleep(100 * time.Millisecond)
rd.MustReadStrings(t, []string{"blist", "d"})
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blpop", "blist", "1"))
- time.Sleep(100 * time.Millisecond)
rd.MustReadStrings(t, []string{"blist", "b"})
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("brpop", "blist", "1"))
- time.Sleep(100 * time.Millisecond)
rd.MustReadStrings(t, []string{"blist", "c"})
})
@@ -400,23 +391,15 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
defer func() { require.NoError(t, rd.Close()) }()
createList("blist1", []string{"a", large, "c"})
createList("blist2", []string{"d", large, "f"})
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blpop", "blist1",
"blist2", "1"))
- time.Sleep(100 * time.Millisecond)
rd.MustReadStrings(t, []string{"blist1", "a"})
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("brpop", "blist1",
"blist2", "1"))
- time.Sleep(100 * time.Millisecond)
rd.MustReadStrings(t, []string{"blist1", "c"})
require.EqualValues(t, 1, rdb.LLen(ctx, "blist1").Val())
require.EqualValues(t, 3, rdb.LLen(ctx, "blist2").Val())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blpop", "blist2",
"blist2", "1"))
- time.Sleep(100 * time.Millisecond)
rd.MustReadStrings(t, []string{"blist2", "d"})
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("brpop", "blist2",
"blist2", "1"))
- time.Sleep(100 * time.Millisecond)
rd.MustReadStrings(t, []string{"blist2", "f"})
require.EqualValues(t, 1, rdb.LLen(ctx, "blist1").Val())
require.EqualValues(t, 1, rdb.LLen(ctx, "blist2").Val())
@@ -427,13 +410,9 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "blist1").Err())
createList("blist2", []string{"d", large, "f"})
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blpop", "blist1",
"blist2", "1"))
- time.Sleep(100 * time.Millisecond)
rd.MustReadStrings(t, []string{"blist2", "d"})
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("brpop", "blist1",
"blist2", "1"))
- time.Sleep(100 * time.Millisecond)
rd.MustReadStrings(t, []string{"blist2", "f"})
require.EqualValues(t, 0, rdb.LLen(ctx, "blist1").Val())
require.EqualValues(t, 1, rdb.LLen(ctx, "blist2").Val())
@@ -444,25 +423,17 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "list1", "list2").Err())
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rd.WriteArgs("blpop", "list1", "list2",
"list2", "list1", "0"))
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.LPush(ctx, "list1", "a").Err())
rd.MustReadStrings(t, []string{"list1", "a"})
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rd.WriteArgs("blpop", "list1", "list2",
"list2", "list1", "0"))
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.LPush(ctx, "list2", "b").Err())
rd.MustReadStrings(t, []string{"list2", "b"})
require.NoError(t, rdb.LPush(ctx, "list1", "a").Err())
require.NoError(t, rdb.LPush(ctx, "list2", "b").Err())
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rd.WriteArgs("blpop", "list1", "list2",
"list2", "list1", "0"))
- time.Sleep(time.Millisecond * 100)
rd.MustReadStrings(t, []string{"list1", "a"})
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rd.WriteArgs("blpop", "list1", "list2",
"list2", "list1", "0"))
- time.Sleep(time.Millisecond * 100)
rd.MustReadStrings(t, []string{"list2", "b"})
})
@@ -470,9 +441,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "blist", "target").Err())
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rd.WriteArgs("blpop", "blist", "0"))
- time.Sleep(time.Millisecond * 100)
require.EqualValues(t, 2, rdb.LPush(ctx, "blist", "foo",
"bar").Val())
rd.MustReadStrings(t, []string{"blist", "bar"})
require.Equal(t, "foo", rdb.LRange(ctx, "blist", 0,
-1).Val()[0])
@@ -483,9 +452,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "blist1").Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs(popType, "blist1", "0"))
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rdb.RPush(ctx, "blist1",
"foo").Err())
rd.MustReadStrings(t, []string{"blist1", "foo"})
require.EqualValues(t, 0, rdb.Exists(ctx,
"blist1").Val())
@@ -495,7 +462,6 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rd.WriteArgs(popType, "blist1",
"-1"))
- time.Sleep(100 * time.Millisecond)
rd.MustMatch(t, ".*negative.*")
})
@@ -505,7 +471,6 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rd.WriteArgs(popType, "blist1", "0"))
- time.Sleep(time.Millisecond * 1000)
require.NoError(t, rdb.RPush(ctx, "blist1",
"foo").Err())
rd.MustReadStrings(t, []string{"blist1", "foo"})
})
@@ -515,7 +480,6 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "blist1",
"blist2").Err())
require.NoError(t, rdb.Set(ctx, "blist2", "nolist",
0).Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs(popType, "blist1",
"blist2", "1"))
rd.MustMatch(t, ".*WRONGTYPE.*")
})
@@ -524,7 +488,6 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "blist1",
"blist2").Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs(popType, "blist1",
"blist2", "1"))
rd.MustMatch(t, "")
})
@@ -533,16 +496,12 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "blist1",
"blist2").Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs(popType, "blist1",
"blist2", "4"))
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rdb.RPush(ctx, "blist1",
"foo").Err())
rd.MustReadStrings(t, []string{"blist1", "foo"})
require.EqualValues(t, 0, rdb.Exists(ctx,
"blist1").Val())
require.EqualValues(t, 0, rdb.Exists(ctx,
"blist2").Val())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs(popType, "blist1",
"blist2", "1"))
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rdb.RPush(ctx, "blist2",
"foo").Err())
rd.MustReadStrings(t, []string{"blist2", "foo"})
require.EqualValues(t, 0, rdb.Exists(ctx,
"blist1").Val())
@@ -950,9 +909,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
require.NoError(t, rdb.Del(ctx,
"target_key{t}").Err())
require.NoError(t, rdb.RPush(ctx,
"target_key{t}", 1).Err())
createList("list{t}", []string{"a", "b", "c",
"d"})
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("lmove",
"list{t}", "target_key{t}", from, to))
- time.Sleep(100 * time.Millisecond)
r, err1 := rd.ReadLine()
require.Equal(t, "$1", r)
require.NoError(t, err1)
@@ -998,9 +955,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
require.NoError(t, rdb.Del(ctx,
"target_key{t}").Err())
require.NoError(t, rdb.RPush(ctx,
"target_key{t}", 1).Err())
createList("list{t}", []string{"a", "b", "c",
"d"})
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blmove",
"list{t}", "target_key{t}", from, to, "1"))
- time.Sleep(100 * time.Millisecond)
r, err1 := rd.ReadLine()
require.Equal(t, "$1", r)
require.NoError(t, err1)
@@ -1026,9 +981,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, "blist", "target").Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blmove", "blist", "target",
"left", "right", "0"))
- time.Sleep(100 * time.Millisecond)
require.EqualValues(t, 2, rdb.LPush(ctx, "blist", "foo",
"bar").Val())
rd.MustRead(t, "$3")
require.Equal(t, "bar", rdb.LRange(ctx, "target", 0,
-1).Val()[0])
@@ -1436,9 +1389,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, key1, key2).Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blmpop", "1", "1",
key1, direction, "count", "1"))
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.RPush(ctx, key1, "ONE",
"TWO").Err())
if direction == "LEFT" {
rd.MustReadStringsWithKey(t, key1,
[]string{"ONE"})
@@ -1452,9 +1403,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, key1, key2).Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blmpop", "1", "1",
key1, direction, "count", "2"))
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.RPush(ctx, key1, "ONE",
"TWO").Err())
if direction == "LEFT" {
rd.MustReadStringsWithKey(t, key1,
[]string{"ONE", "TWO"})
@@ -1468,9 +1417,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, key1, key2).Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blmpop", "1", "1",
key1, direction, "count", "10"))
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.RPush(ctx, key1, "ONE",
"TWO").Err())
if direction == "LEFT" {
rd.MustReadStringsWithKey(t, key1,
[]string{"ONE", "TWO"})
@@ -1484,9 +1431,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, key1, key2).Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blmpop", "1", "2",
key1, key2, direction, "count", "2"))
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.RPush(ctx, key1, "ONE",
"TWO").Err())
if direction == "LEFT" {
rd.MustReadStringsWithKey(t, key1,
[]string{"ONE", "TWO"})
@@ -1500,9 +1445,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, key1, key2).Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blmpop", "1", "2",
key1, key2, direction, "count", "2"))
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.RPush(ctx, key2, "one",
"two").Err())
if direction == "LEFT" {
rd.MustReadStringsWithKey(t, key2,
[]string{"one", "two"})
@@ -1516,9 +1459,10 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, key1, key2).Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blmpop", "1", "2",
key1, key2, direction, "count", "2"))
- time.Sleep(time.Millisecond * 100)
+ // https://github.com/apache/kvrocks/issues/2617
+ // WriteArgs are required to be executed first
+ time.Sleep(100 * time.Millisecond)
require.NoError(t, rdb.RPush(ctx, key2, "one",
"two").Err())
require.NoError(t, rdb.RPush(ctx, key1, "ONE",
"TWO").Err())
if direction == "LEFT" {
@@ -1534,9 +1478,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, key1, key2).Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blmpop", "1", "2",
key1, key2, direction))
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rdb.RPush(ctx, key2, "one",
"two").Err())
if direction == "LEFT" {
rd.MustReadStringsWithKey(t, key2,
[]string{"one"})
@@ -1551,9 +1493,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, key1, key2).Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blmpop", "1", "2",
key1, key2, direction))
- time.Sleep(time.Millisecond * 1200)
rd.MustMatch(t, "")
})
@@ -1562,9 +1502,7 @@ func testList(t *testing.T, configs
util.KvrocksServerConfigs) {
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
require.NoError(t, rdb.Del(ctx, key1, key2).Err())
- time.Sleep(100 * time.Millisecond)
require.NoError(t, rd.WriteArgs("blmpop", "0", "2",
key1, key2, direction, "count", "2"))
- time.Sleep(time.Millisecond * 1200)
require.NoError(t, rdb.RPush(ctx, key2, "one",
"two").Err())
if direction == "LEFT" {
rd.MustReadStringsWithKey(t, key2,
[]string{"one", "two"})
diff --git a/tests/gocase/unit/type/zset/zset_test.go
b/tests/gocase/unit/type/zset/zset_test.go
index ad57defa..fce7e96a 100644
--- a/tests/gocase/unit/type/zset/zset_test.go
+++ b/tests/gocase/unit/type/zset/zset_test.go
@@ -332,8 +332,6 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx
context.Context, enabledRES
rdb.ZAdd(ctx, "zsetb", redis.Z{Score: 1, Member: "d"},
redis.Z{Score: 2, Member: "e"})
require.EqualValues(t, 3, rdb.ZCard(ctx, "zseta").Val())
require.EqualValues(t, 2, rdb.ZCard(ctx, "zsetb").Val())
- // TODO: Remove time.Sleep after fix issue #2473
- time.Sleep(time.Millisecond * 100)
resultz := rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
require.Equal(t, redis.Z{Score: 1, Member: "a"}, resultz)
resultz = rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
@@ -349,9 +347,7 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx
context.Context, enabledRES
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rd.WriteArgs("bzpopmin", "zseta", "0"))
- time.Sleep(time.Millisecond * 100)
rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"})
rd.MustReadStrings(t, []string{"zseta", "a", "1"})
})
@@ -363,7 +359,6 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx
context.Context, enabledRES
rdb.ZAdd(ctx, "zsetb", redis.Z{Score: 1, Member: "d"},
redis.Z{Score: 2, Member: "e"})
require.EqualValues(t, 3, rdb.ZCard(ctx, "zseta").Val())
require.EqualValues(t, 2, rdb.ZCard(ctx, "zsetb").Val())
- time.Sleep(time.Millisecond * 100)
resultz := rdb.BZPopMax(ctx, 0, "zseta", "zsetb").Val().Z
require.Equal(t, redis.Z{Score: 3, Member: "c"}, resultz)
resultz = rdb.BZPopMax(ctx, 0, "zseta", "zsetb").Val().Z
@@ -379,9 +374,7 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx
context.Context, enabledRES
rd := srv.NewTCPClient()
defer func() { require.NoError(t, rd.Close()) }()
- time.Sleep(time.Millisecond * 100)
require.NoError(t, rd.WriteArgs("bzpopmax", "zseta", "0"))
- time.Sleep(time.Millisecond * 100)
rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"})
rd.MustReadStrings(t, []string{"zseta", "a", "1"})
})