This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 961888dd14e5bd306d3e5b741bc4443620617664 Author: Bankim Bhavsar <[email protected]> AuthorDate: Thu Jan 9 10:33:39 2020 -0800 KUDU-2483 Integrate BlockBloomFilter with ColumnPredicate on server side This change switches the implementation of the ColumnPredicate to use the BlockBloomFilter for the BloomFilter predicate on the server side. Earlier implementation was still experimental and didn't provide public client APIs that actually use this BloomFilter predicate so taken the liberty to make incompatible wire protocol changes. Updated BlockBloomFilter to take hash_algorithm and hash_seed. This make serializing and deserializing the BlockBloomFilter convenient and removes the need of BloomFilterInner in ColumnPredicate. Added overloaded Insert()/Find() functions to BlockBloomFilter that take Slice parameter and hashes the key before insertion/lookup. Most of the change involves refactoring the implementation including the unit tests. Currently only FAST_HASH algorithm is supported since 32-bit versions of MURMUR2 and CITY_HASH are not currently implemented. Change-Id: I7ecfd67e9c5fbe459c5b4aed91e0be2a194d433a Reviewed-on: http://gerrit.cloudera.org:8080/15034 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Adar Dembo <[email protected]> Reviewed-by: helifu <[email protected]> --- src/kudu/common/column_predicate-test.cc | 164 +++++++++---------- src/kudu/common/column_predicate.cc | 28 ++-- src/kudu/common/column_predicate.h | 82 ++-------- src/kudu/common/common.proto | 12 +- src/kudu/common/wire_protocol-test.cc | 72 ++++----- src/kudu/common/wire_protocol.cc | 64 +++----- src/kudu/tablet/cfile_set-test.cc | 173 ++++++++++----------- src/kudu/util/CMakeLists.txt | 15 ++ src/kudu/util/block_bloom_filter-test.cc | 12 +- src/kudu/util/block_bloom_filter.cc | 89 ++++++++++- src/kudu/util/block_bloom_filter.h | 74 ++++++++- .../util/{hash.proto => block_bloom_filter.proto} | 20 ++- src/kudu/util/hash.proto | 1 + src/kudu/util/hash_util-test.cc | 16 +- src/kudu/util/hash_util.h | 28 +++- 15 files changed, 469 insertions(+), 381 deletions(-) diff --git a/src/kudu/common/column_predicate-test.cc b/src/kudu/common/column_predicate-test.cc index 15c0ed3..34f8341 100644 --- a/src/kudu/common/column_predicate-test.cc +++ b/src/kudu/common/column_predicate-test.cc @@ -21,7 +21,9 @@ #include <cstdint> #include <cstdlib> #include <functional> +#include <initializer_list> #include <string> +#include <utility> #include <vector> #include <boost/optional/optional.hpp> @@ -37,13 +39,15 @@ #include "kudu/gutil/stringprintf.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/walltime.h" -#include "kudu/util/bloom_filter.h" +#include "kudu/util/block_bloom_filter.h" #include "kudu/util/hash.pb.h" +#include "kudu/util/hash_util.h" #include "kudu/util/int128.h" #include "kudu/util/memory/arena.h" #include "kudu/util/random.h" #include "kudu/util/slice.h" #include "kudu/util/stopwatch.h" +#include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" using std::vector; @@ -82,8 +86,8 @@ class TestColumnPredicate : public KuduTest { void FillBloomFilterAndValues(int n_keys, vector<uint64_t>* values, - BloomFilterBuilder* bfb1, - BloomFilterBuilder* bfb2) { + BlockBloomFilter* b1, + BlockBloomFilter* b2) { uint64_t current = 0; for (int i = 0; i < 2; ++i) { while (true) { @@ -93,9 +97,9 @@ class TestColumnPredicate : public KuduTest { } current = key; Slice key_slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)); - BloomKeyProbe probe(key_slice, MURMUR_HASH_2); - bfb1->AddKey(probe); - bfb2->AddKey(probe); + uint32_t hash_val = HashUtil::ComputeHash32(key_slice, FAST_HASH, 0); + b1->Insert(hash_val); + b2->Insert(hash_val); values->emplace_back(key); break; } @@ -104,12 +108,11 @@ class TestColumnPredicate : public KuduTest { while (true) { uint64_t key = rand_.Next(); Slice key_slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key)); - BloomKeyProbe probe(key_slice, MURMUR_HASH_2); - BloomFilter bf(bfb1->slice(), bfb1->n_hashes()); - if (!bf.MayContainKey(probe) && key > current) { + uint32_t hash_val = HashUtil::ComputeHash32(key_slice, FAST_HASH, 0); + if (!b1->Find(hash_val) && key > current) { current = key; values->emplace_back(key); - bfb2->AddKey(probe); + b2->Insert(hash_val); break; } } @@ -795,9 +798,8 @@ class TestColumnPredicate : public KuduTest { template <typename T> void TestMergeBloomFilterCombinations(const ColumnSchema& column, - vector<ColumnPredicate::BloomFilterInner>* bf, + const vector<BlockBloomFilter*>& bf, vector<T> values) { - vector<ColumnPredicate::BloomFilterInner> orig_bloom_filters = *bf; // BloomFilter AND // NONE // = @@ -811,7 +813,6 @@ class TestColumnPredicate : public KuduTest { // Equality // = // Equality - *bf = orig_bloom_filters; TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), ColumnPredicate::Equality(column, &values[0]), ColumnPredicate::Equality(column, &values[0]), @@ -821,8 +822,8 @@ class TestColumnPredicate : public KuduTest { // Equality // = // None - *bf = orig_bloom_filters; TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), + // Value in index 2 is not present in one of the bloom filters. ColumnPredicate::Equality(column, &values[2]), ColumnPredicate::None(column), PredicateType::None); @@ -831,18 +832,15 @@ class TestColumnPredicate : public KuduTest { // IS NOT NULL // = // BloomFilter - *bf = orig_bloom_filters; - vector<ColumnPredicate::BloomFilterInner> bf_copy = *bf; TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), ColumnPredicate::IsNotNull(column), - ColumnPredicate::InBloomFilter(column, &bf_copy, nullptr, nullptr), + ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), PredicateType::InBloomFilter); // BloomFilter AND // IS NULL // = // None - *bf = orig_bloom_filters; TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), ColumnPredicate::IsNull(column), ColumnPredicate::None(column), @@ -852,9 +850,7 @@ class TestColumnPredicate : public KuduTest { // InList // = // None(the value in list can not hit bloom filter) - *bf = orig_bloom_filters; vector<const void*> in_list = { &values[2], &values[3], &values[4] }; - vector<const void*> hit_list; TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), ColumnPredicate::InList(column, &in_list), ColumnPredicate::None(column), @@ -865,8 +861,7 @@ class TestColumnPredicate : public KuduTest { // = // InList(the value in list all hits bloom filter) in_list = { &values[0], &values[1] }; - hit_list = { &values[0], &values[1] }; - *bf = orig_bloom_filters; + vector<const void*> hit_list = { &values[0], &values[1] }; TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), ColumnPredicate::InList(column, &in_list), ColumnPredicate::InList(column, &hit_list), @@ -878,7 +873,6 @@ class TestColumnPredicate : public KuduTest { // InList(only the some values in list hits bloom filter) in_list = { &values[0], &values[1], &values[2], &values[3] }; hit_list = { &values[0], &values[1]}; - *bf = orig_bloom_filters; TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), ColumnPredicate::InList(column, &in_list), ColumnPredicate::InList(column, &hit_list), @@ -889,7 +883,6 @@ class TestColumnPredicate : public KuduTest { // = // Equality(only the first value in list hits bloom filter, so it simplify to Equality) in_list = { &values[0], &values[2], &values[3] }; - *bf = orig_bloom_filters; TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), ColumnPredicate::InList(column, &in_list), ColumnPredicate::Equality(column, &values[0]), @@ -899,30 +892,26 @@ class TestColumnPredicate : public KuduTest { // BloomFilter // = // BloomFilter with lower and upper bound - *bf = orig_bloom_filters; - bf_copy = *bf; TestMerge(ColumnPredicate::Range(column, &values[0], &values[4]), ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), - ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]), + ColumnPredicate::InBloomFilter(column, bf, &values[0], + &values[4]), PredicateType::InBloomFilter); // BloomFilter with lower and upper bound AND // Range // = // BloomFilter with lower and upper bound - *bf = orig_bloom_filters; - bf_copy = *bf; TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[4]), ColumnPredicate::Range(column, &values[1], &values[3]), - ColumnPredicate::InBloomFilter(column, &bf_copy, &values[1], &values[3]), + ColumnPredicate::InBloomFilter(column, bf, &values[1], + &values[3]), PredicateType::InBloomFilter); // BloomFilter with lower and upper bound AND // Range // = // None - *bf = orig_bloom_filters; - bf_copy = *bf; TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[2]), ColumnPredicate::Range(column, &values[2], &values[4]), ColumnPredicate::None(column), @@ -932,38 +921,35 @@ class TestColumnPredicate : public KuduTest { // BloomFilter with lower and upper bound // = // BloomFilter with lower and upper bound - *bf = orig_bloom_filters; - bf_copy = *bf; - vector<ColumnPredicate::BloomFilterInner> collect = *bf; - collect.insert(collect.end(), bf->begin(), bf->end()); + auto collect = bf; + collect.insert(collect.end(), bf.begin(), bf.end()); TestMerge(ColumnPredicate::InBloomFilter(column, bf, nullptr, nullptr), - ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]), - ColumnPredicate::InBloomFilter(column, &collect, &values[0], &values[4]), + ColumnPredicate::InBloomFilter(column, bf, &values[0], + &values[4]), + ColumnPredicate::InBloomFilter(column, collect, &values[0], + &values[4]), PredicateType::InBloomFilter); // BloomFilter with lower and upper bound AND // BloomFilter with lower and upper bound // = // BloomFilter with lower and upper bound - *bf = orig_bloom_filters; - collect = *bf; - bf_copy = *bf; - collect.insert(collect.end(), bf->begin(), bf->end()); + collect = bf; + collect.insert(collect.end(), bf.begin(), bf.end()); TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[1], &values[3]), - ColumnPredicate::InBloomFilter(column, &bf_copy, &values[0], &values[4]), - ColumnPredicate::InBloomFilter(column, &collect, &values[1], &values[3]), + ColumnPredicate::InBloomFilter(column, bf, &values[0], + &values[4]), + ColumnPredicate::InBloomFilter(column, collect, &values[1], + &values[3]), PredicateType::InBloomFilter); // BloomFilter with lower and upper bound AND // BloomFilter with lower and upper bound // = // None - *bf = orig_bloom_filters; - collect = *bf; - bf_copy = *bf; - collect.insert(collect.end(), bf->begin(), bf->end()); TestMerge(ColumnPredicate::InBloomFilter(column, bf, &values[0], &values[2]), - ColumnPredicate::InBloomFilter(column, &bf_copy, &values[2], &values[4]), + ColumnPredicate::InBloomFilter(column, bf, &values[2], + &values[4]), ColumnPredicate::None(column), PredicateType::None); } @@ -1398,73 +1384,67 @@ TEST_F(TestColumnPredicate, TestRedaction) { } TEST_F(TestColumnPredicate, TestBloomFilterMerge) { + Arena arena(1024); + ArenaBlockBloomFilterBufferAllocator allocator(&arena); + int n_keys = 5; // 0 1 both hit bf1 and bf2, 2 3 4 only hit bf2. + // Test for UINT64 type. - BloomFilterBuilder bfb1( - BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01)); - double expected_fp_rate1 = bfb1.false_positive_rate(); - ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002); - ASSERT_EQ(9, bfb1.n_bits() / n_keys); - BloomFilterBuilder bfb2( - BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01)); - double expected_fp_rate2 = bfb2.false_positive_rate(); - ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002); - ASSERT_EQ(9, bfb2.n_bits() / n_keys); + BlockBloomFilter b1(&allocator); + int log_space_bytes1 = BlockBloomFilter::MinLogSpace(n_keys, 0.01); + ASSERT_OK(b1.Init(log_space_bytes1, FAST_HASH, 0)); + double expected_fp_rate1 = BlockBloomFilter::FalsePositiveProb(n_keys, log_space_bytes1); + ASSERT_LE(expected_fp_rate1, 0.01); + + BlockBloomFilter b2(&allocator); + int log_space_bytes2 = BlockBloomFilter::MinLogSpace(n_keys, 0.01); + ASSERT_OK(b2.Init(log_space_bytes2, FAST_HASH, 0)); + double expected_fp_rate2 = BlockBloomFilter::FalsePositiveProb(n_keys, log_space_bytes2); + ASSERT_LE(expected_fp_rate2, 0.01); + vector<uint64_t> values_int; - FillBloomFilterAndValues(n_keys, &values_int, &bfb1, &bfb2); - const Slice slice1 = bfb1.slice(); - const Slice slice2 = bfb2.slice(); - ColumnPredicate::BloomFilterInner bf1(slice1, bfb1.n_hashes(), MURMUR_HASH_2); - ColumnPredicate::BloomFilterInner bf2(slice2, bfb2.n_hashes(), MURMUR_HASH_2); - vector<ColumnPredicate::BloomFilterInner> bfs; - bfs.emplace_back(bf1); - TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), &bfs, values_int); - bfs.clear(); - bfs.emplace_back(bf1); - bfs.emplace_back(bf2); - TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), &bfs, values_int); + FillBloomFilterAndValues(n_keys, &values_int, &b1, &b2); + TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), {&b1}, values_int); + TestMergeBloomFilterCombinations(ColumnSchema("c", INT64, true), {&b1, &b2}, + values_int); // Test for STRING type. - BloomFilterBuilder bfb3( - BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01)); - double expected_fp_rate3 = bfb3.false_positive_rate(); - ASSERT_NEAR(expected_fp_rate3, 0.01, 0.002); - ASSERT_EQ(9, bfb3.n_bits() / n_keys); - // 0 1 both hit bf1 and bf2, 2 3 4 only hit bf2. + BlockBloomFilter b3(&allocator); + int log_space_bytes3 = BlockBloomFilter::MinLogSpace(n_keys, 0.01); + ASSERT_OK(b3.Init(log_space_bytes3, FAST_HASH, 0)); + double expected_fp_rate3 = BlockBloomFilter::FalsePositiveProb(n_keys, log_space_bytes3); + ASSERT_LE(expected_fp_rate3, 0.01); + vector<std::string> keys = {"0", "00", "10", "100", "1100"}; vector<Slice> keys_slice; for (int i = 0; i < keys.size(); ++i) { Slice key_slice(keys[i]); - BloomKeyProbe probe(key_slice, MURMUR_HASH_2); if (i < 2) { - bfb3.AddKey(probe); + b3.Insert(key_slice); } keys_slice.emplace_back(key_slice); } - bfs.clear(); - bfs.emplace_back(bfb3.slice(), bfb3.n_hashes(), MURMUR_HASH_2); - TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), &bfs, keys_slice); + + TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), {&b3}, keys_slice); // Test for BINARY type - BloomFilterBuilder bfb4( - BloomFilterSizing::ByCountAndFPRate(n_keys, 0.01)); - double expected_fp_rate4 = bfb4.false_positive_rate(); - ASSERT_NEAR(expected_fp_rate4, 0.01, 0.002); - ASSERT_EQ(9, bfb4.n_bits() / n_keys); + BlockBloomFilter b4(&allocator); + int log_space_bytes4 = BlockBloomFilter::MinLogSpace(n_keys, 0.01); + ASSERT_OK(b4.Init(log_space_bytes4, FAST_HASH, 0)); + double expected_fp_rate4 = BlockBloomFilter::FalsePositiveProb(n_keys, log_space_bytes4); + ASSERT_LE(expected_fp_rate4, 0.01); + vector<Slice> binary_keys = { Slice("", 0), Slice("\0", 1), Slice("\0\0", 2), Slice("\0\0\0", 3), Slice("\0\0\0\0", 4) }; for (int i = 0; i < binary_keys.size(); ++i) { - BloomKeyProbe probe(binary_keys[i], MURMUR_HASH_2); if (i < 2) { - bfb4.AddKey(probe); + b4.Insert(binary_keys[i]); } } - bfs.clear(); - bfs.emplace_back(bfb4.slice(), bfb4.n_hashes(), MURMUR_HASH_2); - TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), &bfs, binary_keys); + TestMergeBloomFilterCombinations(ColumnSchema("c", STRING, true), {&b4}, binary_keys); } // Test ColumnPredicate operator (in-)equality. diff --git a/src/kudu/common/column_predicate.cc b/src/kudu/common/column_predicate.cc index 2c84e85..74f8c69 100644 --- a/src/kudu/common/column_predicate.cc +++ b/src/kudu/common/column_predicate.cc @@ -66,15 +66,14 @@ ColumnPredicate::ColumnPredicate(PredicateType predicate_type, ColumnPredicate::ColumnPredicate(PredicateType predicate_type, ColumnSchema column, - std::vector<BloomFilterInner>* bfs, + vector<BlockBloomFilter*> bfs, const void* lower, const void* upper) : predicate_type_(predicate_type), column_(move(column)), lower_(lower), - upper_(upper) { - bloom_filters_.swap(*bfs); -} + upper_(upper), + bloom_filters_(move(bfs)) {} ColumnPredicate ColumnPredicate::Equality(ColumnSchema column, const void* value) { CHECK(value != nullptr); @@ -111,12 +110,12 @@ ColumnPredicate ColumnPredicate::InList(ColumnSchema column, } ColumnPredicate ColumnPredicate::InBloomFilter(ColumnSchema column, - std::vector<BloomFilterInner>* bfs, + std::vector<BlockBloomFilter*> bfs, const void* lower, const void* upper) { - CHECK(bfs != nullptr); - CHECK(!bfs->empty()); - ColumnPredicate pred(PredicateType::InBloomFilter, move(column), bfs, lower, upper); + CHECK(!bfs.empty()); + ColumnPredicate pred(PredicateType::InBloomFilter, move(column), move(bfs), lower, + upper); pred.Simplify(); return pred; } @@ -874,8 +873,9 @@ string ColumnPredicate::ToString() const { case PredicateType::InBloomFilter: { return strings::Substitute("`$0` IS InBloomFilter", column_.name()); }; + default: + LOG(FATAL) << "unknown predicate type"; } - LOG(FATAL) << "unknown predicate type"; } bool ColumnPredicate::operator==(const ColumnPredicate& other) const { @@ -886,7 +886,15 @@ bool ColumnPredicate::operator==(const ColumnPredicate& other) const { switch (predicate_type_) { case PredicateType::Equality: return column_.type_info()->Compare(lower_, other.lower_) == 0; case PredicateType::InBloomFilter: { - if (bloom_filters_ != other.bloom_filters()) { + if (bloom_filters_.size() != other.bloom_filters().size()) { + return false; + } + // Compare the actual BlockBloomFilters pointed by the vectors. + if (!std::equal(bloom_filters_.begin(), bloom_filters_.end(), + other.bloom_filters().begin(), + [] (const BlockBloomFilter* lhs, const BlockBloomFilter* rhs) { + return *lhs == *rhs; + })) { return false; } FALLTHROUGH_INTENDED; diff --git a/src/kudu/common/column_predicate.h b/src/kudu/common/column_predicate.h index 963a172..e94b492 100644 --- a/src/kudu/common/column_predicate.h +++ b/src/kudu/common/column_predicate.h @@ -31,8 +31,7 @@ #include "kudu/common/common.pb.h" #include "kudu/common/schema.h" #include "kudu/common/types.h" -#include "kudu/util/bloom_filter.h" -#include "kudu/util/hash.pb.h" +#include "kudu/util/block_bloom_filter.h" #include "kudu/util/slice.h" namespace kudu { @@ -82,9 +81,6 @@ enum class PredicateType { // the client side), or a scan iterator (on the server side). class ColumnPredicate { public: - - class BloomFilterInner; - // Creates a new equality predicate on the column and value. // // The value is not copied, and must outlive the returned predicate. @@ -145,8 +141,10 @@ class ColumnPredicate { // Create a new BloomFilter predicate for the column. // // The values are not copied, and must outlive the returned predicate. - static ColumnPredicate InBloomFilter(ColumnSchema column, std::vector<BloomFilterInner>* bfs, - const void* lower, const void* upper); + static ColumnPredicate InBloomFilter(ColumnSchema column, + std::vector<BlockBloomFilter*> bfs, + const void* lower, + const void* upper); // Creates a new predicate which matches no values. static ColumnPredicate None(ColumnSchema column); @@ -218,8 +216,9 @@ class ColumnPredicate { case PredicateType::InBloomFilter: { return EvaluateCellForBloomFilter<PhysicalType>(cell); }; + default: + LOG(FATAL) << "unknown predicate type"; } - LOG(FATAL) << "unknown predicate type"; } // Evaluate the predicate on a single cell. Used if the physical type is only known at run-time. @@ -261,64 +260,10 @@ class ColumnPredicate { return values_; } // Returns bloom filters if this is a bloom filter predicate. - const std::vector<BloomFilterInner>& bloom_filters() const { + const std::vector<BlockBloomFilter*>& bloom_filters() const { return bloom_filters_; } - // This class represents the bloom filter used in predicate. - class BloomFilterInner { - public: - - BloomFilterInner(Slice bloom_data, size_t nhash, HashAlgorithm hash_algorithm) : - bloom_data_(bloom_data), - nhash_(nhash), - hash_algorithm_(hash_algorithm) { - } - - BloomFilterInner() : nhash_(0), hash_algorithm_(CITY_HASH) {} - - const Slice& bloom_data() const { - return bloom_data_; - } - - size_t nhash() const { - return nhash_; - } - - HashAlgorithm hash_algorithm() const { - return hash_algorithm_; - } - - void set_nhash(size_t nhash) { - nhash_ = nhash; - } - - void set_bloom_data(Slice bloom_data) { - bloom_data_ = bloom_data; - } - - void set_hash_algorithm(HashAlgorithm hash_algorithm) { - hash_algorithm_ = hash_algorithm; - } - - bool operator==(const BloomFilterInner& other) const { - return (bloom_data_ == other.bloom_data() && - nhash_ == other.nhash() && - hash_algorithm_ == other.hash_algorithm()); - } - - private: - - // The slice of bloom filter. - Slice bloom_data_; - - // The times of hash value used in bloom filter. - size_t nhash_; - - // The hash algorithm used in bloom filter. - HashAlgorithm hash_algorithm_; - }; - private: friend class TestColumnPredicate; @@ -337,7 +282,7 @@ class ColumnPredicate { // Creates a new BloomFilter column predicate. ColumnPredicate(PredicateType predicate_type, ColumnSchema column, - std::vector<BloomFilterInner>* bfs, + std::vector<BlockBloomFilter*> bfs, const void* lower, const void* upper); @@ -383,9 +328,8 @@ class ColumnPredicate { data = slice->data(); } Slice cell_slice(reinterpret_cast<const uint8_t*>(data), size); - for (const auto& bf : bloom_filters_) { - BloomKeyProbe probe(cell_slice, bf.hash_algorithm()); - if (!BloomFilter(bf.bloom_data(), bf.nhash()).MayContainKey(probe)) { + for (const auto* bf : bloom_filters_) { + if (!bf->Find(cell_slice)) { return false; } } @@ -431,8 +375,8 @@ class ColumnPredicate { // The list of values to check column against if this is an InList predicate. std::vector<const void*> values_; - // The list of bloom filter in this predicate. - std::vector<BloomFilterInner> bloom_filters_; + // The list of bloom filters in this predicate. + std::vector<BlockBloomFilter*> bloom_filters_; }; // Compares predicates according to selectivity. Predicates that match fewer diff --git a/src/kudu/common/common.proto b/src/kudu/common/common.proto index ff3ab01..89afee2 100644 --- a/src/kudu/common/common.proto +++ b/src/kudu/common/common.proto @@ -28,6 +28,7 @@ package kudu; option java_package = "org.apache.kudu"; +import "kudu/util/block_bloom_filter.proto"; import "kudu/util/compression/compression.proto"; import "kudu/util/hash.proto"; import "kudu/util/pb_util.proto"; @@ -365,15 +366,6 @@ message ColumnPredicatePB { // The predicate column name. optional string column = 1; - // Represent a bloom filter. - message BloomFilter { - // The hash times for bloom filter. - optional int32 nhash = 1; - // The bloom filter bitmap. - optional bytes bloom_data = 2 [(kudu.REDACT) = true]; - optional HashAlgorithm hash_algorithm = 3 [default = CITY_HASH]; - } - message Range { // Bounds should be encoded as follows: @@ -410,7 +402,7 @@ message ColumnPredicatePB { message InBloomFilter { // A list of bloom filters for the field. - repeated BloomFilter bloom_filters = 1; + repeated BlockBloomFilterPB bloom_filters = 1; // Lower and Upper is optional for InBloomFilter. // When use both InBloomFilter and Range predicate for the same column the diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc index cb5294e..df00788 100644 --- a/src/kudu/common/wire_protocol-test.cc +++ b/src/kudu/common/wire_protocol-test.cc @@ -20,7 +20,6 @@ #include <algorithm> #include <cstdint> #include <cstring> -#include <memory> #include <numeric> #include <ostream> #include <string> @@ -37,11 +36,10 @@ #include "kudu/common/schema.h" #include "kudu/common/types.h" #include "kudu/common/wire_protocol.pb.h" -#include "kudu/gutil/port.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/walltime.h" #include "kudu/util/bitmap.h" -#include "kudu/util/bloom_filter.h" +#include "kudu/util/block_bloom_filter.h" #include "kudu/util/faststring.h" #include "kudu/util/hash.pb.h" #include "kudu/util/hexdump.h" @@ -55,7 +53,6 @@ using std::string; using std::tuple; -using std::unique_ptr; using std::vector; using strings::Substitute; @@ -634,45 +631,42 @@ class BFWireProtocolTest : public KuduTest { BFWireProtocolTest() : schema_({ ColumnSchema("col1", INT32)}, 1), arena_(1024), - n_keys_(100) { - bfb1_.reset(new BloomFilterBuilder(BloomFilterSizing::ByCountAndFPRate(n_keys_, 0.01))); - bfb2_.reset(new BloomFilterBuilder(BloomFilterSizing::ByCountAndFPRate(n_keys_, 0.01))); - } + allocator_(&arena_), + n_keys_(100), + b1_(&allocator_), + b2_(&allocator_) {} + + void SetUp() override { + int log_space_bytes1 = BlockBloomFilter::MinLogSpace(n_keys_, 0.01); + ASSERT_OK(b1_.Init(log_space_bytes1, FAST_HASH, 0)); + ASSERT_LE(BlockBloomFilter::FalsePositiveProb(n_keys_, log_space_bytes1), 0.01); + + int log_space_bytes2 = BlockBloomFilter::MinLogSpace(n_keys_, 0.01); + ASSERT_OK(b2_.Init(log_space_bytes2, FAST_HASH, 0)); + ASSERT_LE(BlockBloomFilter::FalsePositiveProb(n_keys_, log_space_bytes2), 0.01); - virtual void SetUp() OVERRIDE { - double expected_fp_rate1 = bfb1()->false_positive_rate(); - ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002); - ASSERT_EQ(9, bfb1()->n_bits() / n_keys_); - double expected_fp_rate2 = bfb2()->false_positive_rate(); - ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002); - ASSERT_EQ(9, bfb2()->n_bits() / n_keys_); for (int i = 0; i < n_keys_; ++i) { Slice key_slice(reinterpret_cast<const uint8_t*>(&i), sizeof(i)); - BloomKeyProbe probe(key_slice, MURMUR_HASH_2); - bfb1()->AddKey(probe); - bfb2()->AddKey(probe); + b1_.Insert(key_slice); + b2_.Insert(key_slice); } } - BloomFilterBuilder* bfb1() const { return bfb1_.get(); } - - BloomFilterBuilder* bfb2() const { return bfb1_.get(); } - protected: Schema schema_; Arena arena_; + ArenaBlockBloomFilterBufferAllocator allocator_; int n_keys_; - unique_ptr<BloomFilterBuilder> bfb1_; - unique_ptr<BloomFilterBuilder> bfb2_; + BlockBloomFilter b1_; + BlockBloomFilter b2_; }; TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilter) { boost::optional<ColumnPredicate> predicate; ColumnSchema col1 = schema_.column(0); { // Single BloomFilter predicate. - vector<kudu::ColumnPredicate::BloomFilterInner> bfs; - bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); - kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, nullptr); + kudu::ColumnPredicate ibf = + kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, nullptr, nullptr); ColumnPredicatePB pb; NO_FATALS(ColumnPredicateToPB(ibf, &pb)); ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); @@ -681,10 +675,8 @@ TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilter) { } { // Multi BloomFilter predicate. - vector<kudu::ColumnPredicate::BloomFilterInner> bfs; - bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); - bfs.emplace_back(bfb2()->slice(), bfb2()->n_hashes(), MURMUR_HASH_2); - kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, nullptr); + kudu::ColumnPredicate ibf = + kudu::ColumnPredicate::InBloomFilter(col1, {&b1_, &b2_}, nullptr, nullptr); ColumnPredicatePB pb; NO_FATALS(ColumnPredicateToPB(ibf, &pb)); ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); @@ -698,9 +690,7 @@ TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilterWithBound) { ColumnSchema col1 = schema_.column(0); { // Simply BloomFilter with lower bound. int lower = 1; - vector<kudu::ColumnPredicate::BloomFilterInner> bfs; - bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); - kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, nullptr); + auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, &lower, nullptr); ColumnPredicatePB pb; NO_FATALS(ColumnPredicateToPB(ibf, &pb)); ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); @@ -710,9 +700,7 @@ TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilterWithBound) { { // Single bloom filter with upper bound. int upper = 4; - vector<kudu::ColumnPredicate::BloomFilterInner> bfs; - bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); - kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, nullptr, &upper); + auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, nullptr, &upper); ColumnPredicatePB pb; NO_FATALS(ColumnPredicateToPB(ibf, &pb)); ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); @@ -723,9 +711,7 @@ TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilterWithBound) { { // Single bloom filter with both lower and upper bound. int lower = 1; int upper = 4; - vector<kudu::ColumnPredicate::BloomFilterInner> bfs; - bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); - kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, &upper); + auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, &lower, &upper); ColumnPredicatePB pb; NO_FATALS(ColumnPredicateToPB(ibf, &pb)); ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); @@ -736,10 +722,8 @@ TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilterWithBound) { { // Multi bloom filter with both lower and upper bound. int lower = 1; int upper = 4; - vector<kudu::ColumnPredicate::BloomFilterInner> bfs; - bfs.emplace_back(bfb1()->slice(), bfb1()->n_hashes(), MURMUR_HASH_2); - bfs.emplace_back(bfb2()->slice(), bfb2()->n_hashes(), MURMUR_HASH_2); - kudu::ColumnPredicate ibf = kudu::ColumnPredicate::InBloomFilter(col1, &bfs, &lower, &upper); + auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_, &b2_}, &lower, + &upper); ColumnPredicatePB pb; NO_FATALS(ColumnPredicateToPB(ibf, &pb)); ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate)); diff --git a/src/kudu/common/wire_protocol.cc b/src/kudu/common/wire_protocol.cc index 5c473a3..e3101a4 100644 --- a/src/kudu/common/wire_protocol.cc +++ b/src/kudu/common/wire_protocol.cc @@ -47,9 +47,9 @@ #include "kudu/gutil/strings/substitute.h" #include "kudu/gutil/walltime.h" #include "kudu/util/bitmap.h" +#include "kudu/util/block_bloom_filter.h" #include "kudu/util/compression/compression.pb.h" #include "kudu/util/faststring.h" -#include "kudu/util/hash.pb.h" #include "kudu/util/memory/arena.h" #include "kudu/util/net/net_util.h" #include "kudu/util/net/sockaddr.h" @@ -57,6 +57,10 @@ #include "kudu/util/safe_math.h" #include "kudu/util/slice.h" +namespace kudu { +class BlockBloomFilterPB; +} // namespace kudu + using google::protobuf::Map; using google::protobuf::RepeatedPtrField; using kudu::pb_util::SecureDebugString; @@ -448,16 +452,6 @@ void CopyPredicateBoundToPB(const ColumnSchema& col, const void* bound_src, stri bound_dst->assign(reinterpret_cast<const char*>(src), size); } -// Copies a predicate bloom filter data from 'bf_src' into 'bf_dst'. -void CopyPredicateBloomFilterToPB(const ColumnPredicate::BloomFilterInner& bf_src, - ColumnPredicatePB::BloomFilter* bf_dst) { - bf_dst->set_nhash(bf_src.nhash()); - const void* src = bf_src.bloom_data().data(); - size_t size = bf_src.bloom_data().size(); - bf_dst->mutable_bloom_data()->assign(reinterpret_cast<const char*>(src), size); - bf_dst->set_hash_algorithm(bf_src.hash_algorithm()); -} - // Extract a void* pointer suitable for use in a ColumnRangePredicate from the // string protobuf bound. This validates that the pb_value has the correct // length, copies the data into 'arena', and sets *result to point to it. @@ -489,20 +483,6 @@ Status CopyPredicateBoundFromPB(const ColumnSchema& schema, return Status::OK(); } -// Extract BloomFilterInner from bloom data for ColumnBloomFilterPredicate. -Status CopyPredicateBloomFilterFromPB(const ColumnPredicatePB::BloomFilter& bf_src, - ColumnPredicate::BloomFilterInner* dst_src, - Arena* arena) { - size_t bloom_data_size = bf_src.bloom_data().size(); - dst_src->set_nhash(bf_src.nhash()); - // Copy the data from the protobuf into the Arena. - uint8_t* data_copy = static_cast<uint8_t*>(arena->AllocateBytes(bloom_data_size)); - memcpy(data_copy, bf_src.bloom_data().data(), bloom_data_size); - dst_src->set_bloom_data(Slice(data_copy, bloom_data_size)); - dst_src->set_hash_algorithm(bf_src.hash_algorithm()); - return Status::OK(); -} - } // anonymous namespace void ColumnPredicateToPB(const ColumnPredicate& predicate, @@ -547,9 +527,9 @@ void ColumnPredicateToPB(const ColumnPredicate& predicate, case PredicateType::None: LOG(FATAL) << "None predicate may not be converted to protobuf"; case PredicateType::InBloomFilter: { auto* bloom_filter_pred = pb->mutable_in_bloom_filter(); - for (const auto& bf : predicate.bloom_filters()) { - ColumnPredicatePB::BloomFilter* bloom_filter = bloom_filter_pred->add_bloom_filters(); - CopyPredicateBloomFilterToPB(bf, bloom_filter); + for (const auto* bf_src : predicate.bloom_filters()) { + BlockBloomFilterPB* bf_dst = bloom_filter_pred->add_bloom_filters(); + bf_src->CopyToPB(bf_dst); } // Form the optional lower and upper bound. if (predicate.raw_lower() != nullptr) { @@ -634,22 +614,19 @@ Status ColumnPredicateFromPB(const Schema& schema, }; case ColumnPredicatePB::kInBloomFilter: { const auto& in_bloom_filter = pb.in_bloom_filter(); - vector<ColumnPredicate::BloomFilterInner> bloom_filters; + vector<BlockBloomFilter*> bloom_filters; if (in_bloom_filter.bloom_filters_size() == 0) { - return Status::InvalidArgument("Invalid in bloom filter predicate on column: " - "no bloom filter contained", col.name()); + return Status::InvalidArgument( + Substitute("Invalid bloom filter predicate on column: $0. " + "No bloom filters supplied", col.name())); } - for (const auto& bf : in_bloom_filter.bloom_filters()) { - if (!bf.has_nhash() - || !bf.has_bloom_data() - || !bf.has_hash_algorithm() - || bf.hash_algorithm() == UNKNOWN_HASH) { - return Status::InvalidArgument("Invalid in bloom filter predicate on column: " - "missing bloom filter details", col.name()); - } - ColumnPredicate::BloomFilterInner bloom_filter; - RETURN_NOT_OK(CopyPredicateBloomFilterFromPB(bf, &bloom_filter, arena)); - bloom_filters.emplace_back(bloom_filter); + auto* allocator = arena->NewObject<ArenaBlockBloomFilterBufferAllocator>(arena); + for (const auto& bf_src : in_bloom_filter.bloom_filters()) { + auto* block_bloom_filter = arena->NewObject<BlockBloomFilter>(allocator); + RETURN_NOT_OK_PREPEND( + block_bloom_filter->InitFromPB(bf_src), + Substitute("Failed to initialize bloom filter predicate on column: $0", col.name())); + bloom_filters.emplace_back(block_bloom_filter); } // Extract the optional lower and upper bound. const void* lower = nullptr; @@ -660,7 +637,8 @@ Status ColumnPredicateFromPB(const Schema& schema, if (in_bloom_filter.has_upper()) { RETURN_NOT_OK(CopyPredicateBoundFromPB(col, in_bloom_filter.upper(), arena, &upper)); } - *predicate = ColumnPredicate::InBloomFilter(col, &bloom_filters, lower, upper); + *predicate = ColumnPredicate::InBloomFilter(col, std::move(bloom_filters), lower, + upper); break; }; default: return Status::InvalidArgument("Unknown predicate type for column", col.name()); diff --git a/src/kudu/tablet/cfile_set-test.cc b/src/kudu/tablet/cfile_set-test.cc index c074e9d..8838ce2 100644 --- a/src/kudu/tablet/cfile_set-test.cc +++ b/src/kudu/tablet/cfile_set-test.cc @@ -24,10 +24,10 @@ #include <memory> #include <ostream> #include <string> +#include <utility> #include <vector> #include <gflags/gflags.h> -#include <gflags/gflags_declare.h> #include <glog/logging.h> #include <gtest/gtest.h> @@ -50,8 +50,10 @@ #include "kudu/tablet/diskrowset.h" #include "kudu/tablet/tablet-test-util.h" #include "kudu/util/auto_release_pool.h" +#include "kudu/util/block_bloom_filter.h" #include "kudu/util/bloom_filter.h" #include "kudu/util/hash.pb.h" +#include "kudu/util/hash_util.h" #include "kudu/util/mem_tracker.h" #include "kudu/util/memory/arena.h" #include "kudu/util/slice.h" @@ -97,9 +99,9 @@ class TestCFileSet : public KuduRowSetTest { RowBuilder rb(&schema_); for (int i = 0; i < nrows; i++) { rb.Reset(); - rb.AddInt32(i * 2); - rb.AddInt32(i * 10); - rb.AddInt32(i * 100); + rb.AddInt32(i * kRatio[0]); + rb.AddInt32(i * kRatio[1]); + rb.AddInt32(i * kRatio[2]); ASSERT_OK_FAST(WriteRow(rb.data(), &rsw)); } ASSERT_OK(rsw.Finish()); @@ -110,27 +112,26 @@ class TestCFileSet : public KuduRowSetTest { // bf1_exclude: 1 3 5 7 9 ... (2n + 1)th key for column 1 to form bloom filter. // bf2_contain: 0 2 4 6 8 ... (2n)th key for column 2 to form bloom filter. // bf2_exclude: 1 3 5 7 9 ... (2n + 1)th key for column 2 to form bloom filter. - void FillBloomFilter(int nrows, - BloomFilterBuilder* bf1_contain, - BloomFilterBuilder* bf1_exclude, - BloomFilterBuilder* bf2_contain, - BloomFilterBuilder* bf2_exclude) { - int ratio[] = {2, 10, 100}; + static void FillBloomFilter(int nrows, + BlockBloomFilter* bf1_contain, + BlockBloomFilter* bf1_exclude, + BlockBloomFilter* bf2_contain, + BlockBloomFilter* bf2_exclude) { bool add = true; for (int i = 0; i < nrows; ++i) { - int curr1 = i * ratio[0]; - int curr2 = i * ratio[1]; + int curr1 = i * kRatio[0]; + int curr2 = i * kRatio[1]; Slice first(reinterpret_cast<const uint8_t*>(&curr1), sizeof(curr1)); Slice second(reinterpret_cast<const uint8_t*>(&curr2), sizeof(curr2)); - BloomKeyProbe probe1(first, MURMUR_HASH_2); - BloomKeyProbe probe2(second, MURMUR_HASH_2); + uint32_t hash1 = HashUtil::ComputeHash32(first, FAST_HASH, 0); + uint32_t hash2 = HashUtil::ComputeHash32(second, FAST_HASH, 0); if (add) { - bf1_contain->AddKey(probe1); - bf2_contain->AddKey(probe2); + bf1_contain->Insert(hash1); + bf2_contain->Insert(hash2); } else { - bf1_exclude->AddKey(probe1); - bf2_exclude->AddKey(probe2); + bf1_exclude->Insert(hash1); + bf2_exclude->Insert(hash2); } add = !add; } @@ -143,32 +144,33 @@ class TestCFileSet : public KuduRowSetTest { // ret2_exclude: to get the key hits in bf2_exclude for column 2. // In some case key may hit both contain and exclude bloom filter // so we get accurate item hits the bloom filter for test behind. - void GetBloomFilterResult(int nrows, BloomFilterBuilder* bf1_contain, - BloomFilterBuilder* bf1_exclude, - BloomFilterBuilder* bf2_contain, - BloomFilterBuilder* bf2_exclude, - vector<size_t>* ret1_contain, - vector<size_t>* ret1_exclude, - vector<size_t>* ret2_contain, - vector<size_t>* ret2_exclude) { - int ratio[] = {2, 10, 100}; + static void GetBloomFilterResult(int nrows, + BlockBloomFilter* bf1_contain, + BlockBloomFilter* bf1_exclude, + BlockBloomFilter* bf2_contain, + BlockBloomFilter* bf2_exclude, + vector<size_t>* ret1_contain, + vector<size_t>* ret1_exclude, + vector<size_t>* ret2_contain, + vector<size_t>* ret2_exclude) { for (int i = 0; i < nrows; ++i) { - int curr1 = i * ratio[0]; - int curr2 = i * ratio[1]; + int curr1 = i * kRatio[0]; + int curr2 = i * kRatio[1]; Slice first(reinterpret_cast<const uint8_t*>(&curr1), sizeof(curr1)); Slice second(reinterpret_cast<const uint8_t*>(&curr2), sizeof(curr2)); - BloomKeyProbe probe1(first, MURMUR_HASH_2); - BloomKeyProbe probe2(second, MURMUR_HASH_2); - if (BloomFilter(bf1_contain->slice(), bf1_contain->n_hashes()).MayContainKey(probe1)) { + uint32_t hash1 = HashUtil::ComputeHash32(first, FAST_HASH, 0); + uint32_t hash2 = HashUtil::ComputeHash32(second, FAST_HASH, 0); + + if (bf1_contain->Find(hash1)) { ret1_contain->push_back(i); } - if (BloomFilter(bf1_exclude->slice(), bf1_exclude->n_hashes()).MayContainKey(probe1)) { + if (bf1_exclude->Find(hash1)) { ret1_exclude->push_back(i); } - if (BloomFilter(bf2_contain->slice(), bf2_contain->n_hashes()).MayContainKey(probe2)) { + if (bf2_contain->Find(hash2)) { ret2_contain->push_back(i); } - if (BloomFilter(bf2_exclude->slice(), bf2_exclude->n_hashes()).MayContainKey(probe2)) { + if (bf2_exclude->Find(hash2)) { ret2_exclude->push_back(i); } } @@ -234,12 +236,12 @@ class TestCFileSet : public KuduRowSetTest { if (block.selection_vector()->IsRowSelected(i)) { RowBlockRow row = block.row(i); size_t index = row.row_index(); - vector<size_t>::iterator iter = std::find(target.begin(), target.end(), index); - if (iter == target.end()) { + auto target_iter = std::find(target.begin(), target.end(), index); + if (target_iter == target.end()) { FAIL() << "Row " << schema_.DebugRow(row) << " should not have " << "passed predicate "; } - target.erase(iter); + target.erase(target_iter); } } } @@ -265,6 +267,8 @@ class TestCFileSet : public KuduRowSetTest { return attr; } + static constexpr int kRatio[] = {2, 10, 100}; + protected: static const int32_t kNoBound; google::FlagSaver saver; @@ -471,68 +475,62 @@ TEST_F(TestCFileSet, TestRangePredicates2) { TEST_F(TestCFileSet, TestBloomFilterPredicates) { const int kNumRows = 100; - BloomFilterBuilder bfb1_contain( - BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01)); - double expected_fp_rate1 = bfb1_contain.false_positive_rate(); - ASSERT_NEAR(expected_fp_rate1, 0.01, 0.002); - ASSERT_EQ(9, bfb1_contain.n_bits() / kNumRows); - - BloomFilterBuilder bfb1_exclude( - BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01)); - double expected_fp_rate11 = bfb1_exclude.false_positive_rate(); - ASSERT_NEAR(expected_fp_rate11, 0.01, 0.002); - ASSERT_EQ(9, bfb1_exclude.n_bits() / kNumRows); - - BloomFilterBuilder bfb2_contain( - BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01)); - double expected_fp_rate2 = bfb2_contain.false_positive_rate(); - ASSERT_NEAR(expected_fp_rate2, 0.01, 0.002); - ASSERT_EQ(9, bfb2_contain.n_bits() / kNumRows); - - BloomFilterBuilder bfb2_exclude( - BloomFilterSizing::ByCountAndFPRate(kNumRows, 0.01)); - double expected_fp_rate22 = bfb2_exclude.false_positive_rate(); - ASSERT_NEAR(expected_fp_rate22, 0.01, 0.002); - ASSERT_EQ(9, bfb2_exclude.n_bits() / kNumRows); + Arena arena(1024); + ArenaBlockBloomFilterBufferAllocator allocator(&arena); + + BlockBloomFilter bf1_contain(&allocator); + int log_space_bytes1 = BlockBloomFilter::MinLogSpace(kNumRows, 0.01); + ASSERT_OK(bf1_contain.Init(log_space_bytes1, FAST_HASH, 0)); + double expected_fp_rate1 = BlockBloomFilter::FalsePositiveProb(kNumRows, log_space_bytes1); + ASSERT_LE(expected_fp_rate1, 0.01); + + BlockBloomFilter bf1_exclude(&allocator); + int log_space_bytes11 = BlockBloomFilter::MinLogSpace(kNumRows, 0.01); + ASSERT_OK(bf1_exclude.Init(log_space_bytes11, FAST_HASH, 0)); + double expected_fp_rate11 = BlockBloomFilter::FalsePositiveProb(kNumRows, log_space_bytes11); + ASSERT_LE(expected_fp_rate11, 0.01); + + BlockBloomFilter bf2_contain(&allocator); + int log_space_bytes2 = BlockBloomFilter::MinLogSpace(kNumRows, 0.01); + ASSERT_OK(bf2_contain.Init(log_space_bytes2, FAST_HASH, 0)); + double expected_fp_rate2 = BlockBloomFilter::FalsePositiveProb(kNumRows, log_space_bytes2); + ASSERT_LE(expected_fp_rate2, 0.01); + + BlockBloomFilter bf2_exclude(&allocator); + int log_space_bytes22 = BlockBloomFilter::MinLogSpace(kNumRows, 0.01); + ASSERT_OK(bf2_exclude.Init(log_space_bytes22, FAST_HASH, 0)); + double expected_fp_rate22 = BlockBloomFilter::FalsePositiveProb(kNumRows, log_space_bytes22); + ASSERT_LE(expected_fp_rate22, 0.01); WriteTestRowSet(kNumRows); vector<size_t> ret1_contain; vector<size_t> ret1_exclude; vector<size_t> ret2_contain; vector<size_t> ret2_exclude; - FillBloomFilter(kNumRows, &bfb1_contain, &bfb1_exclude, &bfb2_contain, &bfb2_exclude); - GetBloomFilterResult(kNumRows, &bfb1_contain, &bfb1_exclude, &bfb2_contain, &bfb2_exclude, - &ret1_contain, &ret1_exclude, &ret2_contain, &ret2_exclude); + FillBloomFilter(kNumRows, &bf1_contain, &bf1_exclude, &bf2_contain, &bf2_exclude); + GetBloomFilterResult(kNumRows, &bf1_contain, &bf1_exclude, &bf2_contain, + &bf2_exclude, &ret1_contain, &ret1_exclude, &ret2_contain, + &ret2_exclude); shared_ptr<CFileSet> fileset; ASSERT_OK(CFileSet::Open(rowset_meta_, MemTracker::GetRootTracker(), MemTracker::GetRootTracker(), nullptr, &fileset)); - vector<ColumnPredicate::BloomFilterInner> bfs; + // BloomFilter of column 0 contain. - ColumnPredicate::BloomFilterInner bf1_contain(bfb1_contain.slice(), - bfb1_contain.n_hashes(), MURMUR_HASH_2); - bfs.push_back(bf1_contain); - auto pred1_contain = ColumnPredicate::InBloomFilter(schema_.column(0), &bfs, nullptr, nullptr); + auto pred1_contain = ColumnPredicate::InBloomFilter(schema_.column(0), {&bf1_contain}, + nullptr, nullptr); DoTestBloomFilterScan(fileset, { pred1_contain }, ret1_contain); // BloomFilter of column 1 contain. - ColumnPredicate::BloomFilterInner bf2_contain(bfb2_contain.slice(), - bfb2_contain.n_hashes(), MURMUR_HASH_2); - bfs.clear(); - bfs.push_back(bf2_contain); - auto pred2_contain = ColumnPredicate::InBloomFilter(schema_.column(1), &bfs, nullptr, nullptr); + auto pred2_contain = ColumnPredicate::InBloomFilter(schema_.column(1), {&bf2_contain}, + nullptr, nullptr); DoTestBloomFilterScan(fileset, { pred2_contain }, ret2_contain); // BloomFilter of column 0 contain and exclude. - ColumnPredicate::BloomFilterInner bf1_exclude(bfb1_exclude.slice(), - bfb1_exclude.n_hashes(), MURMUR_HASH_2); - bfs.clear(); - bfs.push_back(bf1_contain); - bfs.push_back(bf1_exclude); vector<size_t> ret1_contain_exclude; - auto pred1_contain_exclude = ColumnPredicate::InBloomFilter(schema_.column(0), - &bfs, nullptr, nullptr); + auto pred1_contain_exclude = ColumnPredicate::InBloomFilter( + schema_.column(0), {&bf1_contain, &bf1_exclude}, nullptr, nullptr); std::set_intersection(ret1_contain.begin(), ret1_contain.end(), ret1_exclude.begin(), ret1_exclude.end(), std::back_inserter(ret1_contain_exclude)); DoTestBloomFilterScan(fileset, { pred1_contain_exclude }, ret1_contain_exclude); @@ -548,19 +546,18 @@ TEST_F(TestCFileSet, TestBloomFilterPredicates) { int32_t lower_row_index = lower / 2; int32_t upper_row_index = upper / 2; vector<size_t> ret1_contain_range = ret1_contain; - vector<size_t>::iterator left = std::lower_bound(ret1_contain_range.begin(), - ret1_contain_range.end(), lower_row_index); + auto left = std::lower_bound(ret1_contain_range.begin(), + ret1_contain_range.end(), lower_row_index); ret1_contain_range.erase(ret1_contain_range.begin(), left); // don't erase left - vector<size_t>::iterator right = std::lower_bound(ret1_contain_range.begin(), - ret1_contain_range.end(), upper_row_index); + auto right = std::lower_bound(ret1_contain_range.begin(), + ret1_contain_range.end(), upper_row_index); ret1_contain_range.erase(right, ret1_contain_range.end()); // earse right auto range = ColumnPredicate::Range(schema_.column(0), &lower, &upper); DoTestBloomFilterScan(fileset, { pred1_contain, range }, ret1_contain_range); // BloomFilter of column 0 contain with Range with column. - bfs.clear(); - bfs.push_back(bf1_contain); - auto bf_with_range = ColumnPredicate::InBloomFilter(schema_.column(0), &bfs, &lower, &upper); + auto bf_with_range = ColumnPredicate::InBloomFilter(schema_.column(0), {&bf1_contain}, + &lower, &upper); DoTestBloomFilterScan(fileset, { bf_with_range }, ret1_contain_range); } diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt index a67fc3e..e4a5689 100644 --- a/src/kudu/util/CMakeLists.txt +++ b/src/kudu/util/CMakeLists.txt @@ -16,6 +16,20 @@ # under the License. ####################################### +# block_bloom_filter_proto +####################################### + +PROTOBUF_GENERATE_CPP( + BLOCK_BLOOM_FILTER_PROTO_SRCS BLOCK_BLOOM_FILTER_PROTO_HDRS BLOCK_BLOOM_FILTER_PROTO_TGTS + SOURCE_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/../.. + BINARY_ROOT ${CMAKE_CURRENT_BINARY_DIR}/../.. + PROTO_FILES block_bloom_filter.proto) +ADD_EXPORTABLE_LIBRARY(block_bloom_filter_proto + SRCS ${BLOCK_BLOOM_FILTER_PROTO_SRCS} + DEPS protobuf + NONLINK_DEPS ${BLOCK_BLOOM_FILTER_PROTO_TGTS}) + +####################################### # util_compression_proto ####################################### @@ -265,6 +279,7 @@ else() endif() set(UTIL_LIBS + block_bloom_filter_proto crcutil gflags glog diff --git a/src/kudu/util/block_bloom_filter-test.cc b/src/kudu/util/block_bloom_filter-test.cc index a7ebe11..8af5cc8 100644 --- a/src/kudu/util/block_bloom_filter-test.cc +++ b/src/kudu/util/block_bloom_filter-test.cc @@ -30,6 +30,7 @@ #include <glog/logging.h> #include <gtest/gtest.h> +#include "kudu/util/hash.pb.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" @@ -59,7 +60,7 @@ class BlockBloomFilterTest : public KuduTest { unique_ptr<BlockBloomFilter> bf( new BlockBloomFilter(DefaultBlockBloomFilterBufferAllocator::GetSingleton())); - CHECK_OK(bf->Init(log_space_bytes)); + CHECK_OK(bf->Init(log_space_bytes, FAST_HASH, 0)); bloom_filters_.emplace_back(move(bf)); return bloom_filters_.back().get(); } @@ -85,12 +86,19 @@ TEST_F(BlockBloomFilterTest, InvalidSpace) { BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton()); // Random number in the range [38, 64). const int log_space_bytes = 38 + rand() % (64 - 38); - Status s = bf.Init(log_space_bytes); + Status s = bf.Init(log_space_bytes, FAST_HASH, 0); ASSERT_TRUE(s.IsInvalidArgument()); ASSERT_STR_CONTAINS(s.ToString(), "Bloom filter too large"); bf.Close(); } +TEST_F(BlockBloomFilterTest, InvalidHashAlgorithm) { + BlockBloomFilter bf(DefaultBlockBloomFilterBufferAllocator::GetSingleton()); + Status s = bf.Init(4, UNKNOWN_HASH, 0); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_STR_CONTAINS(s.ToString(), "Invalid/Unsupported hash algorithm"); +} + // We can Insert() hashes into a Bloom filter with different spaces. TEST_F(BlockBloomFilterTest, Insert) { for (int i = 13; i < 17; ++i) { diff --git a/src/kudu/util/block_bloom_filter.cc b/src/kudu/util/block_bloom_filter.cc index 4112f8f..90b0a3d 100644 --- a/src/kudu/util/block_bloom_filter.cc +++ b/src/kudu/util/block_bloom_filter.cc @@ -24,13 +24,17 @@ #include <cmath> #include <cstdlib> #include <cstring> -#include <ostream> +#include <string> #include <gflags/gflags.h> #include "kudu/gutil/singleton.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/util/block_bloom_filter.pb.h" #include "kudu/util/flag_tags.h" +#include "kudu/util/memory/arena.h" + +using strings::Substitute; DEFINE_bool(disable_blockbloomfilter_avx2, false, "Disable AVX2 operations in BlockBloomFilter. This flag has no effect if the target " @@ -48,7 +52,9 @@ BlockBloomFilter::BlockBloomFilter(BlockBloomFilterBufferAllocatorIf* buffer_all buffer_allocator_(buffer_allocator), log_num_buckets_(0), directory_mask_(0), - directory_(nullptr) { + directory_(nullptr), + hash_algorithm_(UNKNOWN_HASH), + hash_seed_(0) { #ifdef USE_AVX2 if (has_avx2()) { bucket_insert_func_ptr_ = &BlockBloomFilter::BucketInsertAVX2; @@ -64,11 +70,16 @@ BlockBloomFilter::BlockBloomFilter(BlockBloomFilterBufferAllocatorIf* buffer_all } BlockBloomFilter::~BlockBloomFilter() { - DCHECK(directory_ == nullptr) << - "Close() should have been called before the object is destroyed."; + Close(); } -Status BlockBloomFilter::Init(const int log_space_bytes) { +Status BlockBloomFilter::InitInternal(const int log_space_bytes, + HashAlgorithm hash_algorithm, + uint32_t hash_seed) { + if (!HashUtil::IsComputeHash32Available(hash_algorithm)) { + return Status::InvalidArgument( + Substitute("Invalid/Unsupported hash algorithm $0", HashAlgorithm_Name(hash_algorithm))); + } // Since log_space_bytes is in bytes, we need to convert it to the number of tiny // Bloom filters we will use. log_num_buckets_ = std::max(1, log_space_bytes - kLogBucketByteSize); @@ -76,7 +87,7 @@ Status BlockBloomFilter::Init(const int log_space_bytes) { // must be limited. if (log_num_buckets_ > 32) { return Status::InvalidArgument( - strings::Substitute("Bloom filter too large. log_space_bytes: $0", log_space_bytes)); + Substitute("Bloom filter too large. log_space_bytes: $0", log_space_bytes)); } // Don't use log_num_buckets_ if it will lead to undefined behavior by a shift // that is too large. @@ -86,7 +97,37 @@ Status BlockBloomFilter::Init(const int log_space_bytes) { Close(); // Ensure that any previously allocated memory for directory_ is released. RETURN_NOT_OK(buffer_allocator_->AllocateBuffer(alloc_size, reinterpret_cast<void**>(&directory_))); - memset(directory_, 0, alloc_size); + hash_algorithm_ = hash_algorithm; + hash_seed_ = hash_seed; + return Status::OK(); +} + +Status BlockBloomFilter::Init(const int log_space_bytes, HashAlgorithm hash_algorithm, + uint32_t hash_seed) { + RETURN_NOT_OK(InitInternal(log_space_bytes, hash_algorithm, hash_seed)); + DCHECK_NOTNULL(directory_); + memset(directory_, 0, directory_size()); + always_false_ = true; + return Status::OK(); +} + +Status BlockBloomFilter::InitFromPB(const BlockBloomFilterPB& bf_src) { + if (!bf_src.has_log_space_bytes() || !bf_src.has_bloom_data() || + !bf_src.has_hash_algorithm() || !bf_src.has_always_false()) { + return Status::InvalidArgument("Missing arguments to initialize BlockBloomFilter."); + } + + RETURN_NOT_OK(InitInternal(bf_src.log_space_bytes(), bf_src.hash_algorithm(), + bf_src.hash_seed())); + DCHECK_NOTNULL(directory_); + + if (directory_size() != bf_src.bloom_data().size()) { + return Status::InvalidArgument( + Substitute("Mismatch in BlockBloomFilter source directory size $0 and expected size $1", + bf_src.bloom_data().size(), directory_size())); + } + memcpy(directory_, bf_src.bloom_data().data(), bf_src.bloom_data().size()); + always_false_ = bf_src.always_false(); return Status::OK(); } @@ -182,10 +223,31 @@ bool BlockBloomFilter::Find(const uint32_t hash) const noexcept { return (this->*bucket_find_func_ptr_)(bucket_idx, hash); } +void BlockBloomFilter::CopyToPB(BlockBloomFilterPB* bf_dst) const { + bf_dst->mutable_bloom_data()->assign(reinterpret_cast<const char*>(directory_), directory_size()); + bf_dst->set_log_space_bytes(log_space_bytes()); + bf_dst->set_always_false(always_false_); + bf_dst->set_hash_algorithm(hash_algorithm_); + bf_dst->set_hash_seed(hash_seed_); +} + +bool BlockBloomFilter::operator==(const BlockBloomFilter& rhs) const { + return always_false_ == rhs.always_false_ && + directory_mask_ == rhs.directory_mask_ && + directory_size() == rhs.directory_size() && + hash_algorithm_ == rhs.hash_algorithm_ && + hash_seed_ == rhs.hash_seed_ && + memcmp(directory_, rhs.directory_, directory_size()) == 0; +} + +bool BlockBloomFilter::operator!=(const BlockBloomFilter& rhs) const { + return !(rhs == *this); +} + Status DefaultBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) { int ret_code = posix_memalign(ptr, CACHELINE_SIZE, bytes); return ret_code == 0 ? Status::OK() : - Status::RuntimeError(strings::Substitute("bad_alloc. bytes: $0", bytes)); + Status::RuntimeError(Substitute("bad_alloc. bytes: $0", bytes)); } void DefaultBlockBloomFilterBufferAllocator::FreeBuffer(void* ptr) { @@ -196,4 +258,15 @@ DefaultBlockBloomFilterBufferAllocator* DefaultBlockBloomFilterBufferAllocator:: return Singleton<DefaultBlockBloomFilterBufferAllocator>::get(); } +Status ArenaBlockBloomFilterBufferAllocator::AllocateBuffer(size_t bytes, void** ptr) { + DCHECK_NOTNULL(arena_); + // 16-bytes is the max alignment supported in arena currently whereas CACHELINE_SIZE + // is typically 64 bytes on modern CPUs. + // TODO(bankim): Needs investigation to support larger alignment values. + *ptr = arena_->AllocateBytesAligned(bytes, 16); + return *ptr == nullptr ? + Status::RuntimeError(Substitute("Arena bad_alloc. bytes: $0", bytes)) : + Status::OK(); +} + } // namespace kudu diff --git a/src/kudu/util/block_bloom_filter.h b/src/kudu/util/block_bloom_filter.h index 2ae7594..9b5f9be 100644 --- a/src/kudu/util/block_bloom_filter.h +++ b/src/kudu/util/block_bloom_filter.h @@ -28,8 +28,16 @@ #include "kudu/gutil/cpu.h" #include "kudu/gutil/macros.h" #include "kudu/gutil/port.h" +#include "kudu/util/hash.pb.h" +#include "kudu/util/hash_util.h" +#include "kudu/util/slice.h" #include "kudu/util/status.h" +namespace kudu { +class Arena; +class BlockBloomFilterPB; +} // namespace kudu + DECLARE_bool(disable_blockbloomfilter_avx2); namespace kudu { @@ -68,25 +76,39 @@ class BlockBloomFilter { // Reset the filter state, allocate/reallocate the internal data structures. // All calls to Insert() and Find() should only be done between the calls to Init() and - // Close().Init and Close are safe to call multiple times. - Status Init(int log_space_bytes); + // Close(). Init and Close are safe to call multiple times. + // BlockBloomFilter offers convenience of both directly inserting 32-bit integers + // and letting the Insert()/Find() hash the keys. To avoid mistakes wherein + // caller is using specific hash function and directly inserts 32-bit hash values + // but misses specifying the hash function in Init() call, default values are not used. + // Parameters: + // "log_space_bytes": Log2 of the space in bytes for the BloomFilter. + // "hash_algorithm": Hash algorithm used to hash the keys to 32-bit integers prior to doing + // Insert() or Find(). + // "hash_seed": Seed used to hash the keys. + Status Init(int log_space_bytes, HashAlgorithm hash_algorithm, uint32_t hash_seed); + // Initialize the BlockBloomFilter by de-serializing the protobuf message. + Status InitFromPB(const BlockBloomFilterPB& bf_src); void Close(); - // Representation of a filter which allows all elements to pass. - static constexpr BlockBloomFilter* const kAlwaysTrueFilter = nullptr; - - bool AlwaysFalse() const { return always_false_; } - // Adds an element to the BloomFilter. The function used to generate 'hash' need not // have good uniformity, but it should have low collision probability. For instance, if // the set of values is 32-bit ints, the identity function is a valid hash function for // this Bloom filter, since the collision probability (the probability that two // non-equal values will have the same hash value) is 0. void Insert(uint32_t hash) noexcept; + // Same as above with convenience of hashing the key. + void Insert(const Slice& key) noexcept { + Insert(HashUtil::ComputeHash32(key, hash_algorithm_, hash_seed_)); + } // Finds an element in the BloomFilter, returning true if it is found and false (with // high probability) if it is not. bool Find(uint32_t hash) const noexcept; + // Same as above with convenience of hashing the key. + bool Find(const Slice& key) const noexcept { + return Find(HashUtil::ComputeHash32(key, hash_algorithm_, hash_seed_)); + } // As more distinct items are inserted into a BloomFilter, the false positive rate // rises. MaxNdv() returns the NDV (number of distinct values) at which a BloomFilter @@ -110,6 +132,12 @@ class BlockBloomFilter { return sizeof(Bucket) * (1LL << std::max<int>(1, log_heap_size - kLogBucketWordBits)); } + // Serializes BlockBloomFilter to protobuf message. + void CopyToPB(BlockBloomFilterPB* bf_dst) const; + + bool operator==(const BlockBloomFilter& rhs) const; + bool operator!=(const BlockBloomFilter& rhs) const; + private: // always_false_ is true when the bloom filter hasn't had any elements inserted. bool always_false_; @@ -144,6 +172,14 @@ class BlockBloomFilter { Bucket* directory_; + // Hash algorithm used to hash data to 32-bit value before insertion and lookup. + HashAlgorithm hash_algorithm_; + // Seed used with hash algorithm. + uint32_t hash_seed_; + + // Helper function for public Init() variants. + Status InitInternal(int log_space_bytes, HashAlgorithm hash_algorithm, uint32_t hash_seed); + // Same as Insert(), but skips the CPU check and assumes that AVX is not available. void InsertNoAvx2(uint32_t hash) noexcept; @@ -171,8 +207,14 @@ class BlockBloomFilter { decltype(&BlockBloomFilter::BucketInsert) bucket_insert_func_ptr_; decltype(&BlockBloomFilter::BucketFind) bucket_find_func_ptr_; + // Returns amount of space used in log2 bytes. + int log_space_bytes() const { + return log_num_buckets_ + kLogBucketByteSize; + } + + // Size of the internal directory structure in bytes. int64_t directory_size() const { - return 1ULL << (log_num_buckets_ + kLogBucketByteSize); + return 1ULL << log_space_bytes(); } // Detect at run-time whether CPU supports AVX2 @@ -228,4 +270,20 @@ class DefaultBlockBloomFilterBufferAllocator : public BlockBloomFilterBufferAllo DISALLOW_COPY_AND_ASSIGN(DefaultBlockBloomFilterBufferAllocator); }; +class ArenaBlockBloomFilterBufferAllocator : public BlockBloomFilterBufferAllocatorIf { + public: + // Arena is expected to remain valid during the lifetime of the allocator. + explicit ArenaBlockBloomFilterBufferAllocator(Arena* arena) : arena_(arena) {} + ArenaBlockBloomFilterBufferAllocator() : arena_(nullptr) {} + + Status AllocateBuffer(size_t bytes, void** ptr) override; + + void FreeBuffer(void* ptr) override { + // NOP. Buffer will be de-allocated when the arena is destructed. + } + + private: + Arena* arena_; +}; + } // namespace kudu diff --git a/src/kudu/util/hash.proto b/src/kudu/util/block_bloom_filter.proto similarity index 56% copy from src/kudu/util/hash.proto copy to src/kudu/util/block_bloom_filter.proto index 8a4ea28..24b9d49 100644 --- a/src/kudu/util/hash.proto +++ b/src/kudu/util/block_bloom_filter.proto @@ -19,9 +19,19 @@ package kudu; option java_package = "org.apache.kudu"; -// Implemented hash algorithms. -enum HashAlgorithm { - UNKNOWN_HASH = 0; - MURMUR_HASH_2 = 1; - CITY_HASH = 2; +import "kudu/util/hash.proto"; +import "kudu/util/pb_util.proto"; + +message BlockBloomFilterPB { + // Log2 of the space required for the BlockBloomFilter. + optional int32 log_space_bytes = 1; + // The bloom filter bitmap. + optional bytes bloom_data = 2 [(kudu.REDACT) = true]; + // Whether the BlockBloomFilter is empty and hence always returns false for lookups. + optional bool always_false = 3; + // Hash algorithm to generate 32-bit unsigned integer hash values before inserting + // in the BlockBloomFilter. + optional HashAlgorithm hash_algorithm = 4 [default = FAST_HASH]; + // Seed used to hash the input values in the hash algorithm. + optional uint32 hash_seed = 5 [default = 0]; } diff --git a/src/kudu/util/hash.proto b/src/kudu/util/hash.proto index 8a4ea28..f4eb7ab 100644 --- a/src/kudu/util/hash.proto +++ b/src/kudu/util/hash.proto @@ -24,4 +24,5 @@ enum HashAlgorithm { UNKNOWN_HASH = 0; MURMUR_HASH_2 = 1; CITY_HASH = 2; + FAST_HASH = 3; } diff --git a/src/kudu/util/hash_util-test.cc b/src/kudu/util/hash_util-test.cc index 72c975a..9fadfcf 100644 --- a/src/kudu/util/hash_util-test.cc +++ b/src/kudu/util/hash_util-test.cc @@ -15,11 +15,15 @@ // specific language governing permissions and limitations // under the License. +#include "kudu/util/hash_util.h" + #include <cstdint> #include <gtest/gtest.h> -#include "kudu/util/hash_util.h" +#include "kudu/util/hash.pb.h" +#include "kudu/util/slice.h" +#include "kudu/util/test_macros.h" namespace kudu { @@ -68,4 +72,14 @@ TEST(HashUtilTest, TestFastHash32) { ASSERT_EQ(1676541068U, hash); } +TEST(HashUtilTest, TestComputeHash32Available) { + Slice data("abcd"); + for (int h = HashAlgorithm_MIN; h <= HashAlgorithm_MAX; h++) { + const auto hash_algorithm = static_cast<HashAlgorithm>(h); + if (HashUtil::IsComputeHash32Available(hash_algorithm)) { + NO_FATALS(HashUtil::ComputeHash32(data, hash_algorithm, 0)); + } + } +} + } // namespace kudu diff --git a/src/kudu/util/hash_util.h b/src/kudu/util/hash_util.h index 2513af0..f9ac3f7 100644 --- a/src/kudu/util/hash_util.h +++ b/src/kudu/util/hash_util.h @@ -18,9 +18,12 @@ #ifndef KUDU_UTIL_HASH_UTIL_H #define KUDU_UTIL_HASH_UTIL_H -#include <stdint.h> +#include <cstdint> +#include <glog/logging.h> #include "kudu/gutil/port.h" +#include "kudu/util/hash.pb.h" +#include "kudu/util/slice.h" namespace kudu { @@ -112,6 +115,29 @@ class HashUtil { return h - (h >> 32); } + // Checks whether 32-bit version of the hash algorithm is available. + // Must be kept in sync with ComputeHash32() function. + static bool IsComputeHash32Available(HashAlgorithm hash_algorithm) { + switch (hash_algorithm) { + case FAST_HASH: + return true; + default: + return false; + } + } + + // Compute 32-bit hash of the supplied data using the specified hash algorithm. + // Must be kept in sync with IsComputeHash32Available() function. + static uint32_t ComputeHash32(const Slice& data, HashAlgorithm hash_algorithm, uint32_t seed) { + // TODO(bankim): Consider adding special handling for zero-length/NULL objects. + switch (hash_algorithm) { + case FAST_HASH: + return FastHash32(data.data(), data.size(), seed); + default: + LOG(FATAL) << "Not implemented 32-bit hash function: " << hash_algorithm; + } + } + private: // Compression function for Merkle-Damgard construction. ATTRIBUTE_NO_SANITIZE_INTEGER
