This is an automated email from the ASF dual-hosted git repository.

abukor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 74e0be274da1df2653f7a72175b3e30e0c7a527e
Author: Bankim Bhavsar <[email protected]>
AuthorDate: Wed Mar 11 12:10:38 2020 -0700

    [client] Add C++ API to accept BlockBloomFilter directly
    
    For performance reasons, ability to use its own allocator etc.,
    callers may choose to supply BlockBloomFilter directly
    instead of using client::KuduBloomFilter. Case in point Impala.
    
    The allocator and BlockBloomFilter are accepted as Slice
    parameters that wrap the raw pointers.
    
    This API is marked Advanced/Unstable as supplying incorrect/incompatible
    parameters can lead to undefined results and it could change in
    future in backward incompatible manner.
    
    Change-Id: Iea85e3da8fe55f78e49a80439e6af723aa3d970b
    Reviewed-on: http://gerrit.cloudera.org:8080/15424
    Reviewed-by: Adar Dembo <[email protected]>
    Tested-by: Kudu Jenkins
    Reviewed-by: Wenzhe Zhou <[email protected]>
---
 src/kudu/client/client.cc                 |  44 ++++++-
 src/kudu/client/client.h                  |  39 ++++++
 src/kudu/client/predicate-test.cc         | 198 ++++++++++++++++++++++--------
 src/kudu/client/scan_predicate-internal.h |  47 +++++++
 src/kudu/client/scan_predicate.cc         |  51 +++++++-
 5 files changed, 322 insertions(+), 57 deletions(-)

diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 8fdc40b..c3cb8e6 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -92,6 +92,7 @@
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
 #include "kudu/util/async_util.h"
+#include "kudu/util/block_bloom_filter.h"
 #include "kudu/util/debug-util.h"
 #include "kudu/util/init.h"
 #include "kudu/util/logging.h"
@@ -971,7 +972,7 @@ KuduPredicate* KuduTable::NewComparisonPredicate(const 
Slice& col_name,
 }
 
 KuduPredicate* KuduTable::NewInBloomFilterPredicate(const Slice& col_name,
-                                                    
std::vector<KuduBloomFilter*>* bloom_filters) {
+                                                    vector<KuduBloomFilter*>* 
bloom_filters) {
   // We always take ownership of values; this ensures cleanup if the predicate 
is invalid.
   auto cleanup = MakeScopedCleanup([&]() {
     STLDeleteElements(bloom_filters);
@@ -1004,6 +1005,47 @@ KuduPredicate* 
KuduTable::NewInBloomFilterPredicate(const Slice& col_name,
   });
 }
 
+KuduPredicate* KuduTable::NewInBloomFilterPredicate(const Slice& col_name,
+                                                    const Slice& allocator,
+                                                    const vector<Slice>& 
bloom_filters) {
+  // Empty vector of bloom filters will select all rows. Hence disallowed.
+  if (bloom_filters.empty()) {
+    return new KuduPredicate(
+        new ErrorPredicateData(Status::InvalidArgument("No Bloom filters 
supplied")));
+  }
+
+  // In this case, the Block Bloom filters and allocator are supplied as 
opaque pointers
+  // and the predicate will convert them to well-defined pointer types
+  // but will NOT take ownership of those pointers. Hence a custom deleter,
+  // DirectBloomFilterDataDeleter, is used that gives control over ownership.
+
+  // Extract the allocator.
+  auto* bbf_allocator =
+      // const_cast<> can be avoided with mutable_data() on a Slice. However 
mutable_data() is a
+      // non-const function which can't be used with the const allocator Slice.
+      // Same for BlockBloomFilter below.
+      
reinterpret_cast<BlockBloomFilterBufferAllocatorIf*>(const_cast<uint8_t*>(allocator.data()));
+  std::shared_ptr<BlockBloomFilterBufferAllocatorIf> bbf_allocator_shared(
+      bbf_allocator,
+      DirectBloomFilterDataDeleter<BlockBloomFilterBufferAllocatorIf>(false 
/*owned*/));
+
+  // Extract the Block Bloom filters.
+  vector<DirectBlockBloomFilterUniqPtr> bbf_vec;
+  for (const auto& bf_slice : bloom_filters) {
+    auto* bbf =
+        
reinterpret_cast<BlockBloomFilter*>(const_cast<uint8_t*>(bf_slice.data()));
+    DirectBlockBloomFilterUniqPtr bf_uniq_ptr(
+        bbf, DirectBloomFilterDataDeleter<BlockBloomFilter>(false /*owned*/));
+    bbf_vec.emplace_back(std::move(bf_uniq_ptr));
+  }
+
+  return data_->MakePredicate(col_name, [&](const ColumnSchema& col_schema) {
+    return new KuduPredicate(
+        new InDirectBloomFilterPredicateData(col_schema, 
std::move(bbf_allocator_shared),
+                                             std::move(bbf_vec)));
+  });
+}
+
 KuduPredicate* KuduTable::NewInListPredicate(const Slice& col_name,
                                              vector<KuduValue*>* values) {
   // We always take ownership of values; this ensures cleanup if the predicate 
is invalid.
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index dc4ad95..87dd697 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1113,6 +1113,45 @@ class KUDU_EXPORT KuduTable : public 
sp::enable_shared_from_this<KuduTable> {
   KuduPredicate* NewInBloomFilterPredicate(const Slice& col_name,
                                            std::vector<KuduBloomFilter*>* 
bloom_filters);
 
+  /// @name Advanced/Unstable API
+  ///
+  /// There are no guarantees on the stability of this client API.
+  ///
+  ///@{
+  /// Create a new IN Bloom filter predicate using direct BlockBloomFilter
+  /// pointers which can be used for scanners on this table.
+  ///
+  /// A Bloom filter is a space-efficient probabilistic data structure used to
+  /// test set membership with a possibility of false positive matches.
+  ///
+  /// IN list predicate can be used with small number of values; on the other
+  /// hand with IN Bloom filter predicate large number of values can be tested
+  /// for membership in a space-efficient manner.
+  ///
+  /// @param [in] col_name
+  ///   Name of the column to which the predicate applies.
+  /// @param [in] allocator
+  ///   Pointer to the BlockBloomFilterBufferAllocatorIf allocator used with
+  ///   Bloom filters.
+  /// @param bloom_filters
+  ///   Vector of BlockBloomFilter pointers that contain the values inserted to
+  ///   match against the column. The column value must match against all the
+  ///   supplied Bloom filters to be returned by the scanner. On return,
+  ///   regardless of success or error, the returned predicate will NOT take
+  ///   ownership of the pointers contained in @c bloom_filters and caller is
+  ///   responsible for the lifecycle management of the Bloom filters.
+  ///   The supplied Bloom filters are expected to remain valid for the
+  ///   lifetime of the KuduScanner.
+  /// @return Raw pointer to an IN Bloom filter predicate. The caller owns the
+  ///   predicate until it is passed into KuduScanner::AddConjunctPredicate().
+  ///   In the case of an error (e.g. an invalid column name),
+  ///   a non-NULL value is still returned. The error will be returned when
+  ///   attempting to add this predicate to a KuduScanner.
+  KuduPredicate* NewInBloomFilterPredicate(const Slice& col_name,
+                                           const Slice& allocator,
+                                           const std::vector<Slice>& 
bloom_filters);
+  ///@}
+
   /// Create a new IN list predicate which can be used for scanners on this
   /// table.
   ///
diff --git a/src/kudu/client/predicate-test.cc 
b/src/kudu/client/predicate-test.cc
index 150775a..eeb84f6 100644
--- a/src/kudu/client/predicate-test.cc
+++ b/src/kudu/client/predicate-test.cc
@@ -17,6 +17,7 @@
 
 #include <algorithm>
 #include <cmath>
+#include <cstddef>
 #include <cstdint>
 #include <initializer_list>
 #include <iterator>
@@ -44,7 +45,9 @@
 #include "kudu/gutil/strings/escaping.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
+#include "kudu/util/block_bloom_filter.h"
 #include "kudu/util/decimal_util.h"
+#include "kudu/util/hash.pb.h"
 #include "kudu/util/int128.h"
 #include "kudu/util/random.h"
 #include "kudu/util/random_util.h"
@@ -326,6 +329,16 @@ class PredicateTest : public KuduTest {
     return bf;
   }
 
+  static unique_ptr<BlockBloomFilter> CreateDirectBloomFilter(
+      int nkeys,
+      float false_positive_prob = kBloomFilterFalsePositiveProb) {
+    unique_ptr<BlockBloomFilter> bf(
+        new 
BlockBloomFilter(DefaultBlockBloomFilterBufferAllocator::GetSingleton()));
+    CHECK_OK(bf->Init(BlockBloomFilter::MinLogSpace(nkeys, 
false_positive_prob),
+                      kudu::FAST_HASH, 0));
+    return bf;
+  }
+
   void CheckInBloomFilterPredicate(const shared_ptr<KuduTable>& table,
                                    KuduBloomFilter* in_bloom_filter,
                                    int expected_count) {
@@ -1260,23 +1273,9 @@ TEST_F(PredicateTest, TestDecimalPredicates) {
 }
 
 class BloomFilterPredicateTest : public PredicateTest {
+ public:
+  BloomFilterPredicateTest() : rand_(Random(SeedRandom())) {}
  protected:
-  template<class Collection>
-  static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& 
values) {
-    auto* bloom_filter = CreateBloomFilter(values.size());
-    for (const auto& v : values) {
-      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), 
sizeof(v)));
-    }
-    return bloom_filter;
-  }
-};
-
-TEST_F(BloomFilterPredicateTest, TestBloomFilterPredicate) {
-  shared_ptr<KuduTable> table = CreateAndOpenTable(KuduColumnSchema::INT32);
-  shared_ptr<KuduSession> session = CreateSession();
-
-  auto seed = SeedRandom();
-  Random rand(seed);
   // Number of values to be written to the table.
   static constexpr int kNumAllValues = 100000;
   // Subset of values from the table that'll be inserted in BloomFilter and 
searched against
@@ -1287,58 +1286,155 @@ TEST_F(BloomFilterPredicateTest, 
TestBloomFilterPredicate) {
   // Number of false positives based on the number of values that'll be 
searched against.
   static constexpr int kFalsePositives = kNumAllValues * 
kBloomFilterFalsePositiveProb;
 
-  const unordered_set<int32_t> empty_set;
-  auto all_values = CreateRandomUniqueIntegers<int32_t>(kNumAllValues, 
empty_set, &rand);
-  auto min_max_pair = std::minmax_element(all_values.begin(), 
all_values.end());
-  vector<int32_t> inclusive_values;
-  ReservoirSample(all_values, kNumInclusiveValues, empty_set, &rand, 
&inclusive_values);
-  auto* inclusive_bf = CreateBloomFilterWithValues(inclusive_values);
-  auto exclusive_values = 
CreateRandomUniqueIntegers<int32_t>(kNumExclusiveValues, all_values,
-                                                              &rand);
-  auto* exclusive_bf = CreateBloomFilterWithValues(exclusive_values);
+  shared_ptr<KuduTable> table_;
+  shared_ptr<KuduSession> session_;
+  Random rand_;
+  unordered_set<int32_t> all_values_;
+  int32_t min_value_, max_value_;
+  // Subset of "all_values_".
+  vector<int32_t> inclusive_values_;
+  // Values that are not contained in "all_values_".
+  unordered_set<int32_t> exclusive_values_;
 
-  int i = 0;
-  for (auto value : all_values) {
-    unique_ptr<KuduInsert> insert(table->NewInsert());
-    ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
-    ASSERT_OK(insert->mutable_row()->SetInt32("value", value));
-    ASSERT_OK(session->Apply(insert.release()));
+  void SetUp() override {
+    PredicateTest::SetUp();
+
+    table_ = CreateAndOpenTable(KuduColumnSchema::INT32);
+    session_ = CreateSession();
+
+    const unordered_set<int32_t> empty_set;
+    all_values_ = CreateRandomUniqueIntegers<int32_t>(kNumAllValues, 
empty_set, &rand_);
+    auto min_max_pair = std::minmax_element(all_values_.begin(), 
all_values_.end());
+    min_value_ = *min_max_pair.first;
+    max_value_ = *min_max_pair.second;
+    ReservoirSample(all_values_, kNumInclusiveValues, empty_set, &rand_, 
&inclusive_values_);
+    exclusive_values_ = 
CreateRandomUniqueIntegers<int32_t>(kNumExclusiveValues, all_values_,
+                                                            &rand_);
+  }
+
+  template<typename BloomFilterType, typename Collection>
+  static void InsertValues(BloomFilterType* bloom_filter, const Collection& 
values) {
+    for (const auto& v : values) {
+      bloom_filter->Insert(Slice(reinterpret_cast<const uint8_t*>(&v), 
sizeof(v)));
+    }
+  }
+
+  template<class Collection>
+  static KuduBloomFilter* CreateBloomFilterWithValues(const Collection& 
values) {
+    KuduBloomFilter* bloom_filter = CreateBloomFilter(values.size());
+    InsertValues(bloom_filter, values);
+    return bloom_filter;
   }
-  ASSERT_OK(session->Flush());
+
+  template<class Collection>
+  static unique_ptr<BlockBloomFilter> CreateDirectBloomFilterWithValues(const 
Collection& values) {
+    unique_ptr<BlockBloomFilter> bloom_filter = 
CreateDirectBloomFilter(values.size());
+    InsertValues(bloom_filter.get(), values);
+    return bloom_filter;
+  }
+
+  void InsertAllValuesInTable() {
+    int i = 0;
+    for (auto value : all_values_) {
+      unique_ptr<KuduInsert> insert(table_->NewInsert());
+      ASSERT_OK(insert->mutable_row()->SetInt64("key", i++));
+      ASSERT_OK(insert->mutable_row()->SetInt32("value", value));
+      ASSERT_OK(session_->Apply(insert.release()));
+    }
+    ASSERT_OK(session_->Flush());
+  }
+
+  // Combine supplied Bloom filter predicates that contains inclusive values
+  // with Range predicate.
+  void TestWithRangePredicate(KuduPredicate* inclusive_predicate1,
+                              KuduPredicate* inclusive_predicate2) {
+    auto* less_predicate = table_->NewComparisonPredicate("value", 
KuduPredicate::LESS,
+                                                          
KuduValue::FromInt(min_value_));
+    int actual_count_less = CountRows(table_, {inclusive_predicate1, 
less_predicate });
+    EXPECT_EQ(0, actual_count_less);
+
+    auto* ge_predicate = table_->NewComparisonPredicate("value", 
KuduPredicate::GREATER_EQUAL,
+                                                        
KuduValue::FromInt(min_value_));
+    auto* le_predicate = table_->NewComparisonPredicate("value", 
KuduPredicate::LESS_EQUAL,
+                                                        
KuduValue::FromInt(max_value_));
+    int actual_count_range = CountRows(table_,
+                                       { inclusive_predicate2, ge_predicate, 
le_predicate });
+    EXPECT_LE(inclusive_values_.size(), actual_count_range);
+    EXPECT_GE(inclusive_values_.size() + kFalsePositives, actual_count_range);
+  }
+};
+
+// Though this static constexpr expression is initialized in the class 
declaration, it needs to be
+// defined because it involves some computation.
+constexpr int BloomFilterPredicateTest::kFalsePositives;
+
+TEST_F(BloomFilterPredicateTest, TestKuduBloomFilterPredicate) {
+  KuduBloomFilter* inclusive_bf = 
CreateBloomFilterWithValues(inclusive_values_);
+  KuduBloomFilter* exclusive_bf = 
CreateBloomFilterWithValues(exclusive_values_);
+
+  InsertAllValuesInTable();
 
   vector<KuduBloomFilter*> inclusive_bf_vec = { inclusive_bf };
   auto* inclusive_predicate =
-      table->NewInBloomFilterPredicate("value", &inclusive_bf_vec);
+      table_->NewInBloomFilterPredicate("value", &inclusive_bf_vec);
   auto* inclusive_predicate_clone1 = inclusive_predicate->Clone();
   auto* inclusive_predicate_clone2 = inclusive_predicate->Clone();
 
   ASSERT_TRUE(inclusive_bf_vec.empty());
-  int actual_count_inclusive = CountRows(table, { inclusive_predicate });
-  EXPECT_LE(inclusive_values.size(), actual_count_inclusive);
-  EXPECT_GE(inclusive_values.size() + kFalsePositives, actual_count_inclusive);
+  int actual_count_inclusive = CountRows(table_, { inclusive_predicate });
+  EXPECT_LE(inclusive_values_.size(), actual_count_inclusive);
+  EXPECT_GE(inclusive_values_.size() + kFalsePositives, 
actual_count_inclusive);
 
   vector<KuduBloomFilter*> exclusive_bf_vec = { exclusive_bf };
   auto* exclusive_predicate =
-      table->NewInBloomFilterPredicate("value", &exclusive_bf_vec);
+      table_->NewInBloomFilterPredicate("value", &exclusive_bf_vec);
   ASSERT_TRUE(exclusive_bf_vec.empty());
-  int actual_count_exclusive = CountRows(table, { exclusive_predicate });
+  int actual_count_exclusive = CountRows(table_, { exclusive_predicate });
+  EXPECT_LE(0, actual_count_exclusive);
+  EXPECT_GE(kFalsePositives, actual_count_exclusive);
+
+  // Combine Range predicate with Bloom filter predicate.
+  TestWithRangePredicate(inclusive_predicate_clone1, 
inclusive_predicate_clone2);
+}
+
+// Same as TestKuduBloomFilterPredicate above but using the overloaded 
NewInBloomFilterPredicate()
+// client API with direct BlockBloomFilter pointer.
+TEST_F(BloomFilterPredicateTest, TestDirectBlockBloomFilterPredicate) {
+  unique_ptr<BlockBloomFilter> inclusive_bf = 
CreateDirectBloomFilterWithValues(inclusive_values_);
+  unique_ptr<BlockBloomFilter> exclusive_bf = 
CreateDirectBloomFilterWithValues(exclusive_values_);
+
+  InsertAllValuesInTable();
+
+  auto* allocator = DefaultBlockBloomFilterBufferAllocator::GetSingleton();
+  Slice allocator_slice(reinterpret_cast<const uint8_t*>(allocator), 
sizeof(*allocator));
+
+  vector<Slice> inclusive_bf_vec =
+      { Slice(reinterpret_cast<const uint8_t*>(inclusive_bf.get()), 
sizeof(*inclusive_bf)) };
+  const size_t inclusive_bf_vec_size = inclusive_bf_vec.size();
+
+  auto* inclusive_predicate =
+      table_->NewInBloomFilterPredicate("value", allocator_slice, 
inclusive_bf_vec);
+  auto* inclusive_predicate_clone1 = inclusive_predicate->Clone();
+  auto* inclusive_predicate_clone2 = inclusive_predicate->Clone();
+
+  ASSERT_EQ(inclusive_bf_vec_size, inclusive_bf_vec.size());
+  int actual_count_inclusive = CountRows(table_, { inclusive_predicate });
+  EXPECT_LE(inclusive_values_.size(), actual_count_inclusive);
+  EXPECT_GE(inclusive_values_.size() + kFalsePositives, 
actual_count_inclusive);
+
+  vector<Slice> exclusive_bf_vec =
+      { Slice(reinterpret_cast<const uint8_t*>(exclusive_bf.get()), 
sizeof(*exclusive_bf)) };
+  const size_t exclusive_bf_vec_size = exclusive_bf_vec.size();
+  auto* exclusive_predicate =
+      table_->NewInBloomFilterPredicate("value", allocator_slice, 
exclusive_bf_vec);
+  ASSERT_EQ(exclusive_bf_vec_size, exclusive_bf_vec.size());
+
+  int actual_count_exclusive = CountRows(table_, { exclusive_predicate });
   EXPECT_LE(0, actual_count_exclusive);
   EXPECT_GE(kFalsePositives, actual_count_exclusive);
 
   // Combine Range predicate with Bloom filter predicate.
-  auto* less_predicate = table->NewComparisonPredicate("value", 
KuduPredicate::LESS,
-                                                       
KuduValue::FromInt(*min_max_pair.first));
-  int actual_count_less = CountRows(table, {inclusive_predicate_clone1, 
less_predicate });
-  EXPECT_EQ(0, actual_count_less);
-
-  auto* ge_predicate = table->NewComparisonPredicate("value", 
KuduPredicate::GREATER_EQUAL,
-                                                     
KuduValue::FromInt(*min_max_pair.first));
-  auto* le_predicate = table->NewComparisonPredicate("value", 
KuduPredicate::LESS_EQUAL,
-                                                     
KuduValue::FromInt(*min_max_pair.second));
-  int actual_count_range = CountRows(table,
-                                     { inclusive_predicate_clone2, 
ge_predicate, le_predicate });
-  EXPECT_LE(inclusive_values.size(), actual_count_range);
-  EXPECT_GE(inclusive_values.size() + kFalsePositives, actual_count_range);
+  TestWithRangePredicate(inclusive_predicate_clone1, 
inclusive_predicate_clone2);
 }
 
 class ParameterizedPredicateTest : public PredicateTest,
diff --git a/src/kudu/client/scan_predicate-internal.h 
b/src/kudu/client/scan_predicate-internal.h
index 5ee276e..543ae32 100644
--- a/src/kudu/client/scan_predicate-internal.h
+++ b/src/kudu/client/scan_predicate-internal.h
@@ -109,6 +109,53 @@ class InBloomFilterPredicateData : public 
KuduPredicate::Data {
   std::vector<std::unique_ptr<KuduBloomFilter>> bloom_filters_;
 };
 
+// Custom deleter to be used with smart pointers in 
InDirectBloomFilterPredicateData.
+// Allows use of smart pointers when the underlying raw pointer may or may not 
be
+// owned by the user. See comment in InDirectBloomFilterPredicateData for more 
context.
+template<typename T>
+struct DirectBloomFilterDataDeleter {
+  explicit DirectBloomFilterDataDeleter(bool owned) : owned_(owned) {
+  }
+
+  void operator()(T* ptr) {
+    if (owned_) {
+      delete ptr;
+    }
+  }
+ private:
+  bool owned_;
+};
+
+typedef std::unique_ptr<BlockBloomFilter, 
DirectBloomFilterDataDeleter<BlockBloomFilter>>
+    DirectBlockBloomFilterUniqPtr;
+
+class InDirectBloomFilterPredicateData : public KuduPredicate::Data {
+ public:
+  InDirectBloomFilterPredicateData(
+      ColumnSchema col,
+      std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator,
+      std::vector<DirectBlockBloomFilterUniqPtr> bloom_filters)
+      : col_(std::move(col)),
+        allocator_(std::move(allocator)),
+        bloom_filters_(std::move(bloom_filters)) {
+  }
+
+  Status AddToScanSpec(ScanSpec* spec, Arena* arena) override;
+
+  InDirectBloomFilterPredicateData* Clone() const override;
+
+ private:
+  ColumnSchema col_;
+  // This class is designed to accept BlockBloomFilter directly as raw pointer
+  // from consumers like Impala. In such a case, the data is owned by the 
caller and not
+  // by the instance of this class. So storing raw pointers and not 
destructing the pointers would
+  // have worked fine. However for the case when predicate data is Clone()'d 
the internal data
+  // is owned by the instance of this class. Hence using smart pointers with 
custom deleter
+  // DirectBloomFilterDataDeleter to keep track of ownership.
+  std::shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_;
+  std::vector<DirectBlockBloomFilterUniqPtr> bloom_filters_;
+};
+
 // A list predicate for a column and a list of constant values.
 class InListPredicateData : public KuduPredicate::Data {
  public:
diff --git a/src/kudu/client/scan_predicate.cc 
b/src/kudu/client/scan_predicate.cc
index 94abe4c..fd4f352 100644
--- a/src/kudu/client/scan_predicate.cc
+++ b/src/kudu/client/scan_predicate.cc
@@ -153,18 +153,30 @@ Status InListPredicateData::AddToScanSpec(ScanSpec* spec, 
Arena* /*arena*/) {
   return Status::OK();
 }
 
-Status InBloomFilterPredicateData::AddToScanSpec(ScanSpec* spec, Arena* 
/*arena*/) {
+// Helper function to add Bloom filters of different types to the scan spec.
+// "func" is a functor that provides access to the underlying BlockBloomFilter 
ptr.
+template<typename BloomFilterType, typename BloomFilterPtrFuncType>
+static Status AddBloomFiltersToScanSpec(const ColumnSchema& col, ScanSpec* 
spec,
+                                        const vector<BloomFilterType>& 
bloom_filters,
+                                        BloomFilterPtrFuncType func) {
   // Extract the BlockBloomFilters.
   vector<BlockBloomFilter*> block_bloom_filters;
-  block_bloom_filters.reserve(bloom_filters_.size());
-  for (const auto& bf : bloom_filters_) {
-    block_bloom_filters.push_back(bf->data_->bloom_filter_.get());
+  block_bloom_filters.reserve(bloom_filters.size());
+  for (const auto& bf : bloom_filters) {
+    block_bloom_filters.push_back(func(bf));
   }
 
-  spec->AddPredicate(ColumnPredicate::InBloomFilter(col_, 
std::move(block_bloom_filters)));
+  spec->AddPredicate(ColumnPredicate::InBloomFilter(col, 
std::move(block_bloom_filters)));
   return Status::OK();
 }
 
+Status InBloomFilterPredicateData::AddToScanSpec(ScanSpec* spec, Arena* 
/*arena*/) {
+  return AddBloomFiltersToScanSpec(col_, spec, bloom_filters_,
+                                   [](const unique_ptr<KuduBloomFilter>& bf) {
+                                     return bf->data_->bloom_filter_.get();
+                                   });
+}
+
 InBloomFilterPredicateData* client::InBloomFilterPredicateData::Clone() const {
   vector<unique_ptr<KuduBloomFilter>> bloom_filter_clones;
   bloom_filter_clones.reserve(bloom_filters_.size());
@@ -175,6 +187,35 @@ InBloomFilterPredicateData* 
client::InBloomFilterPredicateData::Clone() const {
   return new InBloomFilterPredicateData(col_, std::move(bloom_filter_clones));
 }
 
+Status InDirectBloomFilterPredicateData::AddToScanSpec(ScanSpec* spec, Arena* 
/*arena*/) {
+  return AddBloomFiltersToScanSpec(col_, spec, bloom_filters_,
+                                   [](const DirectBlockBloomFilterUniqPtr& bf) 
{
+                                     return bf.get();
+                                   });
+}
+
+InDirectBloomFilterPredicateData* InDirectBloomFilterPredicateData::Clone() 
const {
+  // In the clone case, the objects are owned by 
InDirectBloomFilterPredicateData
+  // and hence use the default deleter with shared_ptr.
+  shared_ptr<BlockBloomFilterBufferAllocatorIf> allocator_clone =
+      CHECK_NOTNULL(allocator_->Clone());
+
+  vector<DirectBlockBloomFilterUniqPtr> bloom_filter_clones;
+  bloom_filter_clones.reserve(bloom_filters_.size());
+  for (const auto& bf : bloom_filters_) {
+    unique_ptr<BlockBloomFilter> bf_clone;
+    CHECK_OK(bf->Clone(allocator_clone.get(), &bf_clone));
+
+    // Similarly for unique_ptr, specify that the BlockBloomFilter is owned by
+    // InDirectBloomFilterPredicateData.
+    DirectBlockBloomFilterUniqPtr direct_bf_clone(
+        bf_clone.release(), 
DirectBloomFilterDataDeleter<BlockBloomFilter>(true /*owned*/));
+    bloom_filter_clones.emplace_back(std::move(direct_bf_clone));
+  }
+  return new InDirectBloomFilterPredicateData(col_, std::move(allocator_clone),
+                                              std::move(bloom_filter_clones));
+}
+
 KuduBloomFilter::KuduBloomFilter()  {
   data_ = new Data();
 }

Reply via email to