This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 01a2f1db Add the support of basic scalable bloom (#1699)
01a2f1db is described below
commit 01a2f1db2c66dce5efefbdd0e39cabbdd783d5b1
Author: Leo <[email protected]>
AuthorDate: Tue Aug 29 23:58:53 2023 +0800
Add the support of basic scalable bloom (#1699)
Co-authored-by: mwish <[email protected]>
Co-authored-by: Twice <[email protected]>
---
src/commands/cmd_bloom_filter.cc | 117 +++++++++++++++++
src/storage/batch_extractor.cc | 1 +
src/storage/redis_db.cc | 3 +-
src/storage/redis_metadata.cc | 56 +++++++-
src/storage/redis_metadata.h | 46 +++++++
src/types/redis_bloom_chain.cc | 198 +++++++++++++++++++++++++++++
src/types/redis_bloom_chain.h | 49 +++++++
tests/cppunit/types/bloom_chain_test.cc | 83 ++++++++++++
tests/gocase/unit/type/bloom/bloom_test.go | 119 +++++++++++++++++
9 files changed, 670 insertions(+), 2 deletions(-)
diff --git a/src/commands/cmd_bloom_filter.cc b/src/commands/cmd_bloom_filter.cc
new file mode 100644
index 00000000..46ffc9c9
--- /dev/null
+++ b/src/commands/cmd_bloom_filter.cc
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "command_parser.h"
+#include "commander.h"
+#include "error_constants.h"
+#include "server/server.h"
+#include "types/redis_bloom_chain.h"
+
+namespace redis {
+
+class CommandBFReserve : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ auto parse_error_rate = ParseFloat<double>(args[2]);
+ if (!parse_error_rate) {
+ return {Status::RedisParseErr, errValueIsNotFloat};
+ }
+ error_rate_ = *parse_error_rate;
+ if (error_rate_ >= 1 || error_rate_ <= 0) {
+ return {Status::RedisParseErr, "error rate should be between 0 and 1"};
+ }
+
+ auto parse_capacity = ParseInt<uint32_t>(args[3], 10);
+ if (!parse_capacity) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+ capacity_ = *parse_capacity;
+ if (capacity_ <= 0) {
+ return {Status::RedisParseErr, "capacity should be larger than 0"};
+ }
+
+ CommandParser parser(args, 4);
+ bool nonscaling = false;
+ while (parser.Good()) {
+ if (parser.EatEqICase("nonscaling")) {
+ nonscaling = true;
+ } else if (parser.EatEqICase("expansion")) {
+ expansion_ = GET_OR_RET(parser.TakeInt<uint16_t>());
+ if (expansion_ < 1) {
+ return {Status::RedisParseErr, "expansion should be greater or equal
to 1"};
+ }
+ } else {
+ return {Status::RedisParseErr, errInvalidSyntax};
+ }
+ }
+
+ // if nonscaling is true, expansion should be 0
+ if (nonscaling && expansion_ != 0) {
+ return {Status::RedisParseErr, "nonscaling filters cannot expand"};
+ }
+
+ return Commander::Parse(args);
+ }
+
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ redis::BloomChain bloomfilter_db(svr->storage, conn->GetNamespace());
+ auto s = bloomfilter_db.Reserve(args_[1], capacity_, error_rate_,
expansion_);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ *output = redis::SimpleString("OK");
+ return Status::OK();
+ }
+
+ private:
+ double error_rate_;
+ uint32_t capacity_;
+ uint16_t expansion_ = 0;
+};
+
+class CommandBFAdd : public Commander {
+ public:
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
+ int ret = 0;
+ auto s = bloom_db.Add(args_[1], args_[2], &ret);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ *output = redis::Integer(ret);
+ return Status::OK();
+ }
+};
+
+class CommandBFExists : public Commander {
+ public:
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
+ int ret = 0;
+ auto s = bloom_db.Exist(args_[1], args_[2], &ret);
+ if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+ *output = redis::Integer(ret);
+ return Status::OK();
+ }
+};
+
+REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4,
"write", 1, 1, 1),
+ MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1,
1),
+ MakeCmdAttr<CommandBFExists>("bf.exists", 3,
"read-only", 1, 1, 1), )
+} // namespace redis
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index 7430e1ce..61952832 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -244,6 +244,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t
column_family_id, const Slic
}
break;
}
+ // TODO: to implement the case of kRedisBloomFilter
default:
break;
}
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 14d6a352..b4904949 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -55,7 +55,8 @@ rocksdb::Status Database::GetMetadata(RedisType type, const
Slice &ns_key, Metad
metadata->Decode(old_metadata);
return rocksdb::Status::InvalidArgument(kErrMsgWrongType);
}
- if (metadata->size == 0 && type != kRedisStream) { // stream is allowed to
be empty
+ if (metadata->size == 0 && type != kRedisStream &&
+ type != kRedisBloomFilter) { // stream and bloom is allowed to be empty
metadata->Decode(old_metadata);
return rocksdb::Status::NotFound("no elements");
}
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 42288329..65a41038 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -302,7 +302,7 @@ timeval Metadata::Time() const {
}
bool Metadata::ExpireAt(uint64_t expired_ts) const {
- if (Type() != kRedisString && Type() != kRedisStream && size == 0) {
+ if (Type() != kRedisString && Type() != kRedisStream && Type() !=
kRedisBloomFilter && size == 0) {
return true;
}
if (expire == 0) {
@@ -398,3 +398,57 @@ rocksdb::Status StreamMetadata::Decode(Slice input) {
return rocksdb::Status::OK();
}
+
+void BloomChainMetadata::Encode(std::string *dst) {
+ Metadata::Encode(dst);
+
+ PutFixed16(dst, n_filters);
+ PutFixed16(dst, expansion);
+
+ PutFixed32(dst, base_capacity);
+ PutDouble(dst, error_rate);
+ PutFixed32(dst, bloom_bytes);
+}
+
+rocksdb::Status BloomChainMetadata::Decode(Slice bytes) {
+ Slice input(bytes);
+ if (!GetFixed8(&input, &flags)) {
+ return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
+ }
+ if (!GetExpire(&input)) {
+ return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
+ }
+
+ if (input.size() < 8 + CommonEncodedSize()) {
+ return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
+ }
+
+ GetFixed64(&input, &version);
+ GetFixedCommon(&input, &size);
+
+ if (input.size() < 20) {
+ return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
+ }
+
+ GetFixed16(&input, &n_filters);
+ GetFixed16(&input, &expansion);
+
+ GetFixed32(&input, &base_capacity);
+ GetDouble(&input, &error_rate);
+ GetFixed32(&input, &bloom_bytes);
+
+ return rocksdb::Status::OK();
+}
+
+uint32_t BloomChainMetadata::GetCapacity() const {
+ // non-scaling
+ if (expansion == 0) {
+ return base_capacity;
+ }
+
+ // the sum of Geometric progression
+ if (expansion == 1) {
+ return base_capacity * n_filters;
+ }
+ return static_cast<uint32_t>(base_capacity * (1 - pow(expansion, n_filters))
/ 1 - expansion);
+}
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 6c76496c..d1530d19 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -43,6 +43,7 @@ enum RedisType {
kRedisBitmap = 6,
kRedisSortedint = 7,
kRedisStream = 8,
+ kRedisBloomFilter = 9,
};
enum RedisCommand {
@@ -146,6 +147,7 @@ class Metadata {
virtual void Encode(std::string *dst);
virtual rocksdb::Status Decode(Slice input);
bool operator==(const Metadata &that) const;
+ virtual ~Metadata() = default;
private:
static uint64_t generateVersion();
@@ -200,3 +202,47 @@ class StreamMetadata : public Metadata {
void Encode(std::string *dst) override;
rocksdb::Status Decode(Slice input) override;
};
+
+class BloomChainMetadata : public Metadata {
+ public:
+ /// The number of sub-filters
+ uint16_t n_filters;
+
+ /// Adding an element to a Bloom filter never fails due to the data
structure "filling up". Instead the error rate
+ /// starts to grow. To keep the error close to the one set on filter
initialisation - the bloom filter will
+ /// auto-scale, meaning when capacity is reached an additional sub-filter
will be created.
+ ///
+ /// The capacity of the new sub-filter is the capacity of the last
sub-filter multiplied by expansion.
+ ///
+ /// The default expansion value is 2.
+ ///
+ /// For non-scaling, expansion should be set to 0
+ uint16_t expansion;
+
+ /// The number of entries intended to be added to the filter. If your filter
allows scaling, the capacity of the last
+ /// sub-filter should be: base_capacity -> base_capacity * expansion ->
base_capacity * expansion^2...
+ ///
+ /// The default base_capacity value is 100.
+ uint32_t base_capacity;
+
+ /// The desired probability for false positives.
+ ///
+ /// The rate is a decimal value between 0 and 1. For example, for a desired
false positive rate of 0.1% (1 in 1000),
+ /// error_rate should be set to 0.001.
+ ///
+ /// The default error_rate value is 0.01.
+ double error_rate;
+
+ /// The total number of bytes allocated for all sub-filters.
+ uint32_t bloom_bytes;
+
+ explicit BloomChainMetadata(bool generate_version = true) :
Metadata(kRedisBloomFilter, generate_version) {}
+
+ void Encode(std::string *dst) override;
+ rocksdb::Status Decode(Slice bytes) override;
+
+ /// Get the total capacity of the bloom chain (the sum capacity of all
sub-filters)
+ ///
+ /// @return the total capacity value
+ uint32_t GetCapacity() const;
+};
diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc
new file mode 100644
index 00000000..4c6cffcc
--- /dev/null
+++ b/src/types/redis_bloom_chain.cc
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "redis_bloom_chain.h"
+
+namespace redis {
+
+std::string BloomChain::getBFKey(const Slice &ns_key, const BloomChainMetadata
&metadata, uint16_t filters_index) {
+ std::string sub_key;
+ PutFixed16(&sub_key, filters_index);
+ std::string bf_key = InternalKey(ns_key, sub_key, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ return bf_key;
+}
+
+void BloomChain::getBFKeyList(const Slice &ns_key, const BloomChainMetadata
&metadata,
+ std::vector<std::string> *bf_key_list) {
+ bf_key_list->reserve(metadata.n_filters);
+ for (uint16_t i = 0; i < metadata.n_filters; ++i) {
+ std::string bf_key = getBFKey(ns_key, metadata, i);
+ bf_key_list->push_back(std::move(bf_key));
+ }
+}
+
+rocksdb::Status BloomChain::getBloomChainMetadata(const Slice &ns_key,
BloomChainMetadata *metadata) {
+ return Database::GetMetadata(kRedisBloomFilter, ns_key, metadata);
+}
+
+rocksdb::Status BloomChain::createBloomChain(const Slice &ns_key, double
error_rate, uint32_t capacity,
+ uint16_t expansion,
BloomChainMetadata *metadata) {
+ metadata->n_filters = 1;
+ metadata->expansion = expansion;
+ metadata->size = 0;
+
+ metadata->error_rate = error_rate;
+ metadata->base_capacity = capacity;
+ metadata->bloom_bytes = BlockSplitBloomFilter::OptimalNumOfBytes(capacity,
error_rate);
+
+ BlockSplitBloomFilter block_split_bloom_filter;
+ block_split_bloom_filter.Init(metadata->bloom_bytes);
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisBloomFilter, {"createSBChain"});
+ batch->PutLogData(log_data.Encode());
+
+ std::string sb_chain_meta_bytes;
+ metadata->Encode(&sb_chain_meta_bytes);
+ batch->Put(metadata_cf_handle_, ns_key, sb_chain_meta_bytes);
+
+ std::string bf_key = getBFKey(ns_key, *metadata, metadata->n_filters - 1);
+ batch->Put(bf_key, block_split_bloom_filter.GetData());
+
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status BloomChain::bloomAdd(const Slice &bf_key, const std::string
&item, int *ret) {
+ std::string bf_data;
+ rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), bf_key, &bf_data);
+ if (!s.ok()) return s;
+ BlockSplitBloomFilter block_split_bloom_filter;
+ block_split_bloom_filter.Init(std::move(bf_data));
+
+ uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
+ bool found = block_split_bloom_filter.FindHash(h);
+
+ *ret = 0;
+ if (!found) {
+ *ret = 1;
+ block_split_bloom_filter.InsertHash(h);
+ auto batch = storage_->GetWriteBatchBase();
+ batch->Put(bf_key, block_split_bloom_filter.GetData());
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+ }
+
+ return rocksdb::Status::OK();
+}
+
+rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string
&item, int *ret) {
+ LatestSnapShot ss(storage_);
+ rocksdb::ReadOptions read_options;
+ read_options.snapshot = ss.GetSnapShot();
+ std::string bf_data;
+ rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
+ if (!s.ok()) return s;
+ BlockSplitBloomFilter block_split_bloom_filter;
+ block_split_bloom_filter.Init(std::move(bf_data));
+
+ uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
+ bool found = block_split_bloom_filter.FindHash(h);
+ *ret = found ? 1 : 0;
+
+ return rocksdb::Status::OK();
+}
+
+rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity,
double error_rate, uint16_t expansion) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+ BloomChainMetadata bloom_chain_metadata;
+ rocksdb::Status s = getBloomChainMetadata(ns_key, &bloom_chain_metadata);
+ if (!s.ok() && !s.IsNotFound()) return s;
+ if (!s.IsNotFound()) {
+ return rocksdb::Status::InvalidArgument("the key already exists");
+ }
+
+ return createBloomChain(ns_key, error_rate, capacity, expansion,
&bloom_chain_metadata);
+}
+
+rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int
*ret) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+ LockGuard guard(storage_->GetLockManager(), ns_key);
+
+ BloomChainMetadata metadata;
+ rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
+
+ if (s.IsNotFound()) {
+ s = createBloomChain(ns_key, kBFDefaultErrorRate, kBFDefaultInitCapacity,
kBFDefaultExpansion, &metadata);
+ }
+ if (!s.ok()) {
+ return s;
+ }
+
+ std::vector<std::string> bf_key_list;
+ getBFKeyList(ns_key, metadata, &bf_key_list);
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
+ batch->PutLogData(log_data.Encode());
+
+ int check_index = metadata.n_filters - 1;
+ std::string item_string = item.ToString();
+
+ // check
+ for (; check_index >= 0; --check_index) { // TODO: to test which direction
for searching is better
+ s = bloomCheck(bf_key_list[check_index], item_string, ret);
+ if (!s.ok()) return s;
+ if (*ret == 1) {
+ break;
+ }
+ }
+
+ // insert
+ if (check_index < 0) {
+ if (metadata.size + 1 > metadata.GetCapacity()) { // TODO: scaling would
be supported later
+ return rocksdb::Status::Aborted("filter is full");
+ }
+ s = bloomAdd(bf_key_list.back(), item_string, ret);
+ if (!s.ok()) return s;
+ metadata.size += *ret;
+ }
+
+ std::string sb_chain_metadata_bytes;
+ metadata.Encode(&sb_chain_metadata_bytes);
+ batch->Put(metadata_cf_handle_, ns_key, sb_chain_metadata_bytes);
+
+ return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+}
+
+rocksdb::Status BloomChain::Exist(const Slice &user_key, const Slice &item,
int *ret) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+
+ BloomChainMetadata metadata;
+ rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
+ if (s.IsNotFound()) return rocksdb::Status::NotFound("key is not found");
+ if (!s.ok()) return s;
+
+ std::vector<std::string> bf_key_list;
+ getBFKeyList(ns_key, metadata, &bf_key_list);
+
+ std::string item_string = item.ToString();
+ // check
+ for (int i = metadata.n_filters - 1; i >= 0; --i) { // TODO: to test which
direction for searching is better
+ s = bloomCheck(bf_key_list[i], item_string, ret);
+ if (!s.ok()) return s;
+ if (*ret == 1) {
+ break;
+ }
+ }
+
+ return rocksdb::Status::OK();
+}
+} // namespace redis
diff --git a/src/types/redis_bloom_chain.h b/src/types/redis_bloom_chain.h
new file mode 100644
index 00000000..7a2923dc
--- /dev/null
+++ b/src/types/redis_bloom_chain.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "bloom_filter.h"
+#include "storage/redis_db.h"
+#include "storage/redis_metadata.h"
+
+namespace redis {
+
+const uint32_t kBFDefaultInitCapacity = 100;
+const double kBFDefaultErrorRate = 0.01;
+const uint16_t kBFDefaultExpansion = 2;
+
+class BloomChain : public Database {
+ public:
+ BloomChain(engine::Storage *storage, const std::string &ns) :
Database(storage, ns) {}
+ rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double
error_rate, uint16_t expansion);
+ rocksdb::Status Add(const Slice &user_key, const Slice &item, int *ret);
+ rocksdb::Status Exist(const Slice &user_key, const Slice &item, int *ret);
+
+ private:
+ std::string getBFKey(const Slice &ns_key, const BloomChainMetadata
&metadata, uint16_t filters_index);
+ void getBFKeyList(const Slice &ns_key, const BloomChainMetadata &metadata,
std::vector<std::string> *bf_key_list);
+ rocksdb::Status getBloomChainMetadata(const Slice &ns_key,
BloomChainMetadata *metadata);
+ rocksdb::Status createBloomChain(const Slice &ns_key, double error_rate,
uint32_t capacity, uint16_t expansion,
+ BloomChainMetadata *metadata);
+ rocksdb::Status bloomAdd(const Slice &bf_key, const std::string &item, int
*ret);
+ rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item, int
*ret);
+};
+} // namespace redis
diff --git a/tests/cppunit/types/bloom_chain_test.cc
b/tests/cppunit/types/bloom_chain_test.cc
new file mode 100644
index 00000000..664be011
--- /dev/null
+++ b/tests/cppunit/types/bloom_chain_test.cc
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "test_base.h"
+#include "types/redis_bloom_chain.h"
+
+class RedisBloomChainTest : public TestBase {
+ protected:
+ explicit RedisBloomChainTest() { sb_chain_ =
std::make_unique<redis::BloomChain>(storage_, "sb_chain_ns"); }
+ ~RedisBloomChainTest() override = default;
+
+ void SetUp() override { key_ = "test_sb_chain_key"; }
+ void TearDown() override {}
+
+ std::unique_ptr<redis::BloomChain> sb_chain_;
+};
+
+TEST_F(RedisBloomChainTest, Reserve) {
+ uint32_t capacity = 1000;
+ double error_rate = 0.02;
+ uint16_t expansion = 0;
+
+ auto s = sb_chain_->Reserve(key_, capacity, error_rate, expansion);
+ EXPECT_TRUE(s.ok());
+
+ // return false because the key is already exists;
+ s = sb_chain_->Reserve(key_, capacity, error_rate, expansion);
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(s.ToString(), "Invalid argument: the key already exists");
+
+ sb_chain_->Del(key_);
+}
+
+TEST_F(RedisBloomChainTest, BasicAddAndTest) {
+ int ret = 0;
+
+ auto s = sb_chain_->Exist("no_exist_key", "test_item", &ret);
+ EXPECT_FALSE(s.ok());
+ EXPECT_EQ(s.ToString(), "NotFound: key is not found");
+ sb_chain_->Del("no_exist_key");
+
+ std::string insert_items[] = {"item1", "item2", "item3", "item101",
"item202", "303"};
+ for (const auto& insert_item : insert_items) {
+ s = sb_chain_->Add(key_, insert_item, &ret);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(ret, 1);
+ }
+
+ for (const auto& insert_item : insert_items) {
+ s = sb_chain_->Exist(key_, insert_item, &ret);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(ret, 1);
+ }
+
+ std::string no_insert_items[] = {"item303", "item404", "1", "2", "3"};
+ for (const auto& no_insert_item : no_insert_items) {
+ s = sb_chain_->Exist(key_, no_insert_item, &ret);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(ret, 0);
+ }
+ sb_chain_->Del(key_);
+}
diff --git a/tests/gocase/unit/type/bloom/bloom_test.go
b/tests/gocase/unit/type/bloom/bloom_test.go
new file mode 100644
index 00000000..86f02199
--- /dev/null
+++ b/tests/gocase/unit/type/bloom/bloom_test.go
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package bloom
+
+import (
+ "context"
+ "testing"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/stretchr/testify/require"
+)
+
+func TestBloom(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{})
+ defer srv.Close()
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ key := "test_bloom_key"
+ t.Run("Reserve a bloom filter", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02",
"1000").Err())
+ })
+
+ t.Run("Reserve a bloom filter with wrong error_rate", func(t
*testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "abc",
"1000").Err(), "ERR value is not a valid float")
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key,
"-0.03", "1000").Err(), "ERR error rate should be between 0 and 1")
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "1",
"1000").Err(), "ERR error rate should be between 0 and 1")
+ })
+
+ t.Run("Reserve a bloom filter with wrong capacity", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"qwe").Err(), "ERR value is not an integer")
+ // capacity stored in uint32_t, if input is negative, the
parser will make an error.
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"-1000").Err(), "ERR value is not an integer or out of range")
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"0").Err(), "ERR capacity should be larger than 0")
+ })
+
+ t.Run("Reserve a bloom filter with nonscaling", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02",
"1000", "nonscaling").Err())
+ })
+
+ t.Run("Reserve a bloom filter with expansion", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02",
"1000", "expansion", "1").Err())
+ })
+
+ t.Run("Reserve a bloom filter with wrong expansion", func(t *testing.T)
{
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"1000", "expansion").Err(), "ERR no more item to parse")
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"1000", "expansion", "0").Err(), "ERR expansion should be greater or equal to
1")
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"1000", "expansion", "asd").Err(), "ERR not started as an integer")
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"1000", "expansion", "-1").Err(), "ERR out of range of integer type")
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"1000", "expansion", "1.5").Err(), "ERR encounter non-integer characters")
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"1000", "expansion", "123asd").Err(), "ERR encounter non-integer characters")
+ })
+
+ t.Run("Reserve a bloom filter with nonscaling and expansion", func(t
*testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"1000", "expansion", "1", "nonscaling").Err(), "ERR nonscaling filters cannot
expand")
+ require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01",
"1000", "nonscaling", "expansion", "1").Err(), "ERR nonscaling filters cannot
expand")
+ })
+
+ t.Run("Check no exists key", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "bf.exists",
"no_exist_key", "item1").Err(), "ERR NotFound: key is not found")
+ require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
+ })
+
+ t.Run("BasicAddAndCheck", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ var totalCount = 10000
+ var fpp = 0.01
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, fpp,
totalCount).Err())
+
+ var insertItems []string
+ for i := 0; i < totalCount; i++ {
+ buf := util.RandString(1, 5, util.Alpha)
+ insertItems = append(insertItems, buf)
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key,
buf).Val())
+ }
+
+ for i := 0; i < totalCount; i++ {
+ index := util.RandomInt(int64(totalCount))
+ require.Equal(t, int64(1), rdb.Do(ctx, "bf.exists",
key, insertItems[index]).Val())
+ }
+
+ var exist = 0
+ for i := 0; i < totalCount; i++ {
+ buf := util.RandString(5, 10, util.Alpha)
+ check := rdb.Do(ctx, "bf.exists", key, buf)
+ require.NoError(t, check.Err())
+ if check.Val() == int64(1) {
+ exist += 1
+ }
+ }
+ require.LessOrEqual(t, float64(exist), fpp*float64(totalCount))
+ })
+}