This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new e52337ba Fix add/delete repeated members error in zset/set/hash types
(#1652)
e52337ba is described below
commit e52337ba82e7d3c6adb95af1acf641fbba8d3b06
Author: ChrisZMF <[email protected]>
AuthorDate: Sat Aug 12 07:19:05 2023 +0800
Fix add/delete repeated members error in zset/set/hash types (#1652)
Co-authored-by: mingfo.zou <[email protected]>
Co-authored-by: mwish <[email protected]>
Co-authored-by: Twice <[email protected]>
---
src/types/redis_hash.cc | 17 ++++++++++-----
src/types/redis_set.cc | 8 +++++++
src/types/redis_zset.cc | 45 ++++++++++++++++------------------------
tests/cppunit/types/hash_test.cc | 29 ++++++++++++++++++++++++++
tests/cppunit/types/set_test.cc | 18 ++++++++++++++++
tests/cppunit/types/zset_test.cc | 29 ++++++++++++++++++++++++++
6 files changed, 114 insertions(+), 32 deletions(-)
diff --git a/src/types/redis_hash.cc b/src/types/redis_hash.cc
index b61f8219..e9a48d94 100644
--- a/src/types/redis_hash.cc
+++ b/src/types/redis_hash.cc
@@ -217,7 +217,11 @@ rocksdb::Status Hash::Delete(const Slice &user_key, const
std::vector<Slice> &fi
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;
std::string sub_key, value;
+ std::unordered_set<std::string_view> field_set;
for (const auto &field : fields) {
+ if (!field_set.emplace(field.ToStringView()).second) {
+ continue;
+ }
InternalKey(ns_key, field, metadata.version,
storage_->IsSlotIdEncoded()).Encode(&sub_key);
s = storage_->Get(rocksdb::ReadOptions(), sub_key, &value);
if (s.ok()) {
@@ -250,12 +254,15 @@ rocksdb::Status Hash::MSet(const Slice &user_key, const
std::vector<FieldValue>
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisHash);
batch->PutLogData(log_data.Encode());
+ std::unordered_set<std::string_view> field_set;
+ for (auto it = field_values.rbegin(); it != field_values.rend(); it++) {
+ if (!field_set.insert(it->field).second) {
+ continue;
+ }
- for (const auto &fv : field_values) {
bool exists = false;
-
std::string sub_key;
- InternalKey(ns_key, fv.field, metadata.version,
storage_->IsSlotIdEncoded()).Encode(&sub_key);
+ InternalKey(ns_key, it->field, metadata.version,
storage_->IsSlotIdEncoded()).Encode(&sub_key);
if (metadata.size > 0) {
std::string field_value;
@@ -263,7 +270,7 @@ rocksdb::Status Hash::MSet(const Slice &user_key, const
std::vector<FieldValue>
if (!s.ok() && !s.IsNotFound()) return s;
if (s.ok()) {
- if (nx || field_value == fv.value) continue;
+ if (nx || field_value == it->value) continue;
exists = true;
}
@@ -271,7 +278,7 @@ rocksdb::Status Hash::MSet(const Slice &user_key, const
std::vector<FieldValue>
if (!exists) added++;
- batch->Put(sub_key, fv.value);
+ batch->Put(sub_key, it->value);
}
if (added > 0) {
diff --git a/src/types/redis_set.cc b/src/types/redis_set.cc
index 31970ff1..16a916be 100644
--- a/src/types/redis_set.cc
+++ b/src/types/redis_set.cc
@@ -70,7 +70,11 @@ rocksdb::Status Set::Add(const Slice &user_key, const
std::vector<Slice> &member
WriteBatchLogData log_data(kRedisSet);
batch->PutLogData(log_data.Encode());
std::string sub_key;
+ std::unordered_set<std::string_view> mset;
for (const auto &member : members) {
+ if (!mset.insert(member.ToStringView()).second) {
+ continue;
+ }
InternalKey(ns_key, member, metadata.version,
storage_->IsSlotIdEncoded()).Encode(&sub_key);
s = storage_->Get(rocksdb::ReadOptions(), sub_key, &value);
if (s.ok()) continue;
@@ -101,7 +105,11 @@ rocksdb::Status Set::Remove(const Slice &user_key, const
std::vector<Slice> &mem
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisSet);
batch->PutLogData(log_data.Encode());
+ std::unordered_set<std::string_view> mset;
for (const auto &member : members) {
+ if (!mset.insert(member.ToStringView()).second) {
+ continue;
+ }
InternalKey(ns_key, member, metadata.version,
storage_->IsSlotIdEncoded()).Encode(&sub_key);
s = storage_->Get(rocksdb::ReadOptions(), sub_key, &value);
if (!s.ok()) continue;
diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc
index d0994b06..85e2d0f7 100644
--- a/src/types/redis_zset.cc
+++ b/src/types/redis_zset.cc
@@ -52,24 +52,12 @@ rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags
flags, MemberScores *
WriteBatchLogData log_data(kRedisZSet);
batch->PutLogData(log_data.Encode());
std::string member_key;
- std::set<std::string> added_member_keys;
- for (int i = static_cast<int>(mscores->size() - 1); i >= 0; i--) {
- InternalKey(ns_key, (*mscores)[i].member, metadata.version,
storage_->IsSlotIdEncoded()).Encode(&member_key);
-
- // Fix the corner case that adds the same member which may add the score
- // column family many times and cause problems in the ZRANGE command.
- //
- // For example, we add members with `ZADD mykey 1 a 2 a` and `ZRANGE mykey
0 1`
- // return only one member(`a`) was expected but got the member `a` twice
now.
- //
- // The root cause of this issue was the score key was composed by member
and score,
- // so the last one can't overwrite the previous when the score was
different.
- // A simple workaround was add those members with reversed order and skip
the member if has added.
- if (added_member_keys.find(member_key) != added_member_keys.end()) {
+ std::unordered_set<std::string_view> added_member_keys;
+ for (auto it = mscores->rbegin(); it != mscores->rend(); it++) {
+ if (!added_member_keys.insert(it->member).second) {
continue;
}
- added_member_keys.insert(member_key);
-
+ InternalKey(ns_key, it->member, metadata.version,
storage_->IsSlotIdEncoded()).Encode(&member_key);
if (metadata.size > 0) {
std::string old_score_bytes;
s = storage_->Get(rocksdb::ReadOptions(), member_key, &old_score_bytes);
@@ -80,27 +68,26 @@ rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags
flags, MemberScores *
}
double old_score = DecodeDouble(old_score_bytes.data());
if (flags.HasIncr()) {
- if ((flags.HasLT() && (*mscores)[i].score >= 0) || (flags.HasGT() &&
(*mscores)[i].score <= 0)) {
+ if ((flags.HasLT() && it->score >= 0) || (flags.HasGT() && it->score
<= 0)) {
continue;
}
- (*mscores)[i].score += old_score;
- if (std::isnan((*mscores)[i].score)) {
+ it->score += old_score;
+ if (std::isnan(it->score)) {
return rocksdb::Status::InvalidArgument("resulting score is not a
number (NaN)");
}
}
- if ((*mscores)[i].score != old_score) {
- if ((flags.HasLT() && (*mscores)[i].score >= old_score) ||
- (flags.HasGT() && (*mscores)[i].score <= old_score)) {
+ if (it->score != old_score) {
+ if ((flags.HasLT() && it->score >= old_score) || (flags.HasGT() &&
it->score <= old_score)) {
continue;
}
- old_score_bytes.append((*mscores)[i].member);
+ old_score_bytes.append(it->member);
std::string old_score_key;
InternalKey(ns_key, old_score_bytes, metadata.version,
storage_->IsSlotIdEncoded()).Encode(&old_score_key);
batch->Delete(score_cf_handle_, old_score_key);
std::string new_score_bytes, new_score_key;
- PutDouble(&new_score_bytes, (*mscores)[i].score);
+ PutDouble(&new_score_bytes, it->score);
batch->Put(member_key, new_score_bytes);
- new_score_bytes.append((*mscores)[i].member);
+ new_score_bytes.append(it->member);
InternalKey(ns_key, new_score_bytes, metadata.version,
storage_->IsSlotIdEncoded()).Encode(&new_score_key);
batch->Put(score_cf_handle_, new_score_key, Slice());
changed++;
@@ -112,9 +99,9 @@ rocksdb::Status ZSet::Add(const Slice &user_key, ZAddFlags
flags, MemberScores *
continue;
}
std::string score_bytes, score_key;
- PutDouble(&score_bytes, (*mscores)[i].score);
+ PutDouble(&score_bytes, it->score);
batch->Put(member_key, score_bytes);
- score_bytes.append((*mscores)[i].member);
+ score_bytes.append(it->member);
InternalKey(ns_key, score_bytes, metadata.version,
storage_->IsSlotIdEncoded()).Encode(&score_key);
batch->Put(score_cf_handle_, score_key, Slice());
added++;
@@ -545,7 +532,11 @@ rocksdb::Status ZSet::Remove(const Slice &user_key, const
std::vector<Slice> &me
batch->PutLogData(log_data.Encode());
int removed = 0;
std::string member_key, score_key;
+ std::unordered_set<std::string_view> mset;
for (const auto &member : members) {
+ if (!mset.insert(member.ToStringView()).second) {
+ continue;
+ }
InternalKey(ns_key, member, metadata.version,
storage_->IsSlotIdEncoded()).Encode(&member_key);
std::string score_bytes;
s = storage_->Get(rocksdb::ReadOptions(), member_key, &score_bytes);
diff --git a/tests/cppunit/types/hash_test.cc b/tests/cppunit/types/hash_test.cc
index 51621e84..28e09ece 100644
--- a/tests/cppunit/types/hash_test.cc
+++ b/tests/cppunit/types/hash_test.cc
@@ -86,6 +86,35 @@ TEST_F(RedisHashTest, MGetAndMSet) {
hash_->Del(key_);
}
+TEST_F(RedisHashTest, MSetAndDeleteRepeated) {
+ std::vector<std::string> fields{"f1", "f1", "f2", "f3"};
+ std::vector<std::string> values{"v1", "v11", "v2", "v3"};
+ std::vector<FieldValue> fvs;
+ for (size_t i = 0; i < fields.size(); i++) {
+ fvs.emplace_back(fields[i], values[i]);
+ }
+
+ uint64_t ret = 0;
+ rocksdb::Status s = hash_->MSet(key_, fvs, false, &ret);
+ EXPECT_TRUE(s.ok() && static_cast<uint64_t>(fvs.size() - 1) == ret);
+ std::string got;
+ s = hash_->Get(key_, "f1", &got);
+ EXPECT_EQ("v11", got);
+
+ s = hash_->Size(key_, &ret);
+ EXPECT_TRUE(s.ok() && ret == static_cast<uint64_t>(fvs.size() - 1));
+
+ std::vector<rocksdb::Slice> fields_to_delete{"f1", "f2", "f2"};
+ s = hash_->Delete(key_, fields_to_delete, &ret);
+ EXPECT_TRUE(s.ok() && ret == static_cast<uint64_t>(fields_to_delete.size() -
1));
+ s = hash_->Size(key_, &ret);
+ EXPECT_TRUE(s.ok() && ret == 1);
+ s = hash_->Get(key_, "f3", &got);
+ EXPECT_EQ("v3", got);
+
+ s = hash_->Del(key_);
+}
+
TEST_F(RedisHashTest, MSetSingleFieldAndNX) {
uint64_t ret = 0;
std::vector<FieldValue> values = {{"field-one", "value-one"}};
diff --git a/tests/cppunit/types/set_test.cc b/tests/cppunit/types/set_test.cc
index c6d78b75..3e4e75bf 100644
--- a/tests/cppunit/types/set_test.cc
+++ b/tests/cppunit/types/set_test.cc
@@ -51,6 +51,24 @@ TEST_F(RedisSetTest, AddAndRemove) {
set_->Del(key_);
}
+TEST_F(RedisSetTest, AddAndRemoveRepeated) {
+ std::vector<rocksdb::Slice> allmembers{"m1", "m1", "m2", "m3"};
+ uint64_t ret = 0;
+ rocksdb::Status s = set_->Add(key_, allmembers, &ret);
+ EXPECT_TRUE(s.ok() && (allmembers.size() - 1) == ret);
+ uint64_t card = 0;
+ set_->Card(key_, &card);
+ EXPECT_EQ(card, allmembers.size() - 1);
+
+ std::vector<rocksdb::Slice> remembers{"m1", "m2", "m2"};
+ s = set_->Remove(key_, remembers, &ret);
+ EXPECT_TRUE(s.ok() && (remembers.size() - 1) == ret);
+ set_->Card(key_, &card);
+ EXPECT_EQ(card, allmembers.size() - 1 - ret);
+
+ set_->Del(key_);
+}
+
TEST_F(RedisSetTest, Members) {
uint64_t ret = 0;
rocksdb::Status s = set_->Add(key_, fields_, &ret);
diff --git a/tests/cppunit/types/zset_test.cc b/tests/cppunit/types/zset_test.cc
index 1db9aec6..887fd606 100644
--- a/tests/cppunit/types/zset_test.cc
+++ b/tests/cppunit/types/zset_test.cc
@@ -94,6 +94,35 @@ TEST_F(RedisZSetTest, Remove) {
zset_->Del(key_);
}
+TEST_F(RedisZSetTest, AddAndRemoveRepeated) {
+ std::vector<std::string> members{"m1", "m1", "m2", "m3"};
+ std::vector<double> scores{1.1, 1.11, 2.2, 3.3};
+
+ uint64_t ret = 0;
+ std::vector<MemberScore> mscores;
+ for (size_t i = 0; i < members.size(); i++) {
+ mscores.emplace_back(MemberScore{members[i], scores[i]});
+ }
+ zset_->Add(key_, ZAddFlags::Default(), &mscores, &ret);
+ EXPECT_EQ(mscores.size() - 1, ret);
+ double score = 0.0;
+ zset_->Score(key_, members[0], &score);
+ EXPECT_EQ(scores[1], score);
+ uint64_t card = 0;
+ zset_->Card(key_, &card);
+ EXPECT_EQ(mscores.size() - 1, card);
+
+ std::vector<rocksdb::Slice> members_to_remove{"m1", "m2", "m2"};
+ zset_->Remove(key_, members_to_remove, &ret);
+ EXPECT_EQ(members_to_remove.size() - 1, ret);
+ zset_->Card(key_, &card);
+ EXPECT_EQ(mscores.size() - 1 - ret, card);
+ zset_->Score(key_, members[3], &score);
+ EXPECT_EQ(scores[3], score);
+
+ zset_->Del(key_);
+}
+
TEST_F(RedisZSetTest, Range) {
uint64_t ret = 0;
std::vector<MemberScore> mscores;