This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 11a01401 Add the support of SCALING for bloom filter (#1721)
11a01401 is described below

commit 11a01401d2695ef12e3931fec95465a2f5207337
Author: Leo <[email protected]>
AuthorDate: Tue Sep 12 13:57:36 2023 +0800

    Add the support of SCALING for bloom filter (#1721)
    
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: hulk <[email protected]>
    Co-authored-by: mwish <[email protected]>
---
 src/storage/redis_metadata.h               |  5 +--
 src/types/bloom_filter.h                   |  4 +-
 src/types/redis_bloom_chain.cc             | 60 ++++++++++++++++++++----------
 src/types/redis_bloom_chain.h              |  4 +-
 tests/gocase/unit/type/bloom/bloom_test.go | 45 ++++++++++++++++++++++
 5 files changed, 94 insertions(+), 24 deletions(-)

diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 0512d1f9..33760aac 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -250,8 +250,7 @@ class BloomChainMetadata : public Metadata {
   using Metadata::Decode;
   rocksdb::Status Decode(Slice *bytes) override;
 
-  /// Get the total capacity of the bloom chain (the sum capacity of all 
sub-filters)
-  ///
-  /// @return the total capacity value
   uint32_t GetCapacity() const;
+
+  bool IsScaling() const { return expansion != 0; };
 };
diff --git a/src/types/bloom_filter.h b/src/types/bloom_filter.h
index d5f29ee3..effe68e4 100644
--- a/src/types/bloom_filter.h
+++ b/src/types/bloom_filter.h
@@ -154,7 +154,9 @@ class BlockSplitBloomFilter {
   /// Get the plain bitset value from the Bloom filter bitset.
   ///
   /// @return bitset value;
-  const std::string& GetData() { return data_; }
+  const std::string& GetData() const& { return data_; }
+
+  std::string&& GetData() && { return std::move(data_); }
 
   /// Compute hash for string value by using its plain encoding result.
   ///
diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc
index b15f5de7..df0ddb78 100644
--- a/src/types/redis_bloom_chain.cc
+++ b/src/types/redis_bloom_chain.cc
@@ -56,12 +56,12 @@ rocksdb::Status BloomChain::createBloomChain(const Slice 
&ns_key, double error_r
   block_split_bloom_filter.Init(metadata->bloom_bytes);
 
   auto batch = storage_->GetWriteBatchBase();
-  WriteBatchLogData log_data(kRedisBloomFilter, {"createSBChain"});
+  WriteBatchLogData log_data(kRedisBloomFilter, {"createBloomChain"});
   batch->PutLogData(log_data.Encode());
 
-  std::string sb_chain_meta_bytes;
-  metadata->Encode(&sb_chain_meta_bytes);
-  batch->Put(metadata_cf_handle_, ns_key, sb_chain_meta_bytes);
+  std::string bloom_chain_meta_bytes;
+  metadata->Encode(&bloom_chain_meta_bytes);
+  batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
 
   std::string bf_key = getBFKey(ns_key, *metadata, metadata->n_filters - 1);
   batch->Put(bf_key, block_split_bloom_filter.GetData());
@@ -69,18 +69,30 @@ rocksdb::Status BloomChain::createBloomChain(const Slice 
&ns_key, double error_r
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
-rocksdb::Status BloomChain::bloomAdd(const Slice &bf_key, const std::string 
&item) {
-  std::string bf_data;
-  rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), bf_key, &bf_data);
-  if (!s.ok()) return s;
+void BloomChain::createBloomFilterInBatch(const Slice &ns_key, 
BloomChainMetadata *metadata,
+                                          
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, std::string *bf_data) {
+  uint32_t bloom_filter_bytes = BlockSplitBloomFilter::OptimalNumOfBytes(
+      static_cast<uint32_t>(metadata->base_capacity * pow(metadata->expansion, 
metadata->n_filters)),
+      metadata->error_rate);
+  metadata->n_filters += 1;
+  metadata->bloom_bytes += bloom_filter_bytes;
+
   BlockSplitBloomFilter block_split_bloom_filter;
-  block_split_bloom_filter.Init(std::move(bf_data));
+  block_split_bloom_filter.Init(bloom_filter_bytes);
+  *bf_data = std::move(block_split_bloom_filter).GetData();
+
+  std::string bloom_chain_meta_bytes;
+  metadata->Encode(&bloom_chain_meta_bytes);
+  batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
+}
+
+void BloomChain::bloomAdd(const std::string &item, std::string *bf_data) {
+  BlockSplitBloomFilter block_split_bloom_filter;
+  block_split_bloom_filter.Init(std::move(*bf_data));
 
   uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
   block_split_bloom_filter.InsertHash(h);
-  auto batch = storage_->GetWriteBatchBase();
-  batch->Put(bf_key, block_split_bloom_filter.GetData());
-  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+  *bf_data = std::move(block_split_bloom_filter).GetData();
 }
 
 rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string 
&item, bool *exist) {
@@ -147,18 +159,28 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, 
const Slice &item, int *r
 
   // insert
   if (!exist) {
-    if (metadata.size + 1 > metadata.GetCapacity()) {  // TODO: scaling would 
be supported later
-      return rocksdb::Status::Aborted("filter is full");
-    }
-    s = bloomAdd(bf_key_list.back(), item_string);
+    std::string bf_data;
+    s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
     if (!s.ok()) return s;
+
+    if (metadata.size + 1 > metadata.GetCapacity()) {
+      if (metadata.IsScaling()) {
+        batch->Put(bf_key_list.back(), bf_data);
+        createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
+        bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 
1));
+      } else {
+        return rocksdb::Status::Aborted("filter is full and is nonscaling");
+      }
+    }
+    bloomAdd(item_string, &bf_data);
+    batch->Put(bf_key_list.back(), bf_data);
     *ret = 1;
     metadata.size += 1;
   }
 
-  std::string sb_chain_metadata_bytes;
-  metadata.Encode(&sb_chain_metadata_bytes);
-  batch->Put(metadata_cf_handle_, ns_key, sb_chain_metadata_bytes);
+  std::string bloom_chain_metadata_bytes;
+  metadata.Encode(&bloom_chain_metadata_bytes);
+  batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);
 
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
diff --git a/src/types/redis_bloom_chain.h b/src/types/redis_bloom_chain.h
index 359a4237..ad66675f 100644
--- a/src/types/redis_bloom_chain.h
+++ b/src/types/redis_bloom_chain.h
@@ -61,7 +61,9 @@ class BloomChain : public Database {
   rocksdb::Status getBloomChainMetadata(const Slice &ns_key, 
BloomChainMetadata *metadata);
   rocksdb::Status createBloomChain(const Slice &ns_key, double error_rate, 
uint32_t capacity, uint16_t expansion,
                                    BloomChainMetadata *metadata);
-  rocksdb::Status bloomAdd(const Slice &bf_key, const std::string &item);
+  void createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata 
*metadata,
+                                ObserverOrUniquePtr<rocksdb::WriteBatchBase> 
&batch, std::string *bf_data);
+  static void bloomAdd(const std::string &item, std::string *bf_data);
   rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item, 
bool *exist);
 };
 }  // namespace redis
diff --git a/tests/gocase/unit/type/bloom/bloom_test.go 
b/tests/gocase/unit/type/bloom/bloom_test.go
index 96ecb460..f93b84b8 100644
--- a/tests/gocase/unit/type/bloom/bloom_test.go
+++ b/tests/gocase/unit/type/bloom/bloom_test.go
@@ -203,6 +203,50 @@ func TestBloom(t *testing.T) {
                require.Equal(t, int64(1), rdb.Do(ctx, "bf.info", key, 
"items").Val())
        })
 
+       t.Run("Bloom filter full and nonscaling", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", 
"50", "nonscaling").Err())
+
+               // insert items, suppose false positives is 0
+               for i := 0; i < 50; i++ {
+                       buf := util.RandString(7, 8, util.Alpha)
+                       Add := rdb.Do(ctx, "bf.add", key, buf)
+                       require.NoError(t, Add.Err())
+               }
+               require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, 
"items").Val())
+               require.ErrorContains(t, rdb.Do(ctx, "bf.add", key, 
"xxx").Err(), "filter is full and is nonscaling")
+               require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, 
"items").Val())
+       })
+
+       t.Run("Bloom filter full and scaling", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", 
"50", "expansion", "2").Err())
+
+               // insert items, suppose false positives is 0
+               for i := 0; i < 50; i++ {
+                       buf := util.RandString(7, 8, util.Alpha)
+                       Add := rdb.Do(ctx, "bf.add", key, buf)
+                       require.NoError(t, Add.Err())
+               }
+               require.Equal(t, []interface{}{"Capacity", int64(50), "Size", 
int64(256), "Number of filters", int64(1), "Number of items inserted", 
int64(50), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+
+               // bloom filter is full and scaling
+               require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxx").Err())
+               require.Equal(t, []interface{}{"Capacity", int64(150), "Size", 
int64(768), "Number of filters", int64(2), "Number of items inserted", 
int64(51), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+
+               // insert items, suppose false positives is 0
+               for i := 0; i < 99; i++ {
+                       buf := util.RandString(7, 8, util.Alpha)
+                       Add := rdb.Do(ctx, "bf.add", key, buf)
+                       require.NoError(t, Add.Err())
+               }
+               require.Equal(t, []interface{}{"Capacity", int64(150), "Size", 
int64(768), "Number of filters", int64(2), "Number of items inserted", 
int64(150), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+
+               // bloom filter is full and scaling
+               require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxxx").Err())
+               require.Equal(t, []interface{}{"Capacity", int64(350), "Size", 
int64(1792), "Number of filters", int64(3), "Number of items inserted", 
int64(151), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+       })
+
        t.Run("Get type of bloom filter", func(t *testing.T) {
                require.NoError(t, rdb.Del(ctx, key).Err())
                require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", 
"1000").Err())
@@ -210,4 +254,5 @@ func TestBloom(t *testing.T) {
        })
 
        // TODO: Add the testcase of get filters of bloom filter after complete 
the scaling.
+
 }

Reply via email to