This is an automated email from the ASF dual-hosted git repository.

maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 9592b507 Add splited bloom filter for RedisBloom support  (#1693)
9592b507 is described below

commit 9592b5079b3b3c90d5998292cd5c1ba8274222d9
Author: Leo <[email protected]>
AuthorDate: Thu Aug 24 15:46:13 2023 +0800

    Add splited bloom filter for RedisBloom support  (#1693)
    
    Co-authored-by: Twice <[email protected]>
    Co-authored-by: Twice <[email protected]>
---
 src/types/bloom_filter.cc                |  96 ++++++++++++++++
 src/types/bloom_filter.h                 | 183 ++++++++++++++++++++++++++++++
 tests/cppunit/types/bloom_filter_test.cc | 187 +++++++++++++++++++++++++++++++
 3 files changed, 466 insertions(+)

diff --git a/src/types/bloom_filter.cc b/src/types/bloom_filter.cc
new file mode 100644
index 00000000..e7c0debf
--- /dev/null
+++ b/src/types/bloom_filter.cc
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bloom_filter.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "xxh3.h"
+
+BlockSplitBloomFilter::BlockSplitBloomFilter() = default;
+
+void BlockSplitBloomFilter::Init(uint32_t num_bytes) {
+  if (num_bytes < kMinimumBloomFilterBytes) {
+    num_bytes = kMinimumBloomFilterBytes;
+  }
+
+  // Get next power of 2 if it is not power of 2.
+  if ((num_bytes & (num_bytes - 1)) != 0) {
+    num_bytes = static_cast<uint32_t>(NextPower2(num_bytes));
+  }
+
+  if (num_bytes > kMaximumBloomFilterBytes) {
+    num_bytes = kMaximumBloomFilterBytes;
+  }
+
+  num_bytes_ = num_bytes;
+  data_.resize(num_bytes_, 0);
+}
+
+bool BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
+  if (num_bytes < kMinimumBloomFilterBytes || num_bytes > 
kMaximumBloomFilterBytes ||
+      (num_bytes & (num_bytes - 1)) != 0) {
+    return false;
+  }
+
+  num_bytes_ = num_bytes;
+  data_ = {reinterpret_cast<const char*>(bitset), num_bytes};
+  return true;
+}
+
+bool BlockSplitBloomFilter::Init(std::string bitset) {
+  if (bitset.size() < kMinimumBloomFilterBytes || bitset.size() > 
kMaximumBloomFilterBytes ||
+      (bitset.size() & (bitset.size() - 1)) != 0) {
+    return false;
+  }
+
+  num_bytes_ = bitset.size();
+  data_ = std::move(bitset);
+  return true;
+}
+
+static constexpr uint32_t kBloomFilterHeaderSizeGuess = 256;
+
+bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
+  const auto bucket_index = static_cast<uint32_t>(((hash >> 32) * (num_bytes_ 
/ kBytesPerFilterBlock)) >> 32);
+  const auto key = static_cast<uint32_t>(hash);
+  const auto* bitset32 = reinterpret_cast<const uint32_t*>(data_.data());
+
+  for (int i = 0; i < kBitsSetPerBlock; ++i) {
+    // Calculate mask for key in the given bitset.
+    const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
+    if ((0 == (bitset32[kBitsSetPerBlock * bucket_index + i] & mask))) {
+      return false;
+    }
+  }
+  return true;
+}
+
+void BlockSplitBloomFilter::InsertHash(uint64_t hash) {
+  const auto bucket_index = static_cast<uint32_t>(((hash >> 32) * (num_bytes_ 
/ kBytesPerFilterBlock)) >> 32);
+  const auto key = static_cast<uint32_t>(hash);
+  auto* bitset32 = reinterpret_cast<uint32_t*>(data_.data());
+
+  for (int i = 0; i < kBitsSetPerBlock; i++) {
+    // Calculate mask for key in the given bitset.
+    const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
+    bitset32[bucket_index * kBitsSetPerBlock + i] |= mask;
+  }
+}
+
+uint64_t BlockSplitBloomFilter::Hash(const char* data, size_t length) { return 
XXH64(data, length, /*seed=*/0); }
diff --git a/src/types/bloom_filter.h b/src/types/bloom_filter.h
new file mode 100644
index 00000000..d5f29ee3
--- /dev/null
+++ b/src/types/bloom_filter.h
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cmath>
+#include <cstdint>
+#include <memory>
+#include <string>
+
+// Returns the smallest power of two that contains v.  If v is already a
+// power of two, it is returned as is.
+static inline int64_t NextPower2(int64_t n) {
+  // Taken from
+  // http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
+  n--;
+  n |= n >> 1;
+  n |= n >> 2;
+  n |= n >> 4;
+  n |= n >> 8;
+  n |= n >> 16;
+  n |= n >> 32;
+  n++;
+  return n;
+}
+
+constexpr bool IsMultipleOf64(int64_t n) { return (n & 63) == 0; }
+
+constexpr bool IsMultipleOf8(int64_t n) { return (n & 7) == 0; }
+
+// Maximum Bloom filter size, it sets to HDFS default block size 128MB
+// This value will be reconsidered when implementing Bloom filter producer.
+static constexpr uint32_t kMaximumBloomFilterBytes = 128 * 1024 * 1024;
+
+/// The BlockSplitBloomFilter is implemented using block-based Bloom filters 
from
+/// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic 
idea is to
+/// hash the item to a tiny Bloom filter which size fit a single cache line or 
smaller.
+///
+/// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom
+/// filter is 32 bytes to take advantage of 32-byte SIMD instructions.
+class BlockSplitBloomFilter {
+ public:
+  /// The constructor of BlockSplitBloomFilter. It uses XXH64 as hash function.
+  BlockSplitBloomFilter();
+
+  /// Initialize the BlockSplitBloomFilter. The range of num_bytes should be 
within
+  /// [kMinimumBloomFilterBytes, kMaximumBloomFilterBytes], it will be
+  /// rounded up/down to lower/upper bound if num_bytes is out of range and 
also
+  /// will be rounded up to a power of 2.
+  ///
+  /// @param num_bytes The number of bytes to store Bloom filter bitset.
+  void Init(uint32_t num_bytes);
+
+  /// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying
+  /// bitset because the given bitset may not satisfy the 32-byte alignment 
requirement
+  /// which may lead to segfault when performing SIMD instructions. It is the 
caller's
+  /// responsibility to free the bitset passed in.
+  ///
+  /// @param bitset The given bitset to initialize the Bloom filter.
+  /// @param num_bytes  The number of bytes of given bitset.
+  /// @return false if the number of bytes of Bloom filter bitset is not a 
power of 2, and true means successfully init
+  bool Init(const uint8_t* bitset, uint32_t num_bytes);
+
+  /// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying
+  /// bitset because the given bitset may not satisfy the 32-byte alignment 
requirement
+  /// which may lead to segfault when performing SIMD instructions. It is the 
caller's
+  /// responsibility to free the bitset passed in.
+  ///
+  /// @param bitset The given bitset to initialize the Bloom filter.
+  /// @return false if the number of bytes of Bloom filter bitset is not a 
power of 2, and true means successfully init
+  bool Init(std::string bitset);
+
+  /// Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom 
filter.
+  static constexpr uint32_t kMinimumBloomFilterBytes = 32;
+
+  /// Calculate optimal size according to the number of distinct values and 
false
+  /// positive probability.
+  ///
+  /// @param ndv The number of distinct values.
+  /// @param fpp The false positive probability.
+  /// @return it always return a value between kMinimumBloomFilterBytes and
+  /// kMaximumBloomFilterBytes, and the return value is always a power of 2
+  static uint32_t OptimalNumOfBytes(uint32_t ndv, double fpp) {
+    uint32_t optimal_num_of_bits = OptimalNumOfBits(ndv, fpp);
+    //    CHECK(IsMultipleOf8(optimal_num_of_bits));
+    return optimal_num_of_bits >> 3;
+  }
+
+  /// Calculate optimal size according to the number of distinct values and 
false
+  /// positive probability.
+  ///
+  /// @param ndv The number of distinct values.
+  /// @param fpp The false positive probability.
+  /// @return it always return a value between kMinimumBloomFilterBytes * 8 and
+  /// kMaximumBloomFilterBytes * 8, and the return value is always a power of 
16
+  static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) {
+    //    CHECK(fpp > 0.0 && fpp < 1.0);
+    const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8));
+    uint32_t num_bits = 0;
+
+    // Handle overflow.
+    if (m < 0 || m > kMaximumBloomFilterBytes << 3) {
+      num_bits = static_cast<uint32_t>(kMaximumBloomFilterBytes << 3);
+    } else {
+      num_bits = static_cast<uint32_t>(m);
+    }
+
+    // Round up to lower bound
+    if (num_bits < kMinimumBloomFilterBytes << 3) {
+      num_bits = kMinimumBloomFilterBytes << 3;
+    }
+
+    // Get next power of 2 if bits is not power of 2.
+    if ((num_bits & (num_bits - 1)) != 0) {
+      num_bits = static_cast<uint32_t>(NextPower2(num_bits));
+    }
+
+    // Round down to upper bound
+    if (num_bits > kMaximumBloomFilterBytes << 3) {
+      num_bits = kMaximumBloomFilterBytes << 3;
+    }
+
+    return num_bits;
+  }
+
+  /// Determine whether an element exist in set or not.
+  ///
+  /// @param hash the element to contain.
+  /// @return false if value is definitely not in set, and true means PROBABLY
+  /// in set.
+  bool FindHash(uint64_t hash) const;
+
+  /// Insert element to set represented by Bloom filter bitset.
+  ///
+  /// @param hash the hash of value to insert into Bloom filter.
+  void InsertHash(uint64_t hash);
+
+  uint32_t GetBitsetSize() const { return num_bytes_; }
+
+  /// Get the plain bitset value from the Bloom filter bitset.
+  ///
+  /// @return bitset value;
+  const std::string& GetData() { return data_; }
+
+  /// Compute hash for string value by using its plain encoding result.
+  ///
+  /// @param value the value address.
+  /// @param len the value length.
+  /// @return hash result.
+  static uint64_t Hash(const char* data, size_t length);
+
+ private:
+  // Bytes in a tiny Bloom filter block.
+  static constexpr int kBytesPerFilterBlock = 32;
+
+  // The number of bits to be set in each tiny Bloom filter
+  static constexpr int kBitsSetPerBlock = 8;
+
+  // The block-based algorithm needs eight odd SALT values to calculate eight 
indexes
+  // of bit to set, one bit in each 32-bit word.
+  static constexpr uint32_t SALT[kBitsSetPerBlock] = {0x47b6137bU, 
0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
+                                                      0x705495c7U, 
0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U};
+
+  // The underlying buffer of bitset.
+  std::string data_;
+
+  // The number of bytes of Bloom filter bitset.
+  uint32_t num_bytes_;
+};
diff --git a/tests/cppunit/types/bloom_filter_test.cc 
b/tests/cppunit/types/bloom_filter_test.cc
new file mode 100644
index 00000000..0749c52c
--- /dev/null
+++ b/tests/cppunit/types/bloom_filter_test.cc
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "types/bloom_filter.h"
+
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <random>
+#include <string>
+#include <utility>
+#include <vector>
+
+namespace test {
+
+TEST(ConstructorTest, TestBloomFilter) {
+  BlockSplitBloomFilter bloom_filter;
+
+  // It return false because the number of bytes of Bloom filter bitset must 
be a power of 2.
+  std::unique_ptr<uint8_t[]> bitset1(new uint8_t[1024]());
+  EXPECT_FALSE(bloom_filter.Init(bitset1.get(), 1023));
+
+  // It return false because the number of bytes of Bloom filter bitset must 
be a power of 2.
+  std::string bitset2(1022, 's');
+  EXPECT_FALSE(bloom_filter.Init(bitset2));
+}
+
+// The BasicTest is used to test basic operations including InsertHash, 
FindHash and
+// serializing and de-serializing.
+TEST(BasicTest, TestBloomFilter) {
+  const std::vector<uint32_t> kBloomFilterSizes = {32, 64, 128, 256, 512, 
1024, 2048};
+  const std::vector<std::string> kStringInserts = {"1", "2",  "3",  "5", "6",  
 "7",    "8",
+                                                   "9", "10", "42", "a", 
"abc", "qwert"};
+  const std::vector<std::string> kStringLookups = {"-1", "-2", "-3",  "-5",    
"-6",   "-7",
+                                                   "-8", "b",  "acb", "tyuio", 
"trewq"};
+
+  for (const auto bloom_filter_bytes : kBloomFilterSizes) {
+    BlockSplitBloomFilter bloom_filter;
+    bloom_filter.Init(bloom_filter_bytes);
+
+    // Empty bloom filter deterministically returns false
+    for (const auto& v : kStringInserts) {
+      EXPECT_FALSE(bloom_filter.FindHash(bloom_filter.Hash(v.data(), 
v.size())));
+    }
+
+    // Insert all values
+    for (const auto& v : kStringInserts) {
+      bloom_filter.InsertHash(bloom_filter.Hash(v.data(), v.size()));
+    }
+
+    // They should always lookup successfully
+    for (const auto& v : kStringInserts) {
+      EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(v.data(), 
v.size())));
+    }
+
+    // Values not inserted in the filter should only rarely lookup successfully
+    int false_positives = 0;
+    for (const auto& v : kStringLookups) {
+      false_positives += bloom_filter.FindHash(bloom_filter.Hash(v.data(), 
v.size()));
+    }
+    // (this is a crude check, see FPPTest below for a more rigorous formula)
+    EXPECT_LE(false_positives, 2);
+
+    // Serialize Bloom filter to string bitset
+    std::string data_saved = bloom_filter.GetData();
+    // ReBuild Bloom filter from string bitset
+    BlockSplitBloomFilter bloom_filter_new;
+    bloom_filter_new.Init(data_saved);
+
+    // Lookup previously inserted values
+    for (const auto& v : kStringInserts) {
+      EXPECT_TRUE(bloom_filter_new.FindHash(bloom_filter_new.Hash(v.data(), 
v.size())));
+    }
+    false_positives = 0;
+    for (const auto& v : kStringLookups) {
+      false_positives += 
bloom_filter_new.FindHash(bloom_filter_new.Hash(v.data(), v.size()));
+    }
+    EXPECT_LE(false_positives, 2);
+  }
+}
+
+// Helper function to generate random string.
+std::string GetRandomString(uint32_t length) {
+  // Character set used to generate random string
+  const std::string charset = 
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+
+  std::default_random_engine gen(42);
+  std::uniform_int_distribution<uint32_t> dist(0, 
static_cast<int>(charset.size() - 1));
+  std::string ret(length, 'x');
+
+  for (uint32_t i = 0; i < length; i++) {
+    ret[i] = charset[dist(gen)];
+  }
+  return ret;
+}
+
+TEST(FPPTest, TestBloomFilter) {
+  // It counts the number of times FindHash returns true.
+  int exist = 0;
+
+  const int total_count = 100000;
+
+  // Bloom filter fpp parameter
+  const double fpp = 0.01;
+
+  std::vector<std::string> members;
+  BlockSplitBloomFilter bloom_filter;
+  bloom_filter.Init(BlockSplitBloomFilter::OptimalNumOfBytes(total_count, 
fpp));
+
+  // Insert elements into the Bloom filter
+  for (int i = 0; i < total_count; i++) {
+    // Insert random string which length is 8
+    std::string tmp = GetRandomString(8);
+    members.push_back(tmp);
+    bloom_filter.InsertHash(bloom_filter.Hash(tmp.data(), tmp.size()));
+  }
+
+  for (int i = 0; i < total_count; i++) {
+    ASSERT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(members[i].data(), 
members[i].size())));
+    std::string tmp = GetRandomString(7);
+
+    if (bloom_filter.FindHash(bloom_filter.Hash(tmp.data(), tmp.size()))) {
+      exist++;
+    }
+  }
+
+  // The exist should be probably less than 1000 according default FPP 0.01.
+  EXPECT_LT(exist, total_count * fpp);
+}
+
+// OptimalValueTest is used to test whether OptimalNumOfBits returns expected
+// numbers according to formula:
+//     num_of_bits = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8.0))
+// where ndv is the number of distinct values and fpp is the false positive 
probability.
+// Also it is used to test whether OptimalNumOfBits returns value between
+// [MINIMUM_BLOOM_FILTER_SIZE, MAXIMUM_BLOOM_FILTER_SIZE].
+TEST(OptimalValueTest, TestBloomFilter) {
+  auto test_optimal_num_estimation = [](uint32_t ndv, double fpp, uint32_t 
num_bits) {
+    EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(ndv, fpp), num_bits);
+    EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBytes(ndv, fpp), num_bits / 
8);
+  };
+
+  test_optimal_num_estimation(256, 0.01, UINT32_C(4096));
+  test_optimal_num_estimation(512, 0.01, UINT32_C(8192));
+  test_optimal_num_estimation(1024, 0.01, UINT32_C(16384));
+  test_optimal_num_estimation(2048, 0.01, UINT32_C(32768));
+
+  test_optimal_num_estimation(200, 0.01, UINT32_C(2048));
+  test_optimal_num_estimation(300, 0.01, UINT32_C(4096));
+  test_optimal_num_estimation(700, 0.01, UINT32_C(8192));
+  test_optimal_num_estimation(1500, 0.01, UINT32_C(16384));
+
+  test_optimal_num_estimation(200, 0.025, UINT32_C(2048));
+  test_optimal_num_estimation(300, 0.025, UINT32_C(4096));
+  test_optimal_num_estimation(700, 0.025, UINT32_C(8192));
+  test_optimal_num_estimation(1500, 0.025, UINT32_C(16384));
+
+  test_optimal_num_estimation(200, 0.05, UINT32_C(2048));
+  test_optimal_num_estimation(300, 0.05, UINT32_C(4096));
+  test_optimal_num_estimation(700, 0.05, UINT32_C(8192));
+  test_optimal_num_estimation(1500, 0.05, UINT32_C(16384));
+
+  // Boundary check
+  test_optimal_num_estimation(4, 0.01, 
BlockSplitBloomFilter::kMinimumBloomFilterBytes * 8);
+  test_optimal_num_estimation(4, 0.25, 
BlockSplitBloomFilter::kMinimumBloomFilterBytes * 8);
+
+  test_optimal_num_estimation(std::numeric_limits<uint32_t>::max(), 0.01, 
kMaximumBloomFilterBytes * 8);
+  test_optimal_num_estimation(std::numeric_limits<uint32_t>::max(), 0.25, 
kMaximumBloomFilterBytes * 8);
+}
+
+}  // namespace test

Reply via email to