This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 83aaa753 Add the support of the BF.MEXISTS and BF.CARD Command (#1756)
83aaa753 is described below

commit 83aaa7531fdfdd874a936287fe3f466b184c9e41
Author: Leo <[email protected]>
AuthorDate: Thu Sep 14 10:48:36 2023 +0800

    Add the support of the BF.MEXISTS and BF.CARD Command (#1756)
    
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_bloom_filter.cc           | 46 ++++++++++++++++++++++++--
 src/server/redis_reply.h                   |  9 ++++++
 src/types/redis_bloom_chain.cc             | 52 ++++++++++++++++++------------
 src/types/redis_bloom_chain.h              | 13 ++++++--
 tests/cppunit/types/bloom_chain_test.cc    |  6 ++--
 tests/gocase/unit/type/bloom/bloom_test.go | 32 +++++++++++++++++-
 6 files changed, 128 insertions(+), 30 deletions(-)

diff --git a/src/commands/cmd_bloom_filter.cc b/src/commands/cmd_bloom_filter.cc
index eb3856f1..61644b34 100644
--- a/src/commands/cmd_bloom_filter.cc
+++ b/src/commands/cmd_bloom_filter.cc
@@ -105,7 +105,7 @@ class CommandBFExists : public Commander {
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
     redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
     int ret = 0;
-    auto s = bloom_db.Exist(args_[1], args_[2], &ret);
+    auto s = bloom_db.Exists(args_[1], args_[2], &ret);
     if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
 
     *output = redis::Integer(ret);
@@ -113,6 +113,30 @@ class CommandBFExists : public Commander {
   }
 };
 
+class CommandBFMExists : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    items_.reserve(args_.size() - 2);
+    for (size_t i = 2; i < args_.size(); ++i) {
+      items_.emplace_back(args_[i]);
+    }
+    return Commander::Parse(args);
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
+    std::vector<int> rets(items_.size(), 0);
+    auto s = bloom_db.MExists(args_[1], items_, &rets);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+    *output = redis::MultiInteger(rets);
+    return Status::OK();
+  }
+
+ private:
+  std::vector<Slice> items_;
+};
+
 class CommandBFInfo : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
@@ -184,8 +208,26 @@ class CommandBFInfo : public Commander {
   BloomInfoType type_ = BloomInfoType::kAll;
 };
 
+class CommandBFCard : public Commander {
+ public:
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
+    BloomFilterInfo info;
+    auto s = bloom_db.Info(args_[1], &info);
+    if (!s.ok() && !s.IsNotFound()) return {Status::RedisExecErr, 
s.ToString()};
+    if (s.IsNotFound()) {
+      *output = redis::Integer(0);
+    } else {
+      *output = redis::Integer(info.size);
+    }
+    return Status::OK();
+  }
+};
+
 REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, 
"write", 1, 1, 1),
                         MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 
1),
                         MakeCmdAttr<CommandBFExists>("bf.exists", 3, 
"read-only", 1, 1, 1),
-                        MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 
1, 1, 1), )
+                        MakeCmdAttr<CommandBFMExists>("bf.mexists", -3, 
"read-only", 1, 1, 1),
+                        MakeCmdAttr<CommandBFInfo>("bf.info", -2, "read-only", 
1, 1, 1),
+                        MakeCmdAttr<CommandBFCard>("bf.card", 2, "read-only", 
1, 1, 1), )
 }  // namespace redis
diff --git a/src/server/redis_reply.h b/src/server/redis_reply.h
index c3cc1b44..75492c1d 100644
--- a/src/server/redis_reply.h
+++ b/src/server/redis_reply.h
@@ -40,6 +40,15 @@ std::string Integer(T data) {
   return ":" + std::to_string(data) + CRLF;
 }
 
+template <typename T, std::enable_if_t<std::is_integral_v<T>, int> = 0>
+std::string MultiInteger(const std::vector<T> &multi_data) {
+  std::string result = "*" + std::to_string(multi_data.size()) + CRLF;
+  for (const auto &data : multi_data) {
+    result += Integer(data);
+  }
+  return result;
+}
+
 std::string BulkString(const std::string &data);
 std::string NilString();
 
diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc
index df0ddb78..a97d77e6 100644
--- a/src/types/redis_bloom_chain.cc
+++ b/src/types/redis_bloom_chain.cc
@@ -86,7 +86,7 @@ void BloomChain::createBloomFilterInBatch(const Slice 
&ns_key, BloomChainMetadat
   batch->Put(metadata_cf_handle_, ns_key, bloom_chain_meta_bytes);
 }
 
-void BloomChain::bloomAdd(const std::string &item, std::string *bf_data) {
+void BloomChain::bloomAdd(const Slice &item, std::string *bf_data) {
   BlockSplitBloomFilter block_split_bloom_filter;
   block_split_bloom_filter.Init(std::move(*bf_data));
 
@@ -95,7 +95,8 @@ void BloomChain::bloomAdd(const std::string &item, 
std::string *bf_data) {
   *bf_data = std::move(block_split_bloom_filter).GetData();
 }
 
-rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string 
&item, bool *exist) {
+rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const 
std::vector<Slice> &items,
+                                       std::vector<bool> *exists) {
   LatestSnapShot ss(storage_);
   rocksdb::ReadOptions read_options;
   read_options.snapshot = ss.GetSnapShot();
@@ -105,9 +106,14 @@ rocksdb::Status BloomChain::bloomCheck(const Slice 
&bf_key, const std::string &i
   BlockSplitBloomFilter block_split_bloom_filter;
   block_split_bloom_filter.Init(std::move(bf_data));
 
-  uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
-  *exist = block_split_bloom_filter.FindHash(h);
-
+  for (size_t i = 0; i < items.size(); ++i) {
+    // this item exists in other bloomfilter already, and it's not necessary 
to check in this bloomfilter.
+    if ((*exists)[i]) {
+      continue;
+    }
+    uint64_t h = BlockSplitBloomFilter::Hash(items[i].data(), items[i].size());
+    (*exists)[i] = block_split_bloom_filter.FindHash(h);
+  }
   return rocksdb::Status::OK();
 }
 
@@ -144,21 +150,19 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, 
const Slice &item, int *r
   WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
   batch->PutLogData(log_data.Encode());
 
-  std::string item_string = item.ToString();
-
   // check
-  bool exist = false;
+  std::vector<bool> exist{false};                      // TODO: to refine in 
BF.MADD
   for (int i = metadata.n_filters - 1; i >= 0; --i) {  // TODO: to test which 
direction for searching is better
-    s = bloomCheck(bf_key_list[i], item_string, &exist);
+    s = bloomCheck(bf_key_list[i], {item}, &exist);
     if (!s.ok()) return s;
-    if (exist) {
+    if (exist[0]) {
       *ret = 0;
       break;
     }
   }
 
   // insert
-  if (!exist) {
+  if (!exist[0]) {  // TODO: to refine in BF.MADD
     std::string bf_data;
     s = storage_->Get(rocksdb::ReadOptions(), bf_key_list.back(), &bf_data);
     if (!s.ok()) return s;
@@ -172,7 +176,7 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, 
const Slice &item, int *r
         return rocksdb::Status::Aborted("filter is full and is nonscaling");
       }
     }
-    bloomAdd(item_string, &bf_data);
+    bloomAdd(item, &bf_data);
     batch->Put(bf_key_list.back(), bf_data);
     *ret = 1;
     metadata.size += 1;
@@ -185,13 +189,20 @@ rocksdb::Status BloomChain::Add(const Slice &user_key, 
const Slice &item, int *r
   return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
-rocksdb::Status BloomChain::Exist(const Slice &user_key, const Slice &item, 
int *ret) {
+rocksdb::Status BloomChain::Exists(const Slice &user_key, const Slice &item, 
int *ret) {
+  std::vector<int> tmp{0};
+  rocksdb::Status s = MExists(user_key, {item}, &tmp);
+  *ret = tmp[0];
+  return s;
+}
+
+rocksdb::Status BloomChain::MExists(const Slice &user_key, const 
std::vector<Slice> &items, std::vector<int> *rets) {
   std::string ns_key = AppendNamespacePrefix(user_key);
 
   BloomChainMetadata metadata;
   rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
   if (s.IsNotFound()) {
-    *ret = 0;
+    std::fill(rets->begin(), rets->end(), 0);
     return rocksdb::Status::OK();
   }
   if (!s.ok()) return s;
@@ -199,17 +210,16 @@ rocksdb::Status BloomChain::Exist(const Slice &user_key, 
const Slice &item, int
   std::vector<std::string> bf_key_list;
   getBFKeyList(ns_key, metadata, &bf_key_list);
 
-  std::string item_string = item.ToString();
   // check
-  bool exist = false;
+  std::vector<bool> exists(items.size(), false);
   for (int i = metadata.n_filters - 1; i >= 0; --i) {  // TODO: to test which 
direction for searching is better
-    s = bloomCheck(bf_key_list[i], item_string, &exist);
+    s = bloomCheck(bf_key_list[i], items, &exists);
     if (!s.ok()) return s;
-    if (exist) {
-      break;
-    }
   }
-  *ret = exist ? 1 : 0;
+
+  for (size_t i = 0; i < items.size(); ++i) {
+    (*rets)[i] = exists[i] ? 1 : 0;
+  }
 
   return rocksdb::Status::OK();
 }
diff --git a/src/types/redis_bloom_chain.h b/src/types/redis_bloom_chain.h
index ad66675f..27264e02 100644
--- a/src/types/redis_bloom_chain.h
+++ b/src/types/redis_bloom_chain.h
@@ -52,7 +52,8 @@ class BloomChain : public Database {
   BloomChain(engine::Storage *storage, const std::string &ns) : 
Database(storage, ns) {}
   rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double 
error_rate, uint16_t expansion);
   rocksdb::Status Add(const Slice &user_key, const Slice &item, int *ret);
-  rocksdb::Status Exist(const Slice &user_key, const Slice &item, int *ret);
+  rocksdb::Status Exists(const Slice &user_key, const Slice &item, int *ret);
+  rocksdb::Status MExists(const Slice &user_key, const std::vector<Slice> 
&items, std::vector<int> *rets);
   rocksdb::Status Info(const Slice &user_key, BloomFilterInfo *info);
 
  private:
@@ -63,7 +64,13 @@ class BloomChain : public Database {
                                    BloomChainMetadata *metadata);
   void createBloomFilterInBatch(const Slice &ns_key, BloomChainMetadata 
*metadata,
                                 ObserverOrUniquePtr<rocksdb::WriteBatchBase> 
&batch, std::string *bf_data);
-  static void bloomAdd(const std::string &item, std::string *bf_data);
-  rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item, 
bool *exist);
+
+  /// bf_data: [in/out] The content string of bloomfilter.
+  static void bloomAdd(const Slice &item, std::string *bf_data);
+
+  /// exists: [in/out] The items exist in bloomfilter already or not.
+  /// exists[i] is true means items[i] exists in other bloomfilter already, 
and it's not necessary to check in this
+  /// bloomfilter.
+  rocksdb::Status bloomCheck(const Slice &bf_key, const std::vector<Slice> 
&items, std::vector<bool> *exists);
 };
 }  // namespace redis
diff --git a/tests/cppunit/types/bloom_chain_test.cc 
b/tests/cppunit/types/bloom_chain_test.cc
index 9f1b7414..35d789e9 100644
--- a/tests/cppunit/types/bloom_chain_test.cc
+++ b/tests/cppunit/types/bloom_chain_test.cc
@@ -55,7 +55,7 @@ TEST_F(RedisBloomChainTest, Reserve) {
 TEST_F(RedisBloomChainTest, BasicAddAndTest) {
   int ret = 0;
 
-  auto s = sb_chain_->Exist("no_exist_key", "test_item", &ret);
+  auto s = sb_chain_->Exists("no_exist_key", "test_item", &ret);
   EXPECT_EQ(ret, 0);
   s = sb_chain_->Del("no_exist_key");
 
@@ -67,14 +67,14 @@ TEST_F(RedisBloomChainTest, BasicAddAndTest) {
   }
 
   for (const auto& insert_item : insert_items) {
-    s = sb_chain_->Exist(key_, insert_item, &ret);
+    s = sb_chain_->Exists(key_, insert_item, &ret);
     EXPECT_TRUE(s.ok());
     EXPECT_EQ(ret, 1);
   }
 
   std::string no_insert_items[] = {"item303", "item404", "1", "2", "3"};
   for (const auto& no_insert_item : no_insert_items) {
-    s = sb_chain_->Exist(key_, no_insert_item, &ret);
+    s = sb_chain_->Exists(key_, no_insert_item, &ret);
     EXPECT_TRUE(s.ok());
     EXPECT_EQ(ret, 0);
   }
diff --git a/tests/gocase/unit/type/bloom/bloom_test.go 
b/tests/gocase/unit/type/bloom/bloom_test.go
index f93b84b8..47b55aac 100644
--- a/tests/gocase/unit/type/bloom/bloom_test.go
+++ b/tests/gocase/unit/type/bloom/bloom_test.go
@@ -145,6 +145,21 @@ func TestBloom(t *testing.T) {
                require.LessOrEqual(t, float64(falseExist), 
fpp*float64(totalCount))
        })
 
+       t.Run("MExists Basic Test", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.Equal(t, []interface{}{int64(0), int64(0), int64(0)}, 
rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
+
+               require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key, 
"xxx").Val())
+               require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
+               require.Equal(t, []interface{}{int64(1), int64(0)}, rdb.Do(ctx, 
"bf.mexists", key, "xxx", "yyy").Val())
+
+               require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key, 
"zzz").Val())
+               require.Equal(t, []interface{}{int64(1), int64(0), int64(1)}, 
rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
+
+               require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key, 
"yyy").Val())
+               require.Equal(t, []interface{}{int64(1), int64(1), int64(1)}, 
rdb.Do(ctx, "bf.mexists", key, "xxx", "yyy", "zzz").Val())
+       })
+
        t.Run("Get info of no exists key ", func(t *testing.T) {
                require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
                require.ErrorContains(t, rdb.Do(ctx, "bf.info", 
"no_exist_key").Err(), "ERR key is not found")
@@ -253,6 +268,21 @@ func TestBloom(t *testing.T) {
                require.Equal(t, "MBbloom--", rdb.Type(ctx, key).Val())
        })
 
-       // TODO: Add the testcase of get filters of bloom filter after complete 
the scaling.
+       t.Run("Get Card of bloom filter", func(t *testing.T) {
+               // if bf.card no exist key, it would return 0
+               require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
+               require.Equal(t, int64(0), rdb.Do(ctx, "bf.card", 
"no_exist_key").Val())
+
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", 
"1000", "expansion", "1").Err())
+               require.Equal(t, int64(0), rdb.Do(ctx, "bf.card", key).Val())
+               require.NoError(t, rdb.Do(ctx, "bf.add", key, "item1").Err())
+               require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
+               // insert the duplicate key, insert would return 0 and the card 
of bloom filter would not change
+               require.Equal(t, int64(0), rdb.Do(ctx, "bf.add", key, 
"item1").Val())
+               require.Equal(t, int64(1), rdb.Do(ctx, "bf.card", key).Val())
+               require.NoError(t, rdb.Do(ctx, "bf.add", key, "item2").Err())
+               require.Equal(t, int64(2), rdb.Do(ctx, "bf.card", key).Val())
+       })
 
 }

Reply via email to