This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 088eae04 Add the support of the bloom BF.INFO command (#1710)
088eae04 is described below
commit 088eae04abaaa74bfdb4473a6655f5753a63242b
Author: Leo <[email protected]>
AuthorDate: Thu Aug 31 19:25:21 2023 +0800
Add the support of the bloom BF.INFO command (#1710)
---
src/commands/cmd_bloom_filter.cc | 86 ++++++++++++++++++++--
src/storage/redis_metadata.cc | 2 +-
src/types/redis_bloom_chain.cc | 71 +++++++++++--------
src/types/redis_bloom_chain.h | 22 +++++-
tests/cppunit/types/bloom_chain_test.cc | 3 +-
tests/gocase/unit/type/bloom/bloom_test.go | 110 ++++++++++++++++++++++++++---
6 files changed, 243 insertions(+), 51 deletions(-)
diff --git a/src/commands/cmd_bloom_filter.cc b/src/commands/cmd_bloom_filter.cc
index 46ffc9c9..eb3856f1 100644
--- a/src/commands/cmd_bloom_filter.cc
+++ b/src/commands/cmd_bloom_filter.cc
@@ -48,11 +48,14 @@ class CommandBFReserve : public Commander {
}
CommandParser parser(args, 4);
- bool nonscaling = false;
+ bool is_nonscaling = false;
+ bool has_expansion = false;
while (parser.Good()) {
if (parser.EatEqICase("nonscaling")) {
- nonscaling = true;
+ is_nonscaling = true;
+ expansion_ = 0;
} else if (parser.EatEqICase("expansion")) {
+ has_expansion = true;
expansion_ = GET_OR_RET(parser.TakeInt<uint16_t>());
if (expansion_ < 1) {
return {Status::RedisParseErr, "expansion should be greater or equal
to 1"};
@@ -62,8 +65,7 @@ class CommandBFReserve : public Commander {
}
}
- // if nonscaling is true, expansion should be 0
- if (nonscaling && expansion_ != 0) {
+ if (is_nonscaling && has_expansion) {
return {Status::RedisParseErr, "nonscaling filters cannot expand"};
}
@@ -82,7 +84,7 @@ class CommandBFReserve : public Commander {
private:
double error_rate_;
uint32_t capacity_;
- uint16_t expansion_ = 0;
+ uint16_t expansion_ = kBFDefaultExpansion;
};
class CommandBFAdd : public Commander {
@@ -111,7 +113,79 @@ class CommandBFExists : public Commander {
}
};
+class CommandBFInfo : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() > 3) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+ CommandParser parser(args, 2);
+ if (parser.Good()) {
+ if (parser.EatEqICase("capacity")) {
+ type_ = BloomInfoType::kCapacity;
+ } else if (parser.EatEqICase("size")) {
+ type_ = BloomInfoType::kSize;
+ } else if (parser.EatEqICase("filters")) {
+ type_ = BloomInfoType::kFilters;
+ } else if (parser.EatEqICase("items")) {
+ type_ = BloomInfoType::kItems;
+ } else if (parser.EatEqICase("expansion")) {
+ type_ = BloomInfoType::kExpansion;
+ } else {
+ return {Status::RedisParseErr, "Invalid info argument"};
+ }
+ }
+
+ return Commander::Parse(args);
+ }
+
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
+ BloomFilterInfo info;
+ auto s = bloom_db.Info(args_[1], &info);
+ if (s.IsNotFound()) return {Status::RedisExecErr, "key is not found"};
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ switch (type_) {
+ case BloomInfoType::kAll:
+ *output = redis::MultiLen(2 * 5);
+ *output += redis::SimpleString("Capacity");
+ *output += redis::Integer(info.capacity);
+ *output += redis::SimpleString("Size");
+ *output += redis::Integer(info.bloom_bytes);
+ *output += redis::SimpleString("Number of filters");
+ *output += redis::Integer(info.n_filters);
+ *output += redis::SimpleString("Number of items inserted");
+ *output += redis::Integer(info.size);
+ *output += redis::SimpleString("Expansion rate");
+ *output += info.expansion == 0 ? redis::NilString() :
redis::Integer(info.expansion);
+ break;
+ case BloomInfoType::kCapacity:
+ *output = redis::Integer(info.capacity);
+ break;
+ case BloomInfoType::kSize:
+ *output = redis::Integer(info.bloom_bytes);
+ break;
+ case BloomInfoType::kFilters:
+ *output = redis::Integer(info.n_filters);
+ break;
+ case BloomInfoType::kItems:
+ *output = redis::Integer(info.size);
+ break;
+ case BloomInfoType::kExpansion:
+ *output = info.expansion == 0 ? redis::NilString() :
redis::Integer(info.expansion);
+ break;
+ }
+
+ return Status::OK();
+ }
+
+ private:
+ BloomInfoType type_ = BloomInfoType::kAll;
+};
+
REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4,
"write", 1, 1, 1),
MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1,
1),
- MakeCmdAttr<CommandBFExists>("bf.exists", 3,
"read-only", 1, 1, 1), )
+ MakeCmdAttr<CommandBFExists>("bf.exists", 3,
"read-only", 1, 1, 1),
+ MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only",
1, 1, 1), )
} // namespace redis
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 65a41038..46d64f95 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -450,5 +450,5 @@ uint32_t BloomChainMetadata::GetCapacity() const {
if (expansion == 1) {
return base_capacity * n_filters;
}
- return static_cast<uint32_t>(base_capacity * (1 - pow(expansion, n_filters))
/ 1 - expansion);
+ return static_cast<uint32_t>(base_capacity * (1 - pow(expansion, n_filters))
/ (1 - expansion));
}
diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc
index 4c6cffcc..b15f5de7 100644
--- a/src/types/redis_bloom_chain.cc
+++ b/src/types/redis_bloom_chain.cc
@@ -69,7 +69,7 @@ rocksdb::Status BloomChain::createBloomChain(const Slice
&ns_key, double error_r
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
-rocksdb::Status BloomChain::bloomAdd(const Slice &bf_key, const std::string
&item, int *ret) {
+rocksdb::Status BloomChain::bloomAdd(const Slice &bf_key, const std::string
&item) {
std::string bf_data;
rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), bf_key, &bf_data);
if (!s.ok()) return s;
@@ -77,21 +77,13 @@ rocksdb::Status BloomChain::bloomAdd(const Slice &bf_key,
const std::string &ite
block_split_bloom_filter.Init(std::move(bf_data));
uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
- bool found = block_split_bloom_filter.FindHash(h);
-
- *ret = 0;
- if (!found) {
- *ret = 1;
- block_split_bloom_filter.InsertHash(h);
- auto batch = storage_->GetWriteBatchBase();
- batch->Put(bf_key, block_split_bloom_filter.GetData());
- return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
- }
-
- return rocksdb::Status::OK();
+ block_split_bloom_filter.InsertHash(h);
+ auto batch = storage_->GetWriteBatchBase();
+ batch->Put(bf_key, block_split_bloom_filter.GetData());
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
-rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string
&item, int *ret) {
+rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string
&item, bool *exist) {
LatestSnapShot ss(storage_);
rocksdb::ReadOptions read_options;
read_options.snapshot = ss.GetSnapShot();
@@ -102,8 +94,7 @@ rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key,
const std::string &i
block_split_bloom_filter.Init(std::move(bf_data));
uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
- bool found = block_split_bloom_filter.FindHash(h);
- *ret = found ? 1 : 0;
+ *exist = block_split_bloom_filter.FindHash(h);
return rocksdb::Status::OK();
}
@@ -132,9 +123,7 @@ rocksdb::Status BloomChain::Add(const Slice &user_key,
const Slice &item, int *r
if (s.IsNotFound()) {
s = createBloomChain(ns_key, kBFDefaultErrorRate, kBFDefaultInitCapacity,
kBFDefaultExpansion, &metadata);
}
- if (!s.ok()) {
- return s;
- }
+ if (!s.ok()) return s;
std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);
@@ -143,26 +132,28 @@ rocksdb::Status BloomChain::Add(const Slice &user_key,
const Slice &item, int *r
WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
batch->PutLogData(log_data.Encode());
- int check_index = metadata.n_filters - 1;
std::string item_string = item.ToString();
// check
- for (; check_index >= 0; --check_index) { // TODO: to test which direction
for searching is better
- s = bloomCheck(bf_key_list[check_index], item_string, ret);
+ bool exist = false;
+ for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which
direction for searching is better
+ s = bloomCheck(bf_key_list[i], item_string, &exist);
if (!s.ok()) return s;
- if (*ret == 1) {
+ if (exist) {
+ *ret = 0;
break;
}
}
// insert
- if (check_index < 0) {
+ if (!exist) {
if (metadata.size + 1 > metadata.GetCapacity()) { // TODO: scaling would
be supported later
return rocksdb::Status::Aborted("filter is full");
}
- s = bloomAdd(bf_key_list.back(), item_string, ret);
+ s = bloomAdd(bf_key_list.back(), item_string);
if (!s.ok()) return s;
- metadata.size += *ret;
+ *ret = 1;
+ metadata.size += 1;
}
std::string sb_chain_metadata_bytes;
@@ -177,7 +168,10 @@ rocksdb::Status BloomChain::Exist(const Slice &user_key,
const Slice &item, int
BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
- if (s.IsNotFound()) return rocksdb::Status::NotFound("key is not found");
+ if (s.IsNotFound()) {
+ *ret = 0;
+ return rocksdb::Status::OK();
+ }
if (!s.ok()) return s;
std::vector<std::string> bf_key_list;
@@ -185,14 +179,33 @@ rocksdb::Status BloomChain::Exist(const Slice &user_key,
const Slice &item, int
std::string item_string = item.ToString();
// check
+ bool exist = false;
for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which
direction for searching is better
- s = bloomCheck(bf_key_list[i], item_string, ret);
+ s = bloomCheck(bf_key_list[i], item_string, &exist);
if (!s.ok()) return s;
- if (*ret == 1) {
+ if (exist) {
break;
}
}
+ *ret = exist ? 1 : 0;
+
+ return rocksdb::Status::OK();
+}
+
+rocksdb::Status BloomChain::Info(const Slice &user_key, BloomFilterInfo *info)
{
+ std::string ns_key = AppendNamespacePrefix(user_key);
+
+ BloomChainMetadata metadata;
+ rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
+ if (!s.ok()) return s;
+
+ info->capacity = metadata.GetCapacity();
+ info->bloom_bytes = metadata.bloom_bytes;
+ info->n_filters = metadata.n_filters;
+ info->size = metadata.size;
+ info->expansion = metadata.expansion;
return rocksdb::Status::OK();
}
+
} // namespace redis
diff --git a/src/types/redis_bloom_chain.h b/src/types/redis_bloom_chain.h
index 7a2923dc..359a4237 100644
--- a/src/types/redis_bloom_chain.h
+++ b/src/types/redis_bloom_chain.h
@@ -30,12 +30,30 @@ const uint32_t kBFDefaultInitCapacity = 100;
const double kBFDefaultErrorRate = 0.01;
const uint16_t kBFDefaultExpansion = 2;
+enum class BloomInfoType {
+ kAll,
+ kCapacity,
+ kSize,
+ kFilters,
+ kItems,
+ kExpansion,
+};
+
+struct BloomFilterInfo {
+ uint32_t capacity;
+ uint32_t bloom_bytes;
+ uint16_t n_filters;
+ uint64_t size;
+ uint16_t expansion;
+};
+
class BloomChain : public Database {
public:
BloomChain(engine::Storage *storage, const std::string &ns) :
Database(storage, ns) {}
rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double
error_rate, uint16_t expansion);
rocksdb::Status Add(const Slice &user_key, const Slice &item, int *ret);
rocksdb::Status Exist(const Slice &user_key, const Slice &item, int *ret);
+ rocksdb::Status Info(const Slice &user_key, BloomFilterInfo *info);
private:
std::string getBFKey(const Slice &ns_key, const BloomChainMetadata
&metadata, uint16_t filters_index);
@@ -43,7 +61,7 @@ class BloomChain : public Database {
rocksdb::Status getBloomChainMetadata(const Slice &ns_key,
BloomChainMetadata *metadata);
rocksdb::Status createBloomChain(const Slice &ns_key, double error_rate,
uint32_t capacity, uint16_t expansion,
BloomChainMetadata *metadata);
- rocksdb::Status bloomAdd(const Slice &bf_key, const std::string &item, int
*ret);
- rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item, int
*ret);
+ rocksdb::Status bloomAdd(const Slice &bf_key, const std::string &item);
+ rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item,
bool *exist);
};
} // namespace redis
diff --git a/tests/cppunit/types/bloom_chain_test.cc
b/tests/cppunit/types/bloom_chain_test.cc
index 664be011..e3666554 100644
--- a/tests/cppunit/types/bloom_chain_test.cc
+++ b/tests/cppunit/types/bloom_chain_test.cc
@@ -56,8 +56,7 @@ TEST_F(RedisBloomChainTest, BasicAddAndTest) {
int ret = 0;
auto s = sb_chain_->Exist("no_exist_key", "test_item", &ret);
- EXPECT_FALSE(s.ok());
- EXPECT_EQ(s.ToString(), "NotFound: key is not found");
+ EXPECT_EQ(ret, 0);
sb_chain_->Del("no_exist_key");
std::string insert_items[] = {"item1", "item2", "item3", "item101",
"item202", "303"};
diff --git a/tests/gocase/unit/type/bloom/bloom_test.go
b/tests/gocase/unit/type/bloom/bloom_test.go
index 86f02199..10ddd2c5 100644
--- a/tests/gocase/unit/type/bloom/bloom_test.go
+++ b/tests/gocase/unit/type/bloom/bloom_test.go
@@ -24,6 +24,7 @@ import (
"testing"
"github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
)
@@ -81,10 +82,26 @@ func TestBloom(t *testing.T) {
require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"1000", "nonscaling", "expansion", "1").Err(), "ERR nonscaling filters cannot
expand")
})
- t.Run("Check no exists key", func(t *testing.T) {
- require.NoError(t, rdb.Del(ctx, key).Err())
- require.ErrorContains(t, rdb.Do(ctx, "bf.exists",
"no_exist_key", "item1").Err(), "ERR NotFound: key is not found")
+ t.Run("Check no exist key and no exist item", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
+ require.Equal(t, int64(0), rdb.Do(ctx, "bf.exists",
"no_exist_key", "item1").Val())
require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
+
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02",
"1000").Err())
+ require.Equal(t, int64(0), rdb.Do(ctx, "bf.exists", key,
"item1").Val())
+ })
+
+ t.Run("Add the same value", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key,
"xxx").Val())
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.info", key,
"items").Val())
+ // Add the same value would return 0
+ require.Equal(t, int64(0), rdb.Do(ctx, "bf.add", key,
"xxx").Val())
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.info", key,
"items").Val())
+ // Add the distinct value would return 1
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key,
"yyy").Val())
+ require.Equal(t, int64(2), rdb.Do(ctx, "bf.info", key,
"items").Val())
})
t.Run("BasicAddAndCheck", func(t *testing.T) {
@@ -93,27 +110,98 @@ func TestBloom(t *testing.T) {
var fpp = 0.01
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, fpp,
totalCount).Err())
+ // insert items
var insertItems []string
+ var falseExist = 0
for i := 0; i < totalCount; i++ {
- buf := util.RandString(1, 5, util.Alpha)
- insertItems = append(insertItems, buf)
- require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key,
buf).Val())
+ buf := util.RandString(7, 8, util.Alpha)
+ Add := rdb.Do(ctx, "bf.add", key, buf)
+ require.NoError(t, Add.Err())
+ if Add.Val() == int64(0) {
+ falseExist += 1
+ } else {
+ insertItems = append(insertItems, buf)
+ }
}
+ require.Equal(t, int64(totalCount-falseExist), rdb.Do(ctx,
"bf.info", key, "items").Val())
+ require.LessOrEqual(t, float64(falseExist),
fpp*float64(totalCount))
+ // check exist items
for i := 0; i < totalCount; i++ {
- index := util.RandomInt(int64(totalCount))
+ index := util.RandomInt(int64(totalCount - falseExist))
require.Equal(t, int64(1), rdb.Do(ctx, "bf.exists",
key, insertItems[index]).Val())
}
- var exist = 0
+ // check no exist items
+ falseExist = 0
for i := 0; i < totalCount; i++ {
- buf := util.RandString(5, 10, util.Alpha)
+ buf := util.RandString(9, 10, util.Alpha)
check := rdb.Do(ctx, "bf.exists", key, buf)
require.NoError(t, check.Err())
if check.Val() == int64(1) {
- exist += 1
+ falseExist += 1
}
}
- require.LessOrEqual(t, float64(exist), fpp*float64(totalCount))
+ require.LessOrEqual(t, float64(falseExist),
fpp*float64(totalCount))
})
+
+ t.Run("Get info of no exists key ", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
+ require.ErrorContains(t, rdb.Do(ctx, "bf.info",
"no_exist_key").Err(), "ERR key is not found")
+ require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
+ })
+
+ t.Run("Get info but wrong arguments", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"2000", "nonscaling").Err())
+ require.ErrorContains(t, rdb.Do(ctx, "bf.info", key,
"xxx").Err(), "Invalid info argument")
+ require.ErrorContains(t, rdb.Do(ctx, "bf.info", key,
"capacity", "items").Err(), "wrong number of arguments")
+ })
+
+ t.Run("Get all info of bloom filter", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02",
"1000", "expansion", "3").Err())
+ require.Equal(t, []interface{}{"Capacity", int64(1000), "Size",
int64(2048), "Number of filters", int64(1), "Number of items inserted",
int64(0), "Expansion rate", int64(3)}, rdb.Do(ctx, "bf.info", key).Val())
+ })
+
+ t.Run("Get capacity of bloom filter", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"2000").Err())
+ require.Equal(t, int64(2000), rdb.Do(ctx, "bf.info", key,
"capacity").Val())
+ })
+
+ t.Run("Get expansion of bloom filter", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"2000", "expansion", "1").Err())
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.info", key,
"expansion").Val())
+ })
+
+ t.Run("Get reserve default expansion", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02",
"1000").Err())
+ // if not specified expansion, the default expansion value is 2.
+ require.Equal(t, int64(2), rdb.Do(ctx, "bf.info", key,
"expansion").Val())
+ })
+
+ t.Run("Get expansion of nonscaling bloom filter", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"2000", "nonscaling").Err())
+ require.Equal(t, redis.Nil, rdb.Do(ctx, "bf.info", key,
"expansion").Err())
+ })
+
+ t.Run("Get size of bloom filter", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02",
"1000", "expansion", "1").Err())
+ require.Equal(t, int64(2048), rdb.Do(ctx, "bf.info", key,
"size").Val())
+ })
+
+ t.Run("Get items of bloom filter", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02",
"1000", "expansion", "1").Err())
+ require.Equal(t, int64(0), rdb.Do(ctx, "bf.info", key,
"items").Val())
+ require.NoError(t, rdb.Do(ctx, "bf.add", key, "item").Err())
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.info", key,
"items").Val())
+ })
+
+ // TODO: Add the testcase of get filters of bloom filter after complete
the scaling.
}