This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 83aaa753 Add the support of the BF.MEXISTS and BF.CARD Command (#1756)
83aaa753 is described below
commit 83aaa7531fdfdd874a936287fe3f466b184c9e41
Author: Leo <[email protected]>
AuthorDate: Thu Sep 14 10:48:36 2023 +0800
Add the support of the BF.MEXISTS and BF.CARD Command (#1756)
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_bloom_filter.cc | 46 ++++++++++++++++++++++++--
src/server/redis_reply.h | 9 ++++++
src/types/redis_bloom_chain.cc | 52 ++++++++++++++++++------------
src/types/redis_bloom_chain.h | 13 ++++++--
tests/cppunit/types/bloom_chain_test.cc | 6 ++--
tests/gocase/unit/type/bloom/bloom_test.go | 32 +++++++++++++++++-
6 files changed, 128 insertions(+), 30 deletions(-)
diff --git a/src/commands/cmd_bloom_filter.cc b/src/commands/cmd_bloom_filter.cc
index eb3856f1..61644b34 100644
--- a/src/commands/cmd_bloom_filter.cc
+++ b/src/commands/cmd_bloom_filter.cc
@@ -105,7 +105,7 @@ class CommandBFExists : public Commander {
Status Execute(Server *svr, Connection *conn, std::string *output) override {
redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
int ret = 0;
- auto s = bloom_db.Exist(args_[1], args_[2], &ret);
+ auto s = bloom_db.Exists(args_[1], args_[2], &ret);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
*output = redis::Integer(ret);
@@ -113,6 +113,30 @@ class CommandBFExists : public Commander {
}
};
+class CommandBFMExists : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ items_.reserve(args_.size() - 2);
+ for (size_t i = 2; i < args_.size(); ++i) {
+ items_.emplace_back(args_[i]);
+ }
+ return Commander::Parse(args);
+ }
+
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
+ std::vector<int> rets(items_.size(), 0);
+ auto s = bloom_db.MExists(args_[1], items_, &rets);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ *output = redis::MultiInteger(rets);
+ return Status::OK();
+ }
+
+ private:
+ std::vector<Slice> items_;
+};
+
class CommandBFInfo : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
@@ -184,8 +208,26 @@ class CommandBFInfo : public Commander {
BloomInfoType type_ = BloomInfoType::kAll;
};
+class CommandBFCard : public Commander {
+ public:
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
+ BloomFilterInfo info;
+ auto s = bloom_db.Info(args_[1], &info);
+ if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr,
s.ToString()};
+ if (s.IsNotFound()) {
+ *output = redis::Integer(0);
+ } else {
+ *output = redis::Integer(info.size);
+ }
+ return Status::OK();
+ }
+};
+
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4,
"write", 1, 1, 1),
MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1,
1),
MakeCmdAttr<CommandBFExists>("bf.exists", 3,
"read-only", 1, 1, 1),
- MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only",
1, 1, 1), )
+ MakeCmdAttr<CommandBFMExists>("bf.mexists", -3,
"read-only", 1, 1, 1),
+ MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only",
1, 1, 1),
+ MakeCmdAttr<CommandBFCard>("bf.card", 2, "read-only",
1, 1, 1), )
} // namespace redis
diff --git a/src/server/redis_reply.h b/src/server/redis_reply.h
index c3cc1b44..75492c1d 100644
--- a/src/server/redis_reply.h
+++ b/src/server/redis_reply.h
@@ -40,6 +40,15 @@ std::string Integer(T data) {
return ":" + std::to_string(data) + CRLF;
}
+template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
+std::string MultiInteger(const std::vector<T> &multi_data) {
+ std::string result = "*" + std::to_string(multi_data.size()) + CRLF;
+ for (const auto &data : multi_data) {
+ result += Integer(data);
+ }
+ return result;
+}
+
std::string BulkString(const std::string &data);
std::string NilString();
diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc
index df0ddb78..a97d77e6 100644
--- a/src/types/redis_bloom_chain.cc
+++ b/src/types/redis_bloom_chain.cc
@@ -86,7 +86,7 @@ void BloomChain::createBloomFilterInBatch(const Slice
&ns_key, BloomChainMetadat
batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
}
-void BloomChain::bloomAdd(const std::string &item, std::string *bf_data) {
+void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(*bf_data));
@@ -95,7 +95,8 @@ void BloomChain::bloomAdd(const std::string &item,
std::string *bf_data) {
*bf_data = std::move(block_split_bloom_filter).GetData();
}
-rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string
&item, bool *exist) {
+rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const
std::vector<Slice> &items,
+ std::vector<bool> *exists) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
@@ -105,9 +106,14 @@ rocksdb::Status BloomChain::bloomCheck(const Slice
&bf_key, const std::string &i
BlockSplitBloomFilter block_split_bloom_filter;
block_split_bloom_filter.Init(std::move(bf_data));
- uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
- *exist = block_split_bloom_filter.FindHash(h);
-
+ for (size_t i = 0; i < items.size(); ++i) {
+ // this item exists in other bloomfilter already, and it's not necessary
to check in this bloomfilter.
+ if ((*exists)[i]) {
+ continue;
+ }
+ uint64_t h = BlockSplitBloomFilter::Hash(items[i].data(), items[i].size());
+ (*exists)[i] = block_split_bloom_filter.FindHash(h);
+ }
return rocksdb::Status::OK();
}
@@ -144,21 +150,19 @@ rocksdb::Status BloomChain::Add(const Slice &user_key,
const Slice &item, int *r
WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
batch->PutLogData(log_data.Encode());
- std::string item_string = item.ToString();
-
// check
- bool exist = false;
+ std::vector<bool> exist{false}; // TODO: to refine in
BF.MADD
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which
direction for searching is better
- s = bloomCheck(bf_key_list[i], item_string, &exist);
+ s = bloomCheck(bf_key_list[i], {item}, &exist);
if (!s.ok()) return s;
- if (exist) {
+ if (exist[0]) {
*ret = 0;
break;
}
}
// insert
- if (!exist) {
+ if (!exist[0]) { // TODO: to refine in BF.MADD
std::string bf_data;
s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
if (!s.ok()) return s;
@@ -172,7 +176,7 @@ rocksdb::Status BloomChain::Add(const Slice &user_key,
const Slice &item, int *r
return rocksdb::Status::Aborted("filter is full and is nonscaling");
}
}
- bloomAdd(item_string, &bf_data);
+ bloomAdd(item, &bf_data);
batch->Put(bf_key_list.back(), bf_data);
*ret = 1;
metadata.size += 1;
@@ -185,13 +189,20 @@ rocksdb::Status BloomChain::Add(const Slice &user_key,
const Slice &item, int *r
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
-rocksdb::Status BloomChain::Exist(const Slice &user_key, const Slice &item,
int *ret) {
+rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item,
int *ret) {
+ std::vector<int> tmp{0};
+ rocksdb::Status s = MExists(user_key, {item}, &tmp);
+ *ret = tmp[0];
+ return s;
+}
+
+rocksdb::Status BloomChain::MExists(const Slice &user_key, const
std::vector<Slice> &items, std::vector<int> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
if (s.IsNotFound()) {
- *ret = 0;
+ std::fill(rets->begin(), rets->end(), 0);
return rocksdb::Status::OK();
}
if (!s.ok()) return s;
@@ -199,17 +210,16 @@ rocksdb::Status BloomChain::Exist(const Slice &user_key,
const Slice &item, int
std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);
- std::string item_string = item.ToString();
// check
- bool exist = false;
+ std::vector<bool> exists(items.size(), false);
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which
direction for searching is better
- s = bloomCheck(bf_key_list[i], item_string, &exist);
+ s = bloomCheck(bf_key_list[i], items, &exists);
if (!s.ok()) return s;
- if (exist) {
- break;
- }
}
- *ret = exist ? 1 : 0;
+
+ for (size_t i = 0; i < items.size(); ++i) {
+ (*rets)[i] = exists[i] ? 1 : 0;
+ }
return rocksdb::Status::OK();
}
diff --git a/src/types/redis_bloom_chain.h b/src/types/redis_bloom_chain.h
index ad66675f..27264e02 100644
--- a/src/types/redis_bloom_chain.h
+++ b/src/types/redis_bloom_chain.h
@@ -52,7 +52,8 @@ class BloomChain : public Database {
BloomChain(engine::Storage *storage, const std::string &ns) :
Database(storage, ns) {}
rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double
error_rate, uint16_t expansion);
rocksdb::Status Add(const Slice &user_key, const Slice &item, int *ret);
- rocksdb::Status Exist(const Slice &user_key, const Slice &item, int *ret);
+ rocksdb::Status Exists(const Slice &user_key, const Slice &item, int *ret);
+ rocksdb::Status MExists(const Slice &user_key, const std::vector<Slice>
&items, std::vector<int> *rets);
rocksdb::Status Info(const Slice &user_key, BloomFilterInfo *info);
private:
@@ -63,7 +64,13 @@ class BloomChain : public Database {
BloomChainMetadata *metadata);
void createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata
*metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase>
&batch, std::string *bf_data);
- static void bloomAdd(const std::string &item, std::string *bf_data);
- rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item,
bool *exist);
+
+ /// bf_data: [in/out] The content string of bloomfilter.
+ static void bloomAdd(const Slice &item, std::string *bf_data);
+
+ /// exists: [in/out] The items exist in bloomfilter already or not.
+ /// exists[i] is true means items[i] exists in other bloomfilter already,
and it's not necessary to check in this
+ /// bloomfilter.
+ rocksdb::Status bloomCheck(const Slice &bf_key, const std::vector<Slice>
&items, std::vector<bool> *exists);
};
} // namespace redis
diff --git a/tests/cppunit/types/bloom_chain_test.cc
b/tests/cppunit/types/bloom_chain_test.cc
index 9f1b7414..35d789e9 100644
--- a/tests/cppunit/types/bloom_chain_test.cc
+++ b/tests/cppunit/types/bloom_chain_test.cc
@@ -55,7 +55,7 @@ TEST_F(RedisBloomChainTest, Reserve) {
TEST_F(RedisBloomChainTest, BasicAddAndTest) {
int ret = 0;
- auto s = sb_chain_->Exist("no_exist_key", "test_item", &ret);
+ auto s = sb_chain_->Exists("no_exist_key", "test_item", &ret);
EXPECT_EQ(ret, 0);
s = sb_chain_->Del("no_exist_key");
@@ -67,14 +67,14 @@ TEST_F(RedisBloomChainTest, BasicAddAndTest) {
}
for (const auto& insert_item : insert_items) {
- s = sb_chain_->Exist(key_, insert_item, &ret);
+ s = sb_chain_->Exists(key_, insert_item, &ret);
EXPECT_TRUE(s.ok());
EXPECT_EQ(ret, 1);
}
std::string no_insert_items[] = {"item303", "item404", "1", "2", "3"};
for (const auto& no_insert_item : no_insert_items) {
- s = sb_chain_->Exist(key_, no_insert_item, &ret);
+ s = sb_chain_->Exists(key_, no_insert_item, &ret);
EXPECT_TRUE(s.ok());
EXPECT_EQ(ret, 0);
}
diff --git a/tests/gocase/unit/type/bloom/bloom_test.go
b/tests/gocase/unit/type/bloom/bloom_test.go
index f93b84b8..47b55aac 100644
--- a/tests/gocase/unit/type/bloom/bloom_test.go
+++ b/tests/gocase/unit/type/bloom/bloom_test.go
@@ -145,6 +145,21 @@ func TestBloom(t *testing.T) {
require.LessOrEqual(t, float64(falseExist),
fpp*float64(totalCount))
})
+ t.Run("MExists Basic Test", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.Equal(t, []interface{}{int64(0), int64(0), int64(0)},
rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
+
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key,
"xxx").Val())
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
+ require.Equal(t, []interface{}{int64(1), int64(0)}, rdb.Do(ctx,
"bf.mexists", key, "xxx", "yyy").Val())
+
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key,
"zzz").Val())
+ require.Equal(t, []interface{}{int64(1), int64(0), int64(1)},
rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
+
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key,
"yyy").Val())
+ require.Equal(t, []interface{}{int64(1), int64(1), int64(1)},
rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
+ })
+
t.Run("Get info of no exists key ", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
require.ErrorContains(t, rdb.Do(ctx, "bf.info",
"no_exist_key").Err(), "ERR key is not found")
@@ -253,6 +268,21 @@ func TestBloom(t *testing.T) {
require.Equal(t, "MBbloom--", rdb.Type(ctx, key).Val())
})
- // TODO: Add the testcase of get filters of bloom filter after complete
the scaling.
+ t.Run("Get Card of bloom filter", func(t *testing.T) {
+ // if bf.card no exist key, it would return 0
+ require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
+ require.Equal(t, int64(0), rdb.Do(ctx, "bf.card",
"no_exist_key").Val())
+
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02",
"1000", "expansion", "1").Err())
+ require.Equal(t, int64(0), rdb.Do(ctx, "bf.card", key).Val())
+ require.NoError(t, rdb.Do(ctx, "bf.add", key, "item1").Err())
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
+ // insert the duplicate key, insert would return 0 and the card
of bloom filter would not change
+ require.Equal(t, int64(0), rdb.Do(ctx, "bf.add", key,
"item1").Val())
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
+ require.NoError(t, rdb.Do(ctx, "bf.add", key, "item2").Err())
+ require.Equal(t, int64(2), rdb.Do(ctx, "bf.card", key).Val())
+ })
}