This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 11a01401 Add the support of SCALING for bloom filter (#1721)
11a01401 is described below
commit 11a01401d2695ef12e3931fec95465a2f5207337
Author: Leo <[email protected]>
AuthorDate: Tue Sep 12 13:57:36 2023 +0800
Add the support of SCALING for bloom filter (#1721)
Co-authored-by: Twice <[email protected]>
Co-authored-by: hulk <[email protected]>
Co-authored-by: mwish <[email protected]>
---
src/storage/redis_metadata.h | 5 +--
src/types/bloom_filter.h | 4 +-
src/types/redis_bloom_chain.cc | 60 ++++++++++++++++++++----------
src/types/redis_bloom_chain.h | 4 +-
tests/gocase/unit/type/bloom/bloom_test.go | 45 ++++++++++++++++++++++
5 files changed, 94 insertions(+), 24 deletions(-)
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 0512d1f9..33760aac 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -250,8 +250,7 @@ class BloomChainMetadata : public Metadata {
using Metadata::Decode;
rocksdb::Status Decode(Slice *bytes) override;
- /// Get the total capacity of the bloom chain (the sum capacity of all
sub-filters)
- ///
- /// @return the total capacity value
uint32_t GetCapacity() const;
+
+ bool IsScaling() const { return expansion != 0; };
};
diff --git a/src/types/bloom_filter.h b/src/types/bloom_filter.h
index d5f29ee3..effe68e4 100644
--- a/src/types/bloom_filter.h
+++ b/src/types/bloom_filter.h
@@ -154,7 +154,9 @@ class BlockSplitBloomFilter {
/// Get the plain bitset value from the Bloom filter bitset.
///
/// @return bitset value;
- const std::string& GetData() { return data_; }
+ const std::string& GetData() const& { return data_; }
+
+ std::string&& GetData() && { return std::move(data_); }
/// Compute hash for string value by using its plain encoding result.
///
diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc
index b15f5de7..df0ddb78 100644
--- a/src/types/redis_bloom_chain.cc
+++ b/src/types/redis_bloom_chain.cc
@@ -56,12 +56,12 @@ rocksdb::Status BloomChain::createBloomChain(const Slice
&ns_key, double error_r
block_split_bloom_filter.Init(metadata->bloom_bytes);
auto batch = storage_->GetWriteBatchBase();
- WriteBatchLogData log_data(kRedisBloomFilter, {"createSBChain"});
+ WriteBatchLogData log_data(kRedisBloomFilter, {"createBloomChain"});
batch->PutLogData(log_data.Encode());
- std::string sb_chain_meta_bytes;
- metadata->Encode(&sb_chain_meta_bytes);
- batch->Put(metadata_cf_handle_, ns_key, sb_chain_meta_bytes);
+ std::string bloom_chain_meta_bytes;
+ metadata->Encode(&bloom_chain_meta_bytes);
+ batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
std::string bf_key = getBFKey(ns_key, *metadata, metadata->n_filters - 1);
batch->Put(bf_key, block_split_bloom_filter.GetData());
@@ -69,18 +69,30 @@ rocksdb::Status BloomChain::createBloomChain(const Slice
&ns_key, double error_r
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
-rocksdb::Status BloomChain::bloomAdd(const Slice &bf_key, const std::string
&item) {
- std::string bf_data;
- rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), bf_key, &bf_data);
- if (!s.ok()) return s;
+void BloomChain::createBloomFilterInBatch(const Slice &ns_key,
BloomChainMetadata *metadata,
+
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, std::string *bf_data) {
+ uint32_t bloom_filter_bytes = BlockSplitBloomFilter::OptimalNumOfBytes(
+ static_cast<uint32_t>(metadata->base_capacity * pow(metadata->expansion,
metadata->n_filters)),
+ metadata->error_rate);
+ metadata->n_filters += 1;
+ metadata->bloom_bytes += bloom_filter_bytes;
+
BlockSplitBloomFilter block_split_bloom_filter;
- block_split_bloom_filter.Init(std::move(bf_data));
+ block_split_bloom_filter.Init(bloom_filter_bytes);
+ *bf_data = std::move(block_split_bloom_filter).GetData();
+
+ std::string bloom_chain_meta_bytes;
+ metadata->Encode(&bloom_chain_meta_bytes);
+ batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
+}
+
+void BloomChain::bloomAdd(const std::string &item, std::string *bf_data) {
+ BlockSplitBloomFilter block_split_bloom_filter;
+ block_split_bloom_filter.Init(std::move(*bf_data));
uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
block_split_bloom_filter.InsertHash(h);
- auto batch = storage_->GetWriteBatchBase();
- batch->Put(bf_key, block_split_bloom_filter.GetData());
- return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
+ *bf_data = std::move(block_split_bloom_filter).GetData();
}
rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string
&item, bool *exist) {
@@ -147,18 +159,28 @@ rocksdb::Status BloomChain::Add(const Slice &user_key,
const Slice &item, int *r
// insert
if (!exist) {
- if (metadata.size + 1 > metadata.GetCapacity()) { // TODO: scaling would
be supported later
- return rocksdb::Status::Aborted("filter is full");
- }
- s = bloomAdd(bf_key_list.back(), item_string);
+ std::string bf_data;
+ s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
if (!s.ok()) return s;
+
+ if (metadata.size + 1 > metadata.GetCapacity()) {
+ if (metadata.IsScaling()) {
+ batch->Put(bf_key_list.back(), bf_data);
+ createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
+ bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters -
1));
+ } else {
+ return rocksdb::Status::Aborted("filter is full and is nonscaling");
+ }
+ }
+ bloomAdd(item_string, &bf_data);
+ batch->Put(bf_key_list.back(), bf_data);
*ret = 1;
metadata.size += 1;
}
- std::string sb_chain_metadata_bytes;
- metadata.Encode(&sb_chain_metadata_bytes);
- batch->Put(metadata_cf_handle_, ns_key, sb_chain_metadata_bytes);
+ std::string bloom_chain_metadata_bytes;
+ metadata.Encode(&bloom_chain_metadata_bytes);
+ batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);
return storage_->Write(storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
diff --git a/src/types/redis_bloom_chain.h b/src/types/redis_bloom_chain.h
index 359a4237..ad66675f 100644
--- a/src/types/redis_bloom_chain.h
+++ b/src/types/redis_bloom_chain.h
@@ -61,7 +61,9 @@ class BloomChain : public Database {
rocksdb::Status getBloomChainMetadata(const Slice &ns_key,
BloomChainMetadata *metadata);
rocksdb::Status createBloomChain(const Slice &ns_key, double error_rate,
uint32_t capacity, uint16_t expansion,
BloomChainMetadata *metadata);
- rocksdb::Status bloomAdd(const Slice &bf_key, const std::string &item);
+ void createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata
*metadata,
+ ObserverOrUniquePtr<rocksdb::WriteBatchBase>
&batch, std::string *bf_data);
+ static void bloomAdd(const std::string &item, std::string *bf_data);
rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item,
bool *exist);
};
} // namespace redis
diff --git a/tests/gocase/unit/type/bloom/bloom_test.go
b/tests/gocase/unit/type/bloom/bloom_test.go
index 96ecb460..f93b84b8 100644
--- a/tests/gocase/unit/type/bloom/bloom_test.go
+++ b/tests/gocase/unit/type/bloom/bloom_test.go
@@ -203,6 +203,50 @@ func TestBloom(t *testing.T) {
require.Equal(t, int64(1), rdb.Do(ctx, "bf.info", key,
"items").Val())
})
+ t.Run("Bloom filter full and nonscaling", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001",
"50", "nonscaling").Err())
+
+ // insert items, suppose false positives is 0
+ for i := 0; i < 50; i++ {
+ buf := util.RandString(7, 8, util.Alpha)
+ Add := rdb.Do(ctx, "bf.add", key, buf)
+ require.NoError(t, Add.Err())
+ }
+ require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key,
"items").Val())
+ require.ErrorContains(t, rdb.Do(ctx, "bf.add", key,
"xxx").Err(), "filter is full and is nonscaling")
+ require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key,
"items").Val())
+ })
+
+ t.Run("Bloom filter full and scaling", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, key).Err())
+ require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001",
"50", "expansion", "2").Err())
+
+ // insert items, suppose false positives is 0
+ for i := 0; i < 50; i++ {
+ buf := util.RandString(7, 8, util.Alpha)
+ Add := rdb.Do(ctx, "bf.add", key, buf)
+ require.NoError(t, Add.Err())
+ }
+ require.Equal(t, []interface{}{"Capacity", int64(50), "Size",
int64(256), "Number of filters", int64(1), "Number of items inserted",
int64(50), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+
+ // bloom filter is full and scaling
+ require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxx").Err())
+ require.Equal(t, []interface{}{"Capacity", int64(150), "Size",
int64(768), "Number of filters", int64(2), "Number of items inserted",
int64(51), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+
+ // insert items, suppose false positives is 0
+ for i := 0; i < 99; i++ {
+ buf := util.RandString(7, 8, util.Alpha)
+ Add := rdb.Do(ctx, "bf.add", key, buf)
+ require.NoError(t, Add.Err())
+ }
+ require.Equal(t, []interface{}{"Capacity", int64(150), "Size",
int64(768), "Number of filters", int64(2), "Number of items inserted",
int64(150), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+
+ // bloom filter is full and scaling
+ require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxxx").Err())
+ require.Equal(t, []interface{}{"Capacity", int64(350), "Size",
int64(1792), "Number of filters", int64(3), "Number of items inserted",
int64(151), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+ })
+
t.Run("Get type of bloom filter", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, key).Err())
require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02",
"1000").Err())
@@ -210,4 +254,5 @@ func TestBloom(t *testing.T) {
})
// TODO: Add the testcase of get filters of bloom filter after complete
the scaling.
+
}