This is an automated email from the ASF dual-hosted git repository. alexey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kudu.git
commit 3e6d6221e1149a9a068f7d3668fddee5967e196a Author: Bankim Bhavsar <[email protected]> AuthorDate: Wed Feb 5 12:13:35 2020 -0800 [client] KUDU-2483 Add IN Bloom filter predicate to C++ client Instead of exposing BlockBloomFilter directly to clients, added a separate KuduBloomFilter class that uses PIMPL pattern to access the underlying BlockBloomFilter and allocator. To enable extending KuduBloomFilter in future added a builder class KuduBloomFilterBuilder. Change-Id: I4aa235a4c933ebce0bf3ec7fcb135098eccc4ea4 Reviewed-on: http://gerrit.cloudera.org:8080/15122 Reviewed-by: Adar Dembo <[email protected]> Tested-by: Kudu Jenkins --- src/kudu/client/CMakeLists.txt | 2 + src/kudu/client/client.cc | 43 +++++++ src/kudu/client/client.h | 39 ++++++- src/kudu/client/hash-internal.h | 31 +++++ src/kudu/client/hash.cc | 51 ++++++++ src/kudu/client/hash.h | 33 ++++++ src/kudu/client/predicate-test.cc | 188 ++++++++++++++++++++++++++++-- src/kudu/client/scan_predicate-internal.h | 56 +++++++++ src/kudu/client/scan_predicate.cc | 140 ++++++++++++++++++++++ src/kudu/client/scan_predicate.h | 106 +++++++++++++++++ src/kudu/client/value.h | 1 + 11 files changed, 679 insertions(+), 11 deletions(-) diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt index 38f8d21..ffd536a 100644 --- a/src/kudu/client/CMakeLists.txt +++ b/src/kudu/client/CMakeLists.txt @@ -36,6 +36,7 @@ set(CLIENT_SRCS client-internal.cc error_collector.cc error-internal.cc + hash.cc master_rpc.cc master_proxy_rpc.cc meta_cache.cc @@ -176,6 +177,7 @@ install(TARGETS kudu_client_exported install(FILES callbacks.h client.h + hash.h row_result.h scan_batch.h scan_predicate.h diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc index ba7fa2b..28cee2a 100644 --- a/src/kudu/client/client.cc +++ b/src/kudu/client/client.cc @@ -971,6 +971,49 @@ KuduPredicate* KuduTable::NewComparisonPredicate(const Slice& col_name, }); } +KuduPredicate* KuduTable::NewInBloomFilterPredicate( + const Slice& col_name, + vector<KuduBloomFilter*>* bloom_filters, + KuduValue* lower_bound_inclusive, + KuduValue* upper_bound_exclusive) { + // We always take ownership of values; this ensures cleanup if the predicate is invalid. + auto cleanup = MakeScopedCleanup([&]() { + STLDeleteElements(bloom_filters); + }); + + unique_ptr<KuduValue> lower_bound_inclusive_uniq_ptr(lower_bound_inclusive); + unique_ptr<KuduValue> upper_bound_exclusive_uniq_ptr(upper_bound_exclusive); + + // Empty vector of bloom filters along with no upper or lower bound will select + // all rows. Hence disallowed. + if (bloom_filters->empty() && !lower_bound_inclusive && !upper_bound_exclusive) { + return new KuduPredicate( + new ErrorPredicateData(Status::InvalidArgument("No predicates supplied"))); + } + + // Transfer the Bloom filter raw ptrs over to vector of unique ptrs. + // There is a possibility of emplace_back() throwing exception, so in such a case the + // transferred Bloom filters in unique_ptrs will be cleaned-up on exiting scope + // automatically and the non-nullptr Bloom filters in input "bloom_filters" vector + // will be cleaned up by the explicit scoped "cleanup". + vector<unique_ptr<KuduBloomFilter>> bloom_filters_owned; + bloom_filters_owned.reserve(bloom_filters->size()); + for (auto& bf : *bloom_filters) { + bloom_filters_owned.emplace_back(bf); + bf = nullptr; + } + + return data_->MakePredicate(col_name, [&](const ColumnSchema& col_schema) { + // At this point we could cancel the scoped "cleanup". But the scoped cleanup + // not only deletes pointers contained in the vector but also clears the vector + // and we want the vector be cleared as expected by the caller. + return new KuduPredicate( + new InBloomFilterPredicateData(col_schema, std::move(bloom_filters_owned), + std::move(lower_bound_inclusive_uniq_ptr), + std::move(upper_bound_exclusive_uniq_ptr))); + }); +} + KuduPredicate* KuduTable::NewInListPredicate(const Slice& col_name, vector<KuduValue*>* values) { // We always take ownership of values; this ensures cleanup if the predicate is invalid. diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h index dd58883..90cdb8a 100644 --- a/src/kudu/client/client.h +++ b/src/kudu/client/client.h @@ -90,8 +90,6 @@ class KuduWriteOperation; class ResourceMetrics; namespace internal { -template <class ReqClass, class RespClass> -class AsyncLeaderMasterRpc; // IWYU pragma: keep class Batcher; class ErrorCollector; class GetTableSchemaRpc; @@ -102,6 +100,8 @@ class RemoteTabletServer; class ReplicaController; class RetrieveAuthzTokenRpc; class WriteRpc; +template <class ReqClass, class RespClass> +class AsyncLeaderMasterRpc; // IWYU pragma: keep } // namespace internal /// Install a callback for internal client logging. @@ -1086,6 +1086,41 @@ class KUDU_EXPORT KuduTable : public sp::enable_shared_from_this<KuduTable> { KuduPredicate::ComparisonOp op, KuduValue* value); + /// Create a new IN Bloom filter predicate which can be used for scanners on + /// this table. + /// + /// A Bloom filter is a space-efficient probabilistic data structure used to + /// test set membership with a possibility of false positive matches. + /// See @c KuduBloomFilter for creating Bloom filters. + /// + /// IN list predicate can be used with small number of values; on the other + /// hand with IN Bloom filter predicate large number of values can be tested + /// for membership in a space-efficient manner. + /// + /// @param [in] col_name + /// Name of the column to which the predicate applies. + /// @param [in] bloom_filters + /// Vector of Bloom filters that contain the values inserted to match + /// against the column. The column value must match against all the + /// supplied Bloom filters to be returned by the scanner. On return, + /// regardless of success or error, the returned predicate will take + /// ownership of the pointers contained in @c bloom_filters. + /// @param [in] lower_bound_inclusive + /// @param [in] upper_bound_exclusive + /// Optional [lower_bound, upper_bound) filters where type of the bound + /// must correspond to the value of the column to which the predicate is + /// to be applied. + /// @return Raw pointer to an IN Bloom filter predicate. The caller owns the + /// predicate until it is passed into KuduScanner::AddConjunctPredicate(). + /// In the case of an error (e.g. an invalid column name), + /// a non-NULL value is still returned. The error will be returned when + /// attempting to add this predicate to a KuduScanner. + KuduPredicate* NewInBloomFilterPredicate( + const Slice& col_name, + std::vector<KuduBloomFilter*>* bloom_filters, + KuduValue* lower_bound_inclusive = NULL, // NOLINT(modernize-use-nullptr) + KuduValue* upper_bound_exclusive = NULL); // NOLINT(modernize-use-nullptr) + /// Create a new IN list predicate which can be used for scanners on this /// table. /// diff --git a/src/kudu/client/hash-internal.h b/src/kudu/client/hash-internal.h new file mode 100644 index 0000000..9869114 --- /dev/null +++ b/src/kudu/client/hash-internal.h @@ -0,0 +1,31 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "kudu/client/hash.h" +#include "kudu/util/hash.pb.h" + +namespace kudu { +namespace client { + +// Helper functions to convert hash algorithm between client-facing and internal PB enums. +kudu::HashAlgorithm ToInternalHashAlgorithm(HashAlgorithm hash_algorithm); +HashAlgorithm FromInternalHashAlgorithm(kudu::HashAlgorithm hash_algorithm); + +} // namespace client +} // namespace kudu diff --git a/src/kudu/client/hash.cc b/src/kudu/client/hash.cc new file mode 100644 index 0000000..7cccfe4 --- /dev/null +++ b/src/kudu/client/hash.cc @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "kudu/client/hash.h" + +#include <ostream> + +#include <glog/logging.h> + +#include "kudu/client/hash-internal.h" +#include "kudu/util/hash.pb.h" + +namespace kudu { +namespace client { + +kudu::HashAlgorithm ToInternalHashAlgorithm(HashAlgorithm hash_algorithm) { + switch (hash_algorithm) { + case UNKNOWN_HASH: return kudu::UNKNOWN_HASH; + case MURMUR_HASH_2: return kudu::MURMUR_HASH_2; + case CITY_HASH: return kudu::CITY_HASH; + case FAST_HASH: return kudu::FAST_HASH; + default: LOG(FATAL) << "Unexpected hash algorithm: " << hash_algorithm; + } +} + +HashAlgorithm FromInternalHashAlgorithm(kudu::HashAlgorithm hash_algorithm) { + switch (hash_algorithm) { + case kudu::UNKNOWN_HASH: return UNKNOWN_HASH; + case kudu::MURMUR_HASH_2: return MURMUR_HASH_2; + case kudu::CITY_HASH: return CITY_HASH; + case kudu::FAST_HASH: return FAST_HASH; + default: LOG(FATAL) << "Unexpected hash algorithm: " << hash_algorithm; + } +} + +} // namespace client +} // namespace kudu diff --git a/src/kudu/client/hash.h b/src/kudu/client/hash.h new file mode 100644 index 0000000..2fb0926 --- /dev/null +++ b/src/kudu/client/hash.h @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_CLIENT_HASH_H +#define KUDU_CLIENT_HASH_H + +namespace kudu { +namespace client { + +/// @brief: Hash algorithm types. +enum HashAlgorithm { + UNKNOWN_HASH = 0, + MURMUR_HASH_2 = 1, + CITY_HASH = 2, + FAST_HASH = 3 +}; + +} // namespace client +} // namespace kudu +#endif // KUDU_CLIENT_HASH_H diff --git a/src/kudu/client/predicate-test.cc b/src/kudu/client/predicate-test.cc index 0b85d7a..78138c1 100644 --- a/src/kudu/client/predicate-test.cc +++ b/src/kudu/client/predicate-test.cc @@ -16,12 +16,16 @@ // under the License. #include <algorithm> +#include <cmath> #include <cstdint> #include <initializer_list> +#include <iterator> #include <limits> #include <memory> #include <ostream> #include <string> +#include <type_traits> +#include <unordered_set> #include <utility> #include <vector> @@ -37,11 +41,15 @@ #include "kudu/client/write_op.h" #include "kudu/common/partial_row.h" #include "kudu/gutil/gscoped_ptr.h" +#include "kudu/gutil/integral_types.h" #include "kudu/gutil/strings/escaping.h" #include "kudu/gutil/strings/substitute.h" #include "kudu/mini-cluster/internal_mini_cluster.h" #include "kudu/util/decimal_util.h" #include "kudu/util/int128.h" +#include "kudu/util/random.h" +#include "kudu/util/random_util.h" +#include "kudu/util/slice.h" #include "kudu/util/status.h" #include "kudu/util/test_macros.h" #include "kudu/util/test_util.h" @@ -50,6 +58,7 @@ using std::count_if; using std::numeric_limits; using std::string; using std::unique_ptr; +using std::unordered_set; using std::vector; namespace kudu { @@ -62,6 +71,7 @@ using sp::shared_ptr; class PredicateTest : public KuduTest { protected: + static constexpr float kBloomFilterFalsePositiveProb = 0.01; void SetUp() override { // Set up the mini cluster @@ -154,12 +164,29 @@ class PredicateTest : public KuduTest { for (const T& v : values) { if (std::any_of(test_values.begin(), test_values.end(), [&] (const T& t) { return t == v; })) { - count += 1; + count++; } } return count; } + // This function helps distinguish floating point values like -0.0 against 0.0. + template<typename F> + int CountMatchedFloatingPointRowsWithSignBit(const vector<F>& values, + const vector<F>& test_values) { + static_assert(std::is_floating_point<F>::value, + "This function must only be used with floating point data types"); + int count = 0; + for (const F& v : values) { + if (std::any_of(test_values.begin(), test_values.end(), + [&] (const F& f) { return f == v && std::signbit(f) == std::signbit(v); })) { + count++; + } + } + return count; + } + + // Returns a vector of ints from -50 (inclusive) to 50 (exclusive), and // boundary values. template <typename T> @@ -289,6 +316,25 @@ class PredicateTest : public KuduTest { }; } + static KuduBloomFilter* CreateBloomFilter( + int nkeys, + float false_positive_prob = kBloomFilterFalsePositiveProb) { + KuduBloomFilterBuilder builder(nkeys); + builder.false_positive_probability(false_positive_prob); + KuduBloomFilter* bf; + CHECK_OK(builder.Build(&bf)); + return bf; + } + + void CheckInBloomFilterPredicate(const shared_ptr<KuduTable>& table, + KuduBloomFilter* in_bloom_filter, + int expected_count) { + vector<KuduBloomFilter*> bf_vec = { in_bloom_filter }; + auto* bf_predicate = table->NewInBloomFilterPredicate("value", &bf_vec); + ASSERT_TRUE(bf_vec.empty()); + ASSERT_EQ(expected_count, CountRows(table, { bf_predicate })); + } + // Check integer predicates against the specified table. The table must have // key/value rows with values from CreateIntValues, plus one null value. template <typename T> @@ -379,18 +425,22 @@ class PredicateTest : public KuduTest { } } - // IN list predicates + // IN list and IN Bloom filter predicates std::random_shuffle(test_values.begin(), test_values.end()); for (auto end = test_values.begin(); end <= test_values.end(); end++) { vector<KuduValue*> vals; + auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end)); for (auto itr = test_values.begin(); itr != end; itr++) { vals.push_back(KuduValue::FromInt(*itr)); + auto key = *itr; + bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key))); } int count = CountMatchedRows<T>(values, vector<T>(test_values.begin(), end)); ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) })); + CheckInBloomFilterPredicate(table, bloom_filter, count); } // IS NOT NULL predicate @@ -402,7 +452,7 @@ class PredicateTest : public KuduTest { } // Check string predicates against the specified table. - void CheckStringPredicates(const shared_ptr<KuduTable>& table, vector<string> values) { + void CheckStringPredicates(const shared_ptr<KuduTable>& table, const vector<string>& values) { ASSERT_EQ(values.size() + 1, CountRows(table, {})); @@ -494,18 +544,21 @@ class PredicateTest : public KuduTest { } } - // IN list predicates + // IN list and IN Bloom filter predicates std::random_shuffle(test_values.begin(), test_values.end()); for (auto end = test_values.begin(); end <= test_values.end(); end++) { vector<KuduValue*> vals; + auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end)); for (auto itr = test_values.begin(); itr != end; itr++) { vals.push_back(KuduValue::CopyString(*itr)); + bloom_filter->Insert(*itr); } int count = CountMatchedRows<string>(values, vector<string>(test_values.begin(), end)); ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) })); + CheckInBloomFilterPredicate(table, bloom_filter, count); } // IS NOT NULL predicate @@ -634,6 +687,44 @@ TEST_F(PredicateTest, TestBoolPredicates) { ASSERT_EQ(2, CountRows(table, { pred })); } + { // empty BloomFilter + auto* bloom_filter = CreateBloomFilter(0); + CheckInBloomFilterPredicate(table, bloom_filter, 0); + } + + { // vector with no BloomFilters + vector<KuduBloomFilter*> no_bloom_filters = {}; + auto* bf_predicate = table->NewInBloomFilterPredicate("value", &no_bloom_filters); + ASSERT_TRUE(no_bloom_filters.empty()); + KuduScanner scanner(table.get()); + Status s = scanner.AddConjunctPredicate(bf_predicate); + ASSERT_TRUE(s.IsInvalidArgument()); + ASSERT_STR_CONTAINS(s.ToString(), "No predicates supplied"); + } + + { // BloomFilter with (true) + auto* bloom_filter = CreateBloomFilter(1); + bool key = true; + bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key))); + CheckInBloomFilterPredicate(table, bloom_filter, 1); + } + + { // BloomFilter with (false) + auto* bloom_filter = CreateBloomFilter(1); + bool key = false; + bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key))); + CheckInBloomFilterPredicate(table, bloom_filter, 1); + } + + { // BloomFilter with (true, false) + auto* bloom_filter = CreateBloomFilter(2); + bool key = true; + bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key))); + key = false; + bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key))); + CheckInBloomFilterPredicate(table, bloom_filter, 2); + } + // IS NOT NULL predicate ASSERT_EQ(CountRows(table, {}) - 1, CountRows(table, { table->NewIsNotNullPredicate("value") })); @@ -864,18 +955,24 @@ TEST_F(PredicateTest, TestFloatPredicates) { } } - // IN list predicates + // IN list and IN Bloom filter predicates std::random_shuffle(test_values.begin(), test_values.end()); for (auto end = test_values.begin(); end <= test_values.end(); end++) { vector<KuduValue*> vals; + auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end)); for (auto itr = test_values.begin(); itr != end; itr++) { vals.push_back(KuduValue::FromFloat(*itr)); + auto key = *itr; + bloom_filter->Insert(Slice(reinterpret_cast<const uint8*>(&key), sizeof(key))); } - int count = CountMatchedRows<float>(values, vector<float>(test_values.begin(), end)); + int count = CountMatchedRows(values, vector<float>(test_values.begin(), end)); ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) })); + int count_with_sign_bit = CountMatchedFloatingPointRowsWithSignBit( + values, vector<float>(test_values.begin(), end)); + CheckInBloomFilterPredicate(table, bloom_filter, count_with_sign_bit); } // IS NOT NULL predicate @@ -988,18 +1085,24 @@ TEST_F(PredicateTest, TestDoublePredicates) { } } - // IN list predicates + // IN list and IN Bloom filter predicates std::random_shuffle(test_values.begin(), test_values.end()); for (auto end = test_values.begin(); end <= test_values.end(); end++) { vector<KuduValue*> vals; + auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end)); for (auto itr = test_values.begin(); itr != end; itr++) { vals.push_back(KuduValue::FromDouble(*itr)); + auto key = *itr; + bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key))); } - int count = CountMatchedRows<double>(values, vector<double>(test_values.begin(), end)); + int count = CountMatchedRows(values, vector<double>(test_values.begin(), end)); ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) })); + int count_with_sign_bit = CountMatchedFloatingPointRowsWithSignBit( + values, vector<double>(test_values.begin(), end)); + CheckInBloomFilterPredicate(table, bloom_filter, count_with_sign_bit); } // IS NOT NULL predicate @@ -1130,18 +1233,22 @@ TEST_F(PredicateTest, TestDecimalPredicates) { } } - // IN list predicates + // IN list and IN Bloom filter predicates std::random_shuffle(test_values.begin(), test_values.end()); for (auto end = test_values.begin(); end <= test_values.end(); end++) { vector<KuduValue*> vals; + auto* bloom_filter = CreateBloomFilter(std::distance(test_values.begin(), end)); for (auto itr = test_values.begin(); itr != end; itr++) { vals.push_back(KuduValue::FromDecimal(*itr, 2)); + auto key = *itr; + bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&key), sizeof(key))); } int count = CountMatchedRows<int128_t>(values, vector<int128_t>(test_values.begin(), end)); ASSERT_EQ(count, CountRows(table, { table->NewInListPredicate("value", &vals) })); + CheckInBloomFilterPredicate(table, bloom_filter, count); } // IS NOT NULL predicate @@ -1152,6 +1259,69 @@ TEST_F(PredicateTest, TestDecimalPredicates) { ASSERT_EQ(1, CountRows(table, { table->NewIsNullPredicate("value") })); } +class BloomFilterPredicateTest : public PredicateTest { + protected: + template<class Collection> + static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& values) { + auto* bloom_filter = CreateBloomFilter(values.size()); + for (const auto& v : values) { + bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), sizeof(v))); + } + return bloom_filter; + } +}; + +TEST_F(BloomFilterPredicateTest, TestBloomFilterPredicate) { + shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT32); + shared_ptr<KuduSession> session = CreateSession(); + + auto seed = SeedRandom(); + Random rand(seed); + // Number of values to be written to the table. + static constexpr int kNumAllValues = 100000; + // Subset of values from the table that'll be inserted in BloomFilter and searched against + // all values in the table. + static constexpr int kNumInclusiveValues = 10000; + // Values that are not present in the table. + static constexpr int kNumExclusiveValues = 10000; + // Number of false positives based on the number of values that'll be searched against. + static constexpr int kFalsePositives = kNumAllValues * kBloomFilterFalsePositiveProb; + + const unordered_set<int32_t> empty_set; + auto all_values = CreateRandomUniqueIntegers<int32_t>(kNumAllValues, empty_set, &rand); + vector<int32_t> inclusive_values; + ReservoirSample(all_values, kNumInclusiveValues, empty_set, &rand, &inclusive_values); + auto* inclusive_bf = CreateBloomFilterWithValues(inclusive_values); + auto exclusive_values = CreateRandomUniqueIntegers<int32_t>(kNumExclusiveValues, all_values, + &rand); + auto* exclusive_bf = CreateBloomFilterWithValues(exclusive_values); + + int i = 0; + for (auto value : all_values) { + unique_ptr<KuduInsert> insert(table->NewInsert()); + ASSERT_OK(insert->mutable_row()->SetInt64("key", i++)); + ASSERT_OK(insert->mutable_row()->SetInt32("value", value)); + ASSERT_OK(session->Apply(insert.release())); + } + ASSERT_OK(session->Flush()); + + vector<KuduBloomFilter*> inclusive_bf_vec = { inclusive_bf }; + auto* inclusive_predicate = + table->NewInBloomFilterPredicate("value", &inclusive_bf_vec); + ASSERT_TRUE(inclusive_bf_vec.empty()); + int actual_count_inclusive = CountRows(table, { inclusive_predicate }); + EXPECT_LE(inclusive_values.size(), actual_count_inclusive); + EXPECT_GE(inclusive_values.size() + kFalsePositives, actual_count_inclusive); + + vector<KuduBloomFilter*> exclusive_bf_vec = { exclusive_bf }; + auto* exclusive_predicate = + table->NewInBloomFilterPredicate("value", &exclusive_bf_vec); + ASSERT_TRUE(exclusive_bf_vec.empty()); + int actual_count_exclusive = CountRows(table, { exclusive_predicate }); + EXPECT_LE(0, actual_count_exclusive); + EXPECT_GE(kFalsePositives, actual_count_exclusive); +} + class ParameterizedPredicateTest : public PredicateTest, public ::testing::WithParamInterface<KuduColumnSchema::DataType> {}; diff --git a/src/kudu/client/scan_predicate-internal.h b/src/kudu/client/scan_predicate-internal.h index acca88f..41359f1 100644 --- a/src/kudu/client/scan_predicate-internal.h +++ b/src/kudu/client/scan_predicate-internal.h @@ -91,6 +91,32 @@ class ComparisonPredicateData : public KuduPredicate::Data { gscoped_ptr<KuduValue> val_; }; +// An InBloomFilter predicate for selecting values present in the vector of Bloom filters. +class InBloomFilterPredicateData : public KuduPredicate::Data { + public: + InBloomFilterPredicateData(ColumnSchema col, + std::vector<std::unique_ptr<KuduBloomFilter>> bloom_filters, + std::unique_ptr<KuduValue> lower = nullptr, + std::unique_ptr<KuduValue> upper = nullptr) + : col_(std::move(col)), + bloom_filters_(std::move(bloom_filters)), + lower_(std::move(lower)), + upper_(std::move(upper)) { + } + + Status AddToScanSpec(ScanSpec* spec, Arena* arena) override; + + InBloomFilterPredicateData* Clone() const override; + + private: + Status CheckTypeAndGetPointer(KuduValue* val_in, void** val_out) const; + + ColumnSchema col_; + std::vector<std::unique_ptr<KuduBloomFilter>> bloom_filters_; + std::unique_ptr<KuduValue> lower_; + std::unique_ptr<KuduValue> upper_; +}; + // A list predicate for a column and a list of constant values. class InListPredicateData : public KuduPredicate::Data { public: @@ -161,6 +187,36 @@ class IsNullPredicateData : public KuduPredicate::Data { ColumnSchema col_; }; +class KuduBloomFilterBuilder::Data { + public: + Data(); + ~Data() = default; + + size_t num_keys_; + double false_positive_probability_; + HashAlgorithm hash_algorithm_; + uint32_t hash_seed_; + + private: + DISALLOW_COPY_AND_ASSIGN(Data); +}; + +class KuduBloomFilter::Data { + public: + Data() = default; + ~Data() = default; + std::unique_ptr<Data> Clone() const; + + std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_; + std::unique_ptr<BlockBloomFilter> bloom_filter_; + + private: + Data(std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator, + std::unique_ptr<BlockBloomFilter> bloom_filter); + + DISALLOW_COPY_AND_ASSIGN(Data); +}; + } // namespace client } // namespace kudu #endif /* KUDU_CLIENT_SCAN_PREDICATE_INTERNAL_H */ diff --git a/src/kudu/client/scan_predicate.cc b/src/kudu/client/scan_predicate.cc index 736d078..05657dd 100644 --- a/src/kudu/client/scan_predicate.cc +++ b/src/kudu/client/scan_predicate.cc @@ -17,11 +17,14 @@ #include "kudu/client/scan_predicate.h" +#include <memory> #include <utility> #include <vector> #include <boost/optional/optional.hpp> +#include <glog/logging.h> +#include "kudu/client/hash-internal.h" #include "kudu/client/scan_predicate-internal.h" #include "kudu/client/value-internal.h" #include "kudu/client/value.h" @@ -32,10 +35,13 @@ #include "kudu/gutil/gscoped_ptr.h" #include "kudu/gutil/stl_util.h" #include "kudu/gutil/strings/substitute.h" +#include "kudu/util/block_bloom_filter.h" #include "kudu/util/status.h" using boost::optional; using std::move; +using std::shared_ptr; +using std::unique_ptr; using std::vector; using strings::Substitute; @@ -148,5 +154,139 @@ Status InListPredicateData::AddToScanSpec(ScanSpec* spec, Arena* /*arena*/) { return Status::OK(); } +Status InBloomFilterPredicateData::CheckTypeAndGetPointer(KuduValue* val_in, + void** val_out) const { + return val_in->data_->CheckTypeAndGetPointer(col_.name(), + col_.type_info()->type(), + col_.type_attributes(), + val_out); +} + +Status InBloomFilterPredicateData::AddToScanSpec(ScanSpec* spec, Arena* /*arena*/) { + void* lower_void = nullptr; + if (lower_) { + RETURN_NOT_OK(CheckTypeAndGetPointer(lower_.get(), &lower_void)); + } + + void* upper_void = nullptr; + if (upper_) { + RETURN_NOT_OK(CheckTypeAndGetPointer(upper_.get(), &upper_void)); + } + + // Extract the BlockBloomFilters. + vector<BlockBloomFilter*> block_bloom_filters; + block_bloom_filters.reserve(bloom_filters_.size()); + for (const auto& bf : bloom_filters_) { + block_bloom_filters.push_back(bf->data_->bloom_filter_.get()); + } + + spec->AddPredicate(ColumnPredicate::InBloomFilter(col_, std::move(block_bloom_filters), + lower_void, upper_void)); + return Status::OK(); +} + +InBloomFilterPredicateData* client::InBloomFilterPredicateData::Clone() const { + unique_ptr<KuduValue> lower_clone; + if (lower_) { + lower_clone.reset(lower_->Clone()); + } + unique_ptr<KuduValue> upper_clone; + if (upper_) { + upper_clone.reset(upper_->Clone()); + } + + vector<unique_ptr<KuduBloomFilter>> bloom_filter_clones; + bloom_filter_clones.reserve(bloom_filters_.size()); + for (const auto& bf : bloom_filters_) { + bloom_filter_clones.emplace_back(bf->Clone()); + } + + return new InBloomFilterPredicateData(col_, std::move(bloom_filter_clones), + std::move(lower_clone), std::move(upper_clone)); +} + +KuduBloomFilter::KuduBloomFilter() { + data_ = new Data(); +} + +KuduBloomFilter::KuduBloomFilter(Data* other_data) : + data_(CHECK_NOTNULL(other_data)) { +} + +KuduBloomFilter::~KuduBloomFilter() { + delete data_; +} + +KuduBloomFilter* KuduBloomFilter::Clone() const { + unique_ptr<Data> data_clone = data_->Clone(); + return new KuduBloomFilter(data_clone.release()); +} + +void KuduBloomFilter::Insert(const Slice& key) { + DCHECK_NOTNULL(data_->bloom_filter_)->Insert(key); +} + +KuduBloomFilterBuilder::Data::Data() : + num_keys_(0), + false_positive_probability_(0.01), + hash_algorithm_(FAST_HASH), + hash_seed_(0) { +} + +KuduBloomFilterBuilder::KuduBloomFilterBuilder(size_t num_keys) { + data_ = new Data; + data_->num_keys_ = num_keys; +} + +KuduBloomFilterBuilder::~KuduBloomFilterBuilder() { + delete data_; +} + +KuduBloomFilterBuilder& KuduBloomFilterBuilder::false_positive_probability(double fpp) { + data_->false_positive_probability_ = fpp; + return *this; +} + +KuduBloomFilterBuilder& KuduBloomFilterBuilder::hash_algorithm(HashAlgorithm hash_algorithm) { + data_->hash_algorithm_ = hash_algorithm; + return *this; +} + +KuduBloomFilterBuilder& KuduBloomFilterBuilder::hash_seed(uint32_t hash_seed) { + data_->hash_seed_ = hash_seed; + return *this; +} + +Status KuduBloomFilterBuilder::Build(KuduBloomFilter** bloom_filter_out) { + unique_ptr<KuduBloomFilter> bf(new KuduBloomFilter()); + bf->data_->allocator_ = DefaultBlockBloomFilterBufferAllocator::GetSingletonSharedPtr(); + bf->data_->bloom_filter_ = unique_ptr<BlockBloomFilter>( + new BlockBloomFilter(bf->data_->allocator_.get())); + + int log_space_bytes = BlockBloomFilter::MinLogSpace(data_->num_keys_, + data_->false_positive_probability_); + RETURN_NOT_OK(bf->data_->bloom_filter_->Init( + log_space_bytes, ToInternalHashAlgorithm(data_->hash_algorithm_), data_->hash_seed_)); + + *bloom_filter_out = bf.release(); + return Status::OK(); +} + +KuduBloomFilter::Data::Data(shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator, + unique_ptr<BlockBloomFilter> bloom_filter) : + allocator_(std::move(allocator)), + bloom_filter_(std::move(bloom_filter)) { +} + +unique_ptr<KuduBloomFilter::Data> KuduBloomFilter::Data::Clone() const { + shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_clone = + CHECK_NOTNULL(allocator_->Clone()); + unique_ptr<BlockBloomFilter> bloom_filter_clone; + CHECK_OK(bloom_filter_->Clone(allocator_clone.get(), &bloom_filter_clone)); + + return unique_ptr<KuduBloomFilter::Data>( + new Data(std::move(allocator_clone), std::move(bloom_filter_clone))); +} + } // namespace client } // namespace kudu diff --git a/src/kudu/client/scan_predicate.h b/src/kudu/client/scan_predicate.h index 70be74c..99b54e4 100644 --- a/src/kudu/client/scan_predicate.h +++ b/src/kudu/client/scan_predicate.h @@ -19,11 +19,20 @@ #ifdef KUDU_HEADERS_NO_STUBS #include "kudu/gutil/macros.h" +#include "kudu/util/slice.h" #else #include "kudu/client/stubs.h" #endif +// NOTE: using stdint.h instead of cstdint because this file is supposed +// to be processed by a compiler lacking C++11 support. +#include <stdint.h> + +#include <cstddef> + +#include "kudu/client/hash.h" #include "kudu/util/kudu_export.h" +#include "kudu/util/status.h" namespace kudu { namespace client { @@ -52,6 +61,7 @@ class KUDU_EXPORT KuduPredicate { /// The PIMPL class has to be public since it's actually just an interface, /// and gcc gives an error trying to derive from a private nested class. class KUDU_NO_EXPORT Data; + private: friend class ComparisonPredicateData; friend class ErrorPredicateData; @@ -67,6 +77,102 @@ class KUDU_EXPORT KuduPredicate { DISALLOW_COPY_AND_ASSIGN(KuduPredicate); }; +/// @brief Bloom filter to be used with IN Bloom filter predicate. +/// +/// A Bloom filter is a space-efficient probabilistic data-structure used to +/// test set membership with a possibility of false positive matches. +/// +/// Create a new KuduBloomFilter using @c KuduBloomFilterBuilder class and +/// populate column values that need to be scanned using +/// @c KuduBloomFilter::Insert() function. +/// +/// Supply the populated KuduBloomFilter to +/// @c KuduTable::NewInBloomFilterPredicate() to create an IN Bloom filter +/// predicate. +class KUDU_EXPORT KuduBloomFilter { + public: + ~KuduBloomFilter(); + + /// Insert key to the Bloom filter. + /// + /// @param[in] key + /// Column value as a @c Slice to insert into the Bloom filter. + void Insert(const Slice& key); + + private: + friend class InBloomFilterPredicateData; + friend class KuduBloomFilterBuilder; + + class KUDU_NO_EXPORT Data; + + // Owned ptr as per the PIMPL pattern. + Data* data_; + + /// Clone the Bloom filter. + /// + /// @return Raw pointer to the cloned Bloom filter. Caller owns the + /// Bloom filter till it's passed to + /// @c KuduTable::NewInBloomFilterPredicate() + KuduBloomFilter* Clone() const; + + KuduBloomFilter(); + + /// Construct a Bloom filter from the KuduBloomFilter::Data pointer. + /// + /// @param other_data + /// Constructed KuduBloomFilter takes ownership of the pointer. + explicit KuduBloomFilter(Data* other_data); + + DISALLOW_COPY_AND_ASSIGN(KuduBloomFilter); +}; + +/// @brief Builder class to help build @c KuduBloomFilter to be used with +/// IN Bloom filter predicate. +class KUDU_EXPORT KuduBloomFilterBuilder { + public: + /// @param [in] num_keys + /// Expected number of elements to be inserted in the Bloom filter. + explicit KuduBloomFilterBuilder(size_t num_keys); + ~KuduBloomFilterBuilder(); + + /// @param [in] fpp + /// Desired false positive probability between 0.0 and 1.0. + /// If not provided, defaults to 0.01. + /// @return Reference to the updated object. + KuduBloomFilterBuilder& false_positive_probability(double fpp); + + /// @param [in] hash_algorithm + /// Hash algorithm used to hash keys before inserting to the Bloom filter. + /// If not provided, defaults to FAST_HASH. + /// @return Reference to the updated object. + KuduBloomFilterBuilder& hash_algorithm(HashAlgorithm hash_algorithm); + + /// @param [in] hash_seed + /// Seed used with hash algorithm to hash the keys before inserting to + /// the Bloom filter. + /// If not provided, defaults to 0. + /// @return Reference to the updated object. + KuduBloomFilterBuilder& hash_seed(uint32_t hash_seed); + + /// Build a new Bloom filter to be used with IN Bloom filter predicate. + /// + /// @param [out] bloom_filter + /// On success, the created Bloom filter raw pointer. Caller owns the Bloom + /// filter until it's passed to @c KuduTable::NewInBloomFilterPredicate(). + /// @return On success, Status::OK() with the created Bloom filter in + /// @c bloom_filter output parameter. On failure to allocate memory or + /// invalid arguments, corresponding error status. + Status Build(KuduBloomFilter** bloom_filter); + + private: + class KUDU_NO_EXPORT Data; + + // Owned ptr as per the PIMPL pattern. + Data* data_; + + DISALLOW_COPY_AND_ASSIGN(KuduBloomFilterBuilder); +}; + } // namespace client } // namespace kudu #endif // KUDU_CLIENT_SCAN_PREDICATE_H diff --git a/src/kudu/client/value.h b/src/kudu/client/value.h index 4d76fa8..62277ff 100644 --- a/src/kudu/client/value.h +++ b/src/kudu/client/value.h @@ -78,6 +78,7 @@ class KUDU_EXPORT KuduValue { ~KuduValue(); private: friend class ComparisonPredicateData; + friend class InBloomFilterPredicateData; friend class InListPredicateData; friend class KuduColumnSpec;
