This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 01a2f1db Add the support of basic scalable bloom (#1699)
01a2f1db is described below

commit 01a2f1db2c66dce5efefbdd0e39cabbdd783d5b1
Author: Leo <[email protected]>
AuthorDate: Tue Aug 29 23:58:53 2023 +0800

    Add the support of basic scalable bloom (#1699)
    
    Co-authored-by: mwish <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 src/commands/cmd_bloom_filter.cc           | 117 +++++++++++++++++
 src/storage/batch_extractor.cc             |   1 +
 src/storage/redis_db.cc                    |   3 +-
 src/storage/redis_metadata.cc              |  56 +++++++-
 src/storage/redis_metadata.h               |  46 +++++++
 src/types/redis_bloom_chain.cc             | 198 +++++++++++++++++++++++++++++
 src/types/redis_bloom_chain.h              |  49 +++++++
 tests/cppunit/types/bloom_chain_test.cc    |  83 ++++++++++++
 tests/gocase/unit/type/bloom/bloom_test.go | 119 +++++++++++++++++
 9 files changed, 670 insertions(+), 2 deletions(-)

diff --git a/src/commands/cmd_bloom_filter.cc b/src/commands/cmd_bloom_filter.cc
new file mode 100644
index 00000000..46ffc9c9
--- /dev/null
+++ b/src/commands/cmd_bloom_filter.cc
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "command_parser.h"
+#include "commander.h"
+#include "error_constants.h"
+#include "server/server.h"
+#include "types/redis_bloom_chain.h"
+
+namespace redis {
+
+class CommandBFReserve : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    auto parse_error_rate = ParseFloat<double>(args[2]);
+    if (!parse_error_rate) {
+      return {Status::RedisParseErr, errValueIsNotFloat};
+    }
+    error_rate_ = *parse_error_rate;
+    if (error_rate_ >= 1 || error_rate_ <= 0) {
+      return {Status::RedisParseErr, "error rate should be between 0 and 1"};
+    }
+
+    auto parse_capacity = ParseInt<uint32_t>(args[3], 10);
+    if (!parse_capacity) {
+      return {Status::RedisParseErr, errValueNotInteger};
+    }
+    capacity_ = *parse_capacity;
+    if (capacity_ <= 0) {
+      return {Status::RedisParseErr, "capacity should be larger than 0"};
+    }
+
+    CommandParser parser(args, 4);
+    bool nonscaling = false;
+    while (parser.Good()) {
+      if (parser.EatEqICase("nonscaling")) {
+        nonscaling = true;
+      } else if (parser.EatEqICase("expansion")) {
+        expansion_ = GET_OR_RET(parser.TakeInt<uint16_t>());
+        if (expansion_ < 1) {
+          return {Status::RedisParseErr, "expansion should be greater or equal 
to 1"};
+        }
+      } else {
+        return {Status::RedisParseErr, errInvalidSyntax};
+      }
+    }
+
+    // if nonscaling is true, expansion should be 0
+    if (nonscaling && expansion_ != 0) {
+      return {Status::RedisParseErr, "nonscaling filters cannot expand"};
+    }
+
+    return Commander::Parse(args);
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    redis::BloomChain bloomfilter_db(svr->storage, conn->GetNamespace());
+    auto s = bloomfilter_db.Reserve(args_[1], capacity_, error_rate_, 
expansion_);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+    *output = redis::SimpleString("OK");
+    return Status::OK();
+  }
+
+ private:
+  double error_rate_;
+  uint32_t capacity_;
+  uint16_t expansion_ = 0;
+};
+
+class CommandBFAdd : public Commander {
+ public:
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
+    int ret = 0;
+    auto s = bloom_db.Add(args_[1], args_[2], &ret);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+    *output = redis::Integer(ret);
+    return Status::OK();
+  }
+};
+
+class CommandBFExists : public Commander {
+ public:
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    redis::BloomChain bloom_db(svr->storage, conn->GetNamespace());
+    int ret = 0;
+    auto s = bloom_db.Exist(args_[1], args_[2], &ret);
+    if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
+
+    *output = redis::Integer(ret);
+    return Status::OK();
+  }
+};
+
+REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandBFReserve>("bf.reserve", -4, 
"write", 1, 1, 1),
+                        MakeCmdAttr<CommandBFAdd>("bf.add", 3, "write", 1, 1, 
1),
+                        MakeCmdAttr<CommandBFExists>("bf.exists", 3, 
"read-only", 1, 1, 1), )
+}  // namespace redis
diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc
index 7430e1ce..61952832 100644
--- a/src/storage/batch_extractor.cc
+++ b/src/storage/batch_extractor.cc
@@ -244,6 +244,7 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t 
column_family_id, const Slic
         }
         break;
       }
+        // TODO: to implement the case of kRedisBloomFilter
       default:
         break;
     }
diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc
index 14d6a352..b4904949 100644
--- a/src/storage/redis_db.cc
+++ b/src/storage/redis_db.cc
@@ -55,7 +55,8 @@ rocksdb::Status Database::GetMetadata(RedisType type, const 
Slice &ns_key, Metad
     metadata->Decode(old_metadata);
     return rocksdb::Status::InvalidArgument(kErrMsgWrongType);
   }
-  if (metadata->size == 0 && type != kRedisStream) {  // stream is allowed to 
be empty
+  if (metadata->size == 0 && type != kRedisStream &&
+      type != kRedisBloomFilter) {  // stream and bloom is allowed to be empty
     metadata->Decode(old_metadata);
     return rocksdb::Status::NotFound("no elements");
   }
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 42288329..65a41038 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -302,7 +302,7 @@ timeval Metadata::Time() const {
 }
 
 bool Metadata::ExpireAt(uint64_t expired_ts) const {
-  if (Type() != kRedisString && Type() != kRedisStream && size == 0) {
+  if (Type() != kRedisString && Type() != kRedisStream && Type() != 
kRedisBloomFilter && size == 0) {
     return true;
   }
   if (expire == 0) {
@@ -398,3 +398,57 @@ rocksdb::Status StreamMetadata::Decode(Slice input) {
 
   return rocksdb::Status::OK();
 }
+
+void BloomChainMetadata::Encode(std::string *dst) {
+  Metadata::Encode(dst);
+
+  PutFixed16(dst, n_filters);
+  PutFixed16(dst, expansion);
+
+  PutFixed32(dst, base_capacity);
+  PutDouble(dst, error_rate);
+  PutFixed32(dst, bloom_bytes);
+}
+
+rocksdb::Status BloomChainMetadata::Decode(Slice bytes) {
+  Slice input(bytes);
+  if (!GetFixed8(&input, &flags)) {
+    return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
+  }
+  if (!GetExpire(&input)) {
+    return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
+  }
+
+  if (input.size() < 8 + CommonEncodedSize()) {
+    return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
+  }
+
+  GetFixed64(&input, &version);
+  GetFixedCommon(&input, &size);
+
+  if (input.size() < 20) {
+    return rocksdb::Status::InvalidArgument(kErrMetadataTooShort);
+  }
+
+  GetFixed16(&input, &n_filters);
+  GetFixed16(&input, &expansion);
+
+  GetFixed32(&input, &base_capacity);
+  GetDouble(&input, &error_rate);
+  GetFixed32(&input, &bloom_bytes);
+
+  return rocksdb::Status::OK();
+}
+
+uint32_t BloomChainMetadata::GetCapacity() const {
+  // non-scaling
+  if (expansion == 0) {
+    return base_capacity;
+  }
+
+  // the sum of Geometric progression
+  if (expansion == 1) {
+    return base_capacity * n_filters;
+  }
+  return static_cast<uint32_t>(base_capacity * (1 - pow(expansion, n_filters)) 
/ 1 - expansion);
+}
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 6c76496c..d1530d19 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -43,6 +43,7 @@ enum RedisType {
   kRedisBitmap = 6,
   kRedisSortedint = 7,
   kRedisStream = 8,
+  kRedisBloomFilter = 9,
 };
 
 enum RedisCommand {
@@ -146,6 +147,7 @@ class Metadata {
   virtual void Encode(std::string *dst);
   virtual rocksdb::Status Decode(Slice input);
   bool operator==(const Metadata &that) const;
+  virtual ~Metadata() = default;
 
  private:
   static uint64_t generateVersion();
@@ -200,3 +202,47 @@ class StreamMetadata : public Metadata {
   void Encode(std::string *dst) override;
   rocksdb::Status Decode(Slice input) override;
 };
+
+class BloomChainMetadata : public Metadata {
+ public:
+  /// The number of sub-filters
+  uint16_t n_filters;
+
+  /// Adding an element to a Bloom filter never fails due to the data 
structure "filling up". Instead the error rate
+  /// starts to grow. To keep the error close to the one set on filter 
initialisation - the bloom filter will
+  /// auto-scale, meaning when capacity is reached an additional sub-filter 
will be created.
+  ///
+  /// The capacity of the new sub-filter is the capacity of the last 
sub-filter multiplied by expansion.
+  ///
+  /// The default expansion value is 2.
+  ///
+  /// For non-scaling, expansion should be set to 0
+  uint16_t expansion;
+
+  /// The number of entries intended to be added to the filter. If your filter 
allows scaling, the capacity of the last
+  /// sub-filter should be: base_capacity -> base_capacity * expansion -> 
base_capacity * expansion^2...
+  ///
+  /// The default base_capacity value is 100.
+  uint32_t base_capacity;
+
+  /// The desired probability for false positives.
+  ///
+  /// The rate is a decimal value between 0 and 1. For example, for a desired 
false positive rate of 0.1% (1 in 1000),
+  /// error_rate should be set to 0.001.
+  ///
+  /// The default error_rate value is 0.01.
+  double error_rate;
+
+  /// The total number of bytes allocated for all sub-filters.
+  uint32_t bloom_bytes;
+
+  explicit BloomChainMetadata(bool generate_version = true) : 
Metadata(kRedisBloomFilter, generate_version) {}
+
+  void Encode(std::string *dst) override;
+  rocksdb::Status Decode(Slice bytes) override;
+
+  /// Get the total capacity of the bloom chain (the sum capacity of all 
sub-filters)
+  ///
+  /// @return the total capacity value
+  uint32_t GetCapacity() const;
+};
diff --git a/src/types/redis_bloom_chain.cc b/src/types/redis_bloom_chain.cc
new file mode 100644
index 00000000..4c6cffcc
--- /dev/null
+++ b/src/types/redis_bloom_chain.cc
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "redis_bloom_chain.h"
+
+namespace redis {
+
+std::string BloomChain::getBFKey(const Slice &ns_key, const BloomChainMetadata 
&metadata, uint16_t filters_index) {
+  std::string sub_key;
+  PutFixed16(&sub_key, filters_index);
+  std::string bf_key = InternalKey(ns_key, sub_key, metadata.version, 
storage_->IsSlotIdEncoded()).Encode();
+  return bf_key;
+}
+
+void BloomChain::getBFKeyList(const Slice &ns_key, const BloomChainMetadata 
&metadata,
+                              std::vector<std::string> *bf_key_list) {
+  bf_key_list->reserve(metadata.n_filters);
+  for (uint16_t i = 0; i < metadata.n_filters; ++i) {
+    std::string bf_key = getBFKey(ns_key, metadata, i);
+    bf_key_list->push_back(std::move(bf_key));
+  }
+}
+
+rocksdb::Status BloomChain::getBloomChainMetadata(const Slice &ns_key, 
BloomChainMetadata *metadata) {
+  return Database::GetMetadata(kRedisBloomFilter, ns_key, metadata);
+}
+
+rocksdb::Status BloomChain::createBloomChain(const Slice &ns_key, double 
error_rate, uint32_t capacity,
+                                             uint16_t expansion, 
BloomChainMetadata *metadata) {
+  metadata->n_filters = 1;
+  metadata->expansion = expansion;
+  metadata->size = 0;
+
+  metadata->error_rate = error_rate;
+  metadata->base_capacity = capacity;
+  metadata->bloom_bytes = BlockSplitBloomFilter::OptimalNumOfBytes(capacity, 
error_rate);
+
+  BlockSplitBloomFilter block_split_bloom_filter;
+  block_split_bloom_filter.Init(metadata->bloom_bytes);
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisBloomFilter, {"createSBChain"});
+  batch->PutLogData(log_data.Encode());
+
+  std::string sb_chain_meta_bytes;
+  metadata->Encode(&sb_chain_meta_bytes);
+  batch->Put(metadata_cf_handle_, ns_key, sb_chain_meta_bytes);
+
+  std::string bf_key = getBFKey(ns_key, *metadata, metadata->n_filters - 1);
+  batch->Put(bf_key, block_split_bloom_filter.GetData());
+
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
+rocksdb::Status BloomChain::bloomAdd(const Slice &bf_key, const std::string 
&item, int *ret) {
+  std::string bf_data;
+  rocksdb::Status s = storage_->Get(rocksdb::ReadOptions(), bf_key, &bf_data);
+  if (!s.ok()) return s;
+  BlockSplitBloomFilter block_split_bloom_filter;
+  block_split_bloom_filter.Init(std::move(bf_data));
+
+  uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
+  bool found = block_split_bloom_filter.FindHash(h);
+
+  *ret = 0;
+  if (!found) {
+    *ret = 1;
+    block_split_bloom_filter.InsertHash(h);
+    auto batch = storage_->GetWriteBatchBase();
+    batch->Put(bf_key, block_split_bloom_filter.GetData());
+    return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status BloomChain::bloomCheck(const Slice &bf_key, const std::string 
&item, int *ret) {
+  LatestSnapShot ss(storage_);
+  rocksdb::ReadOptions read_options;
+  read_options.snapshot = ss.GetSnapShot();
+  std::string bf_data;
+  rocksdb::Status s = storage_->Get(read_options, bf_key, &bf_data);
+  if (!s.ok()) return s;
+  BlockSplitBloomFilter block_split_bloom_filter;
+  block_split_bloom_filter.Init(std::move(bf_data));
+
+  uint64_t h = BlockSplitBloomFilter::Hash(item.data(), item.size());
+  bool found = block_split_bloom_filter.FindHash(h);
+  *ret = found ? 1 : 0;
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status BloomChain::Reserve(const Slice &user_key, uint32_t capacity, 
double error_rate, uint16_t expansion) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+  BloomChainMetadata bloom_chain_metadata;
+  rocksdb::Status s = getBloomChainMetadata(ns_key, &bloom_chain_metadata);
+  if (!s.ok() && !s.IsNotFound()) return s;
+  if (!s.IsNotFound()) {
+    return rocksdb::Status::InvalidArgument("the key already exists");
+  }
+
+  return createBloomChain(ns_key, error_rate, capacity, expansion, 
&bloom_chain_metadata);
+}
+
+rocksdb::Status BloomChain::Add(const Slice &user_key, const Slice &item, int 
*ret) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  LockGuard guard(storage_->GetLockManager(), ns_key);
+
+  BloomChainMetadata metadata;
+  rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
+
+  if (s.IsNotFound()) {
+    s = createBloomChain(ns_key, kBFDefaultErrorRate, kBFDefaultInitCapacity, 
kBFDefaultExpansion, &metadata);
+  }
+  if (!s.ok()) {
+    return s;
+  }
+
+  std::vector<std::string> bf_key_list;
+  getBFKeyList(ns_key, metadata, &bf_key_list);
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
+  batch->PutLogData(log_data.Encode());
+
+  int check_index = metadata.n_filters - 1;
+  std::string item_string = item.ToString();
+
+  // check
+  for (; check_index >= 0; --check_index) {  // TODO: to test which direction 
for searching is better
+    s = bloomCheck(bf_key_list[check_index], item_string, ret);
+    if (!s.ok()) return s;
+    if (*ret == 1) {
+      break;
+    }
+  }
+
+  // insert
+  if (check_index < 0) {
+    if (metadata.size + 1 > metadata.GetCapacity()) {  // TODO: scaling would 
be supported later
+      return rocksdb::Status::Aborted("filter is full");
+    }
+    s = bloomAdd(bf_key_list.back(), item_string, ret);
+    if (!s.ok()) return s;
+    metadata.size += *ret;
+  }
+
+  std::string sb_chain_metadata_bytes;
+  metadata.Encode(&sb_chain_metadata_bytes);
+  batch->Put(metadata_cf_handle_, ns_key, sb_chain_metadata_bytes);
+
+  return storage_->Write(storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
+}
+
+rocksdb::Status BloomChain::Exist(const Slice &user_key, const Slice &item, 
int *ret) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+
+  BloomChainMetadata metadata;
+  rocksdb::Status s = getBloomChainMetadata(ns_key, &metadata);
+  if (s.IsNotFound()) return rocksdb::Status::NotFound("key is not found");
+  if (!s.ok()) return s;
+
+  std::vector<std::string> bf_key_list;
+  getBFKeyList(ns_key, metadata, &bf_key_list);
+
+  std::string item_string = item.ToString();
+  // check
+  for (int i = metadata.n_filters - 1; i >= 0; --i) {  // TODO: to test which 
direction for searching is better
+    s = bloomCheck(bf_key_list[i], item_string, ret);
+    if (!s.ok()) return s;
+    if (*ret == 1) {
+      break;
+    }
+  }
+
+  return rocksdb::Status::OK();
+}
+}  // namespace redis
diff --git a/src/types/redis_bloom_chain.h b/src/types/redis_bloom_chain.h
new file mode 100644
index 00000000..7a2923dc
--- /dev/null
+++ b/src/types/redis_bloom_chain.h
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#pragma once
+
+#include "bloom_filter.h"
+#include "storage/redis_db.h"
+#include "storage/redis_metadata.h"
+
+namespace redis {
+
+const uint32_t kBFDefaultInitCapacity = 100;
+const double kBFDefaultErrorRate = 0.01;
+const uint16_t kBFDefaultExpansion = 2;
+
+class BloomChain : public Database {
+ public:
+  BloomChain(engine::Storage *storage, const std::string &ns) : 
Database(storage, ns) {}
+  rocksdb::Status Reserve(const Slice &user_key, uint32_t capacity, double 
error_rate, uint16_t expansion);
+  rocksdb::Status Add(const Slice &user_key, const Slice &item, int *ret);
+  rocksdb::Status Exist(const Slice &user_key, const Slice &item, int *ret);
+
+ private:
+  std::string getBFKey(const Slice &ns_key, const BloomChainMetadata 
&metadata, uint16_t filters_index);
+  void getBFKeyList(const Slice &ns_key, const BloomChainMetadata &metadata, 
std::vector<std::string> *bf_key_list);
+  rocksdb::Status getBloomChainMetadata(const Slice &ns_key, 
BloomChainMetadata *metadata);
+  rocksdb::Status createBloomChain(const Slice &ns_key, double error_rate, 
uint32_t capacity, uint16_t expansion,
+                                   BloomChainMetadata *metadata);
+  rocksdb::Status bloomAdd(const Slice &bf_key, const std::string &item, int 
*ret);
+  rocksdb::Status bloomCheck(const Slice &bf_key, const std::string &item, int 
*ret);
+};
+}  // namespace redis
diff --git a/tests/cppunit/types/bloom_chain_test.cc 
b/tests/cppunit/types/bloom_chain_test.cc
new file mode 100644
index 00000000..664be011
--- /dev/null
+++ b/tests/cppunit/types/bloom_chain_test.cc
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "test_base.h"
+#include "types/redis_bloom_chain.h"
+
+class RedisBloomChainTest : public TestBase {
+ protected:
+  explicit RedisBloomChainTest() { sb_chain_ = 
std::make_unique<redis::BloomChain>(storage_, "sb_chain_ns"); }
+  ~RedisBloomChainTest() override = default;
+
+  void SetUp() override { key_ = "test_sb_chain_key"; }
+  void TearDown() override {}
+
+  std::unique_ptr<redis::BloomChain> sb_chain_;
+};
+
+TEST_F(RedisBloomChainTest, Reserve) {
+  uint32_t capacity = 1000;
+  double error_rate = 0.02;
+  uint16_t expansion = 0;
+
+  auto s = sb_chain_->Reserve(key_, capacity, error_rate, expansion);
+  EXPECT_TRUE(s.ok());
+
+  // return false because the key is already exists;
+  s = sb_chain_->Reserve(key_, capacity, error_rate, expansion);
+  EXPECT_FALSE(s.ok());
+  EXPECT_EQ(s.ToString(), "Invalid argument: the key already exists");
+
+  sb_chain_->Del(key_);
+}
+
+TEST_F(RedisBloomChainTest, BasicAddAndTest) {
+  int ret = 0;
+
+  auto s = sb_chain_->Exist("no_exist_key", "test_item", &ret);
+  EXPECT_FALSE(s.ok());
+  EXPECT_EQ(s.ToString(), "NotFound: key is not found");
+  sb_chain_->Del("no_exist_key");
+
+  std::string insert_items[] = {"item1", "item2", "item3", "item101", 
"item202", "303"};
+  for (const auto& insert_item : insert_items) {
+    s = sb_chain_->Add(key_, insert_item, &ret);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(ret, 1);
+  }
+
+  for (const auto& insert_item : insert_items) {
+    s = sb_chain_->Exist(key_, insert_item, &ret);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(ret, 1);
+  }
+
+  std::string no_insert_items[] = {"item303", "item404", "1", "2", "3"};
+  for (const auto& no_insert_item : no_insert_items) {
+    s = sb_chain_->Exist(key_, no_insert_item, &ret);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(ret, 0);
+  }
+  sb_chain_->Del(key_);
+}
diff --git a/tests/gocase/unit/type/bloom/bloom_test.go 
b/tests/gocase/unit/type/bloom/bloom_test.go
new file mode 100644
index 00000000..86f02199
--- /dev/null
+++ b/tests/gocase/unit/type/bloom/bloom_test.go
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package bloom
+
+import (
+       "context"
+       "testing"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/stretchr/testify/require"
+)
+
+func TestBloom(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{})
+       defer srv.Close()
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       key := "test_bloom_key"
+       t.Run("Reserve a bloom filter", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", 
"1000").Err())
+       })
+
+       t.Run("Reserve a bloom filter with wrong error_rate", func(t 
*testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "abc", 
"1000").Err(), "ERR value is not a valid float")
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, 
"-0.03", "1000").Err(), "ERR error rate should be between 0 and 1")
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "1", 
"1000").Err(), "ERR error rate should be between 0 and 1")
+       })
+
+       t.Run("Reserve a bloom filter with wrong capacity", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", 
"qwe").Err(), "ERR value is not an integer")
+               // capacity stored in uint32_t, if input is negative, the 
parser will make an error.
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", 
"-1000").Err(), "ERR value is not an integer or out of range")
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", 
"0").Err(), "ERR capacity should be larger than 0")
+       })
+
+       t.Run("Reserve a bloom filter with nonscaling", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", 
"1000", "nonscaling").Err())
+       })
+
+       t.Run("Reserve a bloom filter with expansion", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.NoError(t, rdb.Do(ctx, "bf.reserve", key, "0.02", 
"1000", "expansion", "1").Err())
+       })
+
+       t.Run("Reserve a bloom filter with wrong expansion", func(t *testing.T) 
{
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", 
"1000", "expansion").Err(), "ERR no more item to parse")
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", 
"1000", "expansion", "0").Err(), "ERR expansion should be greater or equal to 
1")
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", 
"1000", "expansion", "asd").Err(), "ERR not started as an integer")
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", 
"1000", "expansion", "-1").Err(), "ERR out of range of integer type")
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", 
"1000", "expansion", "1.5").Err(), "ERR encounter non-integer characters")
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", 
"1000", "expansion", "123asd").Err(), "ERR encounter non-integer characters")
+       })
+
+       t.Run("Reserve a bloom filter with nonscaling and expansion", func(t 
*testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", 
"1000", "expansion", "1", "nonscaling").Err(), "ERR nonscaling filters cannot 
expand")
+               require.ErrorContains(t, rdb.Do(ctx, "bf.reserve", key, "0.01", 
"1000", "nonscaling", "expansion", "1").Err(), "ERR nonscaling filters cannot 
expand")
+       })
+
+       t.Run("Check no exists key", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               require.ErrorContains(t, rdb.Do(ctx, "bf.exists", 
"no_exist_key", "item1").Err(), "ERR NotFound: key is not found")
+               require.NoError(t, rdb.Del(ctx, "no_exist_key").Err())
+       })
+
+       t.Run("BasicAddAndCheck", func(t *testing.T) {
+               require.NoError(t, rdb.Del(ctx, key).Err())
+               var totalCount = 10000
+               var fpp = 0.01
+               require.NoError(t, rdb.Do(ctx, "bf.reserve", key, fpp, 
totalCount).Err())
+
+               var insertItems []string
+               for i := 0; i < totalCount; i++ {
+                       buf := util.RandString(1, 5, util.Alpha)
+                       insertItems = append(insertItems, buf)
+                       require.Equal(t, int64(1), rdb.Do(ctx, "bf.add", key, 
buf).Val())
+               }
+
+               for i := 0; i < totalCount; i++ {
+                       index := util.RandomInt(int64(totalCount))
+                       require.Equal(t, int64(1), rdb.Do(ctx, "bf.exists", 
key, insertItems[index]).Val())
+               }
+
+               var exist = 0
+               for i := 0; i < totalCount; i++ {
+                       buf := util.RandString(5, 10, util.Alpha)
+                       check := rdb.Do(ctx, "bf.exists", key, buf)
+                       require.NoError(t, check.Err())
+                       if check.Val() == int64(1) {
+                               exist += 1
+                       }
+               }
+               require.LessOrEqual(t, float64(exist), fpp*float64(totalCount))
+       })
+}

Reply via email to