This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 3092081a Add the support of the BF.MADD command (#1758)
3092081a is described below

commit 3092081a4857d681173a752a15b5b5b95e68758a
Author: Leo <[email protected]>
AuthorDate: Tue Sep 19 14:42:08 2023 +0800

    Add the support of the BF.MADD command (#1758)
    
    Co-authored-by: hulk <[email protected]>
    Co-authored-by: mwish <[email protected]>
---
 src/commands/cmd_bloom_filter.cc           |  67 ++++++++++--
 src/server/redis_reply.h                   |   9 --
 src/types/redis_bloom_chain.cc             | 165 +++++++++++++++++------------
 src/types/redis_bloom_chain.h              |  25 +++--
 tests/cppunit/types/bloom_chain_test.cc    |  17 +--
 tests/gocase/unit/type/bloom/bloom_test.go | 101 ++++++++++++++++--
 6 files changed, 274 insertions(+), 110 deletions(-)

diff --git a/src/commands/cmd_bloom_filter.cc b/src/commands/cmd_bloom_filter.cc
index 61644b34..ded31857 100644
--- a/src/commands/cmd_bloom_filter.cc
+++ b/src/commands/cmd_bloom_filter.cc
@@ -91,24 +91,71 @@ class CommandBFAdd : public Commander {
  public:
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
     redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
-    int ret = 0;
+    BloomFilterAddResult ret = BloomFilterAddResult::kOk;
     auto s = bloom_db.Add(args_[1], args_[2], &ret);
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
-    *output = redis::Integer(ret);
+    switch (ret) {
+      case BloomFilterAddResult::kOk:
+        *output = redis::Integer(1);
+        break;
+      case BloomFilterAddResult::kExist:
+        *output = redis::Integer(0);
+        break;
+      case BloomFilterAddResult::kFull:
+        *output = redis::Error("ERR nonscaling filter is full");
+        break;
+    }
     return Status::OK();
   }
 };
 
+class CommandBFMAdd : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    items_.reserve(args_.size() - 2);
+    for (size_t i = 2; i < args_.size(); ++i) {
+      items_.emplace_back(args_[i]);
+    }
+    return Commander::Parse(args);
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
+    std::vector<BloomFilterAddResult> rets(items_.size(), 
BloomFilterAddResult::kOk);
+    auto s = bloom_db.MAdd(args_[1], items_, &rets);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+    *output = redis::MultiLen(items_.size());
+    for (size_t i = 0; i < items_.size(); ++i) {
+      switch (rets[i]) {
+        case BloomFilterAddResult::kOk:
+          *output += redis::Integer(1);
+          break;
+        case BloomFilterAddResult::kExist:
+          *output += redis::Integer(0);
+          break;
+        case BloomFilterAddResult::kFull:
+          *output += redis::Error("ERR nonscaling filter is full");
+          break;
+      }
+    }
+    return Status::OK();
+  }
+
+ private:
+  std::vector<Slice> items_;
+};
+
 class CommandBFExists : public Commander {
  public:
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
     redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
-    int ret = 0;
-    auto s = bloom_db.Exists(args_[1], args_[2], &ret);
+    bool exist = false;
+    auto s = bloom_db.Exists(args_[1], args_[2], &exist);
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
-    *output = redis::Integer(ret);
+    *output = redis::Integer(exist ? 1 : 0);
     return Status::OK();
   }
 };
@@ -125,11 +172,14 @@ class CommandBFMExists : public Commander {
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
     redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
-    std::vector<int> rets(items_.size(), 0);
-    auto s = bloom_db.MExists(args_[1], items_, &rets);
+    std::vector<bool> exists(items_.size(), false);
+    auto s = bloom_db.MExists(args_[1], items_, &exists);
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
-    *output = redis::MultiInteger(rets);
+    *output = redis::MultiLen(items_.size());
+    for (size_t i = 0; i < items_.size(); ++i) {
+      *output += Integer(exists[i] ? 1 : 0);
+    }
     return Status::OK();
   }
 
@@ -226,6 +276,7 @@ class CommandBFCard : public Commander {
 
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, 
"write", 1, 1, 1),
                         MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 
1),
+                        MakeCmdAttr<CommandBFMAdd>("bf.madd", -3, "write", 1, 
1, 1),
                         MakeCmdAttr<CommandBFExists>("bf.exists", 3, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandBFMExists>("bf.mexists", -3, 
"read-only", 1, 1, 1),
                         MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 
1, 1, 1),
diff --git a/src/server/redis_reply.h b/src/server/redis_reply.h
index 75492c1d..c3cc1b44 100644
--- a/src/server/redis_reply.h
+++ b/src/server/redis_reply.h
@@ -40,15 +40,6 @@ std::string Integer(T data) {
   return ":" + std::to_string(data) + CRLF;
 }
 
-template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
-std::string MultiInteger(const std::vector<T> &multi_data) {
-  std::string result = "*" + std::to_string(multi_data.size()) + CRLF;
-  for (const auto &data : multi_data) {
-    result += Integer(data);
-  }
-  return result;
-}
-
 std::string BulkString(const std::string &data);
 std::string NilString();
 
diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc
index d092c0d2..592b6eec 100644
--- a/src/types/redis_bloom_chain.cc
+++ b/src/types/redis_bloom_chain.cc
@@ -24,6 +24,10 @@
 
 namespace redis {
 
+rocksdb::Status BloomChain::getBloomChainMetadata(const Slice &ns_key, 
BloomChainMetadata *metadata) {
+  return Database::GetMetadata(kRedisBloomFilter, ns_key, metadata);
+}
+
 std::string BloomChain::getBFKey(const Slice &ns_key, const BloomChainMetadata 
&metadata, uint16_t filters_index) {
   std::string sub_key;
   PutFixed16(&sub_key, filters_index);
@@ -40,8 +44,27 @@ void BloomChain::getBFKeyList(const Slice &ns_key, const 
BloomChainMetadata &met
   }
 }
 
-rocksdb::Status BloomChain::getBloomChainMetadata(const Slice &ns_key, 
BloomChainMetadata *metadata) {
-  return Database::GetMetadata(kRedisBloomFilter, ns_key, metadata);
+rocksdb::Status BloomChain::getBFDataList(const std::vector<std::string> 
&bf_key_list,
+                                          std::vector<std::string> 
*bf_data_list) {
+  LatestSnapShot ss(storage_);
+  rocksdb::ReadOptions read_options;
+  read_options.snapshot = ss.GetSnapShot();
+
+  bf_data_list->reserve(bf_key_list.size());
+  for (const auto &bf_key : bf_key_list) {
+    std::string bf_data;
+    rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
+    if (!s.ok()) return s;
+    bf_data_list->push_back(std::move(bf_data));
+  }
+  return rocksdb::Status::OK();
+}
+
+void BloomChain::getItemHashList(const std::vector<Slice> &items, 
std::vector<uint64_t> *item_hash_list) {
+  item_hash_list->reserve(items.size());
+  for (const auto &item : items) {
+    item_hash_list->push_back(BlockSplitBloomFilter::Hash(item.data(), 
item.size()));
+  }
 }
 
 rocksdb::Status BloomChain::createBloomChain(const Slice &ns_key, double 
error_rate, uint32_t capacity,
@@ -85,32 +108,14 @@ void BloomChain::createBloomFilterInBatch(const Slice 
&ns_key, BloomChainMetadat
   batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
 }
 
-void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
-  BlockSplitBloomFilter block_split_bloom_filter(*bf_data);
-
-  uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
-  block_split_bloom_filter.InsertHash(h);
-}
-
-rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const 
std::vector<Slice> &items,
-                                       std::vector<bool> *exists) {
-  LatestSnapShot ss(storage_);
-  rocksdb::ReadOptions read_options;
-  read_options.snapshot = ss.GetSnapShot();
-  std::string bf_data;
-  rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
-  if (!s.ok()) return s;
+void BloomChain::bloomAdd(uint64_t item_hash, std::string &bf_data) {
   BlockSplitBloomFilter block_split_bloom_filter(bf_data);
+  block_split_bloom_filter.InsertHash(item_hash);
+}
 
-  for (size_t i = 0; i < items.size(); ++i) {
-    // this item exists in other bloomfilter already, and it's not necessary 
to check in this bloomfilter.
-    if ((*exists)[i]) {
-      continue;
-    }
-    uint64_t h = BlockSplitBloomFilter::Hash(items[i].data(), items[i].size());
-    (*exists)[i] = block_split_bloom_filter.FindHash(h);
-  }
-  return rocksdb::Status::OK();
+bool BloomChain::bloomCheck(uint64_t item_hash, std::string &bf_data) {
+  const BlockSplitBloomFilter block_split_bloom_filter(bf_data);
+  return block_split_bloom_filter.FindHash(item_hash);
 }
 
 rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, 
double error_rate, uint16_t expansion) {
@@ -127,7 +132,15 @@ rocksdb::Status BloomChain::Reserve(const Slice &user_key, 
uint32_t capacity, do
   return createBloomChain(ns_key, error_rate, capacity, expansion, 
&bloom_chain_metadata);
 }
 
-rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int 
*ret) {
+rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, 
BloomFilterAddResult *ret) {
+  std::vector<BloomFilterAddResult> tmp{BloomFilterAddResult::kOk};
+  rocksdb::Status s = MAdd(user_key, {item}, &tmp);
+  *ret = tmp[0];
+  return s;
+}
+
+rocksdb::Status BloomChain::MAdd(const Slice &user_key, const 
std::vector<Slice> &items,
+                                 std::vector<BloomFilterAddResult> *rets) {
   std::string ns_key = AppendNamespacePrefix(user_key);
   LockGuard guard(storage_->GetLockManager(), ns_key);
 
@@ -142,63 +155,72 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, 
const Slice &item, int *r
   std::vector<std::string> bf_key_list;
   getBFKeyList(ns_key, metadata, &bf_key_list);
 
+  std::vector<std::string> bf_data_list;
+  s = getBFDataList(bf_key_list, &bf_data_list);
+  if (!s.ok()) return s;
+
+  std::vector<uint64_t> item_hash_list;
+  getItemHashList(items, &item_hash_list);
+
+  uint64_t origin_size = metadata.size;
   auto batch = storage_->GetWriteBatchBase();
   WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
   batch->PutLogData(log_data.Encode());
 
-  // check
-  std::vector<bool> exist{false};                      // TODO: to refine in 
BF.MADD
-  for (int i = metadata.n_filters - 1; i >= 0; --i) {  // TODO: to test which 
direction for searching is better
-    s = bloomCheck(bf_key_list[i], {item}, &exist);
-    if (!s.ok()) return s;
-    if (exist[0]) {
-      *ret = 0;
-      break;
+  for (size_t i = 0; i < items.size(); ++i) {
+    // check
+    bool exist = false;
+    // TODO: to test which direction for searching is better
+    for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
+      exist = bloomCheck(item_hash_list[i], bf_data_list[ii]);
+      if (exist) break;
     }
-  }
-
-  // insert
-  if (!exist[0]) {  // TODO: to refine in BF.MADD
-    std::string bf_data;
-    s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
-    if (!s.ok()) return s;
 
-    if (metadata.size + 1 > metadata.GetCapacity()) {
-      if (metadata.IsScaling()) {
-        batch->Put(bf_key_list.back(), bf_data);
-        createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
-        bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 
1));
-      } else {
-        return rocksdb::Status::Aborted("filter is full and is nonscaling");
+    // insert
+    if (exist) {
+      (*rets)[i] = BloomFilterAddResult::kExist;
+    } else {
+      if (metadata.size + 1 > metadata.GetCapacity()) {
+        if (metadata.IsScaling()) {
+          batch->Put(bf_key_list.back(), bf_data_list.back());
+          std::string bf_data;
+          createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
+          bf_data_list.push_back(std::move(bf_data));
+          bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters 
- 1));
+        } else {
+          (*rets)[i] = BloomFilterAddResult::kFull;
+          continue;
+        }
       }
+      bloomAdd(item_hash_list[i], bf_data_list.back());
+      (*rets)[i] = BloomFilterAddResult::kOk;
+      metadata.size += 1;
     }
-    bloomAdd(item, &bf_data);
-    batch->Put(bf_key_list.back(), bf_data);
-    *ret = 1;
-    metadata.size += 1;
   }
 
-  std::string bloom_chain_metadata_bytes;
-  metadata.Encode(&bloom_chain_metadata_bytes);
-  batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);
-
+  if (metadata.size != origin_size) {
+    std::string bloom_chain_metadata_bytes;
+    metadata.Encode(&bloom_chain_metadata_bytes);
+    batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);
+    batch->Put(bf_key_list.back(), bf_data_list.back());
+  }
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
-rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, 
int *ret) {
-  std::vector<int> tmp{0};
+rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, 
bool *exist) {
+  std::vector<bool> tmp{false};
   rocksdb::Status s = MExists(user_key, {item}, &tmp);
-  *ret = tmp[0];
+  *exist = tmp[0];
   return s;
 }
 
-rocksdb::Status BloomChain::MExists(const Slice &user_key, const 
std::vector<Slice> &items, std::vector<int> *rets) {
+rocksdb::Status BloomChain::MExists(const Slice &user_key, const 
std::vector<Slice> &items, std::vector<bool> *exists) {
   std::string ns_key = AppendNamespacePrefix(user_key);
 
   BloomChainMetadata metadata;
   rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
   if (s.IsNotFound()) {
-    std::fill(rets->begin(), rets->end(), 0);
+    std::fill(exists->begin(), exists->end(), false);
     return rocksdb::Status::OK();
   }
   if (!s.ok()) return s;
@@ -206,15 +228,20 @@ rocksdb::Status BloomChain::MExists(const Slice 
&user_key, const std::vector<Sli
   std::vector<std::string> bf_key_list;
   getBFKeyList(ns_key, metadata, &bf_key_list);
 
-  // check
-  std::vector<bool> exists(items.size(), false);
-  for (int i = metadata.n_filters - 1; i >= 0; --i) {  // TODO: to test which 
direction for searching is better
-    s = bloomCheck(bf_key_list[i], items, &exists);
-    if (!s.ok()) return s;
-  }
+  std::vector<std::string> bf_data_list;
+  s = getBFDataList(bf_key_list, &bf_data_list);
+  if (!s.ok()) return s;
+
+  std::vector<uint64_t> item_hash_list;
+  getItemHashList(items, &item_hash_list);
 
   for (size_t i = 0; i < items.size(); ++i) {
-    (*rets)[i] = exists[i] ? 1 : 0;
+    // check
+    // TODO: to test which direction for searching is better
+    for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
+      (*exists)[i] = bloomCheck(item_hash_list[i], bf_data_list[ii]);
+      if ((*exists)[i]) break;
+    }
   }
 
   return rocksdb::Status::OK();
diff --git a/src/types/redis_bloom_chain.h b/src/types/redis_bloom_chain.h
index 27264e02..b2d53774 100644
--- a/src/types/redis_bloom_chain.h
+++ b/src/types/redis_bloom_chain.h
@@ -39,6 +39,12 @@ enum class BloomInfoType {
   kExpansion,
 };
 
+enum class BloomFilterAddResult {
+  kOk,
+  kExist,
+  kFull,
+};
+
 struct BloomFilterInfo {
   uint32_t capacity;
   uint32_t bloom_bytes;
@@ -51,26 +57,27 @@ class BloomChain : public Database {
  public:
   BloomChain(engine::Storage *storage, const std::string &ns) : 
Database(storage, ns) {}
   rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double 
error_rate, uint16_t expansion);
-  rocksdb::Status Add(const Slice &user_key, const Slice &item, int *ret);
-  rocksdb::Status Exists(const Slice &user_key, const Slice &item, int *ret);
-  rocksdb::Status MExists(const Slice &user_key, const std::vector<Slice> 
&items, std::vector<int> *rets);
+  rocksdb::Status Add(const Slice &user_key, const Slice &item, 
BloomFilterAddResult *ret);
+  rocksdb::Status MAdd(const Slice &user_key, const std::vector<Slice> &items, 
std::vector<BloomFilterAddResult> *rets);
+  rocksdb::Status Exists(const Slice &user_key, const Slice &item, bool 
*exist);
+  rocksdb::Status MExists(const Slice &user_key, const std::vector<Slice> 
&items, std::vector<bool> *exists);
   rocksdb::Status Info(const Slice &user_key, BloomFilterInfo *info);
 
  private:
+  rocksdb::Status getBloomChainMetadata(const Slice &ns_key, 
BloomChainMetadata *metadata);
   std::string getBFKey(const Slice &ns_key, const BloomChainMetadata 
&metadata, uint16_t filters_index);
   void getBFKeyList(const Slice &ns_key, const BloomChainMetadata &metadata, 
std::vector<std::string> *bf_key_list);
-  rocksdb::Status getBloomChainMetadata(const Slice &ns_key, 
BloomChainMetadata *metadata);
+  rocksdb::Status getBFDataList(const std::vector<std::string> &bf_key_list, 
std::vector<std::string> *bf_data_list);
+  static void getItemHashList(const std::vector<Slice> &items, 
std::vector<uint64_t> *item_hash_list);
+
   rocksdb::Status createBloomChain(const Slice &ns_key, double error_rate, 
uint32_t capacity, uint16_t expansion,
                                    BloomChainMetadata *metadata);
   void createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata 
*metadata,
                                 ObserverOrUniquePtr<rocksdb::WriteBatchBase> 
&batch, std::string *bf_data);
 
   /// bf_data: [in/out] The content string of bloomfilter.
-  static void bloomAdd(const Slice &item, std::string *bf_data);
+  static void bloomAdd(uint64_t item_hash, std::string &bf_data);
 
-  /// exists: [in/out] The items exist in bloomfilter already or not.
-  /// exists[i] is true means items[i] exists in other bloomfilter already, 
and it's not necessary to check in this
-  /// bloomfilter.
-  rocksdb::Status bloomCheck(const Slice &bf_key, const std::vector<Slice> 
&items, std::vector<bool> *exists);
+  static bool bloomCheck(uint64_t item_hash, std::string &bf_data);
 };
 }  // namespace redis
diff --git a/tests/cppunit/types/bloom_chain_test.cc 
b/tests/cppunit/types/bloom_chain_test.cc
index 35d789e9..904d3fbd 100644
--- a/tests/cppunit/types/bloom_chain_test.cc
+++ b/tests/cppunit/types/bloom_chain_test.cc
@@ -53,30 +53,31 @@ TEST_F(RedisBloomChainTest, Reserve) {
 }
 
 TEST_F(RedisBloomChainTest, BasicAddAndTest) {
-  int ret = 0;
+  redis::BloomFilterAddResult ret = redis::BloomFilterAddResult::kOk;
+  bool exist = false;
 
-  auto s = sb_chain_->Exists("no_exist_key", "test_item", &ret);
-  EXPECT_EQ(ret, 0);
+  auto s = sb_chain_->Exists("no_exist_key", "test_item", &exist);
+  EXPECT_EQ(exist, 0);
   s = sb_chain_->Del("no_exist_key");
 
   std::string insert_items[] = {"item1", "item2", "item3", "item101", 
"item202", "303"};
   for (const auto& insert_item : insert_items) {
     s = sb_chain_->Add(key_, insert_item, &ret);
     EXPECT_TRUE(s.ok());
-    EXPECT_EQ(ret, 1);
+    EXPECT_EQ(ret, redis::BloomFilterAddResult::kOk);
   }
 
   for (const auto& insert_item : insert_items) {
-    s = sb_chain_->Exists(key_, insert_item, &ret);
+    s = sb_chain_->Exists(key_, insert_item, &exist);
     EXPECT_TRUE(s.ok());
-    EXPECT_EQ(ret, 1);
+    EXPECT_EQ(exist, true);
   }
 
   std::string no_insert_items[] = {"item303", "item404", "1", "2", "3"};
   for (const auto& no_insert_item : no_insert_items) {
-    s = sb_chain_->Exists(key_, no_insert_item, &ret);
+    s = sb_chain_->Exists(key_, no_insert_item, &exist);
     EXPECT_TRUE(s.ok());
-    EXPECT_EQ(ret, 0);
+    EXPECT_EQ(exist, false);
   }
   s = sb_chain_->Del(key_);
 }
diff --git a/tests/gocase/unit/type/bloom/bloom_test.go 
b/tests/gocase/unit/type/bloom/bloom_test.go
index 47b55aac..153296d2 100644
--- a/tests/gocase/unit/type/bloom/bloom_test.go
+++ b/tests/gocase/unit/type/bloom/bloom_test.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/kvrocks/tests/gocase/util"
        "github.com/redis/go-redis/v9"
+       "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 )
 
@@ -145,6 +146,87 @@ func TestBloom(t *testing.T) {
                require.LessOrEqual(t, float64(falseExist), 
fpp*float64(totalCount))
        })
 
+       t.Run("MAdd Basic Test", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.Equal(t, []interface{}{int64(0), int64(0), int64(0)}, 
rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
+
+               require.Equal(t, []interface{}{int64(1), int64(1)}, rdb.Do(ctx, 
"bf.madd", key, "xxx", "zzz").Val())
+               require.Equal(t, int64(2), rdb.Do(ctx, "bf.card", key).Val())
+               require.Equal(t, []interface{}{int64(1), int64(0), int64(1)}, 
rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
+
+               // add the existed value
+               require.Equal(t, []interface{}{int64(0)}, rdb.Do(ctx, 
"bf.madd", key, "zzz").Val())
+               require.Equal(t, []interface{}{int64(1), int64(0), int64(1)}, 
rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
+
+               // add the same value
+               require.Equal(t, []interface{}{int64(1), int64(0)}, rdb.Do(ctx, 
"bf.madd", key, "yyy", "yyy").Val())
+               require.Equal(t, []interface{}{int64(1), int64(1), int64(1)}, 
rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
+       })
+
+       t.Run("MAdd nonscaling Test", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", 
"25", "nonscaling").Err())
+
+               // insert items
+               var insertNum int64 = 0
+               require.Equal(t, []interface{}{int64(1), int64(1), int64(1), 
int64(1)}, rdb.Do(ctx, "bf.madd", key, "x", "y", "z", "k").Val())
+               for insertNum < 24 {
+                       buf := util.RandString(7, 8, util.Alpha)
+                       Add := rdb.Do(ctx, "bf.madd", key, buf, buf+"xx")
+                       require.NoError(t, Add.Err())
+                       insertNum = rdb.Do(ctx, "bf.card", key).Val().(int64)
+               }
+               require.Equal(t, int64(24), rdb.Do(ctx, "bf.card", key).Val())
+
+               Add := rdb.Do(ctx, "bf.madd", key, "a", "x", "xxx", "y", 
"z").Val()
+               ret := make([]interface{}, 0, 5)
+               for _, value := range Add.([]interface{}) {
+                       switch v := value.(type) {
+                       case int64:
+                               ret = append(ret, v)
+                       case error:
+                               ret = append(ret, v.Error())
+                       default:
+                       }
+               }
+               assert.Equal(t, []interface{}{int64(1), int64(0), "ERR 
nonscaling filter is full", int64(0), int64(0)}, ret)
+               require.Equal(t, int64(25), rdb.Do(ctx, "bf.card", key).Val())
+       })
+
+       t.Run("MAdd scaling Test", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", 
"30", "expansion", "2").Err())
+
+               // insert items
+               var insertNum int64 = 0
+               for insertNum < 30 {
+                       buf := util.RandString(7, 8, util.Alpha)
+                       Add := rdb.Do(ctx, "bf.madd", key, buf, buf+"xx", 
buf+"yy")
+                       require.NoError(t, Add.Err())
+                       insertNum = rdb.Do(ctx, "bf.card", key).Val().(int64)
+               }
+               require.Equal(t, []interface{}{"Capacity", int64(30), "Size", 
int64(128), "Number of filters", int64(1), "Number of items inserted", 
int64(30), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+
+               // bloom filter is full and scaling
+               require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxx").Err())
+               require.Equal(t, []interface{}{"Capacity", int64(90), "Size", 
int64(384), "Number of filters", int64(2), "Number of items inserted", 
int64(31), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+
+               // insert items
+               for insertNum < 90 {
+                       buf := util.RandString(7, 8, util.Alpha)
+                       Add := rdb.Do(ctx, "bf.add", key, buf)
+                       require.NoError(t, Add.Err())
+                       insertNum = rdb.Do(ctx, "bf.card", key).Val().(int64)
+               }
+               require.Equal(t, []interface{}{"Capacity", int64(90), "Size", 
int64(384), "Number of filters", int64(2), "Number of items inserted", 
int64(90), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+               // add the existed value would not scaling
+               require.NoError(t, rdb.Do(ctx, "bf.madd", key, "xxx").Err())
+               require.Equal(t, []interface{}{"Capacity", int64(90), "Size", 
int64(384), "Number of filters", int64(2), "Number of items inserted", 
int64(90), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+               // bloom filter is full and scaling
+               require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxxx").Err())
+               require.Equal(t, []interface{}{"Capacity", int64(210), "Size", 
int64(896), "Number of filters", int64(3), "Number of items inserted", 
int64(91), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
+       })
+
        t.Run("MExists Basic Test", func(t *testing.T) {
                require.NoError(t, rdb.Del(ctx, key).Err())
                require.Equal(t, []interface{}{int64(0), int64(0), int64(0)}, 
rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
@@ -222,14 +304,16 @@ func TestBloom(t *testing.T) {
                require.NoError(t, rdb.Del(ctx, key).Err())
                require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", 
"50", "nonscaling").Err())
 
-               // insert items, suppose false positives is 0
-               for i := 0; i < 50; i++ {
+               // insert items
+               var insertNum int64 = 0
+               for insertNum < 50 {
                        buf := util.RandString(7, 8, util.Alpha)
                        Add := rdb.Do(ctx, "bf.add", key, buf)
                        require.NoError(t, Add.Err())
+                       insertNum = rdb.Do(ctx, "bf.card", key).Val().(int64)
                }
                require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, 
"items").Val())
-               require.ErrorContains(t, rdb.Do(ctx, "bf.add", key, 
"xxx").Err(), "filter is full and is nonscaling")
+               require.ErrorContains(t, rdb.Do(ctx, "bf.add", key, 
"xxx").Err(), "ERR nonscaling filter is full")
                require.Equal(t, int64(50), rdb.Do(ctx, "bf.info", key, 
"items").Val())
        })
 
@@ -237,11 +321,13 @@ func TestBloom(t *testing.T) {
                require.NoError(t, rdb.Del(ctx, key).Err())
                require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.0001", 
"50", "expansion", "2").Err())
 
-               // insert items, suppose false positives is 0
-               for i := 0; i < 50; i++ {
+               // insert items
+               var insertNum int64 = 0
+               for insertNum < 50 {
                        buf := util.RandString(7, 8, util.Alpha)
                        Add := rdb.Do(ctx, "bf.add", key, buf)
                        require.NoError(t, Add.Err())
+                       insertNum = rdb.Do(ctx, "bf.card", key).Val().(int64)
                }
                require.Equal(t, []interface{}{"Capacity", int64(50), "Size", 
int64(256), "Number of filters", int64(1), "Number of items inserted", 
int64(50), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
 
@@ -249,11 +335,12 @@ func TestBloom(t *testing.T) {
                require.NoError(t, rdb.Do(ctx, "bf.add", key, "xxx").Err())
                require.Equal(t, []interface{}{"Capacity", int64(150), "Size", 
int64(768), "Number of filters", int64(2), "Number of items inserted", 
int64(51), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
 
-               // insert items, suppose false positives is 0
-               for i := 0; i < 99; i++ {
+               // insert items
+               for insertNum < 150 {
                        buf := util.RandString(7, 8, util.Alpha)
                        Add := rdb.Do(ctx, "bf.add", key, buf)
                        require.NoError(t, Add.Err())
+                       insertNum = rdb.Do(ctx, "bf.card", key).Val().(int64)
                }
                require.Equal(t, []interface{}{"Capacity", int64(150), "Size", 
int64(768), "Number of filters", int64(2), "Number of items inserted", 
int64(150), "Expansion rate", int64(2)}, rdb.Do(ctx, "bf.info", key).Val())
 

Reply via email to