This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 9592b507 Add splited bloom filter for RedisBloom support (#1693)
9592b507 is described below
commit 9592b5079b3b3c90d5998292cd5c1ba8274222d9
Author: Leo <[email protected]>
AuthorDate: Thu Aug 24 15:46:13 2023 +0800
Add splited bloom filter for RedisBloom support (#1693)
Co-authored-by: Twice <[email protected]>
Co-authored-by: Twice <[email protected]>
---
src/types/bloom_filter.cc | 96 ++++++++++++++++
src/types/bloom_filter.h | 183 ++++++++++++++++++++++++++++++
tests/cppunit/types/bloom_filter_test.cc | 187 +++++++++++++++++++++++++++++++
3 files changed, 466 insertions(+)
diff --git a/src/types/bloom_filter.cc b/src/types/bloom_filter.cc
new file mode 100644
index 00000000..e7c0debf
--- /dev/null
+++ b/src/types/bloom_filter.cc
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bloom_filter.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "xxh3.h"
+
+BlockSplitBloomFilter::BlockSplitBloomFilter() = default;
+
+void BlockSplitBloomFilter::Init(uint32_t num_bytes) {
+ if (num_bytes < kMinimumBloomFilterBytes) {
+ num_bytes = kMinimumBloomFilterBytes;
+ }
+
+ // Get next power of 2 if it is not power of 2.
+ if ((num_bytes & (num_bytes - 1)) != 0) {
+ num_bytes = static_cast<uint32_t>(NextPower2(num_bytes));
+ }
+
+ if (num_bytes > kMaximumBloomFilterBytes) {
+ num_bytes = kMaximumBloomFilterBytes;
+ }
+
+ num_bytes_ = num_bytes;
+ data_.resize(num_bytes_, 0);
+}
+
+bool BlockSplitBloomFilter::Init(const uint8_t* bitset, uint32_t num_bytes) {
+ if (num_bytes < kMinimumBloomFilterBytes || num_bytes >
kMaximumBloomFilterBytes ||
+ (num_bytes & (num_bytes - 1)) != 0) {
+ return false;
+ }
+
+ num_bytes_ = num_bytes;
+ data_ = {reinterpret_cast<const char*>(bitset), num_bytes};
+ return true;
+}
+
+bool BlockSplitBloomFilter::Init(std::string bitset) {
+ if (bitset.size() < kMinimumBloomFilterBytes || bitset.size() >
kMaximumBloomFilterBytes ||
+ (bitset.size() & (bitset.size() - 1)) != 0) {
+ return false;
+ }
+
+ num_bytes_ = bitset.size();
+ data_ = std::move(bitset);
+ return true;
+}
+
+static constexpr uint32_t kBloomFilterHeaderSizeGuess = 256;
+
+bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
+ const auto bucket_index = static_cast<uint32_t>(((hash >> 32) * (num_bytes_
/ kBytesPerFilterBlock)) >> 32);
+ const auto key = static_cast<uint32_t>(hash);
+ const auto* bitset32 = reinterpret_cast<const uint32_t*>(data_.data());
+
+ for (int i = 0; i < kBitsSetPerBlock; ++i) {
+ // Calculate mask for key in the given bitset.
+ const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
+ if ((0 == (bitset32[kBitsSetPerBlock * bucket_index + i] & mask))) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void BlockSplitBloomFilter::InsertHash(uint64_t hash) {
+ const auto bucket_index = static_cast<uint32_t>(((hash >> 32) * (num_bytes_
/ kBytesPerFilterBlock)) >> 32);
+ const auto key = static_cast<uint32_t>(hash);
+ auto* bitset32 = reinterpret_cast<uint32_t*>(data_.data());
+
+ for (int i = 0; i < kBitsSetPerBlock; i++) {
+ // Calculate mask for key in the given bitset.
+ const uint32_t mask = UINT32_C(0x1) << ((key * SALT[i]) >> 27);
+ bitset32[bucket_index * kBitsSetPerBlock + i] |= mask;
+ }
+}
+
+uint64_t BlockSplitBloomFilter::Hash(const char* data, size_t length) { return
XXH64(data, length, /*seed=*/0); }
diff --git a/src/types/bloom_filter.h b/src/types/bloom_filter.h
new file mode 100644
index 00000000..d5f29ee3
--- /dev/null
+++ b/src/types/bloom_filter.h
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cmath>
+#include <cstdint>
+#include <memory>
+#include <string>
+
+// Returns the smallest power of two that contains v. If v is already a
+// power of two, it is returned as is.
+static inline int64_t NextPower2(int64_t n) {
+ // Taken from
+ // http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
+ n--;
+ n |= n >> 1;
+ n |= n >> 2;
+ n |= n >> 4;
+ n |= n >> 8;
+ n |= n >> 16;
+ n |= n >> 32;
+ n++;
+ return n;
+}
+
+constexpr bool IsMultipleOf64(int64_t n) { return (n & 63) == 0; }
+
+constexpr bool IsMultipleOf8(int64_t n) { return (n & 7) == 0; }
+
+// Maximum Bloom filter size, it sets to HDFS default block size 128MB
+// This value will be reconsidered when implementing Bloom filter producer.
+static constexpr uint32_t kMaximumBloomFilterBytes = 128 * 1024 * 1024;
+
+/// The BlockSplitBloomFilter is implemented using block-based Bloom filters
from
+/// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic
idea is to
+/// hash the item to a tiny Bloom filter which size fit a single cache line or
smaller.
+///
+/// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom
+/// filter is 32 bytes to take advantage of 32-byte SIMD instructions.
+class BlockSplitBloomFilter {
+ public:
+ /// The constructor of BlockSplitBloomFilter. It uses XXH64 as hash function.
+ BlockSplitBloomFilter();
+
+ /// Initialize the BlockSplitBloomFilter. The range of num_bytes should be
within
+ /// [kMinimumBloomFilterBytes, kMaximumBloomFilterBytes], it will be
+ /// rounded up/down to lower/upper bound if num_bytes is out of range and
also
+ /// will be rounded up to a power of 2.
+ ///
+ /// @param num_bytes The number of bytes to store Bloom filter bitset.
+ void Init(uint32_t num_bytes);
+
+ /// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying
+ /// bitset because the given bitset may not satisfy the 32-byte alignment
requirement
+ /// which may lead to segfault when performing SIMD instructions. It is the
caller's
+ /// responsibility to free the bitset passed in.
+ ///
+ /// @param bitset The given bitset to initialize the Bloom filter.
+ /// @param num_bytes The number of bytes of given bitset.
+ /// @return false if the number of bytes of Bloom filter bitset is not a
power of 2, and true means successfully init
+ bool Init(const uint8_t* bitset, uint32_t num_bytes);
+
+ /// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying
+ /// bitset because the given bitset may not satisfy the 32-byte alignment
requirement
+ /// which may lead to segfault when performing SIMD instructions. It is the
caller's
+ /// responsibility to free the bitset passed in.
+ ///
+ /// @param bitset The given bitset to initialize the Bloom filter.
+ /// @return false if the number of bytes of Bloom filter bitset is not a
power of 2, and true means successfully init
+ bool Init(std::string bitset);
+
+ /// Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom
filter.
+ static constexpr uint32_t kMinimumBloomFilterBytes = 32;
+
+ /// Calculate optimal size according to the number of distinct values and
false
+ /// positive probability.
+ ///
+ /// @param ndv The number of distinct values.
+ /// @param fpp The false positive probability.
+ /// @return it always return a value between kMinimumBloomFilterBytes and
+ /// kMaximumBloomFilterBytes, and the return value is always a power of 2
+ static uint32_t OptimalNumOfBytes(uint32_t ndv, double fpp) {
+ uint32_t optimal_num_of_bits = OptimalNumOfBits(ndv, fpp);
+ // CHECK(IsMultipleOf8(optimal_num_of_bits));
+ return optimal_num_of_bits >> 3;
+ }
+
+ /// Calculate optimal size according to the number of distinct values and
false
+ /// positive probability.
+ ///
+ /// @param ndv The number of distinct values.
+ /// @param fpp The false positive probability.
+ /// @return it always return a value between kMinimumBloomFilterBytes * 8 and
+ /// kMaximumBloomFilterBytes * 8, and the return value is always a power of
16
+ static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) {
+ // CHECK(fpp > 0.0 && fpp < 1.0);
+ const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8));
+ uint32_t num_bits = 0;
+
+ // Handle overflow.
+ if (m < 0 || m > kMaximumBloomFilterBytes << 3) {
+ num_bits = static_cast<uint32_t>(kMaximumBloomFilterBytes << 3);
+ } else {
+ num_bits = static_cast<uint32_t>(m);
+ }
+
+ // Round up to lower bound
+ if (num_bits < kMinimumBloomFilterBytes << 3) {
+ num_bits = kMinimumBloomFilterBytes << 3;
+ }
+
+ // Get next power of 2 if bits is not power of 2.
+ if ((num_bits & (num_bits - 1)) != 0) {
+ num_bits = static_cast<uint32_t>(NextPower2(num_bits));
+ }
+
+ // Round down to upper bound
+ if (num_bits > kMaximumBloomFilterBytes << 3) {
+ num_bits = kMaximumBloomFilterBytes << 3;
+ }
+
+ return num_bits;
+ }
+
+ /// Determine whether an element exist in set or not.
+ ///
+ /// @param hash the element to contain.
+ /// @return false if value is definitely not in set, and true means PROBABLY
+ /// in set.
+ bool FindHash(uint64_t hash) const;
+
+ /// Insert element to set represented by Bloom filter bitset.
+ ///
+ /// @param hash the hash of value to insert into Bloom filter.
+ void InsertHash(uint64_t hash);
+
+ uint32_t GetBitsetSize() const { return num_bytes_; }
+
+ /// Get the plain bitset value from the Bloom filter bitset.
+ ///
+ /// @return bitset value;
+ const std::string& GetData() { return data_; }
+
+ /// Compute hash for string value by using its plain encoding result.
+ ///
+ /// @param value the value address.
+ /// @param len the value length.
+ /// @return hash result.
+ static uint64_t Hash(const char* data, size_t length);
+
+ private:
+ // Bytes in a tiny Bloom filter block.
+ static constexpr int kBytesPerFilterBlock = 32;
+
+ // The number of bits to be set in each tiny Bloom filter
+ static constexpr int kBitsSetPerBlock = 8;
+
+ // The block-based algorithm needs eight odd SALT values to calculate eight
indexes
+ // of bit to set, one bit in each 32-bit word.
+ static constexpr uint32_t SALT[kBitsSetPerBlock] = {0x47b6137bU,
0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
+ 0x705495c7U,
0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U};
+
+ // The underlying buffer of bitset.
+ std::string data_;
+
+ // The number of bytes of Bloom filter bitset.
+ uint32_t num_bytes_;
+};
diff --git a/tests/cppunit/types/bloom_filter_test.cc
b/tests/cppunit/types/bloom_filter_test.cc
new file mode 100644
index 00000000..0749c52c
--- /dev/null
+++ b/tests/cppunit/types/bloom_filter_test.cc
@@ -0,0 +1,187 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "types/bloom_filter.h"
+
+#include <gtest/gtest.h>
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <random>
+#include <string>
+#include <utility>
+#include <vector>
+
+namespace test {
+
+TEST(ConstructorTest, TestBloomFilter) {
+ BlockSplitBloomFilter bloom_filter;
+
+ // It return false because the number of bytes of Bloom filter bitset must
be a power of 2.
+ std::unique_ptr<uint8_t[]> bitset1(new uint8_t[1024]());
+ EXPECT_FALSE(bloom_filter.Init(bitset1.get(), 1023));
+
+ // It return false because the number of bytes of Bloom filter bitset must
be a power of 2.
+ std::string bitset2(1022, 's');
+ EXPECT_FALSE(bloom_filter.Init(bitset2));
+}
+
+// The BasicTest is used to test basic operations including InsertHash,
FindHash and
+// serializing and de-serializing.
+TEST(BasicTest, TestBloomFilter) {
+ const std::vector<uint32_t> kBloomFilterSizes = {32, 64, 128, 256, 512,
1024, 2048};
+ const std::vector<std::string> kStringInserts = {"1", "2", "3", "5", "6",
"7", "8",
+ "9", "10", "42", "a",
"abc", "qwert"};
+ const std::vector<std::string> kStringLookups = {"-1", "-2", "-3", "-5",
"-6", "-7",
+ "-8", "b", "acb", "tyuio",
"trewq"};
+
+ for (const auto bloom_filter_bytes : kBloomFilterSizes) {
+ BlockSplitBloomFilter bloom_filter;
+ bloom_filter.Init(bloom_filter_bytes);
+
+ // Empty bloom filter deterministically returns false
+ for (const auto& v : kStringInserts) {
+ EXPECT_FALSE(bloom_filter.FindHash(bloom_filter.Hash(v.data(),
v.size())));
+ }
+
+ // Insert all values
+ for (const auto& v : kStringInserts) {
+ bloom_filter.InsertHash(bloom_filter.Hash(v.data(), v.size()));
+ }
+
+ // They should always lookup successfully
+ for (const auto& v : kStringInserts) {
+ EXPECT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(v.data(),
v.size())));
+ }
+
+ // Values not inserted in the filter should only rarely lookup successfully
+ int false_positives = 0;
+ for (const auto& v : kStringLookups) {
+ false_positives += bloom_filter.FindHash(bloom_filter.Hash(v.data(),
v.size()));
+ }
+ // (this is a crude check, see FPPTest below for a more rigorous formula)
+ EXPECT_LE(false_positives, 2);
+
+ // Serialize Bloom filter to string bitset
+ std::string data_saved = bloom_filter.GetData();
+ // ReBuild Bloom filter from string bitset
+ BlockSplitBloomFilter bloom_filter_new;
+ bloom_filter_new.Init(data_saved);
+
+ // Lookup previously inserted values
+ for (const auto& v : kStringInserts) {
+ EXPECT_TRUE(bloom_filter_new.FindHash(bloom_filter_new.Hash(v.data(),
v.size())));
+ }
+ false_positives = 0;
+ for (const auto& v : kStringLookups) {
+ false_positives +=
bloom_filter_new.FindHash(bloom_filter_new.Hash(v.data(), v.size()));
+ }
+ EXPECT_LE(false_positives, 2);
+ }
+}
+
+// Helper function to generate random string.
+std::string GetRandomString(uint32_t length) {
+ // Character set used to generate random string
+ const std::string charset =
"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
+
+ std::default_random_engine gen(42);
+ std::uniform_int_distribution<uint32_t> dist(0,
static_cast<int>(charset.size() - 1));
+ std::string ret(length, 'x');
+
+ for (uint32_t i = 0; i < length; i++) {
+ ret[i] = charset[dist(gen)];
+ }
+ return ret;
+}
+
+TEST(FPPTest, TestBloomFilter) {
+ // It counts the number of times FindHash returns true.
+ int exist = 0;
+
+ const int total_count = 100000;
+
+ // Bloom filter fpp parameter
+ const double fpp = 0.01;
+
+ std::vector<std::string> members;
+ BlockSplitBloomFilter bloom_filter;
+ bloom_filter.Init(BlockSplitBloomFilter::OptimalNumOfBytes(total_count,
fpp));
+
+ // Insert elements into the Bloom filter
+ for (int i = 0; i < total_count; i++) {
+ // Insert random string which length is 8
+ std::string tmp = GetRandomString(8);
+ members.push_back(tmp);
+ bloom_filter.InsertHash(bloom_filter.Hash(tmp.data(), tmp.size()));
+ }
+
+ for (int i = 0; i < total_count; i++) {
+ ASSERT_TRUE(bloom_filter.FindHash(bloom_filter.Hash(members[i].data(),
members[i].size())));
+ std::string tmp = GetRandomString(7);
+
+ if (bloom_filter.FindHash(bloom_filter.Hash(tmp.data(), tmp.size()))) {
+ exist++;
+ }
+ }
+
+ // The exist should be probably less than 1000 according default FPP 0.01.
+ EXPECT_LT(exist, total_count * fpp);
+}
+
+// OptimalValueTest is used to test whether OptimalNumOfBits returns expected
+// numbers according to formula:
+// num_of_bits = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8.0))
+// where ndv is the number of distinct values and fpp is the false positive
probability.
+// Also it is used to test whether OptimalNumOfBits returns value between
+// [MINIMUM_BLOOM_FILTER_SIZE, MAXIMUM_BLOOM_FILTER_SIZE].
+TEST(OptimalValueTest, TestBloomFilter) {
+ auto test_optimal_num_estimation = [](uint32_t ndv, double fpp, uint32_t
num_bits) {
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBits(ndv, fpp), num_bits);
+ EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBytes(ndv, fpp), num_bits /
8);
+ };
+
+ test_optimal_num_estimation(256, 0.01, UINT32_C(4096));
+ test_optimal_num_estimation(512, 0.01, UINT32_C(8192));
+ test_optimal_num_estimation(1024, 0.01, UINT32_C(16384));
+ test_optimal_num_estimation(2048, 0.01, UINT32_C(32768));
+
+ test_optimal_num_estimation(200, 0.01, UINT32_C(2048));
+ test_optimal_num_estimation(300, 0.01, UINT32_C(4096));
+ test_optimal_num_estimation(700, 0.01, UINT32_C(8192));
+ test_optimal_num_estimation(1500, 0.01, UINT32_C(16384));
+
+ test_optimal_num_estimation(200, 0.025, UINT32_C(2048));
+ test_optimal_num_estimation(300, 0.025, UINT32_C(4096));
+ test_optimal_num_estimation(700, 0.025, UINT32_C(8192));
+ test_optimal_num_estimation(1500, 0.025, UINT32_C(16384));
+
+ test_optimal_num_estimation(200, 0.05, UINT32_C(2048));
+ test_optimal_num_estimation(300, 0.05, UINT32_C(4096));
+ test_optimal_num_estimation(700, 0.05, UINT32_C(8192));
+ test_optimal_num_estimation(1500, 0.05, UINT32_C(16384));
+
+ // Boundary check
+ test_optimal_num_estimation(4, 0.01,
BlockSplitBloomFilter::kMinimumBloomFilterBytes * 8);
+ test_optimal_num_estimation(4, 0.25,
BlockSplitBloomFilter::kMinimumBloomFilterBytes * 8);
+
+ test_optimal_num_estimation(std::numeric_limits<uint32_t>::max(), 0.01,
kMaximumBloomFilterBytes * 8);
+ test_optimal_num_estimation(std::numeric_limits<uint32_t>::max(), 0.25,
kMaximumBloomFilterBytes * 8);
+}
+
+} // namespace test